queue

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package queue provides a work queue with single-worker delivery semantics (competing consumers). Each enqueued job is processed by exactly one worker at a time. Delivery is at-least-once: handlers MUST be idempotent.

Idempotency contract

A job MAY be delivered to a handler more than once when:

  • A handler runs longer than the configured visibility timeout. After the visibility timeout expires, another worker may re-claim the same job and run the handler again concurrently. Both invocations may succeed.
  • A handler succeeds but the backend acknowledgement (DELETE / ack) fails. The job will be re-delivered after the visibility timeout.
  • A worker crashes mid-handler. The job becomes visible again after the visibility timeout and will be re-delivered.

Make handler effects idempotent (use unique keys, upserts, or check-then-act patterns inside a transaction). Do not assume "the job ran" implies "exactly one effect was produced".

Shutdown behavior

Consumer.Close stops claiming new jobs and waits up to WithShutdownTimeout for in-flight handlers to return. Handlers that exceed the shutdown timeout continue running as orphaned goroutines until they return on their own; Go provides no mechanism to interrupt a running goroutine. Their jobs will be re-claimed after the visibility timeout if they outlive the worker process.

Topics with no subscribers

Durable backends (sqlite, pgx, redis) accumulate jobs for topics that have no subscriber. There is no automatic expiry. Operators are responsible for monitoring queue depth.

Index

Constants

View Source
const (
	// DefaultAppName matches pubsub.DefaultAppName so apps can share namespacing.
	DefaultAppName = "app"
	// DefaultNamespace matches pubsub.DefaultNamespace.
	DefaultNamespace = "default"

	// DefaultMaxRetries is intentionally conservative compared to libraries
	// like riverqueue (25). Most workloads should override this per-Subscribe.
	DefaultMaxRetries = 3

	// DefaultPollInterval is the fallback wake-up cadence for backends that
	// poll (sqlite, redis idle, pgx fallback). Push-based wake-ups (NOTIFY,
	// BLPOP, Cond) preempt this interval where supported.
	DefaultPollInterval = time.Second

	// DefaultVisibilityTimeout is how long a claimed job stays hidden from
	// other workers before becoming re-claimable.
	DefaultVisibilityTimeout = 30 * time.Second

	// DefaultShutdownTimeout is how long Consumer.Close waits for in-flight
	// handlers before returning.
	DefaultShutdownTimeout = 30 * time.Second
)

Variables

View Source
var ErrSkipRetry = errors.New("queue: skip retry")

ErrSkipRetry when returned (or wrapped) from a Handler, signals that the job is permanently failed and should be routed to the dead letter topic immediately, regardless of remaining retry budget.

Functions

func DeadLetterTopic

func DeadLetterTopic(formattedTopic string) string

DeadLetterTopic returns the default dead letter topic name for a given already-formatted topic. A dead letter topic is just another queue topic; subscribe to it to inspect, alert on, or reprocess failed jobs.

func DefaultBackoff

func DefaultBackoff(attempt int) time.Duration

DefaultBackoff implements exponential backoff (1s base, 5 min cap) with uniform ±25% jitter. attempt overflow is handled by clamping to the cap.

func FormatTopic

func FormatTopic(app, namespace, topic string) string

FormatTopic formats a topic name as "<app>.<namespace>.<topic>". This matches the pubsub package convention so an application can share namespace configuration between pubsub and queue.

func ResolveSubscribeConfig

func ResolveSubscribeConfig(cfg *SubscribeConfig, formattedTopic string)

ResolveSubscribeConfig fills in package defaults for any zero-valued field in cfg. Backend implementations call this once per Subscribe call after applying user options.

func SkipRetry

func SkipRetry(err error) error

SkipRetry wraps err so it satisfies errors.Is(err, ErrSkipRetry) while still preserving the wrapped error chain. Returning queue.SkipRetry(myErr) from a handler routes the job to the dead letter topic on this attempt.

Types

type BackoffFunc

type BackoffFunc func(attempt int) time.Duration

BackoffFunc returns the delay before the given retry attempt. attempt is 1 after the first failure, 2 after the second, and so on.

type Clock

type Clock interface {
	Now() time.Time
	NewTimer(d time.Duration) Timer
}

Clock abstracts time for deterministic testing. The default implementation uses the real time package; queuetest provides a controllable FakeClock.

Backends accept a Clock via the (unexported, internal) WithClock option in their respective sub-packages — exported only inside this module via internal helper functions to keep the public surface small.

func RealClock

func RealClock() Clock

RealClock returns the production Clock implementation.

type Consumer

type Consumer interface {
	Close() error
}

Consumer is the lifetime handle for a subscription. Close stops claiming new jobs and waits for in-flight handlers up to the configured shutdown timeout.

type EnqueueConfig

type EnqueueConfig struct {
	App       string
	Namespace string
	Priority  Priority
	// RunAt is the absolute time at which the job becomes eligible. Zero
	// means "now". WithDelay and WithRunAt both populate this field; WithRunAt
	// always wins (last-wins, but RunAt is also "stronger" semantically).
	RunAt time.Time
}

EnqueueConfig is the resolved configuration for a single Enqueue call. Backends populate the App, Namespace, and Priority defaults from their own service-level config, then apply EnqueueOptions to overlay caller intent.

type EnqueueOption

type EnqueueOption interface {
	Apply(*EnqueueConfig)
}

EnqueueOption configures a single Enqueue call.

func WithDelay

func WithDelay(d time.Duration) EnqueueOption

WithDelay schedules the job to run no earlier than now+d.

func WithEnqueueApp

func WithEnqueueApp(app string) EnqueueOption

WithEnqueueApp overrides the topic app prefix for this enqueue only.

func WithEnqueueNamespace

func WithEnqueueNamespace(ns string) EnqueueOption

WithEnqueueNamespace overrides the topic namespace for this enqueue only.

func WithPriority

func WithPriority(p Priority) EnqueueOption

WithPriority assigns the job's priority.

func WithRunAt

func WithRunAt(t time.Time) EnqueueOption

WithRunAt schedules the job to run no earlier than t. Overrides WithDelay.

type EnqueueOptionFunc

type EnqueueOptionFunc func(*EnqueueConfig)

EnqueueOptionFunc adapts a function to EnqueueOption.

func (EnqueueOptionFunc) Apply

func (f EnqueueOptionFunc) Apply(cfg *EnqueueConfig)

Apply calls f(cfg).

type Enqueuer

type Enqueuer interface {
	Enqueue(ctx context.Context, topic string, payload []byte, opts ...EnqueueOption) error
}

Enqueuer enqueues jobs onto a topic.

type Handler

type Handler func(ctx context.Context, job *Job) error

Handler processes a single job. Returning nil acknowledges the job and the backend removes it from the queue. Returning a non-nil error schedules a retry according to the configured backoff and max retry count, unless the error matches ErrSkipRetry, in which case the job is routed straight to the dead letter topic on this attempt.

type Job

type Job struct {
	// ID is the backend-assigned unique identifier (UUID for durable backends,
	// monotonic for inmem). Always non-empty when delivered to a Handler.
	ID string
	// Topic is the fully formatted topic name as it appears in the backend
	// (i.e. after Format).
	Topic string
	// Payload is the opaque body supplied to Enqueue.
	Payload []byte
	// Attempt is 1 on first delivery, 2 on the first retry, and so on.
	Attempt int
	// EnqueuedAt is the time the job was first enqueued (preserved across
	// retries).
	EnqueuedAt time.Time
}

Job is the unit of work delivered to a Handler.

type Priority

type Priority int8

Priority controls relative ordering of jobs in the same topic. Higher priority jobs are delivered before lower priority jobs once they are eligible to run.

const (
	PriorityLow    Priority = -1
	PriorityNormal Priority = 0
	PriorityHigh   Priority = 1
)

type QueueConfig

type QueueConfig struct {
	App       string
	Namespace string
}

QueueConfig is the service-level configuration shared by Enqueue and Subscribe.

type QueueOption

type QueueOption interface {
	Apply(*QueueConfig)
}

QueueOption configures a backend constructor.

func WithApp

func WithApp(app string) QueueOption

WithApp sets the service-level app name.

func WithNamespace

func WithNamespace(ns string) QueueOption

WithNamespace sets the service-level namespace.

type QueueOptionFunc

type QueueOptionFunc func(*QueueConfig)

QueueOptionFunc adapts a function to QueueOption.

func (QueueOptionFunc) Apply

func (f QueueOptionFunc) Apply(cfg *QueueConfig)

Apply calls f(cfg).

type Service

type Service interface {
	Enqueuer
	Subscriber
	// Close releases backend resources. Safe to call once; subsequent calls
	// return nil.
	Close(ctx context.Context) error
}

Service is the union of Enqueuer and Subscriber plus a backend Close. Backend implementations expose this as their public type.

type SubscribeConfig

type SubscribeConfig struct {
	App       string
	Namespace string

	Concurrency       int
	MaxRetries        int
	PollInterval      time.Duration
	VisibilityTimeout time.Duration
	ShutdownTimeout   time.Duration

	// DeadLetterTopic is the topic that exhausted jobs are routed to. Zero
	// value means "<topic>.dead".
	DeadLetterTopic string

	// Backoff returns the delay before the given retry attempt. Zero value
	// means DefaultBackoff.
	Backoff BackoffFunc
}

SubscribeConfig is the resolved configuration for a single Subscribe call.

type SubscribeOption

type SubscribeOption interface {
	Apply(*SubscribeConfig)
}

SubscribeOption configures a single Subscribe call.

func WithBackoff

func WithBackoff(b BackoffFunc) SubscribeOption

WithBackoff overrides the default backoff function.

func WithConcurrency

func WithConcurrency(n int) SubscribeOption

WithConcurrency sets the number of worker goroutines for this subscription. Default 1. Values < 1 are clamped to 1.

func WithDeadLetterTopic

func WithDeadLetterTopic(topic string) SubscribeOption

WithDeadLetterTopic sets a custom dead letter topic. Default "<original-topic>.dead".

func WithMaxRetries

func WithMaxRetries(n int) SubscribeOption

WithMaxRetries sets the maximum number of retries before routing to the dead letter topic. -1 means infinite retries. Default DefaultMaxRetries.

func WithPollInterval

func WithPollInterval(d time.Duration) SubscribeOption

WithPollInterval sets how often a worker re-polls the backend when idle. Push-based wake-ups (NOTIFY, BLPOP, Cond) preempt this interval where supported.

func WithShutdownTimeout

func WithShutdownTimeout(d time.Duration) SubscribeOption

WithShutdownTimeout sets how long Consumer.Close waits for in-flight handlers before returning.

func WithSubscribeApp

func WithSubscribeApp(app string) SubscribeOption

WithSubscribeApp overrides the topic app prefix for this subscription.

func WithSubscribeNamespace

func WithSubscribeNamespace(ns string) SubscribeOption

WithSubscribeNamespace overrides the topic namespace for this subscription.

func WithVisibilityTimeout

func WithVisibilityTimeout(d time.Duration) SubscribeOption

WithVisibilityTimeout sets how long a claimed job stays hidden from other workers. Handlers exceeding this timeout will see their job re-claimed.

type SubscribeOptionFunc

type SubscribeOptionFunc func(*SubscribeConfig)

SubscribeOptionFunc adapts a function to SubscribeOption.

func (SubscribeOptionFunc) Apply

func (f SubscribeOptionFunc) Apply(cfg *SubscribeConfig)

Apply calls f(cfg).

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Consumer, error)
}

Subscriber registers a Handler against a topic. The returned Consumer controls the lifetime of the worker(s).

type Timer

type Timer interface {
	C() <-chan time.Time
	Stop() bool
	Reset(d time.Duration) bool
}

Timer mirrors time.Timer minus the parts queue does not need. Stop returns true if the call stops the timer; false if the timer already fired.

Directories

Path Synopsis
Package inmem provides a non-durable, in-process queue.Service.
Package inmem provides a non-durable, in-process queue.Service.
Package pgx provides a durable queue.Service backed by PostgreSQL using jackc/pgx/v5.
Package pgx provides a durable queue.Service backed by PostgreSQL using jackc/pgx/v5.
Package queuetest provides a backend-agnostic conformance test suite for queue.Service implementations and a deterministic FakeClock for use in backend-specific tests.
Package queuetest provides a backend-agnostic conformance test suite for queue.Service implementations and a deterministic FakeClock for use in backend-specific tests.
Package redis provides a durable queue.Service backed by Redis.
Package redis provides a durable queue.Service backed by Redis.
Package sqlite provides a durable, embedded queue.Service backed by SQLite.
Package sqlite provides a durable, embedded queue.Service backed by SQLite.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL