Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Command = &cli.Command{ Name: "worker", Usage: "Start the scraper worker", Flags: []cli.Flag{ &cli.StringFlag{ Name: "database-url", Usage: "PostgreSQL connection string", Value: "postgres://postgres:postgres@localhost:5432/gmaps_pro?sslmode=disable", Sources: cli.EnvVars(saas.EnvDatabaseURL), }, &cli.IntFlag{ Name: "concurrency", Aliases: []string{"c"}, Usage: "Requested worker instances for host provisioning (each process runs one River job at a time)", Value: 1, Sources: cli.EnvVars(saas.EnvConcurrency), }, &cli.BoolFlag{ Name: "fast", Usage: "Enable fast mode (stealth HTTP requests)", Value: false, Sources: cli.EnvVars(saas.EnvFastMode), }, &cli.BoolFlag{ Name: "debug", Usage: "Enable debug mode (headful browser)", Value: false, Sources: cli.EnvVars(saas.EnvDebug), }, &cli.Int64Flag{ Name: "max-jobs-per-cycle", Usage: "Maximum jobs before restarting scraper", Value: 100, Sources: cli.EnvVars(saas.EnvMaxJobsPerCycle), }, &cli.StringFlag{ Name: "proxies", Usage: "Comma-separated list of proxy URLs", Value: "", Sources: cli.EnvVars(saas.EnvProxies), }, }, Action: func(ctx context.Context, cmd *cli.Command) error { ctx, cancel := context.WithCancel(ctx) defer cancel() sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan log.Info("shutdown signal received") cancel() }() configuredConcurrency := max(1, cmd.Int("concurrency")) concurrency := 1 workerStats.startedAt = time.Now() workerStats.concurrency = concurrency maxRiverWorkers := 1 fastMode := cmd.Bool("fast") debug := cmd.Bool("debug") maxJobsPerCycle := cmd.Int64("max-jobs-per-cycle") proxies := parseProxies(cmd.String("proxies")) dsn := cmd.String("database-url") dbMaxConns := int32(3) dbMinConns := int32(1) dbPool, err := postgres.Connect(ctx, dsn, postgres.WithMaxConns(dbMaxConns), postgres.WithMinConns(dbMinConns), ) if err != nil { return err } defer dbPool.Close() log.Info("worker configuration", "configured_container_instances", configuredConcurrency, "scraper_concurrency", concurrency, "river_max_workers", maxRiverWorkers, "db_max_conns", dbMaxConns, "db_min_conns", dbMinConns, "fast_mode", fastMode, "max_jobs_per_cycle", maxJobsPerCycle, "proxy_count", len(proxies), ) manager := scraper.NewScraperManager(dbPool, concurrency, fastMode, debug, maxJobsPerCycle, proxies) manager.OnJobComplete = IncrementJobsProcessed manager.CentralWriter().OnResultsSaved = AddResultsCollected go runHealthServer(ctx, manager) client, err := rqueue.NewWorkerClient(dbPool, manager) if err != nil { return err } log.Info("starting River worker") if err := client.Start(ctx); err != nil { return err } client.StartRetryPromoter(ctx) log.Info("starting scraper manager") if err := manager.Run(ctx); err != nil { if ctx.Err() != nil { log.Info("scraper manager stopped due to shutdown") } else { return err } } stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second) defer stopCancel() if err := client.Stop(stopCtx); err != nil { log.Error("error stopping River client", "error", err) } log.Info("worker shutdown complete") return nil }, }
Functions ¶
func AddResultsCollected ¶
func AddResultsCollected(count int)
AddResultsCollected adds to the results collected counter. Called by the scraper manager after saving results.
func IncrementJobsProcessed ¶
func IncrementJobsProcessed()
IncrementJobsProcessed increments the jobs processed counter. Called by the scraper manager after each job completes.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.