vtq

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 6 Imported by: 0

README

vtq — Visibility Timeout Queue (SQLite)

Pure-Go, single-table job queue with SQS-style visibility timeout. One primitive, three patterns — just calibration.

         Publish ──► ┌──────────────┐
                     │  vtq_jobs    │
                     │              │  visible_at <= now()
         Claim  ◄─── │  id          │ ◄── consumer polls
                     │  payload     │
                     │  visible_at ─┤── invisible while held
                     │  attempts    │
                     └──────┬───────┘
                            │
              ┌─────────────┼─────────────┐
              ▼             ▼             ▼
           Ack           Nack         Timeout
         (delete)    (reappear now)  (reappear later)

Patterns

Configuration Result
1 row, N instances Leader election — one holder, automatic failover
N rows, N instances Work distribution (TDMA)
visibility < processing time under load Elastic overflow — slow instance loses rows, others pick up

Quick start

db, _ := sql.Open("sqlite", "app.db")

q := vtq.New(db, vtq.Options{
    Queue:      "emails",
    Visibility: 30 * time.Second,
})
q.EnsureTable(ctx)

// Producer
q.Publish(ctx, "msg-001", []byte(`{"to":"a@b.com"}`))

// Consumer (blocks until ctx cancelled)
q.Run(ctx, func(ctx context.Context, job *vtq.Job) error {
    return sendEmail(job.Payload)
    // return nil  → Ack (deleted)
    // return err  → Nack (reappears immediately)
})

Leader election

q := vtq.New(db, vtq.Options{
    Queue:      "leader",
    Visibility: 10 * time.Second,
})
q.EnsureTable(ctx)

// Seed the token once.
q.Publish(ctx, "leader-token", nil)

// Each instance tries to claim.
for {
    job, _ := q.Claim(ctx)
    if job != nil {
        // I am the leader — renew before timeout.
        go keepAlive(ctx, q, job.ID, 5*time.Second)
        doLeaderWork(ctx)
        break
    }
    time.Sleep(time.Second)
}

func keepAlive(ctx context.Context, q *vtq.Q, id string, interval time.Duration) {
    for {
        select {
        case <-ctx.Done():
            return
        case <-time.After(interval):
            q.Extend(ctx, id, 10*time.Second)
        }
    }
}

API

Function Description
New(db, Options) *Q Create a queue handle
EnsureTable(ctx) Create table + index if missing
Publish(ctx, id, payload) Insert a visible job
Claim(ctx) (*Job, error) Atomically pick the oldest visible job
Ack(ctx, id) Delete a processed job
Nack(ctx, id) Make a job visible immediately
Extend(ctx, id, duration) Push visibility timeout forward
Purge(ctx) Delete all jobs in the queue
Len(ctx) (int, error) Count total jobs (visible + invisible)
Run(ctx, Handler) Poll loop — claim, handle, ack/nack

Options

Field Default Description
Queue "" Logical queue name (multiple queues share one table)
Visibility 30s How long a claimed job stays invisible
PollInterval 1s Delay between claim attempts in Run
MaxAttempts 0 Max redeliveries before discard (0 = unlimited)
Logger slog.Default() Structured logger

Schema

CREATE TABLE IF NOT EXISTS vtq_jobs (
    id          TEXT PRIMARY KEY,
    queue       TEXT NOT NULL DEFAULT '',
    payload     BLOB,
    visible_at  INTEGER NOT NULL DEFAULT 0,
    created_at  INTEGER NOT NULL,
    attempts    INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_vtq_visible ON vtq_jobs (queue, visible_at);

Combining with dbsync for HA

┌──────────┐  vtq: claim leader-token  ┌──────────┐
│ Instance │ ◄────────────────────────► │  SQLite   │
│ A (leader)                            │ vtq_jobs  │
│          │ ──── dbsync snapshots ───► │          │
└──────────┘                            └──────────┘
                                              ▲
┌──────────┐  vtq: claim → nil (blocked)      │
│ Instance │ ◄────────────────────────────────┘
│ B (standby)   reads via dbsync replica
└──────────┘
     │
     └── If A crashes → visibility expires → B claims → becomes leader

Documentation

Overview

Package vtq implements a Visibility Timeout Queue backed by SQLite.

Rows in the queue are invisible to consumers for a configurable duration after being claimed. If the holder processes the row successfully it deletes (or acks) it. If the holder crashes or exceeds the timeout the row reappears automatically — another instance can claim it.

This single primitive covers three distributed patterns through calibration:

  • 1 row, N instances → leader election
  • N rows, N instances → work distribution (TDMA)
  • visibility < processing time under load → elastic overflow

The queue is pure SQLite — no external broker, no cloud dependency.

Expected schema (created automatically by EnsureTable):

CREATE TABLE IF NOT EXISTS vtq_jobs (
    id          TEXT PRIMARY KEY,
    queue       TEXT NOT NULL DEFAULT '',
    payload     BLOB,
    visible_at  INTEGER NOT NULL DEFAULT 0,  -- milliseconds since epoch
    created_at  INTEGER NOT NULL,             -- milliseconds since epoch
    attempts    INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_vtq_visible ON vtq_jobs (queue, visible_at);

Index

Constants

This section is empty.

Variables

View Source
var ErrNotHolder = errors.New("vtq: extend failed, caller is not the current holder")

ErrNotHolder is returned by Extend when the caller no longer holds the claim — either because the visibility timeout expired or another consumer re-claimed the job.

Functions

This section is empty.

Types

type Handler

type Handler func(ctx context.Context, job *Job) error

Handler processes a claimed job. Return nil to ack, non-nil to nack.

type Job

type Job struct {
	ID        string
	Queue     string
	Payload   []byte
	VisibleAt time.Time
	CreatedAt time.Time
	Attempts  int
}

Job is a row in the queue.

type Options

type Options struct {
	// Queue is the logical queue name. Multiple queues can coexist in the
	// same table. Default: "" (empty string — the default queue).
	Queue string
	// Visibility is how long a claimed job stays invisible. Default: 30s.
	Visibility time.Duration
	// PollInterval is the delay between claim attempts in the Run loop.
	// Default: 1s.
	PollInterval time.Duration
	// MaxAttempts limits how many times a job can be redelivered before
	// being discarded. 0 means unlimited. Default: 0.
	MaxAttempts int
	// Logger overrides the default slog logger.
	Logger *slog.Logger
}

Options configures queue behaviour.

type Q

type Q struct {
	// contains filtered or unexported fields
}

Q is the queue handle.

func New

func New(db *sql.DB, opts Options) *Q

New creates a queue handle. Call EnsureTable once at startup, then Publish and Claim (or Run) as needed.

func (*Q) Ack

func (q *Q) Ack(ctx context.Context, id string) error

Ack deletes a successfully processed job.

func (*Q) BatchClaim

func (q *Q) BatchClaim(ctx context.Context, n int) ([]*Job, error)

BatchClaim atomically claims up to n visible jobs. It returns an empty (non-nil) slice when no jobs are available.

func (*Q) Claim

func (q *Q) Claim(ctx context.Context) (*Job, error)

Claim atomically picks the oldest visible job, marks it invisible for the configured visibility duration, and returns it. Returns nil, nil if no job is available.

func (*Q) EnsureTable

func (q *Q) EnsureTable(ctx context.Context) error

EnsureTable creates the vtq_jobs table and index if they don't exist.

func (*Q) Extend

func (q *Q) Extend(ctx context.Context, job *Job, extra time.Duration) error

Extend pushes the visibility timeout forward for a job that needs more processing time (heartbeat pattern). It returns ErrNotHolder if the visibility timeout has already expired or another consumer re-claimed the job. On success it updates job.VisibleAt so subsequent Extend calls use the correct baseline.

func (*Q) Len

func (q *Q) Len(ctx context.Context) (int, error)

Len returns the total number of jobs (visible + invisible) in the queue.

func (*Q) Nack

func (q *Q) Nack(ctx context.Context, id string) error

Nack makes a job immediately visible again so another consumer can pick it up.

func (*Q) Publish

func (q *Q) Publish(ctx context.Context, id string, payload []byte) error

Publish inserts a job that is immediately visible.

func (*Q) Purge

func (q *Q) Purge(ctx context.Context) error

Purge deletes all jobs in the queue.

func (*Q) Run

func (q *Q) Run(ctx context.Context, handler Handler)

Run polls for visible jobs and calls handler for each one. It blocks until ctx is cancelled.

func (*Q) RunBatch

func (q *Q) RunBatch(ctx context.Context, batchSize, maxConcurrency int, handler Handler)

RunBatch polls in batches and processes jobs with bounded concurrency. It blocks until ctx is cancelled, draining in-flight handlers before returning.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL