Documentation
¶
Overview ¶
Package vtq implements a Visibility Timeout Queue backed by SQLite.
Rows in the queue are invisible to consumers for a configurable duration after being claimed. If the holder processes the row successfully it deletes (or acks) it. If the holder crashes or exceeds the timeout the row reappears automatically — another instance can claim it.
This single primitive covers three distributed patterns through calibration:
- 1 row, N instances → leader election
- N rows, N instances → work distribution (TDMA)
- visibility < processing time under load → elastic overflow
The queue is pure SQLite — no external broker, no cloud dependency.
Expected schema (created automatically by EnsureTable):
CREATE TABLE IF NOT EXISTS vtq_jobs (
id TEXT PRIMARY KEY,
queue TEXT NOT NULL DEFAULT '',
payload BLOB,
visible_at INTEGER NOT NULL DEFAULT 0, -- milliseconds since epoch
created_at INTEGER NOT NULL, -- milliseconds since epoch
attempts INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_vtq_visible ON vtq_jobs (queue, visible_at);
Index ¶
- Variables
- type Handler
- type Job
- type Options
- type Q
- func (q *Q) Ack(ctx context.Context, id string) error
- func (q *Q) BatchClaim(ctx context.Context, n int) ([]*Job, error)
- func (q *Q) Claim(ctx context.Context) (*Job, error)
- func (q *Q) EnsureTable(ctx context.Context) error
- func (q *Q) Extend(ctx context.Context, job *Job, extra time.Duration) error
- func (q *Q) Len(ctx context.Context) (int, error)
- func (q *Q) Nack(ctx context.Context, id string) error
- func (q *Q) Publish(ctx context.Context, id string, payload []byte) error
- func (q *Q) Purge(ctx context.Context) error
- func (q *Q) Run(ctx context.Context, handler Handler)
- func (q *Q) RunBatch(ctx context.Context, batchSize, maxConcurrency int, handler Handler)
Constants ¶
This section is empty.
Variables ¶
var ErrNotHolder = errors.New("vtq: extend failed, caller is not the current holder")
ErrNotHolder is returned by Extend when the caller no longer holds the claim — either because the visibility timeout expired or another consumer re-claimed the job.
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job struct {
ID string
Queue string
Payload []byte
VisibleAt time.Time
CreatedAt time.Time
Attempts int
}
Job is a row in the queue.
type Options ¶
type Options struct {
// Queue is the logical queue name. Multiple queues can coexist in the
// same table. Default: "" (empty string — the default queue).
Queue string
// Visibility is how long a claimed job stays invisible. Default: 30s.
Visibility time.Duration
// PollInterval is the delay between claim attempts in the Run loop.
// Default: 1s.
PollInterval time.Duration
// MaxAttempts limits how many times a job can be redelivered before
// being discarded. 0 means unlimited. Default: 0.
MaxAttempts int
// Logger overrides the default slog logger.
Logger *slog.Logger
}
Options configures queue behaviour.
type Q ¶
type Q struct {
// contains filtered or unexported fields
}
Q is the queue handle.
func New ¶
New creates a queue handle. Call EnsureTable once at startup, then Publish and Claim (or Run) as needed.
func (*Q) BatchClaim ¶
BatchClaim atomically claims up to n visible jobs. It returns an empty (non-nil) slice when no jobs are available.
func (*Q) Claim ¶
Claim atomically picks the oldest visible job, marks it invisible for the configured visibility duration, and returns it. Returns nil, nil if no job is available.
func (*Q) EnsureTable ¶
EnsureTable creates the vtq_jobs table and index if they don't exist.
func (*Q) Extend ¶
Extend pushes the visibility timeout forward for a job that needs more processing time (heartbeat pattern). It returns ErrNotHolder if the visibility timeout has already expired or another consumer re-claimed the job. On success it updates job.VisibleAt so subsequent Extend calls use the correct baseline.