broker

package
v0.33.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package broker provides Redis-backed task scheduling, dispatch, and coordination primitives for the hover crawl execution pipeline.

Index

Constants

View Source
const DefaultOutboxMaxAttempts = 10

DefaultOutboxMaxAttempts is the retry cap before a row is dead-lettered. Chosen so the worst-case age of a row stuck in backoff is bounded by MaxAttempts × MaxBackoff — at the defaults, 10 × 5 min = 50 min, which caps the oldest-age gauge even if a subset of rows can never be dispatched.

View Source
const RunningCountersKey = keyPrefix + "running"

RunningCountersKey is a single hash whose fields are job IDs and values are the number of tasks currently in-flight.

Variables

This section is empty.

Functions

func ConsumerGroup

func ConsumerGroup(jobID string) string

ConsumerGroup returns the consumer group name for a job stream.

func DomainConfigKey

func DomainConfigKey(domain string) string

DomainConfigKey is a hash storing adaptive delay state: base_delay_ms, adaptive_delay_ms, floor_ms, success_streak, error_streak.

func DomainGateKey

func DomainGateKey(domain string) string

DomainGateKey is a short-lived string used as a time gate. SET NX PX {delay_ms} prevents dispatching to the same domain faster than its configured rate.

func DomainInflightKey

func DomainInflightKey(domain string) string

DomainInflightKey is a hash whose fields are job IDs and values are the number of inflight tasks for that domain+job pair.

func FormatScheduleEntry

func FormatScheduleEntry(taskID, jobID string, pageID int, host, path string, priority float64, retryCount int, sourceType, sourceURL string) string

ScheduleEntry is the member value stored inside the schedule ZSET. It is serialised as a compact pipe-delimited string to avoid JSON overhead on the hot scheduling path.

Format: taskID|jobID|pageID|host|path|priority|retryCount|sourceType|sourceURL

func ScheduleKey

func ScheduleKey(jobID string) string

Schedule keys — sorted sets keyed by job, scored by earliest runnable unix-millisecond timestamp.

func StreamKey

func StreamKey(jobID string) string

Stream keys — per-job streams that hold ready-to-run task envelopes.

Types

type BatchError added in v0.33.1

type BatchError struct {
	FailedIndices []int
	Total         int
	Err           error
}

BatchError is returned by ScheduleBatch when some (but not all) entries in the pipeline failed. FailedIndices lists the indices within the input slice whose ZADD returned an error; the remaining entries were scheduled successfully. Err is the first per-entry error encountered, for logging.

Callers that need to retry only the failures can type-assert via errors.As and use FailedIndices to partition the batch. Callers that treat any failure as fatal can just check err != nil; the error message includes the failure count.

func (*BatchError) Error added in v0.33.1

func (e *BatchError) Error() string

func (*BatchError) Unwrap added in v0.33.1

func (e *BatchError) Unwrap() error

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps a go-redis client with convenience helpers.

func NewClient

func NewClient(cfg Config) (*Client, error)

NewClient parses cfg.URL and returns a connected Client.

func (*Client) Close

func (c *Client) Close() error

Close releases the underlying connection pool.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping verifies the connection is alive.

func (*Client) RDB

func (c *Client) RDB() *redis.Client

RDB exposes the raw go-redis client for packages that need direct access (e.g. Lua scripts, pipelines).

type ConcurrencyChecker

type ConcurrencyChecker interface {
	// CanDispatch returns true if the job has room for another task.
	CanDispatch(ctx context.Context, jobID string) (bool, error)
}

ConcurrencyChecker determines whether a job has capacity for more in-flight tasks.

type Config

type Config struct {
	URL        string
	PoolSize   int
	TLSEnabled bool

	ReadTimeout  time.Duration
	WriteTimeout time.Duration
	MaxRetries   int
}

Config holds Redis connection parameters, typically loaded from environment variables.

func ConfigFromEnv

func ConfigFromEnv() Config

ConfigFromEnv builds a Config from the process environment. TLS is inferred from the URL scheme (rediss:// = TLS) unless REDIS_TLS_ENABLED is explicitly set.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reads from one or more job streams via XREADGROUP and reclaims stale messages via XAUTOCLAIM.

func NewConsumer

func NewConsumer(client *Client, opts ConsumerOpts) *Consumer

NewConsumer creates a Consumer.

func (*Consumer) Ack

func (c *Consumer) Ack(ctx context.Context, jobID string, messageIDs ...string) error

Ack acknowledges one or more messages, removing them from the pending entries list (PEL).

func (*Consumer) PendingCount added in v0.33.1

func (c *Consumer) PendingCount(ctx context.Context, jobID string) (int64, error)

PendingCount returns the number of messages in the pending entries list (PEL) for a job's consumer group — i.e. tasks that have been delivered to a worker but not yet ACKed. This is the authoritative source of "currently running" for a given job; the RunningCounters HASH in Redis is a fast-path mirror that can drift under partial failures. Returns 0 when the stream or group does not yet exist.

func (*Consumer) Read

func (c *Consumer) Read(ctx context.Context, jobID string) ([]StreamMessage, error)

Read fetches new messages from the given job's stream. It blocks for up to opts.BlockTimeout if no messages are available. Returns nil (not error) when no messages are ready.

func (*Consumer) ReadNonBlocking

func (c *Consumer) ReadNonBlocking(ctx context.Context, jobID string) ([]StreamMessage, error)

ReadNonBlocking is like Read but returns immediately if no messages are available. Useful for round-robin scanning across multiple jobs.

func (*Consumer) ReclaimStale

func (c *Consumer) ReclaimStale(ctx context.Context, jobID string) (reclaimed []StreamMessage, deadLetter []StreamMessage, err error)

ReclaimStale uses XAUTOCLAIM to take ownership of messages that have been pending longer than MinIdleTime. Returns the reclaimed messages. Messages that have been delivered more than MaxDeliveries times are returned separately as dead-letter candidates.

A single call sweeps the full PEL by following the XAUTOCLAIM cursor until it returns to "0-0", so a burst of stuck messages drains in one tick rather than one-batch-per-30s. Per-call safety caps keep any single sweep bounded when the PEL is pathologically large.

Note: ReclaimStale does NOT ACK messages in the returned deadLetter slice. The caller owns final disposition and must ACK or NACK each dead-letter message explicitly — otherwise the same messages will be reclaimed again on the next XAUTOCLAIM sweep.

type ConsumerOpts

type ConsumerOpts struct {
	// ConsumerName uniquely identifies this consumer within the group.
	// Typically "worker-{machineID}-{goroutineID}".
	ConsumerName string

	// BlockTimeout is the XREADGROUP BLOCK duration. Default 2s.
	BlockTimeout time.Duration

	// Count is the max messages per XREADGROUP call. Default 1.
	Count int64

	// ClaimInterval is how often the XAUTOCLAIM sweep runs. Default 30s.
	ClaimInterval time.Duration

	// MinIdleTime is the XAUTOCLAIM min-idle-time. Messages pending
	// longer than this are reclaimed. Default 3min (TaskStaleTimeout).
	MinIdleTime time.Duration

	// MaxDeliveries is the maximum number of times a message can be
	// delivered before it is treated as a permanent failure. Default 3.
	MaxDeliveries int64

	// AutoclaimCount is the per-call XAUTOCLAIM COUNT used by ReclaimStale.
	// Default 100. Override via REDIS_AUTOCLAIM_COUNT.
	AutoclaimCount int64

	// AutoclaimMaxPerSweep is the safety cap on messages reclaimed per
	// ReclaimStale invocation across the cursor loop, so one pathological
	// job cannot starve the other jobs the reclaim loop still has to scan.
	// Default 1000. Override via REDIS_AUTOCLAIM_MAX_PER_SWEEP.
	AutoclaimMaxPerSweep int
}

ConsumerOpts controls stream reading behaviour.

func DefaultConsumerOpts

func DefaultConsumerOpts(consumerName string) ConsumerOpts

DefaultConsumerOpts returns production defaults.

type DBSyncFunc

type DBSyncFunc func(ctx context.Context, counts map[string]int64) error

DBSyncFunc is called periodically to write running counters back to Postgres for API visibility. The function receives a map of jobID -> count.

func DefaultDBSyncFunc

func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc

DefaultDBSyncFunc returns a DBSyncFunc that updates the jobs table running_tasks column using the provided *sql.DB.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher is a long-running goroutine that moves due items from per-job Redis ZSETs into per-job Redis Streams.

func NewDispatcher

func NewDispatcher(
	client *Client,
	scheduler *Scheduler,
	pacer *DomainPacer,
	counters *RunningCounters,
	jobLister JobLister,
	concCheck ConcurrencyChecker,
	opts DispatcherOpts,
) *Dispatcher

NewDispatcher creates a Dispatcher.

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context)

Run is the dispatcher's main loop. It blocks until ctx is cancelled. Start it as a goroutine.

type DispatcherOpts

type DispatcherOpts struct {
	// ScanInterval is how often the dispatcher sweeps all active job
	// ZSETs for due items. Default 100ms.
	ScanInterval time.Duration

	// BatchSize is the maximum number of ZSET entries fetched per job
	// per scan tick. Default 50.
	BatchSize int64

	// ParallelJobs caps how many per-job dispatch goroutines run
	// concurrently inside a single tick. Pre-change the tick processed
	// jobs serially, which made the per-task Redis round-trip cost scale
	// O(N_jobs × batch). Under a 100-job workload that serialised the
	// dispatcher into a ~70× backlog. Default 32. Override via
	// REDIS_DISPATCH_PARALLEL_JOBS.
	ParallelJobs int
}

DispatcherOpts controls the dispatcher's scan behaviour.

func DefaultDispatcherOpts

func DefaultDispatcherOpts() DispatcherOpts

DefaultDispatcherOpts returns production defaults, optionally overridden by environment variables.

type DomainPacer

type DomainPacer struct {
	// contains filtered or unexported fields
}

DomainPacer coordinates per-domain request rate across all worker instances using Redis-backed time gates and adaptive delay state.

func NewDomainPacer

func NewDomainPacer(client *Client, cfg PacerConfig) *DomainPacer

NewDomainPacer creates a DomainPacer.

func (*DomainPacer) DecrementInflight

func (p *DomainPacer) DecrementInflight(ctx context.Context, domain, jobID string) error

DecrementInflight reduces the per-domain per-job inflight counter.

func (*DomainPacer) FlushAdaptiveDelays added in v0.33.1

func (p *DomainPacer) FlushAdaptiveDelays(ctx context.Context) (int, error)

FlushAdaptiveDelays removes all accumulated per-domain adaptive delay state (hover:dom:cfg:* keys). Pre-merge the DomainLimiter was in-memory and reset on every worker restart — so a bad afternoon of 429s from a flaky target could never throttle crawls for longer than the worker's lifetime. Post-merge the state lives in Redis with a 24h TTL, which means a single spike can keep a domain at the 60s adaptive floor for a full day.

Call this on worker startup to restore the pre-merge behaviour: the pacer still grows delay on 429 during this run, but the slate is clean on each deploy. Returns the number of keys deleted.

func (*DomainPacer) GetInflight

func (p *DomainPacer) GetInflight(ctx context.Context, domain, jobID string) (int64, error)

GetInflight returns the current inflight count for a domain+job.

func (*DomainPacer) IncrementInflight

func (p *DomainPacer) IncrementInflight(ctx context.Context, domain, jobID string) error

IncrementInflight bumps the per-domain per-job inflight counter.

func (*DomainPacer) Release

func (p *DomainPacer) Release(ctx context.Context, domain, jobID string, success, rateLimited bool) error

Release is called after a crawl completes. It updates the adaptive delay state based on the outcome.

func (*DomainPacer) Seed

func (p *DomainPacer) Seed(ctx context.Context, domain string, baseDelayMS, adaptiveDelayMS, floorMS int) error

Seed initialises the domain config hash with base delay values (typically from robots.txt and Postgres domain record). Safe to call multiple times — uses HSETNX so existing values are preserved.

func (*DomainPacer) TryAcquire

func (p *DomainPacer) TryAcquire(ctx context.Context, domain string) (PaceResult, error)

TryAcquire attempts to set the domain time-gate. If the gate is already held (domain was recently accessed), it returns the remaining wait time. This is non-blocking.

Implemented as a single Lua EVALSHA so the effective-delay read and SET NX PX / PTTL fallback all execute server-side. The earlier three-call version (HMGET → SET NX PX → PTTL) was the dominant dispatcher round-trip cost under multi-job workloads.

type JobLister

type JobLister interface {
	ActiveJobIDs(ctx context.Context) ([]string, error)
}

JobLister returns the set of active job IDs the dispatcher should scan. Implementations typically query Postgres.

type OutboxSweeperOpts

type OutboxSweeperOpts struct {
	// Interval between sweep ticks. Default: 500ms.
	Interval time.Duration
	// BatchSize caps how many rows are claimed per tick. Default: 200.
	BatchSize int
	// BaseBackoff is the first retry delay on ScheduleBatch failure.
	// Default: 2s. Each subsequent attempt doubles up to MaxBackoff.
	BaseBackoff time.Duration
	// MaxBackoff caps the retry delay. Default: 5 minutes.
	MaxBackoff time.Duration
	// MaxAttempts is the retry cap before a row is moved to
	// task_outbox_dead. Default: 10.
	MaxAttempts int
	// StatementTimeout bounds each sweep tick's total DB work. Guards
	// against a pathological sweeper tx holding locks indefinitely. 0
	// leaves the DB's default in place. Default: 5s.
	StatementTimeout time.Duration
}

OutboxSweeperOpts configures a Sweeper.

func DefaultOutboxSweeperOpts

func DefaultOutboxSweeperOpts() OutboxSweeperOpts

DefaultOutboxSweeperOpts returns sensible production defaults.

The sweep interval was lowered from 5s to 500ms because the outbox sits on the hot path between newly-authored tasks and the Redis ZSET that dispatchers poll. At 5s, each just-completed task waited up to 5s for its newly-discovered siblings to reach a worker, which dominated end-to-end throughput on small jobs. The sweep is an index-only SKIP LOCKED query; running it 10× more often is cheap.

StatementTimeout was raised from 5s to 15s after HOVER-K3: when the shared queue pool saturates under bulk-lane load, pool acquire alone can eat several seconds of the tick budget, leaving sub-second headroom for the actual SELECT/UPDATE work and surfacing as "bump attempts: context deadline exceeded". 15s is comfortably shorter than session/idle timeouts but tolerates pool wait spikes.

type PaceResult

type PaceResult struct {
	// Acquired is true if the domain time-gate was successfully set.
	Acquired bool

	// RetryAfter is how long the caller should wait before retrying.
	// Only meaningful when Acquired is false.
	RetryAfter time.Duration
}

PaceResult is returned by TryAcquire.

type PacerConfig

type PacerConfig struct {
	// SuccessThreshold is the number of consecutive successes before
	// the adaptive delay decreases. Default 5. Override via
	// GNH_RATE_LIMIT_SUCCESS_THRESHOLD. Lower values recover faster
	// after a transient rate-limit event.
	SuccessThreshold int

	// DelayStepMS is the amount (milliseconds) the adaptive delay
	// GROWS on each rate-limited release. Default 500.
	DelayStepMS int

	// DelayStepDownMS is the amount (milliseconds) the adaptive delay
	// SHRINKS on each success once SuccessThreshold is reached. Default
	// falls back to DelayStepMS (symmetric growth/recovery). Setting
	// this higher than DelayStepMS makes recovery faster than the
	// growth on rate-limit, which is usually the right default — a
	// single spike of 429s shouldn't throttle a domain for 20 minutes.
	DelayStepDownMS int

	// MaxDelayMS caps the adaptive delay. Default 60_000 (60s).
	MaxDelayMS int

	// MinPushbackMS is the floor applied to PaceResult.RetryAfter when
	// the gate is already held. Prevents the dispatcher from tight-looping
	// through rate-limited tasks whose gate TTL is near-zero — without
	// this floor a domain with a 1ms residual delay would be re-fetched
	// every Dispatcher tick (100ms). Named after the pre-merge constant
	// domainDelayPause (internal/jobs/worker.go:147, 100ms default).
	MinPushbackMS int
}

PacerConfig holds the tuning knobs for domain pacing, mirroring the constants from the current in-memory DomainLimiter.

func DefaultPacerConfig

func DefaultPacerConfig() PacerConfig

DefaultPacerConfig returns production defaults.

type Probe

type Probe struct {
	// contains filtered or unexported fields
}

Probe periodically scrapes Tier 1 gauges that have no natural emission site (stream length, ZSET depth, pending count, outbox backlog + age, Redis PING, pool stats) and feeds them to the observability package. Intended to run once per process.

func NewProbe

func NewProbe(client *Client, db *sql.DB, lister JobLister, opts ProbeOpts) *Probe

NewProbe constructs a Probe. db may be nil on the API side if the outbox is only scraped by the worker. Zero-valued opts fields are back-filled from DefaultProbeOpts so the defaults have a single source of truth.

func (*Probe) Run

func (p *Probe) Run(ctx context.Context)

Run drives the probe loop until ctx is cancelled. Errors are logged and the loop continues; telemetry gaps are preferable to crashes.

type ProbeOpts

type ProbeOpts struct {
	// Interval between probe ticks. Default 5s.
	Interval time.Duration
	// TickTimeout bounds a single tick so a slow Redis or DB call
	// cannot stall the whole probe loop. Default 3s.
	TickTimeout time.Duration
}

ProbeOpts configures a broker Probe.

func DefaultProbeOpts

func DefaultProbeOpts() ProbeOpts

DefaultProbeOpts returns production defaults.

type RunningCounters

type RunningCounters struct {
	// contains filtered or unexported fields
}

RunningCounters tracks how many tasks are currently in-flight per job using a single Redis HASH. This replaces the in-memory atomic counters and async DB flush loops in the old worker pool.

func NewRunningCounters

func NewRunningCounters(client *Client) *RunningCounters

NewRunningCounters creates a RunningCounters.

func (*RunningCounters) Decrement

func (rc *RunningCounters) Decrement(ctx context.Context, jobID string) (int64, error)

Decrement atomically reduces the running count for a job. Returns the new count. Cleans up zero entries.

func (*RunningCounters) Get

func (rc *RunningCounters) Get(ctx context.Context, jobID string) (int64, error)

Get returns the current running count for a single job.

func (*RunningCounters) GetAll

func (rc *RunningCounters) GetAll(ctx context.Context) (map[string]int64, error)

GetAll returns the running counts for all jobs.

func (*RunningCounters) Increment

func (rc *RunningCounters) Increment(ctx context.Context, jobID string) (int64, error)

Increment atomically bumps the running count for a job. Returns the new count.

func (*RunningCounters) Reconcile

func (rc *RunningCounters) Reconcile(ctx context.Context, counts map[string]int64) error

Reconcile sets the running counters from an authoritative source (typically a Postgres query on startup). This overwrites any stale state in Redis.

func (*RunningCounters) RemoveJob

func (rc *RunningCounters) RemoveJob(ctx context.Context, jobID string) error

RemoveJob clears the running counter for a specific job (e.g. on job completion/cancellation).

func (*RunningCounters) StartDBSync

func (rc *RunningCounters) StartDBSync(ctx context.Context, interval time.Duration, syncFn DBSyncFunc)

StartDBSync runs a background loop that periodically reads all running counters from Redis and calls syncFn to persist them to Postgres. Blocks until ctx is cancelled.

type ScheduleEntry

type ScheduleEntry struct {
	TaskID     string
	JobID      string
	PageID     int
	Host       string
	Path       string
	Priority   float64
	RetryCount int
	SourceType string
	SourceURL  string
	RunAt      time.Time
}

ScheduleEntry contains the data needed to schedule a task into the Redis ZSET. The entry is stored as a pipe-delimited member string; the score is the earliest unix-millisecond timestamp at which the task may run.

func ParseScheduleEntry

func ParseScheduleEntry(member string, score float64) (ScheduleEntry, error)

ParseScheduleEntry reconstructs a ScheduleEntry from its pipe-delimited ZSET member string plus the score.

func (ScheduleEntry) Member

func (e ScheduleEntry) Member() string

Member returns the pipe-delimited string stored in the ZSET.

func (ScheduleEntry) Score

func (e ScheduleEntry) Score() float64

Score returns the ZSET score (unix milliseconds).

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages delayed task scheduling via Redis sorted sets.

When constructed via NewSchedulerWithDB, Reschedule dual-writes the new run-at time to the tasks.run_at column so pacing push-backs survive a Redis flush. The db dependency is optional (nil in unit tests that don't exercise the durability path).

func NewScheduler

func NewScheduler(client *Client) *Scheduler

NewScheduler creates a Scheduler without Postgres mirroring. Reschedule writes only to the Redis ZSET. Suitable for tests that don't need the durability path.

func NewSchedulerWithDB

func NewSchedulerWithDB(client *Client, db *sql.DB) *Scheduler

NewSchedulerWithDB creates a Scheduler that mirrors Reschedule run-at updates to the tasks.run_at column. This is the production constructor.

func (*Scheduler) DueItems

func (s *Scheduler) DueItems(ctx context.Context, jobID string, now time.Time, limit int64) ([]ScheduleEntry, error)

DueItems returns up to limit entries whose score is <= now from the given job's ZSET. Items are returned but not removed — the caller is responsible for ZREM after successful dispatch.

func (*Scheduler) PendingCount

func (s *Scheduler) PendingCount(ctx context.Context, jobID string) (int64, error)

PendingCount returns the total number of scheduled items for a job.

func (*Scheduler) Remove

func (s *Scheduler) Remove(ctx context.Context, jobID, member string) error

Remove deletes a task from the job's ZSET (e.g. on cancellation).

func (*Scheduler) RemoveJobSchedule

func (s *Scheduler) RemoveJobSchedule(ctx context.Context, jobID string) error

RemoveJobSchedule deletes the entire ZSET for a job (used on job cancellation or completion).

func (*Scheduler) Reschedule

func (s *Scheduler) Reschedule(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error

Reschedule updates the score (run-at time) for an existing entry in the ZSET. Used when the dispatcher cannot dispatch yet (domain pacing, concurrency limit) and needs to push the item back.

When the Scheduler was constructed with a *sql.DB, the new run-at is also written to the tasks.run_at column. Postgres is written first so that if the process crashes or Redis is flushed between the two writes, the durable store holds the newer time; the next dispatch attempt will see the correct pacing window.

TODO(run-at-reconcile): once the task-lifecycle redesign lands (a dedicated 'scheduled' status flipped by the dispatcher on XADD), add a startup sweep that re-seeds the ZSET from tasks.run_at for status='scheduled' rows with no ZSET member. Blocked on the outbox PR.

func (*Scheduler) RescheduleZSet added in v0.33.1

func (s *Scheduler) RescheduleZSet(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error

RescheduleZSet is Reschedule without the Postgres mirror. Use this on the hot pacer push-back path where every dispatcher iteration would otherwise issue a synchronous UPDATE — at 100 paced ops/sec the per-op DB latency stalled the single dispatcher goroutine and the ZSET backed up to 80k+ entries on 2026-04-22.

Safety: pacer push-back is ephemeral (seconds, not minutes). If Redis loses the ZSET between a push-back and dispatch, OutboxSweeper rehydrates from tasks.run_at. The authoritative run_at in Postgres is written on initial enqueue; a missed push-back just means the task is re-attempted slightly sooner than its pacer gate allows — the next TryAcquire will push it back again. No task is lost.

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(ctx context.Context, entry ScheduleEntry) error

Schedule adds a single task to the job's ZSET.

func (*Scheduler) ScheduleAndAck

func (s *Scheduler) ScheduleAndAck(ctx context.Context, entry ScheduleEntry, ackJobID, messageID string) error

ScheduleAndAck atomically enqueues a retry into the job's ZSET and acknowledges (removes) the original stream message in a single Redis MULTI/EXEC. This prevents the two-step race where Schedule succeeds but Ack fails — which would leave the retry queued while the original stays in the PEL, allowing XAUTOCLAIM to redeliver it and causing a duplicate crawl.

Redis MULTI/EXEC is atomic on a single server: either both operations apply or neither does. The caller receives a single error to act on.

func (*Scheduler) ScheduleBatch

func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) error

ScheduleBatch adds multiple tasks to their respective job ZSETs using a pipeline for efficiency.

Returns nil on full success. Returns a *BatchError when the pipeline completed but individual ZADDs failed — callers can partition the batch via err.(*BatchError).FailedIndices. Returns a non-BatchError error when the pipeline itself could not execute (e.g. Redis unreachable), in which case callers must treat all entries as failed.

type StreamMessage

type StreamMessage struct {
	// MessageID is the Redis stream entry ID (e.g. "1234567890-0").
	MessageID string

	TaskID     string
	JobID      string
	PageID     int
	Host       string
	Path       string
	Priority   float64
	RetryCount int
	SourceType string
	SourceURL  string
}

StreamMessage is a parsed task envelope read from a Redis Stream.

type Sweeper

type Sweeper struct {
	// contains filtered or unexported fields
}

Sweeper polls task_outbox for due rows and pushes them into Redis via Scheduler.ScheduleBatch. Deletes rows on success; bumps attempts and run_at on failure.

Safe to run multiple replicas: each claim tx uses FOR UPDATE SKIP LOCKED so replicas partition the due rows rather than contending.

func NewOutboxSweeper

func NewOutboxSweeper(db *sql.DB, scheduler *Scheduler, opts OutboxSweeperOpts) *Sweeper

NewOutboxSweeper constructs a Sweeper.

func (*Sweeper) Run

func (s *Sweeper) Run(ctx context.Context)

Run drives the sweeper loop until ctx is cancelled. Errors from individual ticks are logged; the loop keeps going.

func (*Sweeper) Tick

func (s *Sweeper) Tick(ctx context.Context) error

Tick runs a single sweep iteration. Exported for tests so they can deterministically trigger a sweep without waiting for the ticker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL