asynqpg

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2026 License: MIT Imports: 12 Imported by: 1

README

asynqpg

Distributed task queue for Go, backed by PostgreSQL.

Tests Coverage Go Reference Website

Consumers pull tasks concurrently without contention – Postgres hands each worker its own batch and skips rows already claimed by others, so throughput scales with workers. Producers can enqueue inside the same transaction as your business logic, so a job appears if and only if your write commits.

Contents

Features

  • Postgres-native – tasks stored in a single table; no Redis or external broker required
  • Safe concurrent processing – Postgres hands each worker its own batch and skips rows already claimed by others, so multiple consumers never touch the same task
  • Transactional enqueue – pass *sqlx.Tx to EnqueueTx so jobs commit atomically with your business logic
  • Flexible retries – exponential or constant backoff, snooze without consuming attempts, skip-retry for permanent errors
  • Delayed & scheduled tasksWithDelay or direct ProcessAt for future processing
  • Idempotency tokensEnqueueMany deduplicates by (type, idempotency_token) at the DB layer
  • Per-type worker pools – independent concurrency, timeout, and middleware per task type
  • Leader-elected maintenance – a single node holds the lease for stuck-task rescue and cleanup
  • Web dashboard – embedded React SPA with Overview, Tasks, Workers, and Maintenance pages; ⌘K palette and j/k navigation
  • Pluggable auth – Basic Auth out of the box; OAuth providers (GitHub included)
  • OpenTelemetry – built-in counters, histograms, and distributed tracing for all operations

Installation

go get github.com/yakser/asynqpg

Requires Go 1.25+ and PostgreSQL 14+.

Database Setup

Apply migrations to create the asynqpg_tasks table:

make up         # start PostgreSQL in Docker
make migrate    # apply migrations

Or use testcontainers-go in tests – no manual setup needed.

Quick Start

Producer
package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
    "github.com/yakser/asynqpg"
    "github.com/yakser/asynqpg/producer"
)

func main() {
    db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }

    p, err := producer.New(producer.Config{Pool: db})
    if err != nil {
        log.Fatal(err)
    }

    payload, _ := json.Marshal(map[string]string{"to": "user@example.com", "subject": "Hello"})

    _, err = p.Enqueue(context.Background(), asynqpg.NewTask("email:send", payload,
        asynqpg.WithMaxRetry(5),
        asynqpg.WithDelay(10*time.Second),
    ))
    if err != nil {
        log.Fatal(err)
    }
}
Consumer
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
    "github.com/yakser/asynqpg"
    "github.com/yakser/asynqpg/consumer"
)

func main() {
    db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }

    c, err := consumer.New(consumer.Config{Pool: db})
    if err != nil {
        log.Fatal(err)
    }

    if err := c.RegisterTaskHandler("email:send",
        consumer.TaskHandlerFunc(func(ctx context.Context, task *asynqpg.TaskInfo) error {
            fmt.Printf("Processing task %d: %s\n", task.ID, task.Type)
            // process task...
            return nil
        }),
        consumer.WithWorkersCount(5),
        consumer.WithTimeout(30*time.Second),
    ); err != nil {
        log.Fatal(err)
    }

    if err := c.Start(); err != nil {
        log.Fatal(err)
    }

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    <-sigCh

    if err := c.Stop(); err != nil {
        log.Printf("shutdown error: %v", err)
    }
}

Architecture

Task Lifecycle
flowchart LR
    %% Styles
    classDef pending   fill:#f59e0b,stroke:#92400e,color:#000
    classDef running   fill:#3b82f6,stroke:#1e40af,color:#fff
    classDef completed fill:#22c55e,stroke:#166534,color:#000
    classDef failed    fill:#ef4444,stroke:#991b1b,color:#fff
    classDef cancelled fill:#6b7280,stroke:#374151,color:#fff
    classDef edge      fill:#f3f4f6,stroke:#d1d5db,color:#374151

    %% States
    P["Pending"]:::pending
    T["Running"]:::running

    subgraph Final
        C["Completed"]:::completed
        F["Failed"]:::failed
        Ca["Cancelled"]:::cancelled
    end

    %% Entry points
    Enqueue(["Enqueue"]):::edge --> P
    EnqueueDelayed(["Enqueue (with delay)"]):::edge --> P

    %% Normal processing
    P -- "fetched by consumer" --> T
    T -- "handler success" --> C
    T -- "handler error, attempts_left > 0" --> P
    T -- "handler error, attempts_left = 0" --> F

    %% Rescuer (stuck tasks)
    T -- "stuck, attempts_left > 0 (Rescuer)" --> P
    T -- "stuck, attempts_left = 0 (Rescuer)" --> F

    %% Manual actions via client API
    F  -- "manual retry" --> P
    Ca -- "manual retry" --> P
    P  -- "manual cancel" --> Ca
    F  -- "manual cancel" --> Ca

    %% Cleaner (maintenance, leader-only)
    C  -. "auto-delete (Cleaner)" .-> Deleted(["Deleted"]):::edge
    F  -. "auto-delete (Cleaner)" .-> Deleted
    Ca -. "auto-delete (Cleaner)" .-> Deleted

    %% Manual delete via client API
    P  -- "manual delete" --> Deleted
    C  -- "manual delete" --> Deleted
    F  -- "manual delete" --> Deleted
    Ca -- "manual delete" --> Deleted

Tasks are fetched using SELECT ... FOR NO KEY UPDATE SKIP LOCKED, enabling safe concurrent processing across multiple consumers. The blocked_till column serves as both a delay mechanism (delayed enqueue, retry backoff) and a distributed lock expiry – a task re-appears for fetching only after blocked_till passes.

Packages
Package Purpose
producer/ Enqueue tasks: Enqueue, EnqueueTx, EnqueueMany, EnqueueManyTx
consumer/ Fetch and process tasks with configurable worker pools
client/ Task inspection and management (get, list, cancel, retry, delete)
ui/ HTTP handler serving REST API + embedded React SPA

Producer

Create a producer with producer.New:

p, err := producer.New(producer.Config{
    Pool:            db,              // required: *sqlx.DB
    Logger:          slog.Default(),  // optional
    DefaultMaxRetry: 3,               // optional, default: 3
    MeterProvider:   mp,              // optional: OTel metrics
    TracerProvider:  tp,              // optional: OTel traces
})
Enqueue Methods
// Single task
id, err := p.Enqueue(ctx, task)

// Within an existing transaction (atomic with your business logic)
id, err = p.EnqueueTx(ctx, tx, task)

// Batch enqueue (skips duplicates by idempotency token)
ids, err := p.EnqueueMany(ctx, tasks)

// Batch within a transaction
ids, err := p.EnqueueManyTx(ctx, tx, tasks)
Task Options
asynqpg.NewTask("type", payload,
    asynqpg.WithMaxRetry(5),                       // max retry attempts
    asynqpg.WithDelay(10*time.Second),              // delay before first processing
    asynqpg.WithIdempotencyToken("unique-token"),   // deduplicate enqueues
)

// Schedule a task for a specific time
task := asynqpg.NewTask("report:generate", payload)
task.ProcessAt = time.Date(2026, 1, 1, 9, 0, 0, 0, time.UTC)

Consumer

Create a consumer with consumer.New:

c, err := consumer.New(consumer.Config{
    Pool:              db,               // required
    ClientID:          "worker-1",       // optional, for leader election
    RetryPolicy:       retryPolicy,      // optional, default: exponential backoff
    FetchInterval:     100*time.Millisecond,
    ShutdownTimeout:   30*time.Second,
    // Retention for completed/failed/cancelled tasks
    CompletedRetention: 24*time.Hour,
    FailedRetention:    7*24*time.Hour,
    CancelledRetention: 24*time.Hour,

    // DisableMaintenance: true,    // set to disable rescuer + cleaner
    // DisableBatchCompleter: true, // set to disable batch completions
})
Handler Registration
err := c.RegisterTaskHandler("email:send", handler,
    consumer.WithWorkersCount(10),          // goroutines for this task type
    consumer.WithMaxAttempts(5),            // override default
    consumer.WithTimeout(30*time.Second),   // per-task execution timeout
)

Implement consumer.TaskHandler or use the TaskHandlerFunc adapter:

type TaskHandler interface {
    Handle(ctx context.Context, task *asynqpg.TaskInfo) error
}
Retry Policies

Exponential backoff (default) – attempt^4 seconds with 10% jitter, capped at 24h:

&asynqpg.DefaultRetryPolicy{MaxRetryDelay: 24 * time.Hour}

Constant delay:

&asynqpg.ConstantRetryPolicy{Delay: 5 * time.Second}
SkipRetry

If a handler encounters a permanent error that should not be retried (e.g., invalid payload, business logic rejection), it can return asynqpg.ErrSkipRetry to immediately fail the task, skipping all remaining retry attempts:

func (h *EmailHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    var payload EmailPayload
    if err := json.Unmarshal(task.Payload, &payload); err != nil {
        // Invalid payload – retrying won't help
        return fmt.Errorf("bad payload: %w", asynqpg.ErrSkipRetry)
    }
    // process task...
    return nil
}

SkipRetry works with errors.Is, so it can be wrapped with additional context via fmt.Errorf("...: %w", asynqpg.ErrSkipRetry).

Snooze

Sometimes a handler needs to defer processing without counting it as a failure. TaskSnooze reschedules the task after a given duration without counting it as an attempt – attempts_left and attempts_elapsed remain unchanged:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    if !isExternalServiceReady() {
        // Try again in 30 seconds, doesn't count as a failed attempt
        return asynqpg.TaskSnooze(30 * time.Second)
    }
    // process task...
    return nil
}

If you want to reschedule and count it as a failed attempt (with error message stored), use TaskSnoozeWithError:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    if err := callExternalAPI(); err != nil {
        // Retry in 1 minute, counts as an attempt, error message is stored
        return fmt.Errorf("api unavailable: %w", asynqpg.TaskSnoozeWithError(1 * time.Minute))
    }
    // process task...
    return nil
}

Key differences:

TaskSnooze TaskSnoozeWithError
Counts as attempt No Yes
Stores error message No Yes
Respects max retries No (unlimited snoozes) Yes (fails when exhausted)
Use case External dependency not ready Transient error with custom delay

Both work with errors.As and can be wrapped with fmt.Errorf. Panics on negative duration; zero duration makes the task immediately available.

Task vs TaskInfo

The library uses two distinct structs to separate concerns:

  • Task (root package) -- the input struct for enqueueing. Contains only fields you set when creating a task: Type, Payload, Delay, MaxRetry, IdempotencyToken.
  • TaskInfo (root package) -- the runtime struct passed to handlers. Contains all database-assigned fields needed during processing:
func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    task.ID               // database ID
    task.Type             // task type
    task.Payload          // task payload
    task.AttemptsLeft     // remaining retry attempts
    task.AttemptsElapsed  // number of attempts already made
    task.CreatedAt        // when the task was first enqueued
    task.Messages         // error messages from previous failed attempts
    task.AttemptedAt      // when the current processing attempt started
    // ...
}

The client package has its own client.TaskInfo -- a full database read model returned by client.GetTask / client.ListTasks for inspection and management. It includes additional fields like Status, BlockedTill, UpdatedAt, and FinalizedAt.

Context Utilities

Task metadata is also available via context helpers, useful in middleware and utilities:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    id, _        := asynqpg.GetTaskID(ctx)         // database ID
    retry, _     := asynqpg.GetRetryCount(ctx)     // attempts already elapsed
    max, _       := asynqpg.GetMaxRetry(ctx)       // total max retry count
    createdAt, _ := asynqpg.GetCreatedAt(ctx)       // task creation time

    // Or get all metadata at once:
    meta, ok := asynqpg.GetTaskMetadata(ctx)
    // meta.ID, meta.RetryCount, meta.MaxRetry, meta.CreatedAt
    // ...
}

For testing handlers, use asynqpg.WithTaskMetadata to create a context with metadata:

ctx := asynqpg.WithTaskMetadata(context.Background(), asynqpg.TaskMetadata{
    ID: 42, RetryCount: 0, MaxRetry: 3, CreatedAt: time.Now(),
})
err := handler.Handle(ctx, task)
Middleware

The consumer supports composable middleware for cross-cutting concerns. Middleware wraps task handlers using the func(TaskHandler) TaskHandler pattern, similar to net/http middleware.

Global middleware applies to all task types:

c, _ := consumer.New(config)

_ = c.Use(func(next consumer.TaskHandler) consumer.TaskHandler {
    return consumer.TaskHandlerFunc(func(ctx context.Context, task *asynqpg.TaskInfo) error {
        slog.Info("processing task", "type", task.Type, "id", task.ID)
        err := next.Handle(ctx, task)
        slog.Info("task done", "type", task.Type, "id", task.ID, "error", err)
        return err
    })
})

Per-task-type middleware applies only to a specific handler:

c.RegisterTaskHandler("email:send", emailHandler,
    consumer.WithMiddleware(rateLimitMiddleware),
    consumer.WithWorkersCount(5),
)

Execution order: global middleware (outermost, first registered runs first) wraps per-task middleware, which wraps the handler. A middleware can short-circuit by returning an error without calling next.Handle.

Lifecycle
c.Start()               // start processing
c.Stop()                 // graceful shutdown (uses configured ShutdownTimeout)
c.Shutdown(timeout)      // graceful shutdown with custom timeout

Client

Inspect and manage tasks:

cl, err := client.New(client.Config{Pool: db})

// Get a single task
info, err := cl.GetTask(ctx, taskID)

// List tasks with filtering
result, err := cl.ListTasks(ctx, client.NewListParams().
    States(asynqpg.TaskStatusFailed, asynqpg.TaskStatusPending).
    Types("email:send").
    Limit(50).
    OrderBy(client.OrderByCreatedAt, client.SortDesc),
)
// result.Tasks, result.Total

// Manage tasks
_, err = cl.CancelTask(ctx, id)   // pending/failed → cancelled
_, err = cl.RetryTask(ctx, id)    // failed/cancelled → pending
_, err = cl.DeleteTask(ctx, id)   // remove from database

All methods have *Tx variants for transactional use.

Web UI

Mount the dashboard as an HTTP handler:

handler, err := ui.NewHandler(ui.HandlerOpts{
    Pool:   db,
    Prefix: "/asynqpg",
})
http.Handle("/asynqpg/", handler)

The dashboard is a single-page React app embedded in the Go binary via //go:embed. It ships as four operator pages plus a profile/appearance page, with a command palette (⌘K), keyboard navigation (j/k to move, x to select), and light/dark themes that follow the system preference.

Overview

Cluster snapshot at a glance: KPIs by status, per-task-type breakdown, current leader, and lease TTL. Auto-refreshes every 5 seconds.

Overview

Tasks

Live task list with saved views (All, Pending, Running, Failed, Needs retry, Dead-letter), filters by type / status / idempotency-token presence, full-text search by id / type / token, bulk retry / cancel / delete, and pagination. Clicking a row opens an inline drawer with tabs for payload, attempt history with error traces, timing breakdown, related tasks, the raw DB row, and a curl snippet to reproduce the call.

Tasks list

Task drawer

Failed tasks

Workers

Shows the currently elected leader (with lease remaining + auto-renew countdown) and a live snapshot of every task in running status across the cluster, including current attempt and elapsed time.

Workers

Maintenance

Status mix at a glance, the most recent failures, and a description of the leader-elected background jobs (Rescuer, Cleaner, Leader election) that run only on the elected leader.

Maintenance

Authentication

Basic Auth:

ui.HandlerOpts{
    BasicAuth: &ui.BasicAuth{Username: "admin", Password: "secret"},
}

OAuth providers (e.g., GitHub):

ui.HandlerOpts{
    AuthProviders: []ui.AuthProvider{githubProvider},
    SecureCookies: true,    // for HTTPS
    SessionMaxAge: 24*time.Hour,
}

Implement the ui.AuthProvider interface to add any OAuth/SSO provider. Multiple providers can be configured in parallel — the login screen shows one button per provider. See examples/demo/github_provider.go for a complete GitHub OAuth implementation.

GitHub OAuth login

User profile

Observability

All public components accept optional MeterProvider and TracerProvider. When nil, the global OpenTelemetry provider is used.

Metrics
Metric Type Description
asynqpg.tasks.enqueued Counter Tasks enqueued
asynqpg.tasks.processed Counter Tasks finished processing
asynqpg.tasks.errors Counter Processing or enqueue errors
asynqpg.task.duration Histogram Handler execution duration (seconds)
asynqpg.task.enqueue_duration Histogram Enqueue latency (seconds)
asynqpg.tasks.in_flight UpDownCounter Currently executing tasks

All metrics are tagged with task_type and status attributes.

Tracing

Spans are created for all enqueue and processing operations with proper SpanKind (Producer/Consumer/Client).

Grafana Dashboard

A pre-built Grafana dashboard is included at deploy/grafana/dashboards/asynqpg-overview.json. It is provisioned automatically when you run make demo-up.

The dashboard refreshes every 10 seconds and includes:

  • Stat panels – total tasks enqueued, processed, errors, and currently in-flight
  • Enqueue Rate – tasks enqueued per second over time
  • Processing Rate by Status – throughput broken down by completed / failed / cancelled
  • Task Duration Quantiles – p50/p95/p99 handler execution latency
  • Enqueue Duration Quantiles – p50/p95/p99 enqueue latency
  • Error Rate by Type – error count per task type over time
  • Error Ratio – fraction of processed tasks that errored
  • In-Flight Over Time – concurrently executing tasks over time

All panels support a Task Type variable filter to drill down into a specific task type.

Observability Stack

The demo includes a full observability stack (Jaeger, Prometheus, Grafana, OTel Collector) in deploy/:

make demo-up   # start PostgreSQL + observability stack
  • Jaegerhttp://localhost:16686
  • Prometheushttp://localhost:9090
  • Grafanahttp://localhost:3000

Grafana dashboard

Jaeger traces

Jaeger error trace

Demo

An interactive TUI demo with producers, consumers, web UI, and observability.

Quick Start
make demo       # start infra, migrate, run demo
make demo-down  # stop all services

Or run steps individually:

make demo-up    # start PostgreSQL + observability stack
make migrate    # apply migrations
make demo-run   # run the demo app
CLI Flags

Pass flags via ARGS:

make demo-run ARGS="--tasks 50 --no-auto --log-level debug"
Flag Default Description
--tasks, -n 100 Number of initial tasks to seed
--no-auto false Disable automatic task generation
--no-logs false Hide the log viewport
--log-level, -l info Log level: debug, info, warn, error
TUI Commands
Command Description
enqueue <type> [N] Enqueue N tasks (types: email, notification, report)
auto on|off Toggle automatic task generation
stats Show processing statistics
clear Clear log viewport
help Show available commands
quit Graceful shutdown

Scroll logs with mouse wheel, arrow keys, or PgUp/PgDown. Press Tab to switch focus between input and log viewport.

Authentication

The demo supports optional authentication for the web UI via environment variables. Create a .env file in examples/demo/ or pass variables directly:

Basic Auth:

BASIC_AUTH_USER=admin BASIC_AUTH_PASS=secret make demo-run

GitHub OAuth:

GITHUB_CLIENT_ID=your_id GITHUB_CLIENT_SECRET=your_secret make demo-run

Without these variables, the UI runs without authentication.

Performance

The library includes Go benchmarks covering SQL-level operations, batch completion, producer throughput, and consumer processing.

make bench   # Run benchmarks (uses testcontainers, requires Docker)

Testing

make test              # unit tests (go test -race -count=1 ./...)
make test-integration  # integration tests (uses testcontainers, no manual DB needed)
make test-all          # both
make bench             # benchmarks (integration, requires Docker)
make lint              # golangci-lint

Contributing

Contributions are welcome, including bug reports, feature requests, documentation improvements, and code changes. See CONTRIBUTING.md for local setup, contribution workflow, testing expectations, and pull request guidelines.

Support

TODO

  • Move testutils to a separate module to avoid pulling testcontainers-go into consumer projects

Project Status

Under active development. The API may change before v1.0. Bug reports and feature requests are welcome.

License

MIT

Documentation

Index

Examples

Constants

View Source
const (
	StatusCompleted = "completed"
	StatusFailed    = "failed"
	StatusRetried   = "retried"
	StatusSnoozed   = "snoozed"
)

Bounded attribute values for status.

View Source
const (
	ErrorTypeHandler = "handler_error"
	ErrorTypeDB      = "db_error"
)

Bounded attribute values for error_type.

Variables

View Source
var (
	AttrTaskType  = attribute.Key("task_type")
	AttrStatus    = attribute.Key("status")
	AttrErrorType = attribute.Key("error_type")
)

Attribute keys used across metrics and traces.

View Source
var ErrSkipRetry = errors.New("skip retry for the task")

ErrSkipRetry is a sentinel error that handlers can return to indicate the task should not be retried and should immediately be marked as failed. This is useful for non-retryable errors such as invalid payloads or business logic rejections.

Usage:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    if invalidPayload(task.Payload) {
        return fmt.Errorf("bad payload: %w", asynqpg.ErrSkipRetry)
    }
    // ...
}

Functions

func GetCreatedAt

func GetCreatedAt(ctx context.Context) (time.Time, bool)

GetCreatedAt extracts the task creation time from the context. Returns (zero time, false) if the context does not contain task metadata.

func GetMaxRetry

func GetMaxRetry(ctx context.Context) (int, bool)

GetMaxRetry extracts the total max retry count from the context. Returns (0, false) if the context does not contain task metadata.

func GetRetryCount

func GetRetryCount(ctx context.Context) (int, bool)

GetRetryCount extracts the number of attempts already elapsed from the context. Returns (0, false) if the context does not contain task metadata.

func GetTaskID

func GetTaskID(ctx context.Context) (int64, bool)

GetTaskID extracts the task's database ID from the context. Returns (0, false) if the context does not contain task metadata.

Example
package main

import (
	"context"
	"fmt"

	"github.com/yakser/asynqpg"
)

func main() {
	ctx := asynqpg.WithTaskMetadata(context.Background(), asynqpg.TaskMetadata{
		ID: 99,
	})

	id, ok := asynqpg.GetTaskID(ctx)
	fmt.Println(ok, id)
}
Output:
true 99

func NewTracer

func NewTracer(tp trace.TracerProvider) trace.Tracer

NewTracer creates a tracer from the given TracerProvider. If tp is nil, the global OTel TracerProvider is used.

func TaskSnooze

func TaskSnooze(d time.Duration) error

TaskSnooze returns an error that reschedules the task after the given duration without counting it as an attempt. The task's attempts_left and attempts_elapsed remain unchanged. Panics if duration < 0.

Usage:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    if !isReady() {
        return asynqpg.TaskSnooze(30 * time.Second)
    }
    // ...
}
Example
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	err := asynqpg.TaskSnooze(30 * time.Second)

	var snoozeErr *asynqpg.TaskSnoozeError
	fmt.Println(errors.As(err, &snoozeErr))
	fmt.Println(snoozeErr.Duration)
}
Output:
true
30s

func TaskSnoozeWithError

func TaskSnoozeWithError(d time.Duration) error

TaskSnoozeWithError returns an error that reschedules the task after the given duration, counting it as a failed attempt. The error message is stored and attempts_left is decremented. If no attempts are left, the task is failed instead of snoozed. Panics if duration < 0.

Usage:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    if err := callExternalAPI(); err != nil {
        return fmt.Errorf("api unavailable: %w", asynqpg.TaskSnoozeWithError(1 * time.Minute))
    }
    // ...
}
Example
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	err := fmt.Errorf("api unavailable: %w", asynqpg.TaskSnoozeWithError(1*time.Minute))

	var snoozeErr *asynqpg.TaskSnoozeWithErrError
	fmt.Println(errors.As(err, &snoozeErr))
	fmt.Println(snoozeErr.Duration)
}
Output:
true
1m0s

func WithTaskMetadata

func WithTaskMetadata(ctx context.Context, meta TaskMetadata) context.Context

WithTaskMetadata returns a new context with task metadata injected. Used internally by the consumer when invoking handlers. Can also be used in tests to create contexts for handler testing.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	ctx := asynqpg.WithTaskMetadata(context.Background(), asynqpg.TaskMetadata{
		ID:         42,
		RetryCount: 0,
		MaxRetry:   3,
		CreatedAt:  time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC),
	})

	meta, ok := asynqpg.GetTaskMetadata(ctx)
	fmt.Println(ok, meta.ID)
}
Output:
true 42

Types

type ConstantRetryPolicy

type ConstantRetryPolicy struct {
	Delay time.Duration
}

ConstantRetryPolicy always returns the same delay. Useful for testing or specific use cases.

Example
package main

import (
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	policy := &asynqpg.ConstantRetryPolicy{Delay: 5 * time.Second}

	delay := policy.NextRetry(1)
	fmt.Println(delay)
}
Output:
5s

func (*ConstantRetryPolicy) NextRetry

func (p *ConstantRetryPolicy) NextRetry(_ int) time.Duration

NextRetry returns the constant delay.

type DefaultRetryPolicy

type DefaultRetryPolicy struct {
	// MaxRetryDelay caps the maximum delay between retries.
	// Default: 24 hours.
	MaxRetryDelay time.Duration
}

DefaultRetryPolicy implements exponential backoff with jitter. Formula: attempt^4 seconds with ±10% jitter. Examples: 1s, 16s, 81s, 256s, 625s, ...

Example
package main

import (
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	policy := &asynqpg.DefaultRetryPolicy{MaxRetryDelay: 24 * time.Hour}

	delay := policy.NextRetry(1)
	fmt.Println(delay > 0)
}
Output:
true

func (*DefaultRetryPolicy) NextRetry

func (p *DefaultRetryPolicy) NextRetry(attempt int) time.Duration

NextRetry calculates the next retry delay using exponential backoff.

type ErrorHandler

type ErrorHandler interface {
	HandleError(ctx context.Context, task *TaskInfo, err error)
}

ErrorHandler is called when a task fails permanently (exhausted all retries) or encounters an unrecoverable error (ErrSkipRetry, panic). Implementations can use this for alerting, dead letter queue routing, or external error tracking (e.g., Sentry, PagerDuty).

type ErrorHandlerFunc

type ErrorHandlerFunc func(ctx context.Context, task *TaskInfo, err error)

ErrorHandlerFunc is an adapter to allow ordinary functions to be used as ErrorHandler.

func (ErrorHandlerFunc) HandleError

func (f ErrorHandlerFunc) HandleError(ctx context.Context, task *TaskInfo, err error)

HandleError calls f(ctx, task, err).

type Metrics

type Metrics struct {
	TasksEnqueued   metric.Int64Counter
	TasksProcessed  metric.Int64Counter
	TasksErrors     metric.Int64Counter
	TaskDuration    metric.Float64Histogram
	EnqueueDuration metric.Float64Histogram
	TasksInFlight   metric.Int64UpDownCounter
}

Metrics holds all OpenTelemetry metric instruments for asynqpg. When no MeterProvider is configured, all instruments are noop (zero overhead).

func NewMetrics

func NewMetrics(mp metric.MeterProvider) (*Metrics, error)

NewMetrics creates metric instruments from the given MeterProvider. If mp is nil, the global OTel MeterProvider is used.

type Pool

type Pool interface {
	Querier
	PingContext(ctx context.Context) error
}

Pool represents a database connection pool. *sqlx.DB satisfies this interface natively.

func WrapStdDB

func WrapStdDB(db *sql.DB, driverName string) Pool

WrapStdDB wraps a standard *sql.DB into a Pool. It uses sqlx internally for struct scanning (SelectContext, GetContext). The driverName parameter should match the driver used to open the database (e.g., "postgres", "pgx").

type Querier

type Querier interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	SelectContext(ctx context.Context, dest any, query string, args ...any) error
	GetContext(ctx context.Context, dest any, query string, args ...any) error
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

Querier can execute queries and scan results into structs. Both database connection pools and transactions satisfy this interface. *sqlx.DB and *sqlx.Tx implement it natively.

type RetryPolicy

type RetryPolicy interface {
	// NextRetry returns the duration to wait before the next retry attempt.
	// attempt is the number of the upcoming attempt (1-indexed).
	NextRetry(attempt int) time.Duration
}

RetryPolicy determines when a failed task should be retried.

type Task

type Task struct {
	Type             string
	Payload          []byte
	IdempotencyToken *string
	Delay            time.Duration
	MaxRetry         *int
	ProcessAt        time.Time
}

Task represents a unit of work to be enqueued. Use NewTask to create a task for enqueueing via Producer. Handlers receive *TaskInfo which contains runtime fields like ID, attempt info, etc.

func NewTask

func NewTask(taskType string, payload []byte, opts ...TaskOption) *Task

NewTask creates a new task with the given type and payload.

Example
package main

import (
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	task := asynqpg.NewTask("email:send", []byte(`{"to":"user@example.com"}`),
		asynqpg.WithMaxRetry(5),
		asynqpg.WithDelay(10*time.Second),
		asynqpg.WithIdempotencyToken("unique-token"),
	)

	fmt.Println(task.Type)
}
Output:
email:send
Example (ProcessAt)
package main

import (
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	task := asynqpg.NewTask("report:generate", []byte(`{"id":1}`))
	task.ProcessAt = time.Date(2026, 1, 1, 9, 0, 0, 0, time.UTC)

	fmt.Println(task.ProcessAt.Format(time.RFC3339))
}
Output:
2026-01-01T09:00:00Z

type TaskInfo

type TaskInfo struct {
	ID               int64
	Type             string
	Payload          []byte
	IdempotencyToken *string
	AttemptsLeft     int
	AttemptsElapsed  int
	CreatedAt        time.Time
	AttemptedAt      *time.Time
	Messages         []string
}

TaskInfo represents a task fetched from the database for processing. Handlers receive *TaskInfo with all runtime information. Unlike Task (used for enqueueing), TaskInfo includes database-assigned fields.

type TaskMetadata

type TaskMetadata struct {
	ID         int64
	RetryCount int
	MaxRetry   int
	CreatedAt  time.Time
}

TaskMetadata holds metadata about the task currently being processed. It is available inside handler contexts via GetTaskMetadata.

func GetTaskMetadata

func GetTaskMetadata(ctx context.Context) (TaskMetadata, bool)

GetTaskMetadata extracts the task metadata from the context. Returns the metadata and true if present, or zero value and false otherwise.

type TaskOption

type TaskOption func(*Task)

TaskOption configures a Task.

func WithDelay

func WithDelay(d time.Duration) TaskOption

WithDelay sets the delay before the task becomes available for processing.

func WithIdempotencyToken

func WithIdempotencyToken(token string) TaskOption

WithIdempotencyToken sets the idempotency token for the task.

func WithMaxRetry

func WithMaxRetry(n int) TaskOption

WithMaxRetry sets the maximum number of retries for the task.

type TaskSnoozeError

type TaskSnoozeError struct {
	Duration time.Duration
}

TaskSnoozeError is returned by TaskSnooze. Detected via errors.As. When a handler returns this error, the task is rescheduled after Duration without counting it as an attempt – attempts_left and attempts_elapsed remain unchanged.

func (*TaskSnoozeError) Error

func (e *TaskSnoozeError) Error() string

func (*TaskSnoozeError) Is

func (e *TaskSnoozeError) Is(target error) bool

type TaskSnoozeWithErrError

type TaskSnoozeWithErrError struct {
	Duration time.Duration
}

TaskSnoozeWithErrError is returned by TaskSnoozeWithError. Detected via errors.As. When a handler returns this error, the task is rescheduled after Duration, counting it as a failed attempt – attempts_left is decremented and the error message is stored.

func (*TaskSnoozeWithErrError) Error

func (e *TaskSnoozeWithErrError) Error() string

func (*TaskSnoozeWithErrError) Is

func (e *TaskSnoozeWithErrError) Is(target error) bool

type TaskStatus

type TaskStatus string

TaskStatus represents the current status of a task in the database.

const (
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusCancelled TaskStatus = "cancelled"
)

func (TaskStatus) IsFinalized

func (s TaskStatus) IsFinalized() bool

IsFinalized returns true if the task is in a terminal state.

type Tx added in v0.1.3

type Tx interface {
	Querier
	Commit() error
	Rollback() error
}

Tx represents a database transaction. *sql.Tx and *sqlx.Tx satisfy this interface natively.

Directories

Path Synopsis
internal
Package testutils provides test helpers for asynqpg integration tests.
Package testutils provides test helpers for asynqpg integration tests.
ui module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL