queue

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package queue is a durable work queue with at-least-once delivery, safe for concurrent consumers across processes. The Postgres implementation claims work with SELECT ... FOR UPDATE SKIP LOCKED, so running N consumers (goroutines or replicas) over one table just works — each task goes to exactly one consumer at a time.

A Queue plugs straight into worker.Processor: its Claim/MarkDone/MarkFailed methods satisfy worker.Source[Task] and worker.Sink[Task], so the reliable claim -> handle -> ack/retry loop comes for free. Schedule enqueues work; an optional unique Name deduplicates retries of the same logical task.

Index

Constants

This section is empty.

Variables

View Source
var ErrLeaseLost = errors.New("queue: lease lost")

ErrLeaseLost is returned by MarkDone/MarkFailed when the task's lease no longer matches — another consumer reclaimed it after the lease expired. The mark is a no-op; the other consumer now owns the task.

Functions

func Schema

func Schema(table string) string

Schema returns the DDL that creates the backing table (and its index) for the given table name. Use it in a migration, or call PG.EnsureSchema in tests.

Types

type InMem

type InMem struct {
	// contains filtered or unexported fields
}

InMem is an in-memory Queue with the same lease/retry/dedup semantics as PG, for unit tests and local runs without a database. It is safe for concurrent use. It is not durable and does not coordinate across processes.

func NewInMem

func NewInMem(leaseTimeout time.Duration) *InMem

NewInMem builds an in-memory queue. leaseTimeout defaults to 5m when <= 0.

func (*InMem) Claim

func (q *InMem) Claim(_ context.Context, now time.Time, limit int) ([]Task, error)

Claim implements Queue.

func (*InMem) Len

func (q *InMem) Len() int

Len reports the number of tasks still in the queue (including dead letters). Test helper.

func (*InMem) MarkDone

func (q *InMem) MarkDone(_ context.Context, t Task, _ time.Time) error

MarkDone implements Queue.

func (*InMem) MarkFailed

func (q *InMem) MarkFailed(_ context.Context, t Task, errMsg string, terminal bool, now time.Time) error

MarkFailed implements Queue.

func (*InMem) Schedule

func (q *InMem) Schedule(_ context.Context, p ScheduleParams) (bool, error)

Schedule implements Queue.

type Options

type Options struct {
	// Table is the backing table name (default "queue_tasks").
	Table string
	// LeaseTimeout is how long a claimed task stays leased before another
	// consumer may reclaim it, guarding against a consumer that died mid-process
	// (default 5m).
	LeaseTimeout time.Duration
}

Options configures a PG queue.

type PG

type PG struct {
	// contains filtered or unexported fields
}

PG is a Postgres-backed Queue. It is safe for concurrent use by multiple goroutines and processes; Claim coordinates consumers via FOR UPDATE SKIP LOCKED so each ready task is handed to exactly one consumer at a time.

func NewPG

func NewPG(log *logger.Logger, db *sqlx.DB, opts Options) *PG

NewPG builds a Postgres queue. It does not create the table; run a migration with Schema(table) or call EnsureSchema (handy in tests).

func (*PG) Claim

func (q *PG) Claim(ctx context.Context, now time.Time, limit int) ([]Task, error)

Claim leases up to limit ready tasks.

func (*PG) Cleanup

func (q *PG) Cleanup(ctx context.Context, before time.Time) (int64, error)

Cleanup removes dead-lettered/done tasks finished before the given time, returning how many rows were deleted. Run it periodically via worker.Loop.

func (*PG) EnsureSchema

func (q *PG) EnsureSchema(ctx context.Context) error

EnsureSchema creates the backing table and index if they do not exist.

func (*PG) MarkDone

func (q *PG) MarkDone(ctx context.Context, t Task, _ time.Time) error

MarkDone removes a successfully processed task.

func (*PG) MarkFailed

func (q *PG) MarkFailed(ctx context.Context, t Task, errMsg string, terminal bool, now time.Time) error

MarkFailed releases (retryable) or parks (terminal) a failed task.

func (*PG) Schedule

func (q *PG) Schedule(ctx context.Context, p ScheduleParams) (bool, error)

Schedule enqueues a task. An empty Name is replaced with a unique value so the call always inserts.

type Queue

type Queue interface {
	// Schedule enqueues a task, returning inserted=false if Name already exists.
	Schedule(ctx context.Context, p ScheduleParams) (inserted bool, err error)
	// Claim leases up to limit ready tasks (run_at <= now and not currently
	// leased), bumping their attempt count. Each returned Task carries a fresh
	// LeaseID that MarkDone/MarkFailed must echo back.
	Claim(ctx context.Context, now time.Time, limit int) ([]Task, error)
	// MarkDone acknowledges successful processing and removes the task.
	MarkDone(ctx context.Context, t Task, now time.Time) error
	// MarkFailed records a failure. terminal=true parks the task as a dead letter;
	// terminal=false releases the lease so a later Claim retries it.
	MarkFailed(ctx context.Context, t Task, errMsg string, terminal bool, now time.Time) error
}

Queue is a durable work queue. Implementations are safe for concurrent use by multiple consumers. Claim/MarkDone/MarkFailed mirror worker.Source/Sink.

type ScheduleParams

type ScheduleParams struct {
	// Name is an optional dedup key. When set, scheduling the same Name twice is a
	// no-op (Schedule reports inserted=false). When empty, the queue assigns a
	// unique name so every call enqueues a distinct task.
	Name string
	// Kind routes the task to a handler; required.
	Kind string
	// Payload is the opaque task body.
	Payload []byte
	// Delay postpones the earliest claim time (RunAt = now + Delay). Zero means
	// claimable immediately.
	Delay time.Duration
}

ScheduleParams describes a task to enqueue.

type Task

type Task struct {
	ID        int64     `db:"id"`
	Name      string    `db:"name"`
	Kind      string    `db:"kind"`
	Payload   []byte    `db:"payload"`
	CreatedAt time.Time `db:"created_at"`
	RunAt     time.Time `db:"run_at"`
	Attempts  int       `db:"attempts"`
	LeaseID   string    `db:"lease_id"`
	LastError string    `db:"last_error"`
}

Task is a unit of queued work. Payload is opaque to the queue; consumers route on Kind and decode Payload (typically JSON) themselves.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL