Documentation
¶
Overview ¶
Package broker provides Redis-backed task scheduling, dispatch, and coordination primitives for the hover crawl execution pipeline.
Index ¶
- Constants
- func ConsumerGroup(jobID string) string
- func DomainConfigKey(domain string) string
- func DomainGateKey(domain string) string
- func DomainInflightKey(domain string) string
- func FormatScheduleEntry(taskID, jobID string, pageID int, host, path string, priority float64, ...) string
- func LighthouseConsumerGroup(jobID string) string
- func LighthouseStreamKey(jobID string) string
- func ScheduleKey(jobID string) string
- func StreamKey(jobID string) string
- type BatchError
- type Client
- func (c *Client) ClearAll(ctx context.Context) (int, error)
- func (c *Client) Close() error
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) RDB() *redis.Client
- func (c *Client) ReclaimTerminalJobKeys(ctx context.Context, filter TerminalFilter) (ReclaimReport, error)
- func (c *Client) RemoveJobKeys(ctx context.Context, jobID string) error
- func (c *Client) SweepOrphanInflight(ctx context.Context, activeJobIDs []string) (int, error)
- type ConcurrencyChecker
- type Config
- type Consumer
- func (c *Consumer) Ack(ctx context.Context, jobID string, messageIDs ...string) error
- func (c *Consumer) PendingCount(ctx context.Context, jobID string) (int64, error)
- func (c *Consumer) Read(ctx context.Context, jobID string) ([]StreamMessage, error)
- func (c *Consumer) ReadNonBlocking(ctx context.Context, jobID string) ([]StreamMessage, error)
- func (c *Consumer) ReclaimStale(ctx context.Context, jobID string) (reclaimed []StreamMessage, deadLetter []StreamMessage, err error)
- type ConsumerOpts
- type DBSyncFunc
- type Dispatcher
- type DispatcherOpts
- type DomainPacer
- func (p *DomainPacer) DecrementInflight(ctx context.Context, domain, jobID string) error
- func (p *DomainPacer) FlushAdaptiveDelays(ctx context.Context) (int, error)
- func (p *DomainPacer) GetInflight(ctx context.Context, domain, jobID string) (int64, error)
- func (p *DomainPacer) IncrementInflight(ctx context.Context, domain, jobID string) error
- func (p *DomainPacer) Release(ctx context.Context, domain, jobID string, success, rateLimited bool) error
- func (p *DomainPacer) Seed(ctx context.Context, domain string, baseDelayMS, adaptiveDelayMS, floorMS int) error
- func (p *DomainPacer) TryAcquire(ctx context.Context, domain string) (PaceResult, error)
- type JobLister
- type OutboxSweeperOpts
- type PaceResult
- type PacerConfig
- type Probe
- type ProbeOpts
- type ReclaimReport
- type Reconciler
- type RunningCounters
- func (rc *RunningCounters) Decrement(ctx context.Context, jobID string) (int64, error)
- func (rc *RunningCounters) Get(ctx context.Context, jobID string) (int64, error)
- func (rc *RunningCounters) GetAll(ctx context.Context) (map[string]int64, error)
- func (rc *RunningCounters) Increment(ctx context.Context, jobID string) (int64, error)
- func (rc *RunningCounters) Reconcile(ctx context.Context, counts map[string]int64) error
- func (rc *RunningCounters) RemoveJob(ctx context.Context, jobID string) error
- func (rc *RunningCounters) StartDBSync(ctx context.Context, interval time.Duration, syncFn DBSyncFunc)
- type ScheduleEntry
- type Scheduler
- func (s *Scheduler) DueItems(ctx context.Context, jobID string, now time.Time, limit int64) ([]ScheduleEntry, error)
- func (s *Scheduler) PendingCount(ctx context.Context, jobID string) (int64, error)
- func (s *Scheduler) Remove(ctx context.Context, jobID, member string) error
- func (s *Scheduler) RemoveJobSchedule(ctx context.Context, jobID string) error
- func (s *Scheduler) Reschedule(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error
- func (s *Scheduler) RescheduleZSet(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error
- func (s *Scheduler) Schedule(ctx context.Context, entry ScheduleEntry) error
- func (s *Scheduler) ScheduleAndAck(ctx context.Context, entry ScheduleEntry, ackJobID, messageID string) error
- func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) error
- type StreamMessage
- type Sweeper
- type TerminalFilter
Constants ¶
const DefaultOutboxMaxAttempts = 10
DefaultOutboxMaxAttempts caps the worst-case stuck-row age at MaxAttempts × MaxBackoff (10 × 5 min = 50 min at defaults).
const RunningCountersKey = keyPrefix + "running"
HASH: jobID → in-flight task count.
Variables ¶
This section is empty.
Functions ¶
func ConsumerGroup ¶
func DomainConfigKey ¶
HASH: base_delay_ms, adaptive_delay_ms, floor_ms, success_streak, error_streak.
func DomainGateKey ¶
String time-gate. SET NX PX {delay_ms} caps per-domain dispatch rate.
func DomainInflightKey ¶
HASH: jobID → inflight task count for this domain+job pair.
func FormatScheduleEntry ¶
func FormatScheduleEntry(taskID, jobID string, pageID int, host, path string, priority float64, retryCount int, sourceType, sourceURL, taskType string, lighthouseRunID int64) string
Pipe-delimited (avoids JSON overhead on the scheduling hot path).
Current: taskID|jobID|pageID|host|path|priority|retryCount|sourceType|sourceURL|taskType|lighthouseRunID Legacy: taskID|jobID|pageID|host|path|priority|retryCount|sourceType|sourceURL
ParseScheduleEntry accepts both so a deploy rolls forward without a ZSET flush; legacy entries default to taskType='crawl'.
func LighthouseConsumerGroup ¶ added in v0.33.10
func LighthouseStreamKey ¶ added in v0.33.10
Distinct stream so crawl workers and analysis can scale independently.
Types ¶
type BatchError ¶ added in v0.33.1
BatchError is returned by ScheduleBatch on partial pipeline failure. Type-assert via errors.As to retry FailedIndices.
func (*BatchError) Error ¶ added in v0.33.1
func (e *BatchError) Error() string
func (*BatchError) Unwrap ¶ added in v0.33.1
func (e *BatchError) Unwrap() error
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) ClearAll ¶ added in v0.33.10
ClearAll deletes every hover:* key the broker writes. Does NOT FLUSHDB — safe on shared Redis.
func (*Client) ReclaimTerminalJobKeys ¶ added in v0.33.10
func (c *Client) ReclaimTerminalJobKeys(ctx context.Context, filter TerminalFilter) (ReclaimReport, error)
One-off backfill sweeper. Idempotent.
func (*Client) RemoveJobKeys ¶ added in v0.33.10
RemoveJobKeys clears all per-job broker state for a terminal job. XGroupDestroy errors are tolerated so a partially-cleaned or lighthouse-less job doesn't abort the rest of the cleanup.
func (*Client) SweepOrphanInflight ¶ added in v0.33.12
SweepOrphanInflight removes dom:flight fields for jobs absent from activeJobIDs. Drift source: SIGKILL bypasses the graceful drain so dispatcher increments without a matching pacer.Release decrement. dom:flight has no dedicated reconciler.
type ConcurrencyChecker ¶
type Config ¶
type Config struct {
URL string
PoolSize int
TLSEnabled bool
ReadTimeout time.Duration
WriteTimeout time.Duration
MaxRetries int
}
func ConfigFromEnv ¶
func ConfigFromEnv() Config
ConfigFromEnv infers TLS from the URL scheme (rediss://) unless REDIS_TLS_ENABLED overrides.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reads via XREADGROUP and reclaims stale messages via XAUTOCLAIM.
func NewConsumer ¶
func NewConsumer(client *Client, opts ConsumerOpts) *Consumer
func (*Consumer) PendingCount ¶ added in v0.33.1
PendingCount returns the PEL size — the authoritative source of "currently running" for a job. RunningCounters mirrors this and can drift under partial failures.
func (*Consumer) Read ¶
Read blocks up to opts.BlockTimeout. Returns nil (not error) when no messages are ready.
func (*Consumer) ReadNonBlocking ¶
ReadNonBlocking returns immediately when no messages are ready.
func (*Consumer) ReclaimStale ¶
func (c *Consumer) ReclaimStale(ctx context.Context, jobID string) (reclaimed []StreamMessage, deadLetter []StreamMessage, err error)
ReclaimStale walks the XAUTOCLAIM cursor until "0-0" so a burst of stuck messages drains in one tick. Messages over MaxDeliveries are returned as deadLetter candidates — caller owns final disposition and must ACK/NACK or they'll be reclaimed again next sweep.
type ConsumerOpts ¶
type ConsumerOpts struct {
// ConsumerName: "worker-{machineID}-{goroutineID}".
ConsumerName string
BlockTimeout time.Duration
Count int64
ClaimInterval time.Duration
MinIdleTime time.Duration
MaxDeliveries int64
// AutoclaimCount is the per-call XAUTOCLAIM COUNT.
AutoclaimCount int64
// AutoclaimMaxPerSweep caps total reclaimed per sweep so one
// pathological job can't starve the rest of the reclaim loop.
AutoclaimMaxPerSweep int
}
func DefaultConsumerOpts ¶
func DefaultConsumerOpts(consumerName string) ConsumerOpts
type DBSyncFunc ¶
func DefaultDBSyncFunc ¶
func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc
No outer tx: wrapping per-job UPDATEs together held row locks that deadlocked with the update_job_queue_counters AFTER trigger and saturated the bulk DB pool. Skew metric loses tx-snapshot consistency but Redis/PG already drift between ticks.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher moves due items from per-job Redis ZSETs into per-job Redis Streams.
func NewDispatcher ¶
func NewDispatcher( client *Client, scheduler *Scheduler, pacer *DomainPacer, counters *RunningCounters, jobLister JobLister, concCheck ConcurrencyChecker, opts DispatcherOpts, ) *Dispatcher
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run(ctx context.Context)
Run blocks until ctx is cancelled. Start as a goroutine.
func (*Dispatcher) SetOnFirstDispatch ¶ added in v0.33.12
func (d *Dispatcher) SetOnFirstDispatch(fn func(ctx context.Context, jobID string) error)
SetOnFirstDispatch installs an idempotent hook fired the first time dispatchJob publishes a task for a jobID. A non-nil return triggers retry on the next dispatch.
func (*Dispatcher) SetReconciler ¶ added in v0.33.12
func (d *Dispatcher) SetReconciler(r Reconciler)
SetReconciler installs the self-heal target. Nil disables self-heal and is tolerated throughout the hot path.
type DispatcherOpts ¶
type DispatcherOpts struct {
ScanInterval time.Duration
BatchSize int64
// ParallelJobs caps per-tick dispatch goroutines. Serial dispatch
// scaled O(N_jobs × batch) and produced a ~70× backlog under 100
// jobs. Default 32; override via REDIS_DISPATCH_PARALLEL_JOBS.
ParallelJobs int
// StuckThreshold gates the self-heal reconcile. Default 30s, env
// REDIS_DISPATCH_STUCK_THRESHOLD_S; rate-limited to one trigger
// per 2× threshold per job.
StuckThreshold time.Duration
}
DispatcherOpts controls the dispatcher's scan behaviour.
func DefaultDispatcherOpts ¶
func DefaultDispatcherOpts() DispatcherOpts
type DomainPacer ¶
type DomainPacer struct {
// contains filtered or unexported fields
}
func NewDomainPacer ¶
func NewDomainPacer(client *Client, cfg PacerConfig) *DomainPacer
func (*DomainPacer) DecrementInflight ¶
func (p *DomainPacer) DecrementInflight(ctx context.Context, domain, jobID string) error
func (*DomainPacer) FlushAdaptiveDelays ¶ added in v0.33.1
func (p *DomainPacer) FlushAdaptiveDelays(ctx context.Context) (int, error)
Restores pre-merge behaviour: in-memory limiter reset on each worker restart, but the Redis-backed state has a 24h TTL so a single 429 spike can pin a domain at the 60s floor for a full day. Call on worker startup to wipe the slate.
func (*DomainPacer) GetInflight ¶
func (*DomainPacer) IncrementInflight ¶
func (p *DomainPacer) IncrementInflight(ctx context.Context, domain, jobID string) error
func (*DomainPacer) Seed ¶
func (p *DomainPacer) Seed(ctx context.Context, domain string, baseDelayMS, adaptiveDelayMS, floorMS int) error
HSETNX preserves existing values, so callers may re-seed safely.
func (*DomainPacer) TryAcquire ¶
func (p *DomainPacer) TryAcquire(ctx context.Context, domain string) (PaceResult, error)
Single Lua EVALSHA — the prior three-call form (HMGET → SET NX PX → PTTL) was the dominant dispatcher round-trip cost under multi-job loads.
type OutboxSweeperOpts ¶
type OutboxSweeperOpts struct {
Interval time.Duration
BatchSize int
BaseBackoff time.Duration
MaxBackoff time.Duration
MaxAttempts int
// StatementTimeout bounds tick DB work; guards against a wedged
// sweeper tx holding locks indefinitely. 0 keeps DB default.
StatementTimeout time.Duration
}
func DefaultOutboxSweeperOpts ¶
func DefaultOutboxSweeperOpts() OutboxSweeperOpts
DefaultOutboxSweeperOpts: 500ms interval (5s starved end-to-end latency on small jobs); 15s StatementTimeout (HOVER-K3 — pool acquire ate several seconds of tick budget under bulk-lane load).
type PaceResult ¶
type PacerConfig ¶
type PacerConfig struct {
SuccessThreshold int
DelayStepMS int
// Defaults to DelayStepMS. Higher = faster recovery than growth, so
// a 429 spike doesn't throttle a domain for 20 minutes.
DelayStepDownMS int
MaxDelayMS int
// Floor on RetryAfter so a near-zero gate TTL doesn't tight-loop
// the dispatcher (Dispatcher tick is 100ms).
MinPushbackMS int
}
func DefaultPacerConfig ¶
func DefaultPacerConfig() PacerConfig
type Probe ¶
type Probe struct {
// contains filtered or unexported fields
}
type ProbeOpts ¶
type ProbeOpts struct {
Interval time.Duration
// Bounds a single tick so a slow Redis/DB call can't stall the loop.
TickTimeout time.Duration
}
func DefaultProbeOpts ¶
func DefaultProbeOpts() ProbeOpts
type ReclaimReport ¶ added in v0.33.10
type Reconciler ¶ added in v0.33.12
Reconciler is the dispatcher's self-heal target when CanDispatch keeps refusing dispatch despite due ZSET work — the signature of `hover:running` counter drift. Implementations must be safe for concurrent invocation and should debounce a flood of triggers to at most one in-flight reconcile.
type RunningCounters ¶
type RunningCounters struct {
// contains filtered or unexported fields
}
func NewRunningCounters ¶
func NewRunningCounters(client *Client) *RunningCounters
func (*RunningCounters) RemoveJob ¶
func (rc *RunningCounters) RemoveJob(ctx context.Context, jobID string) error
func (*RunningCounters) StartDBSync ¶
func (rc *RunningCounters) StartDBSync(ctx context.Context, interval time.Duration, syncFn DBSyncFunc)
type ScheduleEntry ¶
type ScheduleEntry struct {
TaskID string
JobID string
PageID int
Host string
Path string
Priority float64
RetryCount int
SourceType string
SourceURL string
RunAt time.Time
TaskType string
LighthouseRunID int64
}
ScheduleEntry is encoded as a pipe-delimited ZSET member with the run-at unix-ms as the score. TaskType routes to a stream: "crawl" → StreamKey, "lighthouse" → LighthouseStreamKey.
func ParseScheduleEntry ¶
func ParseScheduleEntry(member string, score float64) (ScheduleEntry, error)
ParseScheduleEntry accepts both the 9-field legacy format and the 11-field current format so a rolling deploy drains without a flush.
func (ScheduleEntry) Member ¶
func (e ScheduleEntry) Member() string
func (ScheduleEntry) Score ¶
func (e ScheduleEntry) Score() float64
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages delayed scheduling via Redis sorted sets. NewSchedulerWithDB enables dual-write of Reschedule's run-at into tasks.run_at so pacing push-backs survive a Redis flush.
func NewScheduler ¶
NewScheduler creates a Scheduler without Postgres mirroring.
func (*Scheduler) DueItems ¶
func (s *Scheduler) DueItems(ctx context.Context, jobID string, now time.Time, limit int64) ([]ScheduleEntry, error)
DueItems returns up to limit entries with score ≤ now. Items are not removed — caller ZREMs after successful dispatch.
func (*Scheduler) PendingCount ¶
func (*Scheduler) RemoveJobSchedule ¶
func (*Scheduler) Reschedule ¶
Reschedule pushes an existing entry's run-at later. With *sql.DB configured it dual-writes Postgres first so a crash between writes leaves the durable store with the newer time.
TODO(run-at-reconcile): once tasks have a dedicated 'scheduled' status, add a startup sweep that re-seeds ZSET from tasks.run_at for scheduled rows missing a ZSET member.
func (*Scheduler) RescheduleZSet ¶ added in v0.33.1
func (s *Scheduler) RescheduleZSet(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error
RescheduleZSet is Reschedule without the Postgres mirror, for the hot pacer-pushback path. The synchronous UPDATE backed up the ZSET to 80k+ entries at 100 paced ops/sec on 2026-04-22. Safe because OutboxSweeper rehydrates from tasks.run_at if Redis loses state.
func (*Scheduler) Schedule ¶
func (s *Scheduler) Schedule(ctx context.Context, entry ScheduleEntry) error
func (*Scheduler) ScheduleAndAck ¶
func (s *Scheduler) ScheduleAndAck(ctx context.Context, entry ScheduleEntry, ackJobID, messageID string) error
ScheduleAndAck atomically enqueues the retry and ACKs the original in one MULTI/EXEC. Two-step would let XAUTOCLAIM redeliver a stuck PEL entry and double-crawl if Ack failed after Schedule succeeded.
func (*Scheduler) ScheduleBatch ¶
func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) error
ScheduleBatch returns *BatchError when the pipeline ran but individual ZADDs failed; a non-BatchError means the pipeline itself failed and all entries must be treated as failed.
type StreamMessage ¶
type StreamMessage struct {
MessageID string
TaskID string
JobID string
PageID int
Host string
Path string
Priority float64
RetryCount int
SourceType string
SourceURL string
}
StreamMessage is a parsed task envelope read from a Redis Stream.
type Sweeper ¶
type Sweeper struct {
// contains filtered or unexported fields
}
Sweeper drains task_outbox into Redis via Scheduler.ScheduleBatch. Multi-replica safe: FOR UPDATE SKIP LOCKED partitions due rows across sweepers.
func NewOutboxSweeper ¶
func NewOutboxSweeper(db *sql.DB, scheduler *Scheduler, opts OutboxSweeperOpts) *Sweeper