Documentation
¶
Overview ¶
Package queue is part of the GoFastr framework. See https://github.com/DonaldMurillo/gofastr for documentation.
Package queue provides a pluggable job queue with in-memory and Redis backends, a goroutine pool for concurrent processing, and scheduled job support.
Index ¶
- Variables
- type Browsable
- type DBQueue
- func (q *DBQueue) Ack(ctx context.Context, jobID string) error
- func (q *DBQueue) Close() error
- func (q *DBQueue) Dequeue(ctx context.Context, types ...string) (Job, error)
- func (q *DBQueue) Enqueue(ctx context.Context, job Job) error
- func (q *DBQueue) ListJobs(ctx context.Context, status string, limit int) ([]Job, error)
- func (q *DBQueue) Nack(ctx context.Context, jobID string) error
- func (q *DBQueue) RegisterHandler(jobType string, h Handler)
- func (q *DBQueue) Replay(ctx context.Context, jobID string) error
- func (q *DBQueue) SetLeaseTimeout(d time.Duration)
- func (q *DBQueue) Start(ctx context.Context)
- func (q *DBQueue) Stats(ctx context.Context) (JobStats, error)
- type DBQueueOption
- type Handler
- type Job
- type JobStats
- type MemoryQueue
- func (q *MemoryQueue) Ack(_ context.Context, jobID string) error
- func (q *MemoryQueue) Close() error
- func (q *MemoryQueue) Dequeue(ctx context.Context, types ...string) (Job, error)
- func (q *MemoryQueue) Enqueue(ctx context.Context, job Job) (err error)
- func (q *MemoryQueue) ListJobs(_ context.Context, status string, limit int) ([]Job, error)
- func (q *MemoryQueue) Nack(ctx context.Context, jobID string) error
- func (q *MemoryQueue) RegisterHandler(jobType string, handler Handler)
- func (q *MemoryQueue) Replay(ctx context.Context, jobID string) error
- func (q *MemoryQueue) Start()
- func (q *MemoryQueue) Stats(_ context.Context) (JobStats, error)
- type Queue
- type RedisClient
- type RedisQueue
- func (q *RedisQueue) Ack(ctx context.Context, jobID string) error
- func (q *RedisQueue) Close() error
- func (q *RedisQueue) Dequeue(ctx context.Context, types ...string) (Job, error)
- func (q *RedisQueue) Enqueue(ctx context.Context, job Job) error
- func (q *RedisQueue) Nack(ctx context.Context, jobID string) error
- func (q *RedisQueue) Reclaim(ctx context.Context) (int, error)
- func (q *RedisQueue) Replay(ctx context.Context, jobID string) error
- func (q *RedisQueue) SetVisibilityTimeout(d time.Duration)
- type Replayable
- type ScheduleBuilder
- type ScheduledJob
- type Scheduler
Constants ¶
This section is empty.
Variables ¶
var ( ErrQueueClosed = errors.New("queue is closed") ErrNoJob = errors.New("no job available") )
Sentinel errors.
Functions ¶
This section is empty.
Types ¶
type Browsable ¶
type Browsable interface {
// ListJobs returns up to limit jobs in the given status; pass an
// empty status to return all jobs regardless of state. Jobs are
// ordered newest-first by created_at.
ListJobs(ctx context.Context, status string, limit int) ([]Job, error)
// Stats returns counts grouped by status. Cheap by design — admin
// dashboards may poll it.
Stats(ctx context.Context) (JobStats, error)
}
Browsable is the optional read-only inspection interface — implemented by DBQueue so admin tooling can list and aggregate jobs without guessing at the underlying schema. Memory and Redis queues may implement it later; admin code that depends on it should type-assert.
type DBQueue ¶
type DBQueue struct {
// contains filtered or unexported fields
}
DBQueue is a SQL-backed queue. Jobs persist in a single table; Dequeue claims a row atomically so multiple consumers can race safely.
Postgres uses FOR UPDATE SKIP LOCKED — the canonical pattern for queue fan-out without distributed locks. SQLite uses a SERIALIZABLE-friendly SELECT-then-UPDATE inside a tx; the table-level lock SQLite takes on BEGIN IMMEDIATE serialises writers naturally.
func NewDBQueue ¶
func NewDBQueue(db *sql.DB, opts ...DBQueueOption) (*DBQueue, error)
NewDBQueue constructs a DBQueue and ensures its backing table exists. Probes the dialect once via SELECT version(); falls back to SQLite. Panics if the table name contains unsafe characters.
func (*DBQueue) Dequeue ¶
Dequeue claims the highest-priority eligible job in a single atomic step. Returns ErrNoJob when nothing is ready (no pending row whose scheduled_at has passed).
func (*DBQueue) Enqueue ¶
Enqueue inserts a job. Fills in ID/CreatedAt/MaxAttempts/ScheduledAt defaults when zero-valued so callers can pass {Type, Payload} only.
func (*DBQueue) ListJobs ¶
ListJobs implements Browsable. Returns up to limit jobs in the supplied status, newest-first. Empty status returns all jobs regardless of state. limit <= 0 defaults to 100.
func (*DBQueue) Nack ¶
Nack returns a claimed job to the queue (status=pending) when it still has retry attempts left; otherwise marks it 'failed' for later inspection. When backoff is enabled (see WithBackoff), a requeued job's scheduled_at is pushed into the future so a flapping handler can't burn through every attempt in a tight loop.
func (*DBQueue) RegisterHandler ¶
RegisterHandler binds a job type to a handler. Safe to call concurrently with a running worker loop.
func (*DBQueue) Replay ¶
Replay implements Replayable: it resets a terminally-failed job to pending so a worker picks it up again — attempts cleared, scheduled immediately. The `AND status='failed'` clause makes it idempotent and safe: replaying an unknown, pending, running, or claimed job matches no row and is a no-op, so it can never double-run an in-flight job or resurrect a non-terminal one.
func (*DBQueue) SetLeaseTimeout ¶
SetLeaseTimeout adjusts the in-flight lease duration after construction. See WithLeaseTimeout for semantics. Safe to call concurrently with a running worker loop.
type DBQueueOption ¶
type DBQueueOption func(*DBQueue)
DBQueueOption configures DBQueue construction.
func WithBackoff ¶
func WithBackoff(base, max time.Duration) DBQueueOption
WithBackoff enables exponential retry backoff. On a Nack with retries remaining, scheduled_at is advanced by base*2^(attempts-1) — so the first retry waits ~base, the second ~2*base, and so on — capped at max. A non-positive base disables backoff (jobs retry immediately, the default). A non-positive max means uncapped. Mirrors the webhook battery's retry backoff so the two batteries behave consistently.
func WithLeaseTimeout ¶
func WithLeaseTimeout(d time.Duration) DBQueueOption
WithLeaseTimeout sets how long a claimed-but-unacked job may stay in-flight before it is considered abandoned (the worker crashed/was killed) and becomes eligible for re-dequeue. Defaults to 5 minutes.
func WithTable ¶
func WithTable(name string) DBQueueOption
WithTable overrides the default "queue_jobs" table name.
func WithWorkers ¶
func WithWorkers(n int) DBQueueOption
WithWorkers sets the number of background worker goroutines started by Start(). Defaults to 1 when not set.
type Job ¶
type Job struct {
ID string `json:"id"`
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
Priority int `json:"priority"`
Attempts int `json:"attempts"`
MaxAttempts int `json:"max_attempts"`
CreatedAt time.Time `json:"created_at"`
ScheduledAt time.Time `json:"scheduled_at"`
}
Job represents a unit of work enqueued for asynchronous processing.
type JobStats ¶
JobStats is a snapshot of job counts grouped by status. The keys are status names ("pending", "running", "failed", "dead").
type MemoryQueue ¶
type MemoryQueue struct {
// contains filtered or unexported fields
}
MemoryQueue is an in-memory queue backed by a goroutine pool.
func NewMemoryQueue ¶
func NewMemoryQueue(workers int) *MemoryQueue
NewMemoryQueue creates a new in-memory queue with the given number of workers. The internal job channel is buffered to 1024 jobs.
func (*MemoryQueue) Ack ¶
func (q *MemoryQueue) Ack(_ context.Context, jobID string) error
Ack confirms a manually-dequeued job is done, discarding any tracked in-flight copy. For jobs processed by the automatic worker pool it is a no-op (those are auto-acknowledged after successful handler execution).
func (*MemoryQueue) Close ¶
func (q *MemoryQueue) Close() error
Close drains pending jobs and waits for all workers to finish.
func (*MemoryQueue) Dequeue ¶
Dequeue retrieves the next job from the channel. This is useful for manual consumption without relying on the automatic worker pool.
func (*MemoryQueue) Enqueue ¶
func (q *MemoryQueue) Enqueue(ctx context.Context, job Job) (err error)
Enqueue adds a job to the buffered channel. If the job has no ID, one is generated. Uses recover to handle the race between Close() closing the channel and this method sending to it.
func (*MemoryQueue) ListJobs ¶
ListJobs implements Browsable for the in-memory backend. The only state it can enumerate is the retained dead-letter set, so it returns those jobs for status "failed" (or an empty/"all" status), newest-first, and nothing for any other status — pending/claimed jobs live transiently on an unscannable channel. limit <= 0 defaults to 100.
func (*MemoryQueue) Nack ¶
func (q *MemoryQueue) Nack(ctx context.Context, jobID string) error
Nack marks a manually-dequeued job as failed: it increments the attempt counter and re-enqueues the job when retries remain, otherwise drops it. The job must have been handed out by Dequeue (the in-flight set is consulted by ID). Jobs processed by the automatic worker pool retry internally and are never in the in-flight set; calling Nack for one is a harmless no-op.
func (*MemoryQueue) RegisterHandler ¶
func (q *MemoryQueue) RegisterHandler(jobType string, handler Handler)
RegisterHandler registers a handler function for a given job type.
func (*MemoryQueue) Replay ¶
func (q *MemoryQueue) Replay(ctx context.Context, jobID string) error
Replay implements Replayable: it moves a retained terminally-failed job back onto the pending set with Attempts reset to 0 so Dequeue returns it again. Idempotent and safe: replaying an unknown or non-failed id matches no retained job and is a no-op (nil), never a double-enqueue.
func (*MemoryQueue) Start ¶
func (q *MemoryQueue) Start()
Start launches the worker goroutines. Must be called before enqueuing jobs if you want automatic processing. Workers will call the registered handlers.
type Queue ¶
type Queue interface {
// Enqueue adds a job to the queue.
Enqueue(ctx context.Context, job Job) error
// Dequeue retrieves and removes the next available job, optionally filtered by type.
Dequeue(ctx context.Context, types ...string) (Job, error)
// Ack confirms successful processing of a job.
Ack(ctx context.Context, jobID string) error
// Nack marks a job as failed and triggers retry logic.
Nack(ctx context.Context, jobID string) error
// Close gracefully shuts down the queue, draining in-progress work.
Close() error
}
Queue is the interface that every queue backend must implement.
type RedisClient ¶
type RedisClient interface {
LPush(ctx context.Context, key string, values ...interface{}) error
RPop(ctx context.Context, key string) (string, error)
HSet(ctx context.Context, key string, values ...interface{}) error
HGet(ctx context.Context, key, field string) (string, error)
// HGetAll returns every field→value pair in the hash. Used by Reclaim to
// scan the processing set for expired in-flight jobs.
HGetAll(ctx context.Context, key string) (map[string]string, error)
HDel(ctx context.Context, key string, fields ...string) error
Del(ctx context.Context, keys ...string) error
// LRange returns the elements of the list at key in the inclusive range
// [start, stop]; negative indices count from the tail (-1 is the last
// element). Used by Replay to read the dead-letter list.
LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
// LRem removes up to count occurrences of value from the list at key and
// returns the number removed. Used by Replay to pull one entry off the
// dead-letter list.
LRem(ctx context.Context, key string, count int64, value interface{}) (int64, error)
}
RedisClient defines the minimal Redis operations needed by RedisQueue. This is an interface so callers can inject any Redis client (go-redis, redigo, etc.) without this package importing a specific driver.
type RedisQueue ¶
type RedisQueue struct {
// contains filtered or unexported fields
}
RedisQueue implements the Queue interface backed by Redis lists and hashes. It supports a visibility timeout for in-flight jobs and a dead letter queue for jobs that exceed MaxAttempts.
func NewRedisQueue ¶
func NewRedisQueue(client RedisClient, queueName string) *RedisQueue
NewRedisQueue creates a new Redis-backed queue.
func (*RedisQueue) Ack ¶
func (q *RedisQueue) Ack(ctx context.Context, jobID string) error
Ack removes a single job from the processing queue after successful handling.
func (*RedisQueue) Close ¶
func (q *RedisQueue) Close() error
Close is a no-op for RedisQueue — the caller manages the Redis connection.
func (*RedisQueue) Dequeue ¶
Dequeue pops a job from the Redis list and moves it to the processing queue. If types are specified, only jobs matching one of those types are returned; non-matching jobs are pushed back onto the list.
func (*RedisQueue) Enqueue ¶
func (q *RedisQueue) Enqueue(ctx context.Context, job Job) error
Enqueue pushes a job onto the Redis list, applying defaults for ID, CreatedAt, and MaxAttempts when not set.
func (*RedisQueue) Nack ¶
func (q *RedisQueue) Nack(ctx context.Context, jobID string) error
Nack handles a failed job. If retries remain, it re-enqueues the job; otherwise it moves it to the dead letter queue.
func (*RedisQueue) Reclaim ¶
func (q *RedisQueue) Reclaim(ctx context.Context) (int, error)
Reclaim scans the processing set for in-flight jobs whose visibility timeout has passed (the worker that claimed them crashed before Ack/Nack), re-enqueues them onto the main list, and removes the stale processing entry. Returns the number of jobs re-delivered. Call it periodically (e.g. from a background ticker) to make in-flight Redis work crash-safe.
func (*RedisQueue) Replay ¶
func (q *RedisQueue) Replay(ctx context.Context, jobID string) error
Replay implements Replayable: it pulls a terminally-failed job off the dead-letter list and re-enqueues it onto the main queue with its attempts counter reset, so it gets a full set of retries again. It is idempotent — replaying an unknown job ID is a no-op (returns nil), matching DBQueue.Replay.
The entry is LPush'd back onto the main queue first and only removed from the dead list on success, so a failure between the two ops leaves the job on the dead list (recoverable) rather than dropping it. A crash in that window can leave one copy on each list; the next Replay/Dequeue tolerates the duplicate.
func (*RedisQueue) SetVisibilityTimeout ¶
func (q *RedisQueue) SetVisibilityTimeout(d time.Duration)
SetVisibilityTimeout configures how long a job can be in-flight before it is considered abandoned and eligible for re-delivery.
type Replayable ¶
type Replayable interface {
// Replay resets a terminally-failed job back to pending so it is picked up
// again (attempts counter cleared, scheduled immediately). It MUST only
// touch terminal ('failed') rows and be idempotent: replaying an unknown,
// pending, or running job is a no-op, not an error or a double-run.
Replay(ctx context.Context, jobID string) error
}
Replayable is the optional capability for re-queuing a dead-lettered job — implemented by DBQueue (the durable backend). Admin tooling type-asserts for it and only offers a "replay" action when the backend supports it. Memory and Redis queues don't implement it yet (memory drops dead jobs; redis's dead-list isn't readable through RedisClient).
type ScheduleBuilder ¶
type ScheduleBuilder struct {
// contains filtered or unexported fields
}
ScheduleBuilder provides a fluent API for building scheduled jobs.
func (*ScheduleBuilder) Job ¶
func (b *ScheduleBuilder) Job(jobType string, payload any) *ScheduleBuilder
Job sets the job type and payload for the scheduled job.
func (*ScheduleBuilder) Register ¶
func (b *ScheduleBuilder) Register()
Register adds the scheduled job to the scheduler.
type ScheduledJob ¶
type ScheduledJob struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
Interval time.Duration `json:"interval"`
NextRun time.Time `json:"next_run"`
}
ScheduledJob defines a recurring job configuration.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler enqueues recurring jobs onto one or more Queue backends.
func NewScheduler ¶
NewScheduler creates a new Scheduler that dispatches to the given queues.