queue

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package queue is part of the GoFastr framework. See https://github.com/DonaldMurillo/gofastr for documentation.

Package queue provides a pluggable job queue with in-memory and Redis backends, a goroutine pool for concurrent processing, and scheduled job support.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueClosed = errors.New("queue is closed")
	ErrNoJob       = errors.New("no job available")
)

Sentinel errors.

Functions

This section is empty.

Types

type Browsable

type Browsable interface {
	// ListJobs returns up to limit jobs in the given status; pass an
	// empty status to return all jobs regardless of state. Jobs are
	// ordered newest-first by created_at.
	ListJobs(ctx context.Context, status string, limit int) ([]Job, error)
	// Stats returns counts grouped by status. Cheap by design — admin
	// dashboards may poll it.
	Stats(ctx context.Context) (JobStats, error)
}

Browsable is the optional read-only inspection interface — implemented by DBQueue so admin tooling can list and aggregate jobs without guessing at the underlying schema. Memory and Redis queues may implement it later; admin code that depends on it should type-assert.

type DBQueue

type DBQueue struct {
	// contains filtered or unexported fields
}

DBQueue is a SQL-backed queue. Jobs persist in a single table; Dequeue claims a row atomically so multiple consumers can race safely.

Postgres uses FOR UPDATE SKIP LOCKED — the canonical pattern for queue fan-out without distributed locks. SQLite uses a SERIALIZABLE-friendly SELECT-then-UPDATE inside a tx; the table-level lock SQLite takes on BEGIN IMMEDIATE serialises writers naturally.

func NewDBQueue

func NewDBQueue(db *sql.DB, opts ...DBQueueOption) (*DBQueue, error)

NewDBQueue constructs a DBQueue and ensures its backing table exists. Probes the dialect once via SELECT version(); falls back to SQLite. Panics if the table name contains unsafe characters.

func (*DBQueue) Ack

func (q *DBQueue) Ack(ctx context.Context, jobID string) error

Ack permanently removes the job — work is done, no replay needed.

func (*DBQueue) Close

func (q *DBQueue) Close() error

Close stops worker goroutines started by Start. Idempotent.

func (*DBQueue) Dequeue

func (q *DBQueue) Dequeue(ctx context.Context, types ...string) (Job, error)

Dequeue claims the highest-priority eligible job in a single atomic step. Returns ErrNoJob when nothing is ready (no pending row whose scheduled_at has passed).

func (*DBQueue) Enqueue

func (q *DBQueue) Enqueue(ctx context.Context, job Job) error

Enqueue inserts a job. Fills in ID/CreatedAt/MaxAttempts/ScheduledAt defaults when zero-valued so callers can pass {Type, Payload} only.

func (*DBQueue) ListJobs

func (q *DBQueue) ListJobs(ctx context.Context, status string, limit int) ([]Job, error)

ListJobs implements Browsable. Returns up to limit jobs in the supplied status, newest-first. Empty status returns all jobs regardless of state. limit <= 0 defaults to 100.

func (*DBQueue) Nack

func (q *DBQueue) Nack(ctx context.Context, jobID string) error

Nack returns a claimed job to the queue (status=pending) when it still has retry attempts left; otherwise marks it 'failed' for later inspection. When backoff is enabled (see WithBackoff), a requeued job's scheduled_at is pushed into the future so a flapping handler can't burn through every attempt in a tight loop.

func (*DBQueue) RegisterHandler

func (q *DBQueue) RegisterHandler(jobType string, h Handler)

RegisterHandler binds a job type to a handler. Safe to call concurrently with a running worker loop.

func (*DBQueue) Replay

func (q *DBQueue) Replay(ctx context.Context, jobID string) error

Replay implements Replayable: it resets a terminally-failed job to pending so a worker picks it up again — attempts cleared, scheduled immediately. The `AND status='failed'` clause makes it idempotent and safe: replaying an unknown, pending, running, or claimed job matches no row and is a no-op, so it can never double-run an in-flight job or resurrect a non-terminal one.

func (*DBQueue) SetLeaseTimeout

func (q *DBQueue) SetLeaseTimeout(d time.Duration)

SetLeaseTimeout adjusts the in-flight lease duration after construction. See WithLeaseTimeout for semantics. Safe to call concurrently with a running worker loop.

func (*DBQueue) Start

func (q *DBQueue) Start(ctx context.Context)

Start launches q.workers polling goroutines. Each loops Dequeue → handle → Ack/Nack until Close. A worker goroutine that dies for any reason (including a panic escaping the handler-recover guard) is respawned so the pool can never be permanently drained by a poison message.

func (*DBQueue) Stats

func (q *DBQueue) Stats(ctx context.Context) (JobStats, error)

Stats implements Browsable. Aggregates per-status counts over the whole table. Cheap: a single GROUP BY scan.

type DBQueueOption

type DBQueueOption func(*DBQueue)

DBQueueOption configures DBQueue construction.

func WithBackoff

func WithBackoff(base, max time.Duration) DBQueueOption

WithBackoff enables exponential retry backoff. On a Nack with retries remaining, scheduled_at is advanced by base*2^(attempts-1) — so the first retry waits ~base, the second ~2*base, and so on — capped at max. A non-positive base disables backoff (jobs retry immediately, the default). A non-positive max means uncapped. Mirrors the webhook battery's retry backoff so the two batteries behave consistently.

func WithLeaseTimeout

func WithLeaseTimeout(d time.Duration) DBQueueOption

WithLeaseTimeout sets how long a claimed-but-unacked job may stay in-flight before it is considered abandoned (the worker crashed/was killed) and becomes eligible for re-dequeue. Defaults to 5 minutes.

func WithTable

func WithTable(name string) DBQueueOption

WithTable overrides the default "queue_jobs" table name.

func WithWorkers

func WithWorkers(n int) DBQueueOption

WithWorkers sets the number of background worker goroutines started by Start(). Defaults to 1 when not set.

type Handler

type Handler func(ctx context.Context, job Job) error

Handler processes a job. Return a non-nil error to trigger a retry.

type Job

type Job struct {
	ID          string          `json:"id"`
	Type        string          `json:"type"`
	Payload     json.RawMessage `json:"payload"`
	Priority    int             `json:"priority"`
	Attempts    int             `json:"attempts"`
	MaxAttempts int             `json:"max_attempts"`
	CreatedAt   time.Time       `json:"created_at"`
	ScheduledAt time.Time       `json:"scheduled_at"`
}

Job represents a unit of work enqueued for asynchronous processing.

type JobStats

type JobStats map[string]int

JobStats is a snapshot of job counts grouped by status. The keys are status names ("pending", "running", "failed", "dead").

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

MemoryQueue is an in-memory queue backed by a goroutine pool.

func NewMemoryQueue

func NewMemoryQueue(workers int) *MemoryQueue

NewMemoryQueue creates a new in-memory queue with the given number of workers. The internal job channel is buffered to 1024 jobs.

func (*MemoryQueue) Ack

func (q *MemoryQueue) Ack(_ context.Context, jobID string) error

Ack confirms a manually-dequeued job is done, discarding any tracked in-flight copy. For jobs processed by the automatic worker pool it is a no-op (those are auto-acknowledged after successful handler execution).

func (*MemoryQueue) Close

func (q *MemoryQueue) Close() error

Close drains pending jobs and waits for all workers to finish.

func (*MemoryQueue) Dequeue

func (q *MemoryQueue) Dequeue(ctx context.Context, types ...string) (Job, error)

Dequeue retrieves the next job from the channel. This is useful for manual consumption without relying on the automatic worker pool.

func (*MemoryQueue) Enqueue

func (q *MemoryQueue) Enqueue(ctx context.Context, job Job) (err error)

Enqueue adds a job to the buffered channel. If the job has no ID, one is generated. Uses recover to handle the race between Close() closing the channel and this method sending to it.

func (*MemoryQueue) ListJobs

func (q *MemoryQueue) ListJobs(_ context.Context, status string, limit int) ([]Job, error)

ListJobs implements Browsable for the in-memory backend. The only state it can enumerate is the retained dead-letter set, so it returns those jobs for status "failed" (or an empty/"all" status), newest-first, and nothing for any other status — pending/claimed jobs live transiently on an unscannable channel. limit <= 0 defaults to 100.

func (*MemoryQueue) Nack

func (q *MemoryQueue) Nack(ctx context.Context, jobID string) error

Nack marks a manually-dequeued job as failed: it increments the attempt counter and re-enqueues the job when retries remain, otherwise drops it. The job must have been handed out by Dequeue (the in-flight set is consulted by ID). Jobs processed by the automatic worker pool retry internally and are never in the in-flight set; calling Nack for one is a harmless no-op.

func (*MemoryQueue) RegisterHandler

func (q *MemoryQueue) RegisterHandler(jobType string, handler Handler)

RegisterHandler registers a handler function for a given job type.

func (*MemoryQueue) Replay

func (q *MemoryQueue) Replay(ctx context.Context, jobID string) error

Replay implements Replayable: it moves a retained terminally-failed job back onto the pending set with Attempts reset to 0 so Dequeue returns it again. Idempotent and safe: replaying an unknown or non-failed id matches no retained job and is a no-op (nil), never a double-enqueue.

func (*MemoryQueue) Start

func (q *MemoryQueue) Start()

Start launches the worker goroutines. Must be called before enqueuing jobs if you want automatic processing. Workers will call the registered handlers.

func (*MemoryQueue) Stats

func (q *MemoryQueue) Stats(_ context.Context) (JobStats, error)

Stats implements Browsable. The in-memory backend can only count the retained dead-letter jobs (pending/claimed jobs are transient on the channel), so it reports those under "failed".

type Queue

type Queue interface {
	// Enqueue adds a job to the queue.
	Enqueue(ctx context.Context, job Job) error
	// Dequeue retrieves and removes the next available job, optionally filtered by type.
	Dequeue(ctx context.Context, types ...string) (Job, error)
	// Ack confirms successful processing of a job.
	Ack(ctx context.Context, jobID string) error
	// Nack marks a job as failed and triggers retry logic.
	Nack(ctx context.Context, jobID string) error
	// Close gracefully shuts down the queue, draining in-progress work.
	Close() error
}

Queue is the interface that every queue backend must implement.

type RedisClient

type RedisClient interface {
	LPush(ctx context.Context, key string, values ...interface{}) error
	RPop(ctx context.Context, key string) (string, error)
	HSet(ctx context.Context, key string, values ...interface{}) error
	HGet(ctx context.Context, key, field string) (string, error)
	// HGetAll returns every field→value pair in the hash. Used by Reclaim to
	// scan the processing set for expired in-flight jobs.
	HGetAll(ctx context.Context, key string) (map[string]string, error)
	HDel(ctx context.Context, key string, fields ...string) error
	Del(ctx context.Context, keys ...string) error
	// LRange returns the elements of the list at key in the inclusive range
	// [start, stop]; negative indices count from the tail (-1 is the last
	// element). Used by Replay to read the dead-letter list.
	LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
	// LRem removes up to count occurrences of value from the list at key and
	// returns the number removed. Used by Replay to pull one entry off the
	// dead-letter list.
	LRem(ctx context.Context, key string, count int64, value interface{}) (int64, error)
}

RedisClient defines the minimal Redis operations needed by RedisQueue. This is an interface so callers can inject any Redis client (go-redis, redigo, etc.) without this package importing a specific driver.

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue implements the Queue interface backed by Redis lists and hashes. It supports a visibility timeout for in-flight jobs and a dead letter queue for jobs that exceed MaxAttempts.

func NewRedisQueue

func NewRedisQueue(client RedisClient, queueName string) *RedisQueue

NewRedisQueue creates a new Redis-backed queue.

func (*RedisQueue) Ack

func (q *RedisQueue) Ack(ctx context.Context, jobID string) error

Ack removes a single job from the processing queue after successful handling.

func (*RedisQueue) Close

func (q *RedisQueue) Close() error

Close is a no-op for RedisQueue — the caller manages the Redis connection.

func (*RedisQueue) Dequeue

func (q *RedisQueue) Dequeue(ctx context.Context, types ...string) (Job, error)

Dequeue pops a job from the Redis list and moves it to the processing queue. If types are specified, only jobs matching one of those types are returned; non-matching jobs are pushed back onto the list.

func (*RedisQueue) Enqueue

func (q *RedisQueue) Enqueue(ctx context.Context, job Job) error

Enqueue pushes a job onto the Redis list, applying defaults for ID, CreatedAt, and MaxAttempts when not set.

func (*RedisQueue) Nack

func (q *RedisQueue) Nack(ctx context.Context, jobID string) error

Nack handles a failed job. If retries remain, it re-enqueues the job; otherwise it moves it to the dead letter queue.

func (*RedisQueue) Reclaim

func (q *RedisQueue) Reclaim(ctx context.Context) (int, error)

Reclaim scans the processing set for in-flight jobs whose visibility timeout has passed (the worker that claimed them crashed before Ack/Nack), re-enqueues them onto the main list, and removes the stale processing entry. Returns the number of jobs re-delivered. Call it periodically (e.g. from a background ticker) to make in-flight Redis work crash-safe.

func (*RedisQueue) Replay

func (q *RedisQueue) Replay(ctx context.Context, jobID string) error

Replay implements Replayable: it pulls a terminally-failed job off the dead-letter list and re-enqueues it onto the main queue with its attempts counter reset, so it gets a full set of retries again. It is idempotent — replaying an unknown job ID is a no-op (returns nil), matching DBQueue.Replay.

The entry is LPush'd back onto the main queue first and only removed from the dead list on success, so a failure between the two ops leaves the job on the dead list (recoverable) rather than dropping it. A crash in that window can leave one copy on each list; the next Replay/Dequeue tolerates the duplicate.

func (*RedisQueue) SetVisibilityTimeout

func (q *RedisQueue) SetVisibilityTimeout(d time.Duration)

SetVisibilityTimeout configures how long a job can be in-flight before it is considered abandoned and eligible for re-delivery.

type Replayable

type Replayable interface {
	// Replay resets a terminally-failed job back to pending so it is picked up
	// again (attempts counter cleared, scheduled immediately). It MUST only
	// touch terminal ('failed') rows and be idempotent: replaying an unknown,
	// pending, or running job is a no-op, not an error or a double-run.
	Replay(ctx context.Context, jobID string) error
}

Replayable is the optional capability for re-queuing a dead-lettered job — implemented by DBQueue (the durable backend). Admin tooling type-asserts for it and only offers a "replay" action when the backend supports it. Memory and Redis queues don't implement it yet (memory drops dead jobs; redis's dead-list isn't readable through RedisClient).

type ScheduleBuilder

type ScheduleBuilder struct {
	// contains filtered or unexported fields
}

ScheduleBuilder provides a fluent API for building scheduled jobs.

func (*ScheduleBuilder) Job

func (b *ScheduleBuilder) Job(jobType string, payload any) *ScheduleBuilder

Job sets the job type and payload for the scheduled job.

func (*ScheduleBuilder) Register

func (b *ScheduleBuilder) Register()

Register adds the scheduled job to the scheduler.

type ScheduledJob

type ScheduledJob struct {
	Type     string          `json:"type"`
	Payload  json.RawMessage `json:"payload"`
	Interval time.Duration   `json:"interval"`
	NextRun  time.Time       `json:"next_run"`
}

ScheduledJob defines a recurring job configuration.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler enqueues recurring jobs onto one or more Queue backends.

func NewScheduler

func NewScheduler(queues ...Queue) *Scheduler

NewScheduler creates a new Scheduler that dispatches to the given queues.

func (*Scheduler) Every

func (s *Scheduler) Every(interval time.Duration) *ScheduleBuilder

Every returns a ScheduleBuilder that starts with the given interval.

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context)

Start begins the scheduling loop. It blocks until ctx is cancelled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL