Documentation
¶
Index ¶
- type CentralWriter
- func (cw *CentralWriter) AddResult(jobID string, entry *gmaps.Entry)
- func (cw *CentralWriter) Discard(jobID string)
- func (cw *CentralWriter) Flush(jobID string)
- func (cw *CentralWriter) FlushQueueDepth() int
- func (cw *CentralWriter) ForceFlush(jobID string)
- func (cw *CentralWriter) MarkDone(jobID string)
- func (cw *CentralWriter) RegisterJob(jobID string, riverJobID int64, keyword string) <-chan FlushResult
- func (cw *CentralWriter) Run(ctx context.Context, in <-chan scrapemate.Result) error
- func (cw *CentralWriter) TrackedJobs() int
- type FlushResult
- type Provider
- type SaveFunc
- type ScraperManager
- func (m *ScraperManager) ActiveJobs() int64
- func (m *ScraperManager) CentralWriter() *CentralWriter
- func (m *ScraperManager) ForceFlush(jobID string)
- func (m *ScraperManager) JobDone()
- func (m *ScraperManager) MarkDone(jobID string)
- func (m *ScraperManager) RegisterJob(jobID string, riverJobID int64, keyword string) <-chan FlushResult
- func (m *ScraperManager) Run(ctx context.Context) error
- func (m *ScraperManager) SubmitJob(ctx context.Context, job scrapemate.IJob) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CentralWriter ¶
type CentralWriter struct {
OnResultsSaved func(count int)
// contains filtered or unexported fields
}
CentralWriter tracks exactly one in-flight River job, receives ScrapeMate results, and flushes them to the database when the exit monitor signals done.
func NewCentralWriter ¶
func NewCentralWriter(db *pgxpool.Pool, saveFn SaveFunc) *CentralWriter
NewCentralWriter creates a new CentralWriter. Pass nil for saveFn to use the default PostgreSQL saver built from db.
func (*CentralWriter) AddResult ¶
func (cw *CentralWriter) AddResult(jobID string, entry *gmaps.Entry)
AddResult appends an entry for the currently tracked job.
func (*CentralWriter) Discard ¶
func (cw *CentralWriter) Discard(jobID string)
Discard drops the tracked job without persisting results.
func (*CentralWriter) Flush ¶
func (cw *CentralWriter) Flush(jobID string)
Flush saves results to DB, signals completion, and clears tracked state. Idempotent: a second call for the same jobID is a no-op.
func (*CentralWriter) FlushQueueDepth ¶
func (cw *CentralWriter) FlushQueueDepth() int
FlushQueueDepth is kept for health endpoint compatibility.
func (*CentralWriter) ForceFlush ¶
func (cw *CentralWriter) ForceFlush(jobID string)
ForceFlush immediately flushes results for a job (used on timeout/shutdown).
func (*CentralWriter) MarkDone ¶
func (cw *CentralWriter) MarkDone(jobID string)
MarkDone is called by the exit monitor when a job is complete.
func (*CentralWriter) RegisterJob ¶
func (cw *CentralWriter) RegisterJob(jobID string, riverJobID int64, keyword string) <-chan FlushResult
RegisterJob registers the active River job and returns a completion channel that receives the flush result.
func (*CentralWriter) Run ¶
func (cw *CentralWriter) Run(ctx context.Context, in <-chan scrapemate.Result) error
Run processes results from the ScrapeMate scraper and buffers them for the currently registered River job.
func (*CentralWriter) TrackedJobs ¶
func (cw *CentralWriter) TrackedJobs() int
TrackedJobs returns how many jobs are currently tracked in-memory.
type FlushResult ¶
FlushResult is sent to the River worker when results are flushed to DB.
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
Provider is a simple FIFO bridge between River workers and ScrapeMate. Root jobs are submitted by the River worker and child jobs are pushed by ScrapeMate.
func NewProvider ¶
NewProvider creates a new Provider with the given buffer size. If bufferSize <= 0, defaults to 100.
func (*Provider) Close ¶
func (p *Provider) Close()
Close closes the provider and job output channel.
type SaveFunc ¶
type SaveFunc func(ctx context.Context, riverJobID int64, keyword string, entries []*gmaps.Entry) error
SaveFunc persists results. The default writes to PostgreSQL; tests can supply a lightweight substitute via NewCentralWriter.
type ScraperManager ¶
type ScraperManager struct {
// OnJobComplete is called after each job finishes (for stats tracking).
OnJobComplete func()
// contains filtered or unexported fields
}
ScraperManager manages the scraper lifecycle, restarting it after a configured number of jobs to prevent memory leaks.
func NewScraperManager ¶
func NewScraperManager(dbPool *pgxpool.Pool, concurrency int, fastMode, debug bool, maxJobs int64, proxies []string) *ScraperManager
NewScraperManager creates a new ScraperManager.
func (*ScraperManager) ActiveJobs ¶
func (m *ScraperManager) ActiveJobs() int64
ActiveJobs returns the number of currently active scrape jobs.
func (*ScraperManager) CentralWriter ¶
func (m *ScraperManager) CentralWriter() *CentralWriter
CentralWriter returns the CentralWriter instance.
func (*ScraperManager) ForceFlush ¶
func (m *ScraperManager) ForceFlush(jobID string)
ForceFlush delegates to CentralWriter.
func (*ScraperManager) JobDone ¶
func (m *ScraperManager) JobDone()
JobDone is called after each River job completes.
func (*ScraperManager) MarkDone ¶
func (m *ScraperManager) MarkDone(jobID string)
MarkDone delegates to CentralWriter.
func (*ScraperManager) RegisterJob ¶
func (m *ScraperManager) RegisterJob(jobID string, riverJobID int64, keyword string) <-chan FlushResult
RegisterJob delegates to CentralWriter.
func (*ScraperManager) Run ¶
func (m *ScraperManager) Run(ctx context.Context) error
Run starts the scraper manager loop. It creates a new scraper, runs it until the job threshold is reached, then restarts.
func (*ScraperManager) SubmitJob ¶
func (m *ScraperManager) SubmitJob(ctx context.Context, job scrapemate.IJob) error
SubmitJob submits a job to the current provider.