Documentation
¶
Overview ¶
Package queue provides a work queue with single-worker delivery semantics (competing consumers). Each enqueued job is processed by exactly one worker at a time. Delivery is at-least-once: handlers MUST be idempotent.
Idempotency contract ¶
A job MAY be delivered to a handler more than once when:
- A handler runs longer than the configured visibility timeout. After the visibility timeout expires, another worker may re-claim the same job and run the handler again concurrently. Both invocations may succeed.
- A handler succeeds but the backend acknowledgement (DELETE / ack) fails. The job will be re-delivered after the visibility timeout.
- A worker crashes mid-handler. The job becomes visible again after the visibility timeout and will be re-delivered.
Make handler effects idempotent (use unique keys, upserts, or check-then-act patterns inside a transaction). Do not assume "the job ran" implies "exactly one effect was produced".
Shutdown behavior ¶
Consumer.Close stops claiming new jobs and waits up to WithShutdownTimeout for in-flight handlers to return. Handlers that exceed the shutdown timeout continue running as orphaned goroutines until they return on their own; Go provides no mechanism to interrupt a running goroutine. Their jobs will be re-claimed after the visibility timeout if they outlive the worker process.
Topics with no subscribers ¶
Durable backends (sqlite, pgx, redis) accumulate jobs for topics that have no subscriber. There is no automatic expiry. Operators are responsible for monitoring queue depth.
Index ¶
- Constants
- Variables
- func DeadLetterTopic(formattedTopic string) string
- func DefaultBackoff(attempt int) time.Duration
- func FormatTopic(app, namespace, topic string) string
- func ResolveSubscribeConfig(cfg *SubscribeConfig, formattedTopic string)
- func SkipRetry(err error) error
- type BackoffFunc
- type Clock
- type Consumer
- type EnqueueConfig
- type EnqueueOption
- type EnqueueOptionFunc
- type Enqueuer
- type Handler
- type Job
- type Priority
- type QueueConfig
- type QueueOption
- type QueueOptionFunc
- type Service
- type SubscribeConfig
- type SubscribeOption
- func WithBackoff(b BackoffFunc) SubscribeOption
- func WithConcurrency(n int) SubscribeOption
- func WithDeadLetterTopic(topic string) SubscribeOption
- func WithMaxRetries(n int) SubscribeOption
- func WithPollInterval(d time.Duration) SubscribeOption
- func WithShutdownTimeout(d time.Duration) SubscribeOption
- func WithSubscribeApp(app string) SubscribeOption
- func WithSubscribeNamespace(ns string) SubscribeOption
- func WithVisibilityTimeout(d time.Duration) SubscribeOption
- type SubscribeOptionFunc
- type Subscriber
- type Timer
Constants ¶
const ( // DefaultAppName matches pubsub.DefaultAppName so apps can share namespacing. DefaultAppName = "app" // DefaultNamespace matches pubsub.DefaultNamespace. DefaultNamespace = "default" // DefaultMaxRetries is intentionally conservative compared to libraries // like riverqueue (25). Most workloads should override this per-Subscribe. DefaultMaxRetries = 3 // DefaultPollInterval is the fallback wake-up cadence for backends that // poll (sqlite, redis idle, pgx fallback). Push-based wake-ups (NOTIFY, // BLPOP, Cond) preempt this interval where supported. DefaultPollInterval = time.Second // DefaultVisibilityTimeout is how long a claimed job stays hidden from // other workers before becoming re-claimable. DefaultVisibilityTimeout = 30 * time.Second // DefaultShutdownTimeout is how long Consumer.Close waits for in-flight // handlers before returning. DefaultShutdownTimeout = 30 * time.Second )
Variables ¶
var ErrSkipRetry = errors.New("queue: skip retry")
ErrSkipRetry when returned (or wrapped) from a Handler, signals that the job is permanently failed and should be routed to the dead letter topic immediately, regardless of remaining retry budget.
Functions ¶
func DeadLetterTopic ¶
DeadLetterTopic returns the default dead letter topic name for a given already-formatted topic. A dead letter topic is just another queue topic; subscribe to it to inspect, alert on, or reprocess failed jobs.
func DefaultBackoff ¶
DefaultBackoff implements exponential backoff (1s base, 5 min cap) with uniform ±25% jitter. attempt overflow is handled by clamping to the cap.
func FormatTopic ¶
FormatTopic formats a topic name as "<app>.<namespace>.<topic>". This matches the pubsub package convention so an application can share namespace configuration between pubsub and queue.
func ResolveSubscribeConfig ¶
func ResolveSubscribeConfig(cfg *SubscribeConfig, formattedTopic string)
ResolveSubscribeConfig fills in package defaults for any zero-valued field in cfg. Backend implementations call this once per Subscribe call after applying user options.
Types ¶
type BackoffFunc ¶
BackoffFunc returns the delay before the given retry attempt. attempt is 1 after the first failure, 2 after the second, and so on.
type Clock ¶
Clock abstracts time for deterministic testing. The default implementation uses the real time package; queuetest provides a controllable FakeClock.
Backends accept a Clock via the (unexported, internal) WithClock option in their respective sub-packages — exported only inside this module via internal helper functions to keep the public surface small.
type Consumer ¶
type Consumer interface {
Close() error
}
Consumer is the lifetime handle for a subscription. Close stops claiming new jobs and waits for in-flight handlers up to the configured shutdown timeout.
type EnqueueConfig ¶
type EnqueueConfig struct {
App string
Namespace string
Priority Priority
// RunAt is the absolute time at which the job becomes eligible. Zero
// means "now". WithDelay and WithRunAt both populate this field; WithRunAt
// always wins (last-wins, but RunAt is also "stronger" semantically).
RunAt time.Time
}
EnqueueConfig is the resolved configuration for a single Enqueue call. Backends populate the App, Namespace, and Priority defaults from their own service-level config, then apply EnqueueOptions to overlay caller intent.
type EnqueueOption ¶
type EnqueueOption interface {
Apply(*EnqueueConfig)
}
EnqueueOption configures a single Enqueue call.
func WithDelay ¶
func WithDelay(d time.Duration) EnqueueOption
WithDelay schedules the job to run no earlier than now+d.
func WithEnqueueApp ¶
func WithEnqueueApp(app string) EnqueueOption
WithEnqueueApp overrides the topic app prefix for this enqueue only.
func WithEnqueueNamespace ¶
func WithEnqueueNamespace(ns string) EnqueueOption
WithEnqueueNamespace overrides the topic namespace for this enqueue only.
func WithPriority ¶
func WithPriority(p Priority) EnqueueOption
WithPriority assigns the job's priority.
func WithRunAt ¶
func WithRunAt(t time.Time) EnqueueOption
WithRunAt schedules the job to run no earlier than t. Overrides WithDelay.
type EnqueueOptionFunc ¶
type EnqueueOptionFunc func(*EnqueueConfig)
EnqueueOptionFunc adapts a function to EnqueueOption.
func (EnqueueOptionFunc) Apply ¶
func (f EnqueueOptionFunc) Apply(cfg *EnqueueConfig)
Apply calls f(cfg).
type Enqueuer ¶
type Enqueuer interface {
Enqueue(ctx context.Context, topic string, payload []byte, opts ...EnqueueOption) error
}
Enqueuer enqueues jobs onto a topic.
type Handler ¶
Handler processes a single job. Returning nil acknowledges the job and the backend removes it from the queue. Returning a non-nil error schedules a retry according to the configured backoff and max retry count, unless the error matches ErrSkipRetry, in which case the job is routed straight to the dead letter topic on this attempt.
type Job ¶
type Job struct {
// ID is the backend-assigned unique identifier (UUID for durable backends,
// monotonic for inmem). Always non-empty when delivered to a Handler.
ID string
// Topic is the fully formatted topic name as it appears in the backend
// (i.e. after Format).
Topic string
// Payload is the opaque body supplied to Enqueue.
Payload []byte
// Attempt is 1 on first delivery, 2 on the first retry, and so on.
Attempt int
// EnqueuedAt is the time the job was first enqueued (preserved across
// retries).
EnqueuedAt time.Time
}
Job is the unit of work delivered to a Handler.
type Priority ¶
type Priority int8
Priority controls relative ordering of jobs in the same topic. Higher priority jobs are delivered before lower priority jobs once they are eligible to run.
type QueueConfig ¶
QueueConfig is the service-level configuration shared by Enqueue and Subscribe.
type QueueOption ¶
type QueueOption interface {
Apply(*QueueConfig)
}
QueueOption configures a backend constructor.
func WithNamespace ¶
func WithNamespace(ns string) QueueOption
WithNamespace sets the service-level namespace.
type QueueOptionFunc ¶
type QueueOptionFunc func(*QueueConfig)
QueueOptionFunc adapts a function to QueueOption.
type Service ¶
type Service interface {
Enqueuer
Subscriber
// Close releases backend resources. Safe to call once; subsequent calls
// return nil.
Close(ctx context.Context) error
}
Service is the union of Enqueuer and Subscriber plus a backend Close. Backend implementations expose this as their public type.
type SubscribeConfig ¶
type SubscribeConfig struct {
App string
Namespace string
Concurrency int
MaxRetries int
PollInterval time.Duration
VisibilityTimeout time.Duration
ShutdownTimeout time.Duration
// DeadLetterTopic is the topic that exhausted jobs are routed to. Zero
// value means "<topic>.dead".
DeadLetterTopic string
// Backoff returns the delay before the given retry attempt. Zero value
// means DefaultBackoff.
Backoff BackoffFunc
}
SubscribeConfig is the resolved configuration for a single Subscribe call.
type SubscribeOption ¶
type SubscribeOption interface {
Apply(*SubscribeConfig)
}
SubscribeOption configures a single Subscribe call.
func WithBackoff ¶
func WithBackoff(b BackoffFunc) SubscribeOption
WithBackoff overrides the default backoff function.
func WithConcurrency ¶
func WithConcurrency(n int) SubscribeOption
WithConcurrency sets the number of worker goroutines for this subscription. Default 1. Values < 1 are clamped to 1.
func WithDeadLetterTopic ¶
func WithDeadLetterTopic(topic string) SubscribeOption
WithDeadLetterTopic sets a custom dead letter topic. Default "<original-topic>.dead".
func WithMaxRetries ¶
func WithMaxRetries(n int) SubscribeOption
WithMaxRetries sets the maximum number of retries before routing to the dead letter topic. -1 means infinite retries. Default DefaultMaxRetries.
func WithPollInterval ¶
func WithPollInterval(d time.Duration) SubscribeOption
WithPollInterval sets how often a worker re-polls the backend when idle. Push-based wake-ups (NOTIFY, BLPOP, Cond) preempt this interval where supported.
func WithShutdownTimeout ¶
func WithShutdownTimeout(d time.Duration) SubscribeOption
WithShutdownTimeout sets how long Consumer.Close waits for in-flight handlers before returning.
func WithSubscribeApp ¶
func WithSubscribeApp(app string) SubscribeOption
WithSubscribeApp overrides the topic app prefix for this subscription.
func WithSubscribeNamespace ¶
func WithSubscribeNamespace(ns string) SubscribeOption
WithSubscribeNamespace overrides the topic namespace for this subscription.
func WithVisibilityTimeout ¶
func WithVisibilityTimeout(d time.Duration) SubscribeOption
WithVisibilityTimeout sets how long a claimed job stays hidden from other workers. Handlers exceeding this timeout will see their job re-claimed.
type SubscribeOptionFunc ¶
type SubscribeOptionFunc func(*SubscribeConfig)
SubscribeOptionFunc adapts a function to SubscribeOption.
func (SubscribeOptionFunc) Apply ¶
func (f SubscribeOptionFunc) Apply(cfg *SubscribeConfig)
Apply calls f(cfg).
type Subscriber ¶
type Subscriber interface {
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Consumer, error)
}
Subscriber registers a Handler against a topic. The returned Consumer controls the lifetime of the worker(s).
Directories
¶
| Path | Synopsis |
|---|---|
|
Package inmem provides a non-durable, in-process queue.Service.
|
Package inmem provides a non-durable, in-process queue.Service. |
|
Package pgx provides a durable queue.Service backed by PostgreSQL using jackc/pgx/v5.
|
Package pgx provides a durable queue.Service backed by PostgreSQL using jackc/pgx/v5. |
|
Package queuetest provides a backend-agnostic conformance test suite for queue.Service implementations and a deterministic FakeClock for use in backend-specific tests.
|
Package queuetest provides a backend-agnostic conformance test suite for queue.Service implementations and a deterministic FakeClock for use in backend-specific tests. |
|
Package redis provides a durable queue.Service backed by Redis.
|
Package redis provides a durable queue.Service backed by Redis. |
|
Package sqlite provides a durable, embedded queue.Service backed by SQLite.
|
Package sqlite provides a durable, embedded queue.Service backed by SQLite. |