Documentation
¶
Overview ¶
Package queue is a durable work queue with at-least-once delivery, safe for concurrent consumers across processes. The Postgres implementation claims work with SELECT ... FOR UPDATE SKIP LOCKED, so running N consumers (goroutines or replicas) over one table just works — each task goes to exactly one consumer at a time.
A Queue plugs straight into worker.Processor: its Claim/MarkDone/MarkFailed methods satisfy worker.Source[Task] and worker.Sink[Task], so the reliable claim -> handle -> ack/retry loop comes for free. Schedule enqueues work; an optional unique Name deduplicates retries of the same logical task.
Index ¶
- Variables
- func Schema(table string) string
- type InMem
- func (q *InMem) Claim(_ context.Context, now time.Time, limit int) ([]Task, error)
- func (q *InMem) Len() int
- func (q *InMem) MarkDone(_ context.Context, t Task, _ time.Time) error
- func (q *InMem) MarkFailed(_ context.Context, t Task, errMsg string, terminal bool, now time.Time) error
- func (q *InMem) Schedule(_ context.Context, p ScheduleParams) (bool, error)
- type Options
- type PG
- func (q *PG) Claim(ctx context.Context, now time.Time, limit int) ([]Task, error)
- func (q *PG) Cleanup(ctx context.Context, before time.Time) (int64, error)
- func (q *PG) EnsureSchema(ctx context.Context) error
- func (q *PG) MarkDone(ctx context.Context, t Task, _ time.Time) error
- func (q *PG) MarkFailed(ctx context.Context, t Task, errMsg string, terminal bool, now time.Time) error
- func (q *PG) Schedule(ctx context.Context, p ScheduleParams) (bool, error)
- type Queue
- type ScheduleParams
- type Task
Constants ¶
This section is empty.
Variables ¶
var ErrLeaseLost = errors.New("queue: lease lost")
ErrLeaseLost is returned by MarkDone/MarkFailed when the task's lease no longer matches — another consumer reclaimed it after the lease expired. The mark is a no-op; the other consumer now owns the task.
Functions ¶
Types ¶
type InMem ¶
type InMem struct {
// contains filtered or unexported fields
}
InMem is an in-memory Queue with the same lease/retry/dedup semantics as PG, for unit tests and local runs without a database. It is safe for concurrent use. It is not durable and does not coordinate across processes.
func (*InMem) Len ¶
Len reports the number of tasks still in the queue (including dead letters). Test helper.
type Options ¶
type Options struct {
// Table is the backing table name (default "queue_tasks").
Table string
// LeaseTimeout is how long a claimed task stays leased before another
// consumer may reclaim it, guarding against a consumer that died mid-process
// (default 5m).
LeaseTimeout time.Duration
}
Options configures a PG queue.
type PG ¶
type PG struct {
// contains filtered or unexported fields
}
PG is a Postgres-backed Queue. It is safe for concurrent use by multiple goroutines and processes; Claim coordinates consumers via FOR UPDATE SKIP LOCKED so each ready task is handed to exactly one consumer at a time.
func NewPG ¶
NewPG builds a Postgres queue. It does not create the table; run a migration with Schema(table) or call EnsureSchema (handy in tests).
func (*PG) Cleanup ¶
Cleanup removes dead-lettered/done tasks finished before the given time, returning how many rows were deleted. Run it periodically via worker.Loop.
func (*PG) EnsureSchema ¶
EnsureSchema creates the backing table and index if they do not exist.
type Queue ¶
type Queue interface {
// Schedule enqueues a task, returning inserted=false if Name already exists.
Schedule(ctx context.Context, p ScheduleParams) (inserted bool, err error)
// Claim leases up to limit ready tasks (run_at <= now and not currently
// leased), bumping their attempt count. Each returned Task carries a fresh
// LeaseID that MarkDone/MarkFailed must echo back.
Claim(ctx context.Context, now time.Time, limit int) ([]Task, error)
// MarkDone acknowledges successful processing and removes the task.
MarkDone(ctx context.Context, t Task, now time.Time) error
// MarkFailed records a failure. terminal=true parks the task as a dead letter;
// terminal=false releases the lease so a later Claim retries it.
MarkFailed(ctx context.Context, t Task, errMsg string, terminal bool, now time.Time) error
}
Queue is a durable work queue. Implementations are safe for concurrent use by multiple consumers. Claim/MarkDone/MarkFailed mirror worker.Source/Sink.
type ScheduleParams ¶
type ScheduleParams struct {
// Name is an optional dedup key. When set, scheduling the same Name twice is a
// no-op (Schedule reports inserted=false). When empty, the queue assigns a
// unique name so every call enqueues a distinct task.
Name string
// Kind routes the task to a handler; required.
Kind string
// Payload is the opaque task body.
Payload []byte
// Delay postpones the earliest claim time (RunAt = now + Delay). Zero means
// claimable immediately.
Delay time.Duration
}
ScheduleParams describes a task to enqueue.
type Task ¶
type Task struct {
ID int64 `db:"id"`
Name string `db:"name"`
Kind string `db:"kind"`
Payload []byte `db:"payload"`
CreatedAt time.Time `db:"created_at"`
RunAt time.Time `db:"run_at"`
Attempts int `db:"attempts"`
LeaseID string `db:"lease_id"`
LastError string `db:"last_error"`
}
Task is a unit of queued work. Payload is opaque to the queue; consumers route on Kind and decode Payload (typically JSON) themselves.