asynqpg

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 12 Imported by: 0

README

asynqpg

Distributed task queue for Go, backed by PostgreSQL.

Tests Coverage Go Reference

Contents

Features

  • PostgreSQL-native – no Redis or external broker required
  • Concurrent processingFOR NO KEY UPDATE SKIP LOCKED for safe multi-consumer operation
  • Configurable retry – exponential backoff (default) or constant delay, per-task max retries
  • SkipRetry – sentinel error to immediately fail non-retryable tasks
  • Snooze – reschedule tasks with TaskSnooze (free) or TaskSnoozeWithError (counts as attempt)
  • Context utilities – extract task ID, retry count, and max retry from handler context
  • Delayed tasks – schedule tasks to run at a future time
  • Idempotent enqueue – deduplicate tasks via idempotency tokens
  • Batch enqueue – insert thousands of tasks efficiently with auto-chunking
  • Transactional enqueue – enqueue tasks within your existing database transaction
  • Per-type worker pools – independent concurrency and timeout settings per task type
  • Leader election – PostgreSQL advisory locks for single-leader maintenance
  • Automatic maintenance – rescues stuck tasks, cleans up old completed/failed tasks
  • Batch completion – batches DB writes for high-throughput workloads
  • Web dashboard – React SPA with task inspection, filtering, retry/cancel/delete
  • Auth – BasicAuth or OAuth (GitHub, etc.) for the web UI
  • OpenTelemetry – built-in metrics and distributed tracing

Installation

go get github.com/yakser/asynqpg

Requires Go 1.25+ and PostgreSQL 14+.

Database Setup

Apply migrations to create the asynqpg_tasks table:

make up         # start PostgreSQL in Docker
make migrate    # apply migrations

Or use testcontainers-go in tests – no manual setup needed.

Quick Start

Producer
package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
    "github.com/yakser/asynqpg"
    "github.com/yakser/asynqpg/producer"
)

func main() {
    db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }

    p, err := producer.New(producer.Config{Pool: db})
    if err != nil {
        log.Fatal(err)
    }

    payload, _ := json.Marshal(map[string]string{"to": "user@example.com", "subject": "Hello"})

    _, err = p.Enqueue(context.Background(), asynqpg.NewTask("email:send", payload,
        asynqpg.WithMaxRetry(5),
        asynqpg.WithDelay(10*time.Second),
    ))
    if err != nil {
        log.Fatal(err)
    }
}
Consumer
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
    "github.com/yakser/asynqpg"
    "github.com/yakser/asynqpg/consumer"
)

func main() {
    db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }

    c, err := consumer.New(consumer.Config{Pool: db})
    if err != nil {
        log.Fatal(err)
    }

    if err := c.RegisterTaskHandler("email:send",
        consumer.TaskHandlerFunc(func(ctx context.Context, task *asynqpg.TaskInfo) error {
            fmt.Printf("Processing task %d: %s\n", task.ID, task.Type)
            // process task...
            return nil
        }),
        consumer.WithWorkersCount(5),
        consumer.WithTimeout(30*time.Second),
    ); err != nil {
        log.Fatal(err)
    }

    if err := c.Start(); err != nil {
        log.Fatal(err)
    }

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    <-sigCh

    if err := c.Stop(); err != nil {
        log.Printf("shutdown error: %v", err)
    }
}

Architecture

Task Lifecycle
flowchart LR
    %% Styles
    classDef pending   fill:#f59e0b,stroke:#92400e,color:#000
    classDef running   fill:#3b82f6,stroke:#1e40af,color:#fff
    classDef completed fill:#22c55e,stroke:#166534,color:#000
    classDef failed    fill:#ef4444,stroke:#991b1b,color:#fff
    classDef cancelled fill:#6b7280,stroke:#374151,color:#fff
    classDef edge      fill:#f3f4f6,stroke:#d1d5db,color:#374151

    %% States
    P["Pending"]:::pending
    T["Running"]:::running

    subgraph Final
        C["Completed"]:::completed
        F["Failed"]:::failed
        Ca["Cancelled"]:::cancelled
    end

    %% Entry points
    Enqueue(["Enqueue"]):::edge --> P
    EnqueueDelayed(["Enqueue (with delay)"]):::edge --> P

    %% Normal processing
    P -- "fetched by consumer" --> T
    T -- "handler success" --> C
    T -- "handler error, attempts_left > 0" --> P
    T -- "handler error, attempts_left = 0" --> F

    %% Rescuer (stuck tasks)
    T -- "stuck, attempts_left > 0 (Rescuer)" --> P
    T -- "stuck, attempts_left = 0 (Rescuer)" --> F

    %% Manual actions via client API
    F  -- "manual retry" --> P
    Ca -- "manual retry" --> P
    P  -- "manual cancel" --> Ca
    F  -- "manual cancel" --> Ca

    %% Cleaner (maintenance, leader-only)
    C  -. "auto-delete (Cleaner)" .-> Deleted(["Deleted"]):::edge
    F  -. "auto-delete (Cleaner)" .-> Deleted
    Ca -. "auto-delete (Cleaner)" .-> Deleted

    %% Manual delete via client API
    P  -- "manual delete" --> Deleted
    C  -- "manual delete" --> Deleted
    F  -- "manual delete" --> Deleted
    Ca -- "manual delete" --> Deleted

Tasks are fetched using SELECT ... FOR NO KEY UPDATE SKIP LOCKED, enabling safe concurrent processing across multiple consumers. The blocked_till column serves as both a delay mechanism (delayed enqueue, retry backoff) and a distributed lock expiry – a task re-appears for fetching only after blocked_till passes.

Packages
Package Purpose
producer/ Enqueue tasks: Enqueue, EnqueueTx, EnqueueMany, EnqueueManyTx
consumer/ Fetch and process tasks with configurable worker pools
client/ Task inspection and management (get, list, cancel, retry, delete)
ui/ HTTP handler serving REST API + embedded React SPA

Producer

Create a producer with producer.New:

p, err := producer.New(producer.Config{
    Pool:            db,              // required: *sqlx.DB
    Logger:          slog.Default(),  // optional
    DefaultMaxRetry: 3,               // optional, default: 3
    MeterProvider:   mp,              // optional: OTel metrics
    TracerProvider:  tp,              // optional: OTel traces
})
Enqueue Methods
// Single task
id, err := p.Enqueue(ctx, task)

// Within an existing transaction (atomic with your business logic)
id, err = p.EnqueueTx(ctx, tx, task)

// Batch enqueue (auto-chunks at 5000, skips duplicates by idempotency token)
ids, err := p.EnqueueMany(ctx, tasks)

// Batch within a transaction
ids, err := p.EnqueueManyTx(ctx, tx, tasks)
Task Options
asynqpg.NewTask("type", payload,
    asynqpg.WithMaxRetry(5),                       // max retry attempts
    asynqpg.WithDelay(10*time.Second),              // delay before first processing
    asynqpg.WithIdempotencyToken("unique-token"),   // deduplicate enqueues
)

// Schedule a task for a specific time
task := asynqpg.NewTask("report:generate", payload)
task.ProcessAt = time.Date(2026, 1, 1, 9, 0, 0, 0, time.UTC)

Consumer

Create a consumer with consumer.New:

c, err := consumer.New(consumer.Config{
    Pool:              db,               // required
    ClientID:          "worker-1",       // optional, for leader election
    RetryPolicy:       retryPolicy,      // optional, default: exponential backoff
    FetchInterval:     100*time.Millisecond,
    ShutdownTimeout:   30*time.Second,
    // Retention for completed/failed/cancelled tasks
    CompletedRetention: 24*time.Hour,
    FailedRetention:    7*24*time.Hour,
    CancelledRetention: 24*time.Hour,

    // DisableMaintenance: true,    // set to disable rescuer + cleaner
    // DisableBatchCompleter: true, // set to disable batch completions
})
Handler Registration
err := c.RegisterTaskHandler("email:send", handler,
    consumer.WithWorkersCount(10),          // goroutines for this task type
    consumer.WithMaxAttempts(5),            // override default
    consumer.WithTimeout(30*time.Second),   // per-task execution timeout
)

Implement consumer.TaskHandler or use the TaskHandlerFunc adapter:

type TaskHandler interface {
    Handle(ctx context.Context, task *asynqpg.TaskInfo) error
}
Retry Policies

Exponential backoff (default) – attempt^4 seconds with 10% jitter, capped at 24h:

&asynqpg.DefaultRetryPolicy{MaxRetryDelay: 24 * time.Hour}

Constant delay:

&asynqpg.ConstantRetryPolicy{Delay: 5 * time.Second}
SkipRetry

If a handler encounters a permanent error that should not be retried (e.g., invalid payload, business logic rejection), it can return asynqpg.ErrSkipRetry to immediately fail the task, skipping all remaining retry attempts:

func (h *EmailHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    var payload EmailPayload
    if err := json.Unmarshal(task.Payload, &payload); err != nil {
        // Invalid payload – retrying won't help
        return fmt.Errorf("bad payload: %w", asynqpg.ErrSkipRetry)
    }
    // process task...
    return nil
}

SkipRetry works with errors.Is, so it can be wrapped with additional context via fmt.Errorf("...: %w", asynqpg.ErrSkipRetry).

Snooze

Sometimes a handler needs to defer processing without counting it as a failure. TaskSnooze reschedules the task after a given duration without counting it as an attempt – attempts_left and attempts_elapsed remain unchanged:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    if !isExternalServiceReady() {
        // Try again in 30 seconds, doesn't count as a failed attempt
        return asynqpg.TaskSnooze(30 * time.Second)
    }
    // process task...
    return nil
}

If you want to reschedule and count it as a failed attempt (with error message stored), use TaskSnoozeWithError:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    if err := callExternalAPI(); err != nil {
        // Retry in 1 minute, counts as an attempt, error message is stored
        return fmt.Errorf("api unavailable: %w", asynqpg.TaskSnoozeWithError(1 * time.Minute))
    }
    // process task...
    return nil
}

Key differences:

TaskSnooze TaskSnoozeWithError
Counts as attempt No Yes
Stores error message No Yes
Respects max retries No (unlimited snoozes) Yes (fails when exhausted)
Use case External dependency not ready Transient error with custom delay

Both work with errors.As and can be wrapped with fmt.Errorf. Panics on negative duration; zero duration makes the task immediately available.

Task vs TaskInfo

The library uses two distinct structs to separate concerns:

  • Task (root package) -- the input struct for enqueueing. Contains only fields you set when creating a task: Type, Payload, Delay, MaxRetry, IdempotencyToken.
  • TaskInfo (root package) -- the runtime struct passed to handlers. Contains all database-assigned fields needed during processing:
func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    task.ID               // database ID
    task.Type             // task type
    task.Payload          // task payload
    task.AttemptsLeft     // remaining retry attempts
    task.AttemptsElapsed  // number of attempts already made
    task.CreatedAt        // when the task was first enqueued
    task.Messages         // error messages from previous failed attempts
    task.AttemptedAt      // when the current processing attempt started
    // ...
}

The client package has its own client.TaskInfo -- a full database read model returned by client.GetTask / client.ListTasks for inspection and management. It includes additional fields like Status, BlockedTill, UpdatedAt, and FinalizedAt.

Context Utilities

Task metadata is also available via context helpers, useful in middleware and utilities:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    id, _        := asynqpg.GetTaskID(ctx)         // database ID
    retry, _     := asynqpg.GetRetryCount(ctx)     // attempts already elapsed
    max, _       := asynqpg.GetMaxRetry(ctx)       // total max retry count
    createdAt, _ := asynqpg.GetCreatedAt(ctx)       // task creation time

    // Or get all metadata at once:
    meta, ok := asynqpg.GetTaskMetadata(ctx)
    // meta.ID, meta.RetryCount, meta.MaxRetry, meta.CreatedAt
    // ...
}

For testing handlers, use asynqpg.WithTaskMetadata to create a context with metadata:

ctx := asynqpg.WithTaskMetadata(context.Background(), asynqpg.TaskMetadata{
    ID: 42, RetryCount: 0, MaxRetry: 3, CreatedAt: time.Now(),
})
err := handler.Handle(ctx, task)
Middleware

The consumer supports composable middleware for cross-cutting concerns. Middleware wraps task handlers using the func(TaskHandler) TaskHandler pattern, similar to net/http middleware.

Global middleware applies to all task types:

c, _ := consumer.New(config)

_ = c.Use(func(next consumer.TaskHandler) consumer.TaskHandler {
    return consumer.TaskHandlerFunc(func(ctx context.Context, task *asynqpg.TaskInfo) error {
        slog.Info("processing task", "type", task.Type, "id", task.ID)
        err := next.Handle(ctx, task)
        slog.Info("task done", "type", task.Type, "id", task.ID, "error", err)
        return err
    })
})

Per-task-type middleware applies only to a specific handler:

c.RegisterTaskHandler("email:send", emailHandler,
    consumer.WithMiddleware(rateLimitMiddleware),
    consumer.WithWorkersCount(5),
)

Execution order: global middleware (outermost, first registered runs first) wraps per-task middleware, which wraps the handler. A middleware can short-circuit by returning an error without calling next.Handle.

Lifecycle
c.Start()               // start processing
c.Stop()                 // graceful shutdown (uses configured ShutdownTimeout)
c.Shutdown(timeout)      // graceful shutdown with custom timeout

Client

Inspect and manage tasks:

cl, err := client.New(client.Config{Pool: db})

// Get a single task
info, err := cl.GetTask(ctx, taskID)

// List tasks with filtering
result, err := cl.ListTasks(ctx, client.NewListParams().
    States(asynqpg.TaskStatusFailed, asynqpg.TaskStatusPending).
    Types("email:send").
    Limit(50).
    OrderBy(client.OrderByCreatedAt, client.SortDesc),
)
// result.Tasks, result.Total

// Manage tasks
_, err = cl.CancelTask(ctx, id)   // pending/failed → cancelled
_, err = cl.RetryTask(ctx, id)    // failed/cancelled → pending
_, err = cl.DeleteTask(ctx, id)   // remove from database

All methods have *Tx variants for transactional use.

Web UI

Mount the dashboard as an HTTP handler:

handler, err := ui.NewHandler(ui.HandlerOpts{
    Pool:   db,
    Prefix: "/asynqpg",
})
http.Handle("/asynqpg/", handler)

Dashboard

Tasks list

Task detail

Dead Letter Queue

Authentication

Basic Auth:

ui.HandlerOpts{
    BasicAuth: &ui.BasicAuth{Username: "admin", Password: "secret"},
}

OAuth providers (e.g., GitHub):

ui.HandlerOpts{
    AuthProviders: []ui.AuthProvider{githubProvider},
    SecureCookies: true,    // for HTTPS
    SessionMaxAge: 24*time.Hour,
}

Implement the ui.AuthProvider interface to add any OAuth/SSO provider. See examples/demo/github_provider.go for a complete GitHub OAuth implementation.

GitHub OAuth login

User profile

Observability

All public components accept optional MeterProvider and TracerProvider. When nil, the global OpenTelemetry provider is used.

Metrics
Metric Type Description
asynqpg.tasks.enqueued Counter Tasks enqueued
asynqpg.tasks.processed Counter Tasks finished processing
asynqpg.tasks.errors Counter Processing or enqueue errors
asynqpg.task.duration Histogram Handler execution duration (seconds)
asynqpg.task.enqueue_duration Histogram Enqueue latency (seconds)
asynqpg.tasks.in_flight UpDownCounter Currently executing tasks

All metrics are tagged with task_type and status attributes.

Tracing

Spans are created for all enqueue and processing operations with proper SpanKind (Producer/Consumer/Client).

Grafana Dashboard

A pre-built Grafana dashboard is included at deploy/grafana/dashboards/asynqpg-overview.json. It is provisioned automatically when you run make demo-up.

The dashboard refreshes every 10 seconds and includes:

  • Stat panels – total tasks enqueued, processed, errors, and currently in-flight
  • Enqueue Rate – tasks enqueued per second over time
  • Processing Rate by Status – throughput broken down by completed / failed / cancelled
  • Task Duration Quantiles – p50/p95/p99 handler execution latency
  • Enqueue Duration Quantiles – p50/p95/p99 enqueue latency
  • Error Rate by Type – error count per task type over time
  • Error Ratio – fraction of processed tasks that errored
  • In-Flight Over Time – concurrently executing tasks over time

All panels support a Task Type variable filter to drill down into a specific task type.

Observability Stack

The demo includes a full observability stack (Jaeger, Prometheus, Grafana, OTel Collector) in deploy/:

make demo-up   # start PostgreSQL + observability stack
  • Jaegerhttp://localhost:16686
  • Prometheushttp://localhost:9090
  • Grafanahttp://localhost:3000

Grafana dashboard

Jaeger traces

Jaeger error trace

Demo

Run the full demo with producers, consumers, web UI, and observability:

make demo       # start everything
make demo-down  # stop all services

Performance

The library includes Go benchmarks covering SQL-level operations, batch completion, producer throughput, and consumer processing.

make bench   # Run benchmarks (uses testcontainers, requires Docker)

Testing

make test              # unit tests (go test -race -count=1 ./...)
make test-integration  # integration tests (uses testcontainers, no manual DB needed)
make test-all          # both
make bench             # benchmarks (integration, requires Docker)
make lint              # golangci-lint

Contributing

Contributions are welcome, including bug reports, feature requests, documentation improvements, and code changes. See CONTRIBUTING.md for local setup, contribution workflow, testing expectations, and pull request guidelines.

Support

TODO

  • Move testutils to a separate module to avoid pulling testcontainers-go into consumer projects

Project Status

Under active development. The API may change before v1.0. Bug reports and feature requests are welcome.

License

MIT

Documentation

Index

Examples

Constants

View Source
const (
	StatusCompleted = "completed"
	StatusFailed    = "failed"
	StatusRetried   = "retried"
	StatusSnoozed   = "snoozed"
)

Bounded attribute values for status.

View Source
const (
	ErrorTypeHandler = "handler_error"
	ErrorTypeDB      = "db_error"
)

Bounded attribute values for error_type.

Variables

View Source
var (
	AttrTaskType  = attribute.Key("task_type")
	AttrStatus    = attribute.Key("status")
	AttrErrorType = attribute.Key("error_type")
)

Attribute keys used across metrics and traces.

View Source
var ErrSkipRetry = errors.New("skip retry for the task")

ErrSkipRetry is a sentinel error that handlers can return to indicate the task should not be retried and should immediately be marked as failed. This is useful for non-retryable errors such as invalid payloads or business logic rejections.

Usage:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    if invalidPayload(task.Payload) {
        return fmt.Errorf("bad payload: %w", asynqpg.ErrSkipRetry)
    }
    // ...
}

Functions

func GetCreatedAt

func GetCreatedAt(ctx context.Context) (time.Time, bool)

GetCreatedAt extracts the task creation time from the context. Returns (zero time, false) if the context does not contain task metadata.

func GetMaxRetry

func GetMaxRetry(ctx context.Context) (int, bool)

GetMaxRetry extracts the total max retry count from the context. Returns (0, false) if the context does not contain task metadata.

func GetRetryCount

func GetRetryCount(ctx context.Context) (int, bool)

GetRetryCount extracts the number of attempts already elapsed from the context. Returns (0, false) if the context does not contain task metadata.

func GetTaskID

func GetTaskID(ctx context.Context) (int64, bool)

GetTaskID extracts the task's database ID from the context. Returns (0, false) if the context does not contain task metadata.

Example
package main

import (
	"context"
	"fmt"

	"github.com/yakser/asynqpg"
)

func main() {
	ctx := asynqpg.WithTaskMetadata(context.Background(), asynqpg.TaskMetadata{
		ID: 99,
	})

	id, ok := asynqpg.GetTaskID(ctx)
	fmt.Println(ok, id)
}
Output:

true 99

func NewTracer

func NewTracer(tp trace.TracerProvider) trace.Tracer

NewTracer creates a tracer from the given TracerProvider. If tp is nil, the global OTel TracerProvider is used.

func TaskSnooze

func TaskSnooze(d time.Duration) error

TaskSnooze returns an error that reschedules the task after the given duration without counting it as an attempt. The task's attempts_left and attempts_elapsed remain unchanged. Panics if duration < 0.

Usage:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    if !isReady() {
        return asynqpg.TaskSnooze(30 * time.Second)
    }
    // ...
}
Example
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	err := asynqpg.TaskSnooze(30 * time.Second)

	var snoozeErr *asynqpg.TaskSnoozeError
	fmt.Println(errors.As(err, &snoozeErr))
	fmt.Println(snoozeErr.Duration)
}
Output:

true
30s

func TaskSnoozeWithError

func TaskSnoozeWithError(d time.Duration) error

TaskSnoozeWithError returns an error that reschedules the task after the given duration, counting it as a failed attempt. The error message is stored and attempts_left is decremented. If no attempts are left, the task is failed instead of snoozed. Panics if duration < 0.

Usage:

func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
    if err := callExternalAPI(); err != nil {
        return fmt.Errorf("api unavailable: %w", asynqpg.TaskSnoozeWithError(1 * time.Minute))
    }
    // ...
}
Example
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	err := fmt.Errorf("api unavailable: %w", asynqpg.TaskSnoozeWithError(1*time.Minute))

	var snoozeErr *asynqpg.TaskSnoozeWithErrError
	fmt.Println(errors.As(err, &snoozeErr))
	fmt.Println(snoozeErr.Duration)
}
Output:

true
1m0s

func WithTaskMetadata

func WithTaskMetadata(ctx context.Context, meta TaskMetadata) context.Context

WithTaskMetadata returns a new context with task metadata injected. Used internally by the consumer when invoking handlers. Can also be used in tests to create contexts for handler testing.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	ctx := asynqpg.WithTaskMetadata(context.Background(), asynqpg.TaskMetadata{
		ID:         42,
		RetryCount: 0,
		MaxRetry:   3,
		CreatedAt:  time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC),
	})

	meta, ok := asynqpg.GetTaskMetadata(ctx)
	fmt.Println(ok, meta.ID)
}
Output:

true 42

Types

type ConstantRetryPolicy

type ConstantRetryPolicy struct {
	Delay time.Duration
}

ConstantRetryPolicy always returns the same delay. Useful for testing or specific use cases.

Example
package main

import (
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	policy := &asynqpg.ConstantRetryPolicy{Delay: 5 * time.Second}

	delay := policy.NextRetry(1)
	fmt.Println(delay)
}
Output:

5s

func (*ConstantRetryPolicy) NextRetry

func (p *ConstantRetryPolicy) NextRetry(_ int) time.Duration

NextRetry returns the constant delay.

type DefaultRetryPolicy

type DefaultRetryPolicy struct {
	// MaxRetryDelay caps the maximum delay between retries.
	// Default: 24 hours.
	MaxRetryDelay time.Duration
}

DefaultRetryPolicy implements exponential backoff with jitter. Formula: attempt^4 seconds with ±10% jitter. Examples: 1s, 16s, 81s, 256s, 625s, ...

Example
package main

import (
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	policy := &asynqpg.DefaultRetryPolicy{MaxRetryDelay: 24 * time.Hour}

	delay := policy.NextRetry(1)
	fmt.Println(delay > 0)
}
Output:

true

func (*DefaultRetryPolicy) NextRetry

func (p *DefaultRetryPolicy) NextRetry(attempt int) time.Duration

NextRetry calculates the next retry delay using exponential backoff.

type ErrorHandler

type ErrorHandler interface {
	HandleError(ctx context.Context, task *TaskInfo, err error)
}

ErrorHandler is called when a task fails permanently (exhausted all retries) or encounters an unrecoverable error (ErrSkipRetry, panic). Implementations can use this for alerting, dead letter queue routing, or external error tracking (e.g., Sentry, PagerDuty).

type ErrorHandlerFunc

type ErrorHandlerFunc func(ctx context.Context, task *TaskInfo, err error)

ErrorHandlerFunc is an adapter to allow ordinary functions to be used as ErrorHandler.

func (ErrorHandlerFunc) HandleError

func (f ErrorHandlerFunc) HandleError(ctx context.Context, task *TaskInfo, err error)

HandleError calls f(ctx, task, err).

type Metrics

type Metrics struct {
	TasksEnqueued   metric.Int64Counter
	TasksProcessed  metric.Int64Counter
	TasksErrors     metric.Int64Counter
	TaskDuration    metric.Float64Histogram
	EnqueueDuration metric.Float64Histogram
	TasksInFlight   metric.Int64UpDownCounter
}

Metrics holds all OpenTelemetry metric instruments for asynqpg. When no MeterProvider is configured, all instruments are noop (zero overhead).

func NewMetrics

func NewMetrics(mp metric.MeterProvider) (*Metrics, error)

NewMetrics creates metric instruments from the given MeterProvider. If mp is nil, the global OTel MeterProvider is used.

type Pool

type Pool interface {
	Querier
	PingContext(ctx context.Context) error
}

Pool represents a database connection pool. *sqlx.DB satisfies this interface natively.

func WrapStdDB

func WrapStdDB(db *sql.DB, driverName string) Pool

WrapStdDB wraps a standard *sql.DB into a Pool. It uses sqlx internally for struct scanning (SelectContext, GetContext). The driverName parameter should match the driver used to open the database (e.g., "postgres", "pgx").

type Querier

type Querier interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	SelectContext(ctx context.Context, dest any, query string, args ...any) error
	GetContext(ctx context.Context, dest any, query string, args ...any) error
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

Querier can execute queries and scan results into structs. Both database connection pools and transactions satisfy this interface. *sqlx.DB and *sqlx.Tx implement it natively.

type RetryPolicy

type RetryPolicy interface {
	// NextRetry returns the duration to wait before the next retry attempt.
	// attempt is the number of the upcoming attempt (1-indexed).
	NextRetry(attempt int) time.Duration
}

RetryPolicy determines when a failed task should be retried.

type Task

type Task struct {
	Type             string
	Payload          []byte
	IdempotencyToken *string
	Delay            time.Duration
	MaxRetry         *int
	ProcessAt        time.Time
}

Task represents a unit of work to be enqueued. Use NewTask to create a task for enqueueing via Producer. Handlers receive *TaskInfo which contains runtime fields like ID, attempt info, etc.

func NewTask

func NewTask(taskType string, payload []byte, opts ...TaskOption) *Task

NewTask creates a new task with the given type and payload.

Example
package main

import (
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	task := asynqpg.NewTask("email:send", []byte(`{"to":"user@example.com"}`),
		asynqpg.WithMaxRetry(5),
		asynqpg.WithDelay(10*time.Second),
		asynqpg.WithIdempotencyToken("unique-token"),
	)

	fmt.Println(task.Type)
}
Output:

email:send
Example (ProcessAt)
package main

import (
	"fmt"
	"time"

	"github.com/yakser/asynqpg"
)

func main() {
	task := asynqpg.NewTask("report:generate", []byte(`{"id":1}`))
	task.ProcessAt = time.Date(2026, 1, 1, 9, 0, 0, 0, time.UTC)

	fmt.Println(task.ProcessAt.Format(time.RFC3339))
}
Output:

2026-01-01T09:00:00Z

type TaskInfo

type TaskInfo struct {
	ID               int64
	Type             string
	Payload          []byte
	IdempotencyToken *string
	AttemptsLeft     int
	AttemptsElapsed  int
	CreatedAt        time.Time
	AttemptedAt      *time.Time
	Messages         []string
}

TaskInfo represents a task fetched from the database for processing. Handlers receive *TaskInfo with all runtime information. Unlike Task (used for enqueueing), TaskInfo includes database-assigned fields.

type TaskMetadata

type TaskMetadata struct {
	ID         int64
	RetryCount int
	MaxRetry   int
	CreatedAt  time.Time
}

TaskMetadata holds metadata about the task currently being processed. It is available inside handler contexts via GetTaskMetadata.

func GetTaskMetadata

func GetTaskMetadata(ctx context.Context) (TaskMetadata, bool)

GetTaskMetadata extracts the task metadata from the context. Returns the metadata and true if present, or zero value and false otherwise.

type TaskOption

type TaskOption func(*Task)

TaskOption configures a Task.

func WithDelay

func WithDelay(d time.Duration) TaskOption

WithDelay sets the delay before the task becomes available for processing.

func WithIdempotencyToken

func WithIdempotencyToken(token string) TaskOption

WithIdempotencyToken sets the idempotency token for the task.

func WithMaxRetry

func WithMaxRetry(n int) TaskOption

WithMaxRetry sets the maximum number of retries for the task.

type TaskSnoozeError

type TaskSnoozeError struct {
	Duration time.Duration
}

TaskSnoozeError is returned by TaskSnooze. Detected via errors.As. When a handler returns this error, the task is rescheduled after Duration without counting it as an attempt – attempts_left and attempts_elapsed remain unchanged.

func (*TaskSnoozeError) Error

func (e *TaskSnoozeError) Error() string

func (*TaskSnoozeError) Is

func (e *TaskSnoozeError) Is(target error) bool

type TaskSnoozeWithErrError

type TaskSnoozeWithErrError struct {
	Duration time.Duration
}

TaskSnoozeWithErrError is returned by TaskSnoozeWithError. Detected via errors.As. When a handler returns this error, the task is rescheduled after Duration, counting it as a failed attempt – attempts_left is decremented and the error message is stored.

func (*TaskSnoozeWithErrError) Error

func (e *TaskSnoozeWithErrError) Error() string

func (*TaskSnoozeWithErrError) Is

func (e *TaskSnoozeWithErrError) Is(target error) bool

type TaskStatus

type TaskStatus string

TaskStatus represents the current status of a task in the database.

const (
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusCancelled TaskStatus = "cancelled"
)

func (TaskStatus) IsFinalized

func (s TaskStatus) IsFinalized() bool

IsFinalized returns true if the task is in a terminal state.

Directories

Path Synopsis
internal
Package testutils provides test helpers for asynqpg integration tests.
Package testutils provides test helpers for asynqpg integration tests.
ui module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL