broker

package
v0.33.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package broker provides Redis-backed task scheduling, dispatch, and coordination primitives for the hover crawl execution pipeline.

Index

Constants

View Source
const DefaultOutboxMaxAttempts = 10

DefaultOutboxMaxAttempts caps the worst-case stuck-row age at MaxAttempts × MaxBackoff (10 × 5 min = 50 min at defaults).

View Source
const RunningCountersKey = keyPrefix + "running"

HASH: jobID → in-flight task count.

Variables

This section is empty.

Functions

func ConsumerGroup

func ConsumerGroup(jobID string) string

func DomainConfigKey

func DomainConfigKey(domain string) string

HASH: base_delay_ms, adaptive_delay_ms, floor_ms, success_streak, error_streak.

func DomainGateKey

func DomainGateKey(domain string) string

String time-gate. SET NX PX {delay_ms} caps per-domain dispatch rate.

func DomainInflightKey

func DomainInflightKey(domain string) string

HASH: jobID → inflight task count for this domain+job pair.

func FormatScheduleEntry

func FormatScheduleEntry(taskID, jobID string, pageID int, host, path string, priority float64, retryCount int, sourceType, sourceURL, taskType string, lighthouseRunID int64) string

Pipe-delimited (avoids JSON overhead on the scheduling hot path).

Current: taskID|jobID|pageID|host|path|priority|retryCount|sourceType|sourceURL|taskType|lighthouseRunID Legacy: taskID|jobID|pageID|host|path|priority|retryCount|sourceType|sourceURL

ParseScheduleEntry accepts both so a deploy rolls forward without a ZSET flush; legacy entries default to taskType='crawl'.

func LighthouseConsumerGroup added in v0.33.10

func LighthouseConsumerGroup(jobID string) string

func LighthouseStreamKey added in v0.33.10

func LighthouseStreamKey(jobID string) string

Distinct stream so crawl workers and analysis can scale independently.

func ScheduleKey

func ScheduleKey(jobID string) string

ZSET; score = earliest runnable unix-ms.

func StreamKey

func StreamKey(jobID string) string

Types

type BatchError added in v0.33.1

type BatchError struct {
	FailedIndices []int
	Total         int
	Err           error
}

BatchError is returned by ScheduleBatch on partial pipeline failure. Type-assert via errors.As to retry FailedIndices.

func (*BatchError) Error added in v0.33.1

func (e *BatchError) Error() string

func (*BatchError) Unwrap added in v0.33.1

func (e *BatchError) Unwrap() error

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(cfg Config) (*Client, error)

func (*Client) ClearAll added in v0.33.10

func (c *Client) ClearAll(ctx context.Context) (int, error)

ClearAll deletes every hover:* key the broker writes. Does NOT FLUSHDB — safe on shared Redis.

func (*Client) Close

func (c *Client) Close() error

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

func (*Client) RDB

func (c *Client) RDB() *redis.Client

func (*Client) ReclaimTerminalJobKeys added in v0.33.10

func (c *Client) ReclaimTerminalJobKeys(ctx context.Context, filter TerminalFilter) (ReclaimReport, error)

One-off backfill sweeper. Idempotent.

func (*Client) RemoveJobKeys added in v0.33.10

func (c *Client) RemoveJobKeys(ctx context.Context, jobID string) error

RemoveJobKeys clears all per-job broker state for a terminal job. XGroupDestroy errors are tolerated so a partially-cleaned or lighthouse-less job doesn't abort the rest of the cleanup.

func (*Client) SweepOrphanInflight added in v0.33.12

func (c *Client) SweepOrphanInflight(ctx context.Context, activeJobIDs []string) (int, error)

SweepOrphanInflight removes dom:flight fields for jobs absent from activeJobIDs. Drift source: SIGKILL bypasses the graceful drain so dispatcher increments without a matching pacer.Release decrement. dom:flight has no dedicated reconciler.

type ConcurrencyChecker

type ConcurrencyChecker interface {
	CanDispatch(ctx context.Context, jobID string) (bool, error)
}

type Config

type Config struct {
	URL          string
	PoolSize     int
	TLSEnabled   bool
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
	MaxRetries   int
}

func ConfigFromEnv

func ConfigFromEnv() Config

ConfigFromEnv infers TLS from the URL scheme (rediss://) unless REDIS_TLS_ENABLED overrides.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reads via XREADGROUP and reclaims stale messages via XAUTOCLAIM.

func NewConsumer

func NewConsumer(client *Client, opts ConsumerOpts) *Consumer

func (*Consumer) Ack

func (c *Consumer) Ack(ctx context.Context, jobID string, messageIDs ...string) error

func (*Consumer) PendingCount added in v0.33.1

func (c *Consumer) PendingCount(ctx context.Context, jobID string) (int64, error)

PendingCount returns the PEL size — the authoritative source of "currently running" for a job. RunningCounters mirrors this and can drift under partial failures.

func (*Consumer) Read

func (c *Consumer) Read(ctx context.Context, jobID string) ([]StreamMessage, error)

Read blocks up to opts.BlockTimeout. Returns nil (not error) when no messages are ready.

func (*Consumer) ReadNonBlocking

func (c *Consumer) ReadNonBlocking(ctx context.Context, jobID string) ([]StreamMessage, error)

ReadNonBlocking returns immediately when no messages are ready.

func (*Consumer) ReclaimStale

func (c *Consumer) ReclaimStale(ctx context.Context, jobID string) (reclaimed []StreamMessage, deadLetter []StreamMessage, err error)

ReclaimStale walks the XAUTOCLAIM cursor until "0-0" so a burst of stuck messages drains in one tick. Messages over MaxDeliveries are returned as deadLetter candidates — caller owns final disposition and must ACK/NACK or they'll be reclaimed again next sweep.

type ConsumerOpts

type ConsumerOpts struct {
	// ConsumerName: "worker-{machineID}-{goroutineID}".
	ConsumerName  string
	BlockTimeout  time.Duration
	Count         int64
	ClaimInterval time.Duration
	MinIdleTime   time.Duration
	MaxDeliveries int64
	// AutoclaimCount is the per-call XAUTOCLAIM COUNT.
	AutoclaimCount int64
	// AutoclaimMaxPerSweep caps total reclaimed per sweep so one
	// pathological job can't starve the rest of the reclaim loop.
	AutoclaimMaxPerSweep int
}

func DefaultConsumerOpts

func DefaultConsumerOpts(consumerName string) ConsumerOpts

type DBSyncFunc

type DBSyncFunc func(ctx context.Context, counts map[string]int64) error

func DefaultDBSyncFunc

func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc

No outer tx: wrapping per-job UPDATEs together held row locks that deadlocked with the update_job_queue_counters AFTER trigger and saturated the bulk DB pool. Skew metric loses tx-snapshot consistency but Redis/PG already drift between ticks.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher moves due items from per-job Redis ZSETs into per-job Redis Streams.

func NewDispatcher

func NewDispatcher(
	client *Client,
	scheduler *Scheduler,
	pacer *DomainPacer,
	counters *RunningCounters,
	jobLister JobLister,
	concCheck ConcurrencyChecker,
	opts DispatcherOpts,
) *Dispatcher

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context)

Run blocks until ctx is cancelled. Start as a goroutine.

func (*Dispatcher) SetOnFirstDispatch added in v0.33.12

func (d *Dispatcher) SetOnFirstDispatch(fn func(ctx context.Context, jobID string) error)

SetOnFirstDispatch installs an idempotent hook fired the first time dispatchJob publishes a task for a jobID. A non-nil return triggers retry on the next dispatch.

func (*Dispatcher) SetReconciler added in v0.33.12

func (d *Dispatcher) SetReconciler(r Reconciler)

SetReconciler installs the self-heal target. Nil disables self-heal and is tolerated throughout the hot path.

type DispatcherOpts

type DispatcherOpts struct {
	ScanInterval time.Duration
	BatchSize    int64

	// ParallelJobs caps per-tick dispatch goroutines. Serial dispatch
	// scaled O(N_jobs × batch) and produced a ~70× backlog under 100
	// jobs. Default 32; override via REDIS_DISPATCH_PARALLEL_JOBS.
	ParallelJobs int

	// StuckThreshold gates the self-heal reconcile. Default 30s, env
	// REDIS_DISPATCH_STUCK_THRESHOLD_S; rate-limited to one trigger
	// per 2× threshold per job.
	StuckThreshold time.Duration
}

DispatcherOpts controls the dispatcher's scan behaviour.

func DefaultDispatcherOpts

func DefaultDispatcherOpts() DispatcherOpts

type DomainPacer

type DomainPacer struct {
	// contains filtered or unexported fields
}

func NewDomainPacer

func NewDomainPacer(client *Client, cfg PacerConfig) *DomainPacer

func (*DomainPacer) DecrementInflight

func (p *DomainPacer) DecrementInflight(ctx context.Context, domain, jobID string) error

func (*DomainPacer) FlushAdaptiveDelays added in v0.33.1

func (p *DomainPacer) FlushAdaptiveDelays(ctx context.Context) (int, error)

Restores pre-merge behaviour: in-memory limiter reset on each worker restart, but the Redis-backed state has a 24h TTL so a single 429 spike can pin a domain at the 60s floor for a full day. Call on worker startup to wipe the slate.

func (*DomainPacer) GetInflight

func (p *DomainPacer) GetInflight(ctx context.Context, domain, jobID string) (int64, error)

func (*DomainPacer) IncrementInflight

func (p *DomainPacer) IncrementInflight(ctx context.Context, domain, jobID string) error

func (*DomainPacer) Release

func (p *DomainPacer) Release(ctx context.Context, domain, jobID string, success, rateLimited bool) error

func (*DomainPacer) Seed

func (p *DomainPacer) Seed(ctx context.Context, domain string, baseDelayMS, adaptiveDelayMS, floorMS int) error

HSETNX preserves existing values, so callers may re-seed safely.

func (*DomainPacer) TryAcquire

func (p *DomainPacer) TryAcquire(ctx context.Context, domain string) (PaceResult, error)

Single Lua EVALSHA — the prior three-call form (HMGET → SET NX PX → PTTL) was the dominant dispatcher round-trip cost under multi-job loads.

type JobLister

type JobLister interface {
	ActiveJobIDs(ctx context.Context) ([]string, error)
}

type OutboxSweeperOpts

type OutboxSweeperOpts struct {
	Interval    time.Duration
	BatchSize   int
	BaseBackoff time.Duration
	MaxBackoff  time.Duration
	MaxAttempts int
	// StatementTimeout bounds tick DB work; guards against a wedged
	// sweeper tx holding locks indefinitely. 0 keeps DB default.
	StatementTimeout time.Duration
}

func DefaultOutboxSweeperOpts

func DefaultOutboxSweeperOpts() OutboxSweeperOpts

DefaultOutboxSweeperOpts: 500ms interval (5s starved end-to-end latency on small jobs); 15s StatementTimeout (HOVER-K3 — pool acquire ate several seconds of tick budget under bulk-lane load).

type PaceResult

type PaceResult struct {
	Acquired bool
	// Only meaningful when Acquired is false.
	RetryAfter time.Duration
}

type PacerConfig

type PacerConfig struct {
	SuccessThreshold int
	DelayStepMS      int
	// Defaults to DelayStepMS. Higher = faster recovery than growth, so
	// a 429 spike doesn't throttle a domain for 20 minutes.
	DelayStepDownMS int
	MaxDelayMS      int
	// Floor on RetryAfter so a near-zero gate TTL doesn't tight-loop
	// the dispatcher (Dispatcher tick is 100ms).
	MinPushbackMS int
}

func DefaultPacerConfig

func DefaultPacerConfig() PacerConfig

type Probe

type Probe struct {
	// contains filtered or unexported fields
}

func NewProbe

func NewProbe(client *Client, db *sql.DB, lister JobLister, opts ProbeOpts) *Probe

db may be nil on the API side. Zero opts fields fall back to defaults.

func (*Probe) Run

func (p *Probe) Run(ctx context.Context)

type ProbeOpts

type ProbeOpts struct {
	Interval time.Duration
	// Bounds a single tick so a slow Redis/DB call can't stall the loop.
	TickTimeout time.Duration
}

func DefaultProbeOpts

func DefaultProbeOpts() ProbeOpts

type ReclaimReport added in v0.33.10

type ReclaimReport struct {
	CandidatesScanned int
	TerminalJobs      int
	Cleaned           int
	Failed            int
	// First failure only, to avoid retaining one error per failed job.
	FirstError error
}

type Reconciler added in v0.33.12

type Reconciler interface {
	TriggerReconcile(ctx context.Context)
}

Reconciler is the dispatcher's self-heal target when CanDispatch keeps refusing dispatch despite due ZSET work — the signature of `hover:running` counter drift. Implementations must be safe for concurrent invocation and should debounce a flood of triggers to at most one in-flight reconcile.

type RunningCounters

type RunningCounters struct {
	// contains filtered or unexported fields
}

func NewRunningCounters

func NewRunningCounters(client *Client) *RunningCounters

func (*RunningCounters) Decrement

func (rc *RunningCounters) Decrement(ctx context.Context, jobID string) (int64, error)

func (*RunningCounters) Get

func (rc *RunningCounters) Get(ctx context.Context, jobID string) (int64, error)

func (*RunningCounters) GetAll

func (rc *RunningCounters) GetAll(ctx context.Context) (map[string]int64, error)

func (*RunningCounters) Increment

func (rc *RunningCounters) Increment(ctx context.Context, jobID string) (int64, error)

func (*RunningCounters) Reconcile

func (rc *RunningCounters) Reconcile(ctx context.Context, counts map[string]int64) error

func (*RunningCounters) RemoveJob

func (rc *RunningCounters) RemoveJob(ctx context.Context, jobID string) error

func (*RunningCounters) StartDBSync

func (rc *RunningCounters) StartDBSync(ctx context.Context, interval time.Duration, syncFn DBSyncFunc)

type ScheduleEntry

type ScheduleEntry struct {
	TaskID          string
	JobID           string
	PageID          int
	Host            string
	Path            string
	Priority        float64
	RetryCount      int
	SourceType      string
	SourceURL       string
	RunAt           time.Time
	TaskType        string
	LighthouseRunID int64
}

ScheduleEntry is encoded as a pipe-delimited ZSET member with the run-at unix-ms as the score. TaskType routes to a stream: "crawl" → StreamKey, "lighthouse" → LighthouseStreamKey.

func ParseScheduleEntry

func ParseScheduleEntry(member string, score float64) (ScheduleEntry, error)

ParseScheduleEntry accepts both the 9-field legacy format and the 11-field current format so a rolling deploy drains without a flush.

func (ScheduleEntry) Member

func (e ScheduleEntry) Member() string

func (ScheduleEntry) Score

func (e ScheduleEntry) Score() float64

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages delayed scheduling via Redis sorted sets. NewSchedulerWithDB enables dual-write of Reschedule's run-at into tasks.run_at so pacing push-backs survive a Redis flush.

func NewScheduler

func NewScheduler(client *Client) *Scheduler

NewScheduler creates a Scheduler without Postgres mirroring.

func NewSchedulerWithDB

func NewSchedulerWithDB(client *Client, db *sql.DB) *Scheduler

func (*Scheduler) DueItems

func (s *Scheduler) DueItems(ctx context.Context, jobID string, now time.Time, limit int64) ([]ScheduleEntry, error)

DueItems returns up to limit entries with score ≤ now. Items are not removed — caller ZREMs after successful dispatch.

func (*Scheduler) PendingCount

func (s *Scheduler) PendingCount(ctx context.Context, jobID string) (int64, error)

func (*Scheduler) Remove

func (s *Scheduler) Remove(ctx context.Context, jobID, member string) error

func (*Scheduler) RemoveJobSchedule

func (s *Scheduler) RemoveJobSchedule(ctx context.Context, jobID string) error

func (*Scheduler) Reschedule

func (s *Scheduler) Reschedule(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error

Reschedule pushes an existing entry's run-at later. With *sql.DB configured it dual-writes Postgres first so a crash between writes leaves the durable store with the newer time.

TODO(run-at-reconcile): once tasks have a dedicated 'scheduled' status, add a startup sweep that re-seeds ZSET from tasks.run_at for scheduled rows missing a ZSET member.

func (*Scheduler) RescheduleZSet added in v0.33.1

func (s *Scheduler) RescheduleZSet(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error

RescheduleZSet is Reschedule without the Postgres mirror, for the hot pacer-pushback path. The synchronous UPDATE backed up the ZSET to 80k+ entries at 100 paced ops/sec on 2026-04-22. Safe because OutboxSweeper rehydrates from tasks.run_at if Redis loses state.

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(ctx context.Context, entry ScheduleEntry) error

func (*Scheduler) ScheduleAndAck

func (s *Scheduler) ScheduleAndAck(ctx context.Context, entry ScheduleEntry, ackJobID, messageID string) error

ScheduleAndAck atomically enqueues the retry and ACKs the original in one MULTI/EXEC. Two-step would let XAUTOCLAIM redeliver a stuck PEL entry and double-crawl if Ack failed after Schedule succeeded.

func (*Scheduler) ScheduleBatch

func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) error

ScheduleBatch returns *BatchError when the pipeline ran but individual ZADDs failed; a non-BatchError means the pipeline itself failed and all entries must be treated as failed.

type StreamMessage

type StreamMessage struct {
	MessageID  string
	TaskID     string
	JobID      string
	PageID     int
	Host       string
	Path       string
	Priority   float64
	RetryCount int
	SourceType string
	SourceURL  string
}

StreamMessage is a parsed task envelope read from a Redis Stream.

type Sweeper

type Sweeper struct {
	// contains filtered or unexported fields
}

Sweeper drains task_outbox into Redis via Scheduler.ScheduleBatch. Multi-replica safe: FOR UPDATE SKIP LOCKED partitions due rows across sweepers.

func NewOutboxSweeper

func NewOutboxSweeper(db *sql.DB, scheduler *Scheduler, opts OutboxSweeperOpts) *Sweeper

func (*Sweeper) Run

func (s *Sweeper) Run(ctx context.Context)

Run drives the sweeper loop until ctx is cancelled.

func (*Sweeper) Tick

func (s *Sweeper) Tick(ctx context.Context) error

Tick runs a single sweep iteration. Exported for tests.

type TerminalFilter added in v0.33.10

type TerminalFilter func(ctx context.Context, jobIDs []string) ([]string, error)

Returns the subset of jobIDs that are terminal in Postgres. Kept as an interface so this package stays free of SQL.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL