Documentation
¶
Index ¶
- Constants
- Variables
- func GetCreatedAt(ctx context.Context) (time.Time, bool)
- func GetMaxRetry(ctx context.Context) (int, bool)
- func GetRetryCount(ctx context.Context) (int, bool)
- func GetTaskID(ctx context.Context) (int64, bool)
- func NewTracer(tp trace.TracerProvider) trace.Tracer
- func TaskSnooze(d time.Duration) error
- func TaskSnoozeWithError(d time.Duration) error
- func WithTaskMetadata(ctx context.Context, meta TaskMetadata) context.Context
- type ConstantRetryPolicy
- type DefaultRetryPolicy
- type ErrorHandler
- type ErrorHandlerFunc
- type Metrics
- type Pool
- type Querier
- type RetryPolicy
- type Task
- type TaskInfo
- type TaskMetadata
- type TaskOption
- type TaskSnoozeError
- type TaskSnoozeWithErrError
- type TaskStatus
Examples ¶
Constants ¶
const ( StatusCompleted = "completed" StatusFailed = "failed" StatusRetried = "retried" StatusSnoozed = "snoozed" )
Bounded attribute values for status.
const ( ErrorTypeHandler = "handler_error" ErrorTypeDB = "db_error" )
Bounded attribute values for error_type.
Variables ¶
var ( AttrTaskType = attribute.Key("task_type") AttrStatus = attribute.Key("status") AttrErrorType = attribute.Key("error_type") )
Attribute keys used across metrics and traces.
var ErrSkipRetry = errors.New("skip retry for the task")
ErrSkipRetry is a sentinel error that handlers can return to indicate the task should not be retried and should immediately be marked as failed. This is useful for non-retryable errors such as invalid payloads or business logic rejections.
Usage:
func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
if invalidPayload(task.Payload) {
return fmt.Errorf("bad payload: %w", asynqpg.ErrSkipRetry)
}
// ...
}
Functions ¶
func GetCreatedAt ¶
GetCreatedAt extracts the task creation time from the context. Returns (zero time, false) if the context does not contain task metadata.
func GetMaxRetry ¶
GetMaxRetry extracts the total max retry count from the context. Returns (0, false) if the context does not contain task metadata.
func GetRetryCount ¶
GetRetryCount extracts the number of attempts already elapsed from the context. Returns (0, false) if the context does not contain task metadata.
func GetTaskID ¶
GetTaskID extracts the task's database ID from the context. Returns (0, false) if the context does not contain task metadata.
Example ¶
package main
import (
"context"
"fmt"
"github.com/yakser/asynqpg"
)
func main() {
ctx := asynqpg.WithTaskMetadata(context.Background(), asynqpg.TaskMetadata{
ID: 99,
})
id, ok := asynqpg.GetTaskID(ctx)
fmt.Println(ok, id)
}
Output: true 99
func NewTracer ¶
func NewTracer(tp trace.TracerProvider) trace.Tracer
NewTracer creates a tracer from the given TracerProvider. If tp is nil, the global OTel TracerProvider is used.
func TaskSnooze ¶
TaskSnooze returns an error that reschedules the task after the given duration without counting it as an attempt. The task's attempts_left and attempts_elapsed remain unchanged. Panics if duration < 0.
Usage:
func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
if !isReady() {
return asynqpg.TaskSnooze(30 * time.Second)
}
// ...
}
Example ¶
package main
import (
"errors"
"fmt"
"time"
"github.com/yakser/asynqpg"
)
func main() {
err := asynqpg.TaskSnooze(30 * time.Second)
var snoozeErr *asynqpg.TaskSnoozeError
fmt.Println(errors.As(err, &snoozeErr))
fmt.Println(snoozeErr.Duration)
}
Output: true 30s
func TaskSnoozeWithError ¶
TaskSnoozeWithError returns an error that reschedules the task after the given duration, counting it as a failed attempt. The error message is stored and attempts_left is decremented. If no attempts are left, the task is failed instead of snoozed. Panics if duration < 0.
Usage:
func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
if err := callExternalAPI(); err != nil {
return fmt.Errorf("api unavailable: %w", asynqpg.TaskSnoozeWithError(1 * time.Minute))
}
// ...
}
Example ¶
package main
import (
"errors"
"fmt"
"time"
"github.com/yakser/asynqpg"
)
func main() {
err := fmt.Errorf("api unavailable: %w", asynqpg.TaskSnoozeWithError(1*time.Minute))
var snoozeErr *asynqpg.TaskSnoozeWithErrError
fmt.Println(errors.As(err, &snoozeErr))
fmt.Println(snoozeErr.Duration)
}
Output: true 1m0s
func WithTaskMetadata ¶
func WithTaskMetadata(ctx context.Context, meta TaskMetadata) context.Context
WithTaskMetadata returns a new context with task metadata injected. Used internally by the consumer when invoking handlers. Can also be used in tests to create contexts for handler testing.
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/yakser/asynqpg"
)
func main() {
ctx := asynqpg.WithTaskMetadata(context.Background(), asynqpg.TaskMetadata{
ID: 42,
RetryCount: 0,
MaxRetry: 3,
CreatedAt: time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC),
})
meta, ok := asynqpg.GetTaskMetadata(ctx)
fmt.Println(ok, meta.ID)
}
Output: true 42
Types ¶
type ConstantRetryPolicy ¶
ConstantRetryPolicy always returns the same delay. Useful for testing or specific use cases.
Example ¶
package main
import (
"fmt"
"time"
"github.com/yakser/asynqpg"
)
func main() {
policy := &asynqpg.ConstantRetryPolicy{Delay: 5 * time.Second}
delay := policy.NextRetry(1)
fmt.Println(delay)
}
Output: 5s
type DefaultRetryPolicy ¶
type DefaultRetryPolicy struct {
// MaxRetryDelay caps the maximum delay between retries.
// Default: 24 hours.
MaxRetryDelay time.Duration
}
DefaultRetryPolicy implements exponential backoff with jitter. Formula: attempt^4 seconds with ±10% jitter. Examples: 1s, 16s, 81s, 256s, 625s, ...
Example ¶
package main
import (
"fmt"
"time"
"github.com/yakser/asynqpg"
)
func main() {
policy := &asynqpg.DefaultRetryPolicy{MaxRetryDelay: 24 * time.Hour}
delay := policy.NextRetry(1)
fmt.Println(delay > 0)
}
Output: true
type ErrorHandler ¶
ErrorHandler is called when a task fails permanently (exhausted all retries) or encounters an unrecoverable error (ErrSkipRetry, panic). Implementations can use this for alerting, dead letter queue routing, or external error tracking (e.g., Sentry, PagerDuty).
type ErrorHandlerFunc ¶
ErrorHandlerFunc is an adapter to allow ordinary functions to be used as ErrorHandler.
func (ErrorHandlerFunc) HandleError ¶
func (f ErrorHandlerFunc) HandleError(ctx context.Context, task *TaskInfo, err error)
HandleError calls f(ctx, task, err).
type Metrics ¶
type Metrics struct {
TasksEnqueued metric.Int64Counter
TasksProcessed metric.Int64Counter
TasksErrors metric.Int64Counter
TaskDuration metric.Float64Histogram
EnqueueDuration metric.Float64Histogram
TasksInFlight metric.Int64UpDownCounter
}
Metrics holds all OpenTelemetry metric instruments for asynqpg. When no MeterProvider is configured, all instruments are noop (zero overhead).
func NewMetrics ¶
func NewMetrics(mp metric.MeterProvider) (*Metrics, error)
NewMetrics creates metric instruments from the given MeterProvider. If mp is nil, the global OTel MeterProvider is used.
type Querier ¶
type Querier interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
SelectContext(ctx context.Context, dest any, query string, args ...any) error
GetContext(ctx context.Context, dest any, query string, args ...any) error
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}
Querier can execute queries and scan results into structs. Both database connection pools and transactions satisfy this interface. *sqlx.DB and *sqlx.Tx implement it natively.
type RetryPolicy ¶
type RetryPolicy interface {
// NextRetry returns the duration to wait before the next retry attempt.
// attempt is the number of the upcoming attempt (1-indexed).
NextRetry(attempt int) time.Duration
}
RetryPolicy determines when a failed task should be retried.
type Task ¶
type Task struct {
Type string
Payload []byte
IdempotencyToken *string
Delay time.Duration
MaxRetry *int
ProcessAt time.Time
}
Task represents a unit of work to be enqueued. Use NewTask to create a task for enqueueing via Producer. Handlers receive *TaskInfo which contains runtime fields like ID, attempt info, etc.
func NewTask ¶
func NewTask(taskType string, payload []byte, opts ...TaskOption) *Task
NewTask creates a new task with the given type and payload.
Example ¶
package main
import (
"fmt"
"time"
"github.com/yakser/asynqpg"
)
func main() {
task := asynqpg.NewTask("email:send", []byte(`{"to":"user@example.com"}`),
asynqpg.WithMaxRetry(5),
asynqpg.WithDelay(10*time.Second),
asynqpg.WithIdempotencyToken("unique-token"),
)
fmt.Println(task.Type)
}
Output: email:send
Example (ProcessAt) ¶
package main
import (
"fmt"
"time"
"github.com/yakser/asynqpg"
)
func main() {
task := asynqpg.NewTask("report:generate", []byte(`{"id":1}`))
task.ProcessAt = time.Date(2026, 1, 1, 9, 0, 0, 0, time.UTC)
fmt.Println(task.ProcessAt.Format(time.RFC3339))
}
Output: 2026-01-01T09:00:00Z
type TaskInfo ¶
type TaskInfo struct {
ID int64
Type string
Payload []byte
IdempotencyToken *string
AttemptsLeft int
AttemptsElapsed int
CreatedAt time.Time
AttemptedAt *time.Time
Messages []string
}
TaskInfo represents a task fetched from the database for processing. Handlers receive *TaskInfo with all runtime information. Unlike Task (used for enqueueing), TaskInfo includes database-assigned fields.
type TaskMetadata ¶
TaskMetadata holds metadata about the task currently being processed. It is available inside handler contexts via GetTaskMetadata.
func GetTaskMetadata ¶
func GetTaskMetadata(ctx context.Context) (TaskMetadata, bool)
GetTaskMetadata extracts the task metadata from the context. Returns the metadata and true if present, or zero value and false otherwise.
type TaskOption ¶
type TaskOption func(*Task)
TaskOption configures a Task.
func WithDelay ¶
func WithDelay(d time.Duration) TaskOption
WithDelay sets the delay before the task becomes available for processing.
func WithIdempotencyToken ¶
func WithIdempotencyToken(token string) TaskOption
WithIdempotencyToken sets the idempotency token for the task.
func WithMaxRetry ¶
func WithMaxRetry(n int) TaskOption
WithMaxRetry sets the maximum number of retries for the task.
type TaskSnoozeError ¶
TaskSnoozeError is returned by TaskSnooze. Detected via errors.As. When a handler returns this error, the task is rescheduled after Duration without counting it as an attempt – attempts_left and attempts_elapsed remain unchanged.
func (*TaskSnoozeError) Error ¶
func (e *TaskSnoozeError) Error() string
func (*TaskSnoozeError) Is ¶
func (e *TaskSnoozeError) Is(target error) bool
type TaskSnoozeWithErrError ¶
TaskSnoozeWithErrError is returned by TaskSnoozeWithError. Detected via errors.As. When a handler returns this error, the task is rescheduled after Duration, counting it as a failed attempt – attempts_left is decremented and the error message is stored.
func (*TaskSnoozeWithErrError) Error ¶
func (e *TaskSnoozeWithErrError) Error() string
func (*TaskSnoozeWithErrError) Is ¶
func (e *TaskSnoozeWithErrError) Is(target error) bool
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the current status of a task in the database.
const ( TaskStatusPending TaskStatus = "pending" TaskStatusRunning TaskStatus = "running" TaskStatusCompleted TaskStatus = "completed" TaskStatusFailed TaskStatus = "failed" TaskStatusCancelled TaskStatus = "cancelled" )
func (TaskStatus) IsFinalized ¶
func (s TaskStatus) IsFinalized() bool
IsFinalized returns true if the task is in a terminal state.








