queue

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package queue is a durable work queue with at-least-once delivery, safe for concurrent consumers across processes.

The Postgres implementation (PG) claims work with SELECT … FOR UPDATE SKIP LOCKED, so running N consumers (goroutines or replicas) over one table just works — each ready task goes to exactly one consumer at a time. A Queue plugs straight into worker.Processor: its Claim/MarkDone/MarkFailed methods satisfy worker.Source[Task] and worker.Sink[Task], so the reliable claim -> handle -> ack/retry loop comes for free. Schedule enqueues work; an optional unique Name deduplicates retries of the same logical task. An InMem implementation with identical semantics is provided for unit tests and local runs without a database.

Wiring (Postgres + worker.Processor)

Schedule produces work; a worker.Processor consumes it by leasing a batch (Claim), running a Handler, and recording the outcome (MarkDone / MarkFailed):

q := queue.NewPG(log, db, queue.Options{LeaseTimeout: 5 * time.Minute})
if err := q.EnsureSchema(ctx); err != nil { // tests; in prod run Schema() as a migration
    return err
}

// Producer: enqueue a task. A Name makes the enqueue idempotent.
_, err := q.Schedule(ctx, queue.ScheduleParams{
    Name:    "send-welcome:42",
    Kind:    "email.welcome",
    Payload: payload,
    Delay:   0,
})

// Consumer: q is both the Source (Claim) and Sink (MarkDone/MarkFailed).
handler := worker.HandlerFunc[queue.Task](func(ctx context.Context, t queue.Task) error {
    return dispatch(ctx, t.Kind, t.Payload)
})
proc := worker.NewProcessor[queue.Task](log, q, handler, q, worker.ProcessorConfig{
    Name:       "queue",
    BatchSize:  100,
    IsTerminal: func(err error) bool { return errors.Is(err, errPermanent) },
})

group := worker.NewGroup(log, 10*time.Second)
group.Add(worker.NewPacedLoop(log, worker.LoopConfig{}, proc.PacedTick()))
if err := group.Run(ctx); err != nil {
    return err
}

Schema

Schema returns the DDL for the fixed queue_tasks table and its ready-task index; embed it in a migration. PG.EnsureSchema runs the same DDL directly and is convenient in tests.

Semantics

Claim leases up to limit ready tasks (run_at <= now and not currently leased, or with an expired lease), bumps each task's attempt count, and stamps a fresh LeaseID that MarkDone/MarkFailed must echo back. MarkDone acknowledges success and removes the task. MarkFailed with terminal=false releases the lease so a later Claim retries the task; terminal=true parks it as a dead letter. When a lease no longer matches — because it expired and another consumer reclaimed the task — the mark is a no-op and returns ErrLeaseLost. PG.Cleanup reaps done/dead-lettered rows finished before a cutoff; run it periodically via worker.Loop.

Options

  • Options.LeaseTimeout: how long a claimed task stays leased before another consumer may reclaim it (default 5m). NewInMem takes the same timeout as a plain argument.
  • ScheduleParams: Name (optional dedup key — empty means always insert), Kind (handler routing key, required), Payload (opaque body), Delay (postpones the earliest claim time).

The backing table is fixed (queue_tasks); all time math and lease-id generation happen in Go, keeping the SQL free of NOW()/gen_random_uuid().

Index

Constants

This section is empty.

Variables

View Source
var ErrLeaseLost = errors.New("queue: lease lost")

ErrLeaseLost is returned by MarkDone/MarkFailed when the task's lease no longer matches — another consumer reclaimed it after the lease expired. The mark is a no-op; the other consumer now owns the task.

Functions

func Schema

func Schema() string

Schema returns the DDL that creates the queue_tasks table and its index. Use it in a migration, or call PG.EnsureSchema in tests.

Types

type InMem

type InMem struct {
	// contains filtered or unexported fields
}

InMem is an in-memory Queue with the same lease/retry/dedup semantics as PG, for unit tests and local runs without a database. It is safe for concurrent use. It is not durable and does not coordinate across processes.

func NewInMem

func NewInMem(leaseTimeout time.Duration) *InMem

NewInMem builds an in-memory queue. leaseTimeout defaults to 5m when <= 0.

func (*InMem) Claim

func (q *InMem) Claim(_ context.Context, now time.Time, limit int) ([]Task, error)

Claim implements Queue.

func (*InMem) Len

func (q *InMem) Len() int

Len reports the number of tasks still in the queue (including dead letters). Test helper.

func (*InMem) MarkDone

func (q *InMem) MarkDone(_ context.Context, t Task, _ time.Time) error

MarkDone implements Queue.

func (*InMem) MarkFailed

func (q *InMem) MarkFailed(_ context.Context, t Task, errMsg string, terminal bool, now time.Time) error

MarkFailed implements Queue.

func (*InMem) Schedule

func (q *InMem) Schedule(_ context.Context, p ScheduleParams) (bool, error)

Schedule implements Queue.

type Options

type Options struct {
	// LeaseTimeout is how long a claimed task stays leased before another
	// consumer may reclaim it, guarding against a consumer that died mid-process
	// (default 5m).
	LeaseTimeout time.Duration
}

Options configures a PG queue.

type PG

type PG struct {
	// contains filtered or unexported fields
}

PG is a Postgres-backed Queue. Every query is an inline const right next to the method that runs it — no string-templated query cache, no fmt.Sprintf — so the SQL reads as SQL. The backing table is fixed (queue_tasks), and all time math (lease cutoff) and id generation (lease_id) happen in Go, so the SQL stays free of NOW()/gen_random_uuid()/interval casts.

PG is safe for concurrent use by multiple goroutines and processes; Claim coordinates consumers via FOR UPDATE SKIP LOCKED so each ready task is handed to exactly one consumer at a time.

func NewPG

func NewPG(log *logger.Logger, db *sqlx.DB, opts Options) *PG

NewPG builds a Postgres queue. It does not create the table; run a migration with Schema() or call EnsureSchema (handy in tests).

func (*PG) Claim

func (q *PG) Claim(ctx context.Context, now time.Time, limit int) ([]Task, error)

Claim atomically leases up to limit ready tasks (run_at <= now, not currently leased or lease expired) under a fresh lease id, bumping their attempt count. A single CTE + UPDATE … RETURNING does selection and lease assignment in one round-trip; FOR UPDATE SKIP LOCKED lets consumers split the work. The lease cutoff is computed in Go (now - leaseTimeout) and the lease id is generated Go-side and shared by the whole batch (one per Claim).

func (*PG) Cleanup

func (q *PG) Cleanup(ctx context.Context, before time.Time) (int64, error)

Cleanup removes dead-lettered/done tasks finished before the given time, returning how many rows were deleted. Run it periodically via worker.Loop.

func (*PG) EnsureSchema

func (q *PG) EnsureSchema(ctx context.Context) error

EnsureSchema creates the backing table and index if they do not exist.

func (*PG) MarkDone

func (q *PG) MarkDone(ctx context.Context, t Task, _ time.Time) error

MarkDone removes a successfully processed task. The lease_id predicate is the lease guard: a consumer that reclaimed the task after the lease expired holds a different lease_id, so the DELETE matches no row and we return ErrLeaseLost.

func (*PG) MarkFailed

func (q *PG) MarkFailed(ctx context.Context, t Task, errMsg string, terminal bool, now time.Time) error

MarkFailed releases a failed task for retry (terminal=false: clears the lease and makes it claimable now) or parks it as a dead letter (terminal=true: sets done_at so Claim skips it but Cleanup can reap it). Guarded by the same lease predicate as MarkDone.

func (*PG) Schedule

func (q *PG) Schedule(ctx context.Context, p ScheduleParams) (bool, error)

Schedule enqueues a task. An empty Name is replaced with a unique value so the call always inserts.

type Queue

type Queue interface {
	// Schedule enqueues a task, returning inserted=false if Name already exists.
	Schedule(ctx context.Context, p ScheduleParams) (inserted bool, err error)
	// Claim leases up to limit ready tasks (run_at <= now and not currently
	// leased), bumping their attempt count. Each returned Task carries a fresh
	// LeaseID that MarkDone/MarkFailed must echo back.
	Claim(ctx context.Context, now time.Time, limit int) ([]Task, error)
	// MarkDone acknowledges successful processing and removes the task.
	MarkDone(ctx context.Context, t Task, now time.Time) error
	// MarkFailed records a failure. terminal=true parks the task as a dead letter;
	// terminal=false releases the lease so a later Claim retries it.
	MarkFailed(ctx context.Context, t Task, errMsg string, terminal bool, now time.Time) error
}

Queue is a durable work queue. Implementations are safe for concurrent use by multiple consumers. Claim/MarkDone/MarkFailed mirror worker.Source/Sink.

type ScheduleParams

type ScheduleParams struct {
	// Name is an optional dedup key. When set, scheduling the same Name twice is a
	// no-op (Schedule reports inserted=false). When empty, the queue assigns a
	// unique name so every call enqueues a distinct task.
	Name string
	// Kind routes the task to a handler; required.
	Kind string
	// Payload is the opaque task body.
	Payload []byte
	// Delay postpones the earliest claim time (RunAt = now + Delay). Zero means
	// claimable immediately.
	Delay time.Duration
}

ScheduleParams describes a task to enqueue.

type Task

type Task struct {
	ID        int64
	Name      string
	Kind      string
	Payload   []byte
	CreatedAt time.Time
	RunAt     time.Time
	Attempts  int
	LeaseID   string
	LastError string
}

Task is a unit of queued work — a pure core type with no db tags (the PG store owns that mapping via taskRow). Payload is opaque to the queue; consumers route on Kind and decode Payload (typically JSON) themselves.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL