Documentation
¶
Overview ¶
Package queue is a durable work queue with at-least-once delivery, safe for concurrent consumers across processes.
The Postgres implementation (PG) claims work with SELECT … FOR UPDATE SKIP LOCKED, so running N consumers (goroutines or replicas) over one table just works — each ready task goes to exactly one consumer at a time. A Queue plugs straight into worker.Processor: its Claim/MarkDone/MarkFailed methods satisfy worker.Source[Task] and worker.Sink[Task], so the reliable claim -> handle -> ack/retry loop comes for free. Schedule enqueues work; an optional unique Name deduplicates retries of the same logical task. An InMem implementation with identical semantics is provided for unit tests and local runs without a database.
Wiring (Postgres + worker.Processor) ¶
Schedule produces work; a worker.Processor consumes it by leasing a batch (Claim), running a Handler, and recording the outcome (MarkDone / MarkFailed):
q := queue.NewPG(log, db, queue.Options{LeaseTimeout: 5 * time.Minute})
if err := q.EnsureSchema(ctx); err != nil { // tests; in prod run Schema() as a migration
return err
}
// Producer: enqueue a task. A Name makes the enqueue idempotent.
_, err := q.Schedule(ctx, queue.ScheduleParams{
Name: "send-welcome:42",
Kind: "email.welcome",
Payload: payload,
Delay: 0,
})
// Consumer: q is both the Source (Claim) and Sink (MarkDone/MarkFailed).
handler := worker.HandlerFunc[queue.Task](func(ctx context.Context, t queue.Task) error {
return dispatch(ctx, t.Kind, t.Payload)
})
proc := worker.NewProcessor[queue.Task](log, q, handler, q, worker.ProcessorConfig{
Name: "queue",
BatchSize: 100,
IsTerminal: func(err error) bool { return errors.Is(err, errPermanent) },
})
group := worker.NewGroup(log, 10*time.Second)
group.Add(worker.NewPacedLoop(log, worker.LoopConfig{}, proc.PacedTick()))
if err := group.Run(ctx); err != nil {
return err
}
Schema ¶
Schema returns the DDL for the fixed queue_tasks table and its ready-task index; embed it in a migration. PG.EnsureSchema runs the same DDL directly and is convenient in tests.
Semantics ¶
Claim leases up to limit ready tasks (run_at <= now and not currently leased, or with an expired lease), bumps each task's attempt count, and stamps a fresh LeaseID that MarkDone/MarkFailed must echo back. MarkDone acknowledges success and removes the task. MarkFailed with terminal=false releases the lease so a later Claim retries the task; terminal=true parks it as a dead letter. When a lease no longer matches — because it expired and another consumer reclaimed the task — the mark is a no-op and returns ErrLeaseLost. PG.Cleanup reaps done/dead-lettered rows finished before a cutoff; run it periodically via worker.Loop.
Options ¶
- Options.LeaseTimeout: how long a claimed task stays leased before another consumer may reclaim it (default 5m). NewInMem takes the same timeout as a plain argument.
- ScheduleParams: Name (optional dedup key — empty means always insert), Kind (handler routing key, required), Payload (opaque body), Delay (postpones the earliest claim time).
The backing table is fixed (queue_tasks); all time math and lease-id generation happen in Go, keeping the SQL free of NOW()/gen_random_uuid().
Index ¶
- Variables
- func Schema() string
- type InMem
- func (q *InMem) Claim(_ context.Context, now time.Time, limit int) ([]Task, error)
- func (q *InMem) Len() int
- func (q *InMem) MarkDone(_ context.Context, t Task, _ time.Time) error
- func (q *InMem) MarkFailed(_ context.Context, t Task, errMsg string, terminal bool, now time.Time) error
- func (q *InMem) Schedule(_ context.Context, p ScheduleParams) (bool, error)
- type Options
- type PG
- func (q *PG) Claim(ctx context.Context, now time.Time, limit int) ([]Task, error)
- func (q *PG) Cleanup(ctx context.Context, before time.Time) (int64, error)
- func (q *PG) EnsureSchema(ctx context.Context) error
- func (q *PG) MarkDone(ctx context.Context, t Task, _ time.Time) error
- func (q *PG) MarkFailed(ctx context.Context, t Task, errMsg string, terminal bool, now time.Time) error
- func (q *PG) Schedule(ctx context.Context, p ScheduleParams) (bool, error)
- type Queue
- type ScheduleParams
- type Task
Constants ¶
This section is empty.
Variables ¶
var ErrLeaseLost = errors.New("queue: lease lost")
ErrLeaseLost is returned by MarkDone/MarkFailed when the task's lease no longer matches — another consumer reclaimed it after the lease expired. The mark is a no-op; the other consumer now owns the task.
Functions ¶
Types ¶
type InMem ¶
type InMem struct {
// contains filtered or unexported fields
}
InMem is an in-memory Queue with the same lease/retry/dedup semantics as PG, for unit tests and local runs without a database. It is safe for concurrent use. It is not durable and does not coordinate across processes.
func (*InMem) Len ¶
Len reports the number of tasks still in the queue (including dead letters). Test helper.
type Options ¶
type Options struct {
// LeaseTimeout is how long a claimed task stays leased before another
// consumer may reclaim it, guarding against a consumer that died mid-process
// (default 5m).
LeaseTimeout time.Duration
}
Options configures a PG queue.
type PG ¶
type PG struct {
// contains filtered or unexported fields
}
PG is a Postgres-backed Queue. Every query is an inline const right next to the method that runs it — no string-templated query cache, no fmt.Sprintf — so the SQL reads as SQL. The backing table is fixed (queue_tasks), and all time math (lease cutoff) and id generation (lease_id) happen in Go, so the SQL stays free of NOW()/gen_random_uuid()/interval casts.
PG is safe for concurrent use by multiple goroutines and processes; Claim coordinates consumers via FOR UPDATE SKIP LOCKED so each ready task is handed to exactly one consumer at a time.
func NewPG ¶
NewPG builds a Postgres queue. It does not create the table; run a migration with Schema() or call EnsureSchema (handy in tests).
func (*PG) Claim ¶
Claim atomically leases up to limit ready tasks (run_at <= now, not currently leased or lease expired) under a fresh lease id, bumping their attempt count. A single CTE + UPDATE … RETURNING does selection and lease assignment in one round-trip; FOR UPDATE SKIP LOCKED lets consumers split the work. The lease cutoff is computed in Go (now - leaseTimeout) and the lease id is generated Go-side and shared by the whole batch (one per Claim).
func (*PG) Cleanup ¶
Cleanup removes dead-lettered/done tasks finished before the given time, returning how many rows were deleted. Run it periodically via worker.Loop.
func (*PG) EnsureSchema ¶
EnsureSchema creates the backing table and index if they do not exist.
func (*PG) MarkDone ¶
MarkDone removes a successfully processed task. The lease_id predicate is the lease guard: a consumer that reclaimed the task after the lease expired holds a different lease_id, so the DELETE matches no row and we return ErrLeaseLost.
func (*PG) MarkFailed ¶
func (q *PG) MarkFailed(ctx context.Context, t Task, errMsg string, terminal bool, now time.Time) error
MarkFailed releases a failed task for retry (terminal=false: clears the lease and makes it claimable now) or parks it as a dead letter (terminal=true: sets done_at so Claim skips it but Cleanup can reap it). Guarded by the same lease predicate as MarkDone.
type Queue ¶
type Queue interface {
// Schedule enqueues a task, returning inserted=false if Name already exists.
Schedule(ctx context.Context, p ScheduleParams) (inserted bool, err error)
// Claim leases up to limit ready tasks (run_at <= now and not currently
// leased), bumping their attempt count. Each returned Task carries a fresh
// LeaseID that MarkDone/MarkFailed must echo back.
Claim(ctx context.Context, now time.Time, limit int) ([]Task, error)
// MarkDone acknowledges successful processing and removes the task.
MarkDone(ctx context.Context, t Task, now time.Time) error
// MarkFailed records a failure. terminal=true parks the task as a dead letter;
// terminal=false releases the lease so a later Claim retries it.
MarkFailed(ctx context.Context, t Task, errMsg string, terminal bool, now time.Time) error
}
Queue is a durable work queue. Implementations are safe for concurrent use by multiple consumers. Claim/MarkDone/MarkFailed mirror worker.Source/Sink.
type ScheduleParams ¶
type ScheduleParams struct {
// Name is an optional dedup key. When set, scheduling the same Name twice is a
// no-op (Schedule reports inserted=false). When empty, the queue assigns a
// unique name so every call enqueues a distinct task.
Name string
// Kind routes the task to a handler; required.
Kind string
// Payload is the opaque task body.
Payload []byte
// Delay postpones the earliest claim time (RunAt = now + Delay). Zero means
// claimable immediately.
Delay time.Duration
}
ScheduleParams describes a task to enqueue.
type Task ¶
type Task struct {
ID int64
Name string
Kind string
Payload []byte
CreatedAt time.Time
RunAt time.Time
Attempts int
LeaseID string
LastError string
}
Task is a unit of queued work — a pure core type with no db tags (the PG store owns that mapping via taskRow). Payload is opaque to the queue; consumers route on Kind and decode Payload (typically JSON) themselves.