scraper

package
v1.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CentralWriter

type CentralWriter struct {
	OnResultsSaved func(count int)
	// contains filtered or unexported fields
}

CentralWriter tracks exactly one in-flight River job, receives ScrapeMate results, and flushes them to the database when the exit monitor signals done.

func NewCentralWriter

func NewCentralWriter(db *pgxpool.Pool, saveFn SaveFunc) *CentralWriter

NewCentralWriter creates a new CentralWriter. Pass nil for saveFn to use the default PostgreSQL saver built from db.

func (*CentralWriter) AddResult

func (cw *CentralWriter) AddResult(jobID string, entry *gmaps.Entry)

AddResult appends an entry for the currently tracked job.

func (*CentralWriter) Discard

func (cw *CentralWriter) Discard(jobID string)

Discard drops the tracked job without persisting results.

func (*CentralWriter) Flush

func (cw *CentralWriter) Flush(jobID string)

Flush saves results to DB, signals completion, and clears tracked state. Idempotent: a second call for the same jobID is a no-op.

func (*CentralWriter) FlushQueueDepth

func (cw *CentralWriter) FlushQueueDepth() int

FlushQueueDepth is kept for health endpoint compatibility.

func (*CentralWriter) ForceFlush

func (cw *CentralWriter) ForceFlush(jobID string)

ForceFlush immediately flushes results for a job (used on timeout/shutdown).

func (*CentralWriter) MarkDone

func (cw *CentralWriter) MarkDone(jobID string)

MarkDone is called by the exit monitor when a job is complete.

func (*CentralWriter) RegisterJob

func (cw *CentralWriter) RegisterJob(jobID string, riverJobID int64, keyword string) <-chan FlushResult

RegisterJob registers the active River job and returns a completion channel that receives the flush result.

func (*CentralWriter) Run

func (cw *CentralWriter) Run(ctx context.Context, in <-chan scrapemate.Result) error

Run processes results from the ScrapeMate scraper and buffers them for the currently registered River job.

func (*CentralWriter) TrackedJobs

func (cw *CentralWriter) TrackedJobs() int

TrackedJobs returns how many jobs are currently tracked in-memory.

type FlushResult

type FlushResult struct {
	ResultCount int
	Err         error
}

FlushResult is sent to the River worker when results are flushed to DB.

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

Provider is a simple FIFO bridge between River workers and ScrapeMate. Root jobs are submitted by the River worker and child jobs are pushed by ScrapeMate.

func NewProvider

func NewProvider(bufferSize int) *Provider

NewProvider creates a new Provider with the given buffer size. If bufferSize <= 0, defaults to 100.

func (*Provider) Close

func (p *Provider) Close()

Close closes the provider and job output channel.

func (*Provider) Jobs

func (p *Provider) Jobs(_ context.Context) (<-chan scrapemate.IJob, <-chan error)

Jobs returns channels for ScrapeMate to consume jobs and errors.

func (*Provider) Push

func (p *Provider) Push(ctx context.Context, job scrapemate.IJob) error

Push adds a child job to the provider queue.

func (*Provider) Submit

func (p *Provider) Submit(ctx context.Context, job scrapemate.IJob) error

Submit adds a root job to the provider queue.

type SaveFunc

type SaveFunc func(ctx context.Context, riverJobID int64, keyword string, entries []*gmaps.Entry) error

SaveFunc persists results. The default writes to PostgreSQL; tests can supply a lightweight substitute via NewCentralWriter.

type ScraperManager

type ScraperManager struct {

	// OnJobComplete is called after each job finishes (for stats tracking).
	OnJobComplete func()
	// contains filtered or unexported fields
}

ScraperManager manages the scraper lifecycle, restarting it after a configured number of jobs to prevent memory leaks.

func NewScraperManager

func NewScraperManager(dbPool *pgxpool.Pool, concurrency int, fastMode, debug bool, maxJobs int64, proxies []string) *ScraperManager

NewScraperManager creates a new ScraperManager.

func (*ScraperManager) ActiveJobs

func (m *ScraperManager) ActiveJobs() int64

ActiveJobs returns the number of currently active scrape jobs.

func (*ScraperManager) CentralWriter

func (m *ScraperManager) CentralWriter() *CentralWriter

CentralWriter returns the CentralWriter instance.

func (*ScraperManager) ForceFlush

func (m *ScraperManager) ForceFlush(jobID string)

ForceFlush delegates to CentralWriter.

func (*ScraperManager) JobDone

func (m *ScraperManager) JobDone()

JobDone is called after each River job completes.

func (*ScraperManager) MarkDone

func (m *ScraperManager) MarkDone(jobID string)

MarkDone delegates to CentralWriter.

func (*ScraperManager) RegisterJob

func (m *ScraperManager) RegisterJob(jobID string, riverJobID int64, keyword string) <-chan FlushResult

RegisterJob delegates to CentralWriter.

func (*ScraperManager) Run

func (m *ScraperManager) Run(ctx context.Context) error

Run starts the scraper manager loop. It creates a new scraper, runs it until the job threshold is reached, then restarts.

func (*ScraperManager) SubmitJob

func (m *ScraperManager) SubmitJob(ctx context.Context, job scrapemate.IJob) error

SubmitJob submits a job to the current provider.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL