Documentation
¶
Overview ¶
Package broker provides Redis-backed task scheduling, dispatch, and coordination primitives for the hover crawl execution pipeline.
Index ¶
- Constants
- func ConsumerGroup(jobID string) string
- func DomainConfigKey(domain string) string
- func DomainGateKey(domain string) string
- func DomainInflightKey(domain string) string
- func FormatScheduleEntry(taskID, jobID string, pageID int, host, path string, priority float64, ...) string
- func ScheduleKey(jobID string) string
- func StreamKey(jobID string) string
- type BatchError
- type Client
- type ConcurrencyChecker
- type Config
- type Consumer
- func (c *Consumer) Ack(ctx context.Context, jobID string, messageIDs ...string) error
- func (c *Consumer) PendingCount(ctx context.Context, jobID string) (int64, error)
- func (c *Consumer) Read(ctx context.Context, jobID string) ([]StreamMessage, error)
- func (c *Consumer) ReadNonBlocking(ctx context.Context, jobID string) ([]StreamMessage, error)
- func (c *Consumer) ReclaimStale(ctx context.Context, jobID string) (reclaimed []StreamMessage, deadLetter []StreamMessage, err error)
- type ConsumerOpts
- type DBSyncFunc
- type Dispatcher
- type DispatcherOpts
- type DomainPacer
- func (p *DomainPacer) DecrementInflight(ctx context.Context, domain, jobID string) error
- func (p *DomainPacer) FlushAdaptiveDelays(ctx context.Context) (int, error)
- func (p *DomainPacer) GetInflight(ctx context.Context, domain, jobID string) (int64, error)
- func (p *DomainPacer) IncrementInflight(ctx context.Context, domain, jobID string) error
- func (p *DomainPacer) Release(ctx context.Context, domain, jobID string, success, rateLimited bool) error
- func (p *DomainPacer) Seed(ctx context.Context, domain string, baseDelayMS, adaptiveDelayMS, floorMS int) error
- func (p *DomainPacer) TryAcquire(ctx context.Context, domain string) (PaceResult, error)
- type JobLister
- type OutboxSweeperOpts
- type PaceResult
- type PacerConfig
- type Probe
- type ProbeOpts
- type RunningCounters
- func (rc *RunningCounters) Decrement(ctx context.Context, jobID string) (int64, error)
- func (rc *RunningCounters) Get(ctx context.Context, jobID string) (int64, error)
- func (rc *RunningCounters) GetAll(ctx context.Context) (map[string]int64, error)
- func (rc *RunningCounters) Increment(ctx context.Context, jobID string) (int64, error)
- func (rc *RunningCounters) Reconcile(ctx context.Context, counts map[string]int64) error
- func (rc *RunningCounters) RemoveJob(ctx context.Context, jobID string) error
- func (rc *RunningCounters) StartDBSync(ctx context.Context, interval time.Duration, syncFn DBSyncFunc)
- type ScheduleEntry
- type Scheduler
- func (s *Scheduler) DueItems(ctx context.Context, jobID string, now time.Time, limit int64) ([]ScheduleEntry, error)
- func (s *Scheduler) PendingCount(ctx context.Context, jobID string) (int64, error)
- func (s *Scheduler) Remove(ctx context.Context, jobID, member string) error
- func (s *Scheduler) RemoveJobSchedule(ctx context.Context, jobID string) error
- func (s *Scheduler) Reschedule(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error
- func (s *Scheduler) RescheduleZSet(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error
- func (s *Scheduler) Schedule(ctx context.Context, entry ScheduleEntry) error
- func (s *Scheduler) ScheduleAndAck(ctx context.Context, entry ScheduleEntry, ackJobID, messageID string) error
- func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) error
- type StreamMessage
- type Sweeper
Constants ¶
const DefaultOutboxMaxAttempts = 10
DefaultOutboxMaxAttempts is the retry cap before a row is dead-lettered. Chosen so the worst-case age of a row stuck in backoff is bounded by MaxAttempts × MaxBackoff — at the defaults, 10 × 5 min = 50 min, which caps the oldest-age gauge even if a subset of rows can never be dispatched.
const RunningCountersKey = keyPrefix + "running"
RunningCountersKey is a single hash whose fields are job IDs and values are the number of tasks currently in-flight.
Variables ¶
This section is empty.
Functions ¶
func ConsumerGroup ¶
ConsumerGroup returns the consumer group name for a job stream.
func DomainConfigKey ¶
DomainConfigKey is a hash storing adaptive delay state: base_delay_ms, adaptive_delay_ms, floor_ms, success_streak, error_streak.
func DomainGateKey ¶
DomainGateKey is a short-lived string used as a time gate. SET NX PX {delay_ms} prevents dispatching to the same domain faster than its configured rate.
func DomainInflightKey ¶
DomainInflightKey is a hash whose fields are job IDs and values are the number of inflight tasks for that domain+job pair.
func FormatScheduleEntry ¶
func FormatScheduleEntry(taskID, jobID string, pageID int, host, path string, priority float64, retryCount int, sourceType, sourceURL string) string
ScheduleEntry is the member value stored inside the schedule ZSET. It is serialised as a compact pipe-delimited string to avoid JSON overhead on the hot scheduling path.
Format: taskID|jobID|pageID|host|path|priority|retryCount|sourceType|sourceURL
func ScheduleKey ¶
Schedule keys — sorted sets keyed by job, scored by earliest runnable unix-millisecond timestamp.
Types ¶
type BatchError ¶ added in v0.33.1
BatchError is returned by ScheduleBatch when some (but not all) entries in the pipeline failed. FailedIndices lists the indices within the input slice whose ZADD returned an error; the remaining entries were scheduled successfully. Err is the first per-entry error encountered, for logging.
Callers that need to retry only the failures can type-assert via errors.As and use FailedIndices to partition the batch. Callers that treat any failure as fatal can just check err != nil; the error message includes the failure count.
func (*BatchError) Error ¶ added in v0.33.1
func (e *BatchError) Error() string
func (*BatchError) Unwrap ¶ added in v0.33.1
func (e *BatchError) Unwrap() error
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps a go-redis client with convenience helpers.
type ConcurrencyChecker ¶
type ConcurrencyChecker interface {
// CanDispatch returns true if the job has room for another task.
CanDispatch(ctx context.Context, jobID string) (bool, error)
}
ConcurrencyChecker determines whether a job has capacity for more in-flight tasks.
type Config ¶
type Config struct {
URL string
PoolSize int
TLSEnabled bool
ReadTimeout time.Duration
WriteTimeout time.Duration
MaxRetries int
}
Config holds Redis connection parameters, typically loaded from environment variables.
func ConfigFromEnv ¶
func ConfigFromEnv() Config
ConfigFromEnv builds a Config from the process environment. TLS is inferred from the URL scheme (rediss:// = TLS) unless REDIS_TLS_ENABLED is explicitly set.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reads from one or more job streams via XREADGROUP and reclaims stale messages via XAUTOCLAIM.
func NewConsumer ¶
func NewConsumer(client *Client, opts ConsumerOpts) *Consumer
NewConsumer creates a Consumer.
func (*Consumer) Ack ¶
Ack acknowledges one or more messages, removing them from the pending entries list (PEL).
func (*Consumer) PendingCount ¶ added in v0.33.1
PendingCount returns the number of messages in the pending entries list (PEL) for a job's consumer group — i.e. tasks that have been delivered to a worker but not yet ACKed. This is the authoritative source of "currently running" for a given job; the RunningCounters HASH in Redis is a fast-path mirror that can drift under partial failures. Returns 0 when the stream or group does not yet exist.
func (*Consumer) Read ¶
Read fetches new messages from the given job's stream. It blocks for up to opts.BlockTimeout if no messages are available. Returns nil (not error) when no messages are ready.
func (*Consumer) ReadNonBlocking ¶
ReadNonBlocking is like Read but returns immediately if no messages are available. Useful for round-robin scanning across multiple jobs.
func (*Consumer) ReclaimStale ¶
func (c *Consumer) ReclaimStale(ctx context.Context, jobID string) (reclaimed []StreamMessage, deadLetter []StreamMessage, err error)
ReclaimStale uses XAUTOCLAIM to take ownership of messages that have been pending longer than MinIdleTime. Returns the reclaimed messages. Messages that have been delivered more than MaxDeliveries times are returned separately as dead-letter candidates.
A single call sweeps the full PEL by following the XAUTOCLAIM cursor until it returns to "0-0", so a burst of stuck messages drains in one tick rather than one-batch-per-30s. Per-call safety caps keep any single sweep bounded when the PEL is pathologically large.
Note: ReclaimStale does NOT ACK messages in the returned deadLetter slice. The caller owns final disposition and must ACK or NACK each dead-letter message explicitly — otherwise the same messages will be reclaimed again on the next XAUTOCLAIM sweep.
type ConsumerOpts ¶
type ConsumerOpts struct {
// ConsumerName uniquely identifies this consumer within the group.
// Typically "worker-{machineID}-{goroutineID}".
ConsumerName string
// BlockTimeout is the XREADGROUP BLOCK duration. Default 2s.
BlockTimeout time.Duration
// Count is the max messages per XREADGROUP call. Default 1.
Count int64
// ClaimInterval is how often the XAUTOCLAIM sweep runs. Default 30s.
ClaimInterval time.Duration
// MinIdleTime is the XAUTOCLAIM min-idle-time. Messages pending
// longer than this are reclaimed. Default 3min (TaskStaleTimeout).
MinIdleTime time.Duration
// MaxDeliveries is the maximum number of times a message can be
// delivered before it is treated as a permanent failure. Default 3.
MaxDeliveries int64
// AutoclaimCount is the per-call XAUTOCLAIM COUNT used by ReclaimStale.
// Default 100. Override via REDIS_AUTOCLAIM_COUNT.
AutoclaimCount int64
// AutoclaimMaxPerSweep is the safety cap on messages reclaimed per
// ReclaimStale invocation across the cursor loop, so one pathological
// job cannot starve the other jobs the reclaim loop still has to scan.
// Default 1000. Override via REDIS_AUTOCLAIM_MAX_PER_SWEEP.
AutoclaimMaxPerSweep int
}
ConsumerOpts controls stream reading behaviour.
func DefaultConsumerOpts ¶
func DefaultConsumerOpts(consumerName string) ConsumerOpts
DefaultConsumerOpts returns production defaults.
type DBSyncFunc ¶
DBSyncFunc is called periodically to write running counters back to Postgres for API visibility. The function receives a map of jobID -> count.
func DefaultDBSyncFunc ¶
func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc
DefaultDBSyncFunc returns a DBSyncFunc that updates the jobs table running_tasks column using the provided *sql.DB.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher is a long-running goroutine that moves due items from per-job Redis ZSETs into per-job Redis Streams.
func NewDispatcher ¶
func NewDispatcher( client *Client, scheduler *Scheduler, pacer *DomainPacer, counters *RunningCounters, jobLister JobLister, concCheck ConcurrencyChecker, opts DispatcherOpts, ) *Dispatcher
NewDispatcher creates a Dispatcher.
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run(ctx context.Context)
Run is the dispatcher's main loop. It blocks until ctx is cancelled. Start it as a goroutine.
type DispatcherOpts ¶
type DispatcherOpts struct {
// ScanInterval is how often the dispatcher sweeps all active job
// ZSETs for due items. Default 100ms.
ScanInterval time.Duration
// BatchSize is the maximum number of ZSET entries fetched per job
// per scan tick. Default 50.
BatchSize int64
// ParallelJobs caps how many per-job dispatch goroutines run
// concurrently inside a single tick. Pre-change the tick processed
// jobs serially, which made the per-task Redis round-trip cost scale
// O(N_jobs × batch). Under a 100-job workload that serialised the
// dispatcher into a ~70× backlog. Default 32. Override via
// REDIS_DISPATCH_PARALLEL_JOBS.
ParallelJobs int
}
DispatcherOpts controls the dispatcher's scan behaviour.
func DefaultDispatcherOpts ¶
func DefaultDispatcherOpts() DispatcherOpts
DefaultDispatcherOpts returns production defaults, optionally overridden by environment variables.
type DomainPacer ¶
type DomainPacer struct {
// contains filtered or unexported fields
}
DomainPacer coordinates per-domain request rate across all worker instances using Redis-backed time gates and adaptive delay state.
func NewDomainPacer ¶
func NewDomainPacer(client *Client, cfg PacerConfig) *DomainPacer
NewDomainPacer creates a DomainPacer.
func (*DomainPacer) DecrementInflight ¶
func (p *DomainPacer) DecrementInflight(ctx context.Context, domain, jobID string) error
DecrementInflight reduces the per-domain per-job inflight counter.
func (*DomainPacer) FlushAdaptiveDelays ¶ added in v0.33.1
func (p *DomainPacer) FlushAdaptiveDelays(ctx context.Context) (int, error)
FlushAdaptiveDelays removes all accumulated per-domain adaptive delay state (hover:dom:cfg:* keys). Pre-merge the DomainLimiter was in-memory and reset on every worker restart — so a bad afternoon of 429s from a flaky target could never throttle crawls for longer than the worker's lifetime. Post-merge the state lives in Redis with a 24h TTL, which means a single spike can keep a domain at the 60s adaptive floor for a full day.
Call this on worker startup to restore the pre-merge behaviour: the pacer still grows delay on 429 during this run, but the slate is clean on each deploy. Returns the number of keys deleted.
func (*DomainPacer) GetInflight ¶
GetInflight returns the current inflight count for a domain+job.
func (*DomainPacer) IncrementInflight ¶
func (p *DomainPacer) IncrementInflight(ctx context.Context, domain, jobID string) error
IncrementInflight bumps the per-domain per-job inflight counter.
func (*DomainPacer) Release ¶
func (p *DomainPacer) Release(ctx context.Context, domain, jobID string, success, rateLimited bool) error
Release is called after a crawl completes. It updates the adaptive delay state based on the outcome.
func (*DomainPacer) Seed ¶
func (p *DomainPacer) Seed(ctx context.Context, domain string, baseDelayMS, adaptiveDelayMS, floorMS int) error
Seed initialises the domain config hash with base delay values (typically from robots.txt and Postgres domain record). Safe to call multiple times — uses HSETNX so existing values are preserved.
func (*DomainPacer) TryAcquire ¶
func (p *DomainPacer) TryAcquire(ctx context.Context, domain string) (PaceResult, error)
TryAcquire attempts to set the domain time-gate. If the gate is already held (domain was recently accessed), it returns the remaining wait time. This is non-blocking.
Implemented as a single Lua EVALSHA so the effective-delay read and SET NX PX / PTTL fallback all execute server-side. The earlier three-call version (HMGET → SET NX PX → PTTL) was the dominant dispatcher round-trip cost under multi-job workloads.
type JobLister ¶
JobLister returns the set of active job IDs the dispatcher should scan. Implementations typically query Postgres.
type OutboxSweeperOpts ¶
type OutboxSweeperOpts struct {
// Interval between sweep ticks. Default: 500ms.
Interval time.Duration
// BatchSize caps how many rows are claimed per tick. Default: 200.
BatchSize int
// BaseBackoff is the first retry delay on ScheduleBatch failure.
// Default: 2s. Each subsequent attempt doubles up to MaxBackoff.
BaseBackoff time.Duration
// MaxBackoff caps the retry delay. Default: 5 minutes.
MaxBackoff time.Duration
// MaxAttempts is the retry cap before a row is moved to
// task_outbox_dead. Default: 10.
MaxAttempts int
// StatementTimeout bounds each sweep tick's total DB work. Guards
// against a pathological sweeper tx holding locks indefinitely. 0
// leaves the DB's default in place. Default: 5s.
StatementTimeout time.Duration
}
OutboxSweeperOpts configures a Sweeper.
func DefaultOutboxSweeperOpts ¶
func DefaultOutboxSweeperOpts() OutboxSweeperOpts
DefaultOutboxSweeperOpts returns sensible production defaults.
The sweep interval was lowered from 5s to 500ms because the outbox sits on the hot path between newly-authored tasks and the Redis ZSET that dispatchers poll. At 5s, each just-completed task waited up to 5s for its newly-discovered siblings to reach a worker, which dominated end-to-end throughput on small jobs. The sweep is an index-only SKIP LOCKED query; running it 10× more often is cheap.
type PaceResult ¶
type PaceResult struct {
// Acquired is true if the domain time-gate was successfully set.
Acquired bool
// RetryAfter is how long the caller should wait before retrying.
// Only meaningful when Acquired is false.
RetryAfter time.Duration
}
PaceResult is returned by TryAcquire.
type PacerConfig ¶
type PacerConfig struct {
// SuccessThreshold is the number of consecutive successes before
// the adaptive delay decreases. Default 5. Override via
// GNH_RATE_LIMIT_SUCCESS_THRESHOLD. Lower values recover faster
// after a transient rate-limit event.
SuccessThreshold int
// DelayStepMS is the amount (milliseconds) the adaptive delay
// GROWS on each rate-limited release. Default 500.
DelayStepMS int
// DelayStepDownMS is the amount (milliseconds) the adaptive delay
// SHRINKS on each success once SuccessThreshold is reached. Default
// falls back to DelayStepMS (symmetric growth/recovery). Setting
// this higher than DelayStepMS makes recovery faster than the
// growth on rate-limit, which is usually the right default — a
// single spike of 429s shouldn't throttle a domain for 20 minutes.
DelayStepDownMS int
// MaxDelayMS caps the adaptive delay. Default 60_000 (60s).
MaxDelayMS int
// MinPushbackMS is the floor applied to PaceResult.RetryAfter when
// the gate is already held. Prevents the dispatcher from tight-looping
// through rate-limited tasks whose gate TTL is near-zero — without
// this floor a domain with a 1ms residual delay would be re-fetched
// every Dispatcher tick (100ms). Named after the pre-merge constant
// domainDelayPause (internal/jobs/worker.go:147, 100ms default).
MinPushbackMS int
}
PacerConfig holds the tuning knobs for domain pacing, mirroring the constants from the current in-memory DomainLimiter.
func DefaultPacerConfig ¶
func DefaultPacerConfig() PacerConfig
DefaultPacerConfig returns production defaults.
type Probe ¶
type Probe struct {
// contains filtered or unexported fields
}
Probe periodically scrapes Tier 1 gauges that have no natural emission site (stream length, ZSET depth, pending count, outbox backlog + age, Redis PING, pool stats) and feeds them to the observability package. Intended to run once per process.
type ProbeOpts ¶
type ProbeOpts struct {
// Interval between probe ticks. Default 5s.
Interval time.Duration
// TickTimeout bounds a single tick so a slow Redis or DB call
// cannot stall the whole probe loop. Default 3s.
TickTimeout time.Duration
}
ProbeOpts configures a broker Probe.
func DefaultProbeOpts ¶
func DefaultProbeOpts() ProbeOpts
DefaultProbeOpts returns production defaults.
type RunningCounters ¶
type RunningCounters struct {
// contains filtered or unexported fields
}
RunningCounters tracks how many tasks are currently in-flight per job using a single Redis HASH. This replaces the in-memory atomic counters and async DB flush loops in the old worker pool.
func NewRunningCounters ¶
func NewRunningCounters(client *Client) *RunningCounters
NewRunningCounters creates a RunningCounters.
func (*RunningCounters) Decrement ¶
Decrement atomically reduces the running count for a job. Returns the new count. Cleans up zero entries.
func (*RunningCounters) Increment ¶
Increment atomically bumps the running count for a job. Returns the new count.
func (*RunningCounters) Reconcile ¶
Reconcile sets the running counters from an authoritative source (typically a Postgres query on startup). This overwrites any stale state in Redis.
func (*RunningCounters) RemoveJob ¶
func (rc *RunningCounters) RemoveJob(ctx context.Context, jobID string) error
RemoveJob clears the running counter for a specific job (e.g. on job completion/cancellation).
func (*RunningCounters) StartDBSync ¶
func (rc *RunningCounters) StartDBSync(ctx context.Context, interval time.Duration, syncFn DBSyncFunc)
StartDBSync runs a background loop that periodically reads all running counters from Redis and calls syncFn to persist them to Postgres. Blocks until ctx is cancelled.
type ScheduleEntry ¶
type ScheduleEntry struct {
TaskID string
JobID string
PageID int
Host string
Path string
Priority float64
RetryCount int
SourceType string
SourceURL string
RunAt time.Time
}
ScheduleEntry contains the data needed to schedule a task into the Redis ZSET. The entry is stored as a pipe-delimited member string; the score is the earliest unix-millisecond timestamp at which the task may run.
func ParseScheduleEntry ¶
func ParseScheduleEntry(member string, score float64) (ScheduleEntry, error)
ParseScheduleEntry reconstructs a ScheduleEntry from its pipe-delimited ZSET member string plus the score.
func (ScheduleEntry) Member ¶
func (e ScheduleEntry) Member() string
Member returns the pipe-delimited string stored in the ZSET.
func (ScheduleEntry) Score ¶
func (e ScheduleEntry) Score() float64
Score returns the ZSET score (unix milliseconds).
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages delayed task scheduling via Redis sorted sets.
When constructed via NewSchedulerWithDB, Reschedule dual-writes the new run-at time to the tasks.run_at column so pacing push-backs survive a Redis flush. The db dependency is optional (nil in unit tests that don't exercise the durability path).
func NewScheduler ¶
NewScheduler creates a Scheduler without Postgres mirroring. Reschedule writes only to the Redis ZSET. Suitable for tests that don't need the durability path.
func NewSchedulerWithDB ¶
NewSchedulerWithDB creates a Scheduler that mirrors Reschedule run-at updates to the tasks.run_at column. This is the production constructor.
func (*Scheduler) DueItems ¶
func (s *Scheduler) DueItems(ctx context.Context, jobID string, now time.Time, limit int64) ([]ScheduleEntry, error)
DueItems returns up to limit entries whose score is <= now from the given job's ZSET. Items are returned but not removed — the caller is responsible for ZREM after successful dispatch.
func (*Scheduler) PendingCount ¶
PendingCount returns the total number of scheduled items for a job.
func (*Scheduler) RemoveJobSchedule ¶
RemoveJobSchedule deletes the entire ZSET for a job (used on job cancellation or completion).
func (*Scheduler) Reschedule ¶
Reschedule updates the score (run-at time) for an existing entry in the ZSET. Used when the dispatcher cannot dispatch yet (domain pacing, concurrency limit) and needs to push the item back.
When the Scheduler was constructed with a *sql.DB, the new run-at is also written to the tasks.run_at column. Postgres is written first so that if the process crashes or Redis is flushed between the two writes, the durable store holds the newer time; the next dispatch attempt will see the correct pacing window.
TODO(run-at-reconcile): once the task-lifecycle redesign lands (a dedicated 'scheduled' status flipped by the dispatcher on XADD), add a startup sweep that re-seeds the ZSET from tasks.run_at for status='scheduled' rows with no ZSET member. Blocked on the outbox PR.
func (*Scheduler) RescheduleZSet ¶ added in v0.33.1
func (s *Scheduler) RescheduleZSet(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error
RescheduleZSet is Reschedule without the Postgres mirror. Use this on the hot pacer push-back path where every dispatcher iteration would otherwise issue a synchronous UPDATE — at 100 paced ops/sec the per-op DB latency stalled the single dispatcher goroutine and the ZSET backed up to 80k+ entries on 2026-04-22.
Safety: pacer push-back is ephemeral (seconds, not minutes). If Redis loses the ZSET between a push-back and dispatch, OutboxSweeper rehydrates from tasks.run_at. The authoritative run_at in Postgres is written on initial enqueue; a missed push-back just means the task is re-attempted slightly sooner than its pacer gate allows — the next TryAcquire will push it back again. No task is lost.
func (*Scheduler) Schedule ¶
func (s *Scheduler) Schedule(ctx context.Context, entry ScheduleEntry) error
Schedule adds a single task to the job's ZSET.
func (*Scheduler) ScheduleAndAck ¶
func (s *Scheduler) ScheduleAndAck(ctx context.Context, entry ScheduleEntry, ackJobID, messageID string) error
ScheduleAndAck atomically enqueues a retry into the job's ZSET and acknowledges (removes) the original stream message in a single Redis MULTI/EXEC. This prevents the two-step race where Schedule succeeds but Ack fails — which would leave the retry queued while the original stays in the PEL, allowing XAUTOCLAIM to redeliver it and causing a duplicate crawl.
Redis MULTI/EXEC is atomic on a single server: either both operations apply or neither does. The caller receives a single error to act on.
func (*Scheduler) ScheduleBatch ¶
func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) error
ScheduleBatch adds multiple tasks to their respective job ZSETs using a pipeline for efficiency.
Returns nil on full success. Returns a *BatchError when the pipeline completed but individual ZADDs failed — callers can partition the batch via err.(*BatchError).FailedIndices. Returns a non-BatchError error when the pipeline itself could not execute (e.g. Redis unreachable), in which case callers must treat all entries as failed.
type StreamMessage ¶
type StreamMessage struct {
// MessageID is the Redis stream entry ID (e.g. "1234567890-0").
MessageID string
TaskID string
JobID string
PageID int
Host string
Path string
Priority float64
RetryCount int
SourceType string
SourceURL string
}
StreamMessage is a parsed task envelope read from a Redis Stream.
type Sweeper ¶
type Sweeper struct {
// contains filtered or unexported fields
}
Sweeper polls task_outbox for due rows and pushes them into Redis via Scheduler.ScheduleBatch. Deletes rows on success; bumps attempts and run_at on failure.
Safe to run multiple replicas: each claim tx uses FOR UPDATE SKIP LOCKED so replicas partition the due rows rather than contending.
func NewOutboxSweeper ¶
func NewOutboxSweeper(db *sql.DB, scheduler *Scheduler, opts OutboxSweeperOpts) *Sweeper
NewOutboxSweeper constructs a Sweeper.