Documentation
¶
Index ¶
- Constants
- Variables
- func CreateRiverUIHandler(_ context.Context, client *Client) (*riverui.Handler, error)
- type CleanupArgs
- type CleanupWorker
- type Client
- func (c *Client) DeleteJob(ctx context.Context, encodedJobID string) error
- func (c *Client) GetDashboardStats(ctx context.Context) (*DashboardStats, error)
- func (c *Client) GetJobResults(ctx context.Context, encodedJobID string) (json.RawMessage, string, error)
- func (c *Client) GetJobStatus(ctx context.Context, jobID string) (*JobStatus, error)
- func (c *Client) InsertJob(ctx context.Context, args ScrapeJobArgs) (string, error)
- func (c *Client) InsertWorkerDeleteJob(ctx context.Context, args WorkerDeleteArgs) error
- func (c *Client) InsertWorkerProvisionJob(ctx context.Context, args WorkerProvisionArgs) error
- func (c *Client) ListJobs(ctx context.Context, state string, limit int, cursor string) (*JobListResult, error)
- func (c *Client) RiverClient() *river.Client[pgx.Tx]
- func (c *Client) Start(ctx context.Context) error
- func (c *Client) StartRetryPromoter(ctx context.Context)
- func (c *Client) Stop(ctx context.Context) error
- type DashboardStats
- type JobDeleteArgs
- type JobDeleteWorker
- type JobListItem
- type JobListResult
- type JobStatus
- type ScrapeJobArgs
- type ScrapeManager
- type ScrapeWatchdogMetrics
- type ScrapeWorker
- type WorkerDeleteArgs
- type WorkerDeleteWorker
- type WorkerHealthCheckArgs
- type WorkerHealthCheckWorker
- type WorkerProvisionArgs
- type WorkerProvisionWorker
Constants ¶
const QueueMaintenance = "maintenance"
QueueMaintenance is the queue name for maintenance jobs like deletion.
Variables ¶
var ValidJobStates = map[string]rivertype.JobState{ "available": rivertype.JobStateAvailable, "cancelled": rivertype.JobStateCancelled, "completed": rivertype.JobStateCompleted, "discarded": rivertype.JobStateDiscarded, "pending": rivertype.JobStatePending, "retryable": rivertype.JobStateRetryable, "running": rivertype.JobStateRunning, "scheduled": rivertype.JobStateScheduled, }
ValidJobStates are the valid states for filtering jobs.
Functions ¶
Types ¶
type CleanupArgs ¶
type CleanupArgs struct{}
CleanupArgs is a periodic job that cleans up expired sessions, stale rate limits, and old scrape results.
func (CleanupArgs) InsertOpts ¶
func (CleanupArgs) InsertOpts() river.InsertOpts
func (CleanupArgs) Kind ¶
func (CleanupArgs) Kind() string
type CleanupWorker ¶
type CleanupWorker struct {
river.WorkerDefaults[CleanupArgs]
// contains filtered or unexported fields
}
CleanupWorker performs periodic database cleanup.
func (*CleanupWorker) Work ¶
func (w *CleanupWorker) Work(ctx context.Context, _ *river.Job[CleanupArgs]) error
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
NewClient creates a new Client for the API server. It processes maintenance jobs (worker provisioning/deletion) and can insert scrape jobs. encryptionKey is needed to decrypt secrets from app_config at job execution time.
func NewWorkerClient ¶
func NewWorkerClient(dbPool *pgxpool.Pool, manager ScrapeManager) (*Client, error)
NewWorkerClient creates a new Client for worker mode. This client only processes scrape jobs. Maintenance jobs (worker provisioning, deletion) are handled by the server's River client. Worker-mode concurrency is intentionally fixed to one River worker per process.
func (*Client) DeleteJob ¶
DeleteJob queues a background job to delete a scrape job and its results. Returns immediately after validation; actual deletion happens async.
func (*Client) GetDashboardStats ¶
func (c *Client) GetDashboardStats(ctx context.Context) (*DashboardStats, error)
GetDashboardStats fetches job and result counts for the dashboard.
func (*Client) GetJobResults ¶
func (c *Client) GetJobResults(ctx context.Context, encodedJobID string) (json.RawMessage, string, error)
GetJobResults fetches the raw JSON results and keyword for a job by its encoded ID.
func (*Client) GetJobStatus ¶
func (*Client) InsertWorkerDeleteJob ¶
func (c *Client) InsertWorkerDeleteJob(ctx context.Context, args WorkerDeleteArgs) error
func (*Client) InsertWorkerProvisionJob ¶
func (c *Client) InsertWorkerProvisionJob(ctx context.Context, args WorkerProvisionArgs) error
InsertWorkerProvisionJob queues a background job to provision a cloud worker.
func (*Client) ListJobs ¶
func (c *Client) ListJobs(ctx context.Context, state string, limit int, cursor string) (*JobListResult, error)
ListJobs returns a paginated list of jobs with optional state filtering.
func (*Client) RiverClient ¶
RiverClient returns the underlying River client for use with River UI.
func (*Client) StartRetryPromoter ¶
StartRetryPromoter runs a background goroutine that periodically promotes retryable scrape jobs for immediate retry.
type DashboardStats ¶
InsertWorkerDeleteJob queues a background job to delete a cloud worker. DashboardStats holds summary statistics for the admin dashboard.
type JobDeleteArgs ¶
type JobDeleteArgs struct {
JobID int64 `json:"job_id"`
}
JobDeleteArgs contains the arguments for deleting a scrape job.
func (JobDeleteArgs) InsertOpts ¶
func (JobDeleteArgs) InsertOpts() river.InsertOpts
InsertOpts returns the insert options for this job type.
func (JobDeleteArgs) Kind ¶
func (JobDeleteArgs) Kind() string
Kind returns the job type identifier.
type JobDeleteWorker ¶
type JobDeleteWorker struct {
river.WorkerDefaults[JobDeleteArgs]
// contains filtered or unexported fields
}
JobDeleteWorker handles job deletion in the background.
func (*JobDeleteWorker) Work ¶
func (w *JobDeleteWorker) Work(ctx context.Context, job *river.Job[JobDeleteArgs]) error
Work deletes the job and its associated results.
type JobListItem ¶
type JobListItem struct {
JobID string
Status string
Keyword string
CreatedAt time.Time
StartedAt *time.Time
CompletedAt *time.Time
ResultCount int
Error string
}
JobListItem represents a job in the list (without full results).
type JobListResult ¶
type JobListResult struct {
Jobs []JobListItem
NextCursor string
HasMore bool
}
JobListResult represents the result of listing jobs.
type ScrapeJobArgs ¶
type ScrapeJobArgs struct {
Keyword string `json:"keyword"`
Lang string `json:"lang"`
MaxDepth int `json:"max_depth"`
Email bool `json:"email"`
GeoCoordinates string `json:"geo_coordinates"`
Zoom int `json:"zoom"`
Radius float64 `json:"radius"`
FastMode bool `json:"fast_mode"`
ExtraReviews bool `json:"extra_reviews"`
TimeoutSecs int `json:"timeout"` // timeout in seconds
}
func (ScrapeJobArgs) InsertOpts ¶
func (ScrapeJobArgs) InsertOpts() river.InsertOpts
func (ScrapeJobArgs) Kind ¶
func (ScrapeJobArgs) Kind() string
type ScrapeManager ¶
type ScrapeManager interface {
JobDone()
SubmitJob(ctx context.Context, job scrapemate.IJob) error
RegisterJob(jobID string, riverJobID int64, keyword string) <-chan scraper.FlushResult
MarkDone(jobID string)
ForceFlush(jobID string)
}
ScrapeManager is the interface that ScrapeWorker uses to interact with the scraper lifecycle and result collection.
type ScrapeWatchdogMetrics ¶
func GetScrapeWatchdogMetrics ¶
func GetScrapeWatchdogMetrics() ScrapeWatchdogMetrics
GetScrapeWatchdogMetrics returns cumulative watchdog counters for scrape jobs.
type ScrapeWorker ¶
type ScrapeWorker struct {
river.WorkerDefaults[ScrapeJobArgs]
Manager ScrapeManager
}
func (*ScrapeWorker) NextRetryAt ¶
func (w *ScrapeWorker) NextRetryAt(_ *river.Job[ScrapeJobArgs]) time.Time
NextRetryAt returns a 2-minute fallback retry delay. In practice, the retry promoter goroutine periodically calls JobRetry() much sooner.
func (*ScrapeWorker) Timeout ¶
func (w *ScrapeWorker) Timeout(job *river.Job[ScrapeJobArgs]) time.Duration
Timeout controls River's job context deadline for scrape jobs. Keep it above Work()'s internal timeout to allow force-flush and result save.
func (*ScrapeWorker) Work ¶
func (w *ScrapeWorker) Work(ctx context.Context, job *river.Job[ScrapeJobArgs]) error
type WorkerDeleteArgs ¶
type WorkerDeleteArgs struct {
ResourceID int `json:"resource_id"`
Provider string `json:"provider"`
ProviderResourceID string `json:"provider_resource_id"`
}
WorkerDeleteArgs contains only non-sensitive arguments for deletion.
func (WorkerDeleteArgs) InsertOpts ¶
func (WorkerDeleteArgs) InsertOpts() river.InsertOpts
func (WorkerDeleteArgs) Kind ¶
func (WorkerDeleteArgs) Kind() string
type WorkerDeleteWorker ¶
type WorkerDeleteWorker struct {
river.WorkerDefaults[WorkerDeleteArgs]
// contains filtered or unexported fields
}
WorkerDeleteWorker handles worker deletion in the background.
func (*WorkerDeleteWorker) Work ¶
func (w *WorkerDeleteWorker) Work(ctx context.Context, job *river.Job[WorkerDeleteArgs]) error
type WorkerHealthCheckArgs ¶
type WorkerHealthCheckArgs struct{}
WorkerHealthCheckArgs is a periodic job that checks health of all active workers.
func (WorkerHealthCheckArgs) InsertOpts ¶
func (WorkerHealthCheckArgs) InsertOpts() river.InsertOpts
func (WorkerHealthCheckArgs) Kind ¶
func (WorkerHealthCheckArgs) Kind() string
type WorkerHealthCheckWorker ¶
type WorkerHealthCheckWorker struct {
river.WorkerDefaults[WorkerHealthCheckArgs]
// contains filtered or unexported fields
}
WorkerHealthCheckWorker checks health of all active provisioned workers.
func (*WorkerHealthCheckWorker) Work ¶
func (w *WorkerHealthCheckWorker) Work(ctx context.Context, _ *river.Job[WorkerHealthCheckArgs]) error
type WorkerProvisionArgs ¶
type WorkerProvisionArgs struct {
ResourceID int `json:"resource_id"`
Provider string `json:"provider"`
Name string `json:"name"`
Region string `json:"region"`
Size string `json:"size"`
Concurrency int `json:"concurrency"` // Number of worker containers on the host.
MaxJobsPerCycle int `json:"max_jobs_per_cycle"`
FastMode bool `json:"fast_mode"`
Proxies string `json:"proxies"`
}
WorkerProvisionArgs contains only non-sensitive arguments for provisioning. Secrets (API tokens, DB URL, registry creds) are fetched at runtime from app_config.
func (WorkerProvisionArgs) InsertOpts ¶
func (WorkerProvisionArgs) InsertOpts() river.InsertOpts
func (WorkerProvisionArgs) Kind ¶
func (WorkerProvisionArgs) Kind() string
type WorkerProvisionWorker ¶
type WorkerProvisionWorker struct {
river.WorkerDefaults[WorkerProvisionArgs]
// contains filtered or unexported fields
}
WorkerProvisionWorker handles worker provisioning in the background.
func (*WorkerProvisionWorker) Timeout ¶
func (w *WorkerProvisionWorker) Timeout(_ *river.Job[WorkerProvisionArgs]) time.Duration
func (*WorkerProvisionWorker) Work ¶
func (w *WorkerProvisionWorker) Work(ctx context.Context, job *river.Job[WorkerProvisionArgs]) error