cmdworker

package
v1.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Command = &cli.Command{
	Name:  "worker",
	Usage: "Start the scraper worker",
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:    "database-url",
			Usage:   "PostgreSQL connection string",
			Value:   "postgres://postgres:postgres@localhost:5432/gmaps_pro?sslmode=disable",
			Sources: cli.EnvVars(saas.EnvDatabaseURL),
		},
		&cli.IntFlag{
			Name:    "concurrency",
			Aliases: []string{"c"},
			Usage:   "Requested worker instances for host provisioning (each process runs one River job at a time)",
			Value:   1,
			Sources: cli.EnvVars(saas.EnvConcurrency),
		},
		&cli.BoolFlag{
			Name:    "fast",
			Usage:   "Enable fast mode (stealth HTTP requests)",
			Value:   false,
			Sources: cli.EnvVars(saas.EnvFastMode),
		},
		&cli.BoolFlag{
			Name:    "debug",
			Usage:   "Enable debug mode (headful browser)",
			Value:   false,
			Sources: cli.EnvVars(saas.EnvDebug),
		},
		&cli.Int64Flag{
			Name:    "max-jobs-per-cycle",
			Usage:   "Maximum jobs before restarting scraper",
			Value:   100,
			Sources: cli.EnvVars(saas.EnvMaxJobsPerCycle),
		},
		&cli.StringFlag{
			Name:    "proxies",
			Usage:   "Comma-separated list of proxy URLs",
			Value:   "",
			Sources: cli.EnvVars(saas.EnvProxies),
		},
	},
	Action: func(ctx context.Context, cmd *cli.Command) error {

		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

		sigChan := make(chan os.Signal, 1)
		signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

		go func() {
			<-sigChan
			log.Info("shutdown signal received")
			cancel()
		}()

		configuredConcurrency := max(1, cmd.Int("concurrency"))
		concurrency := 1

		workerStats.startedAt = time.Now()
		workerStats.concurrency = concurrency

		maxRiverWorkers := 1
		fastMode := cmd.Bool("fast")
		debug := cmd.Bool("debug")
		maxJobsPerCycle := cmd.Int64("max-jobs-per-cycle")
		proxies := parseProxies(cmd.String("proxies"))

		dsn := cmd.String("database-url")
		dbMaxConns := int32(3)
		dbMinConns := int32(1)
		dbPool, err := postgres.Connect(ctx, dsn,
			postgres.WithMaxConns(dbMaxConns),
			postgres.WithMinConns(dbMinConns),
		)
		if err != nil {
			return err
		}
		defer dbPool.Close()

		log.Info("worker configuration",
			"configured_container_instances", configuredConcurrency,
			"scraper_concurrency", concurrency,
			"river_max_workers", maxRiverWorkers,
			"db_max_conns", dbMaxConns,
			"db_min_conns", dbMinConns,
			"fast_mode", fastMode,
			"max_jobs_per_cycle", maxJobsPerCycle,
			"proxy_count", len(proxies),
		)

		manager := scraper.NewScraperManager(dbPool, concurrency, fastMode, debug, maxJobsPerCycle, proxies)
		manager.OnJobComplete = IncrementJobsProcessed
		manager.CentralWriter().OnResultsSaved = AddResultsCollected

		go runHealthServer(ctx, manager)

		client, err := rqueue.NewWorkerClient(dbPool, manager)
		if err != nil {
			return err
		}

		log.Info("starting River worker")
		if err := client.Start(ctx); err != nil {
			return err
		}

		client.StartRetryPromoter(ctx)

		log.Info("starting scraper manager")
		if err := manager.Run(ctx); err != nil {
			if ctx.Err() != nil {
				log.Info("scraper manager stopped due to shutdown")
			} else {
				return err
			}
		}

		stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second)
		defer stopCancel()

		if err := client.Stop(stopCtx); err != nil {
			log.Error("error stopping River client", "error", err)
		}

		log.Info("worker shutdown complete")

		return nil
	},
}

Functions

func AddResultsCollected

func AddResultsCollected(count int)

AddResultsCollected adds to the results collected counter. Called by the scraper manager after saving results.

func IncrementJobsProcessed

func IncrementJobsProcessed()

IncrementJobsProcessed increments the jobs processed counter. Called by the scraper manager after each job completes.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL