Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrTaskHandlerAlreadyRegistered = errors.New("task handler already registered")
)
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
Pool asynqpg.Pool
Logger *slog.Logger
// RetryPolicy determines retry delays for failed tasks.
// If nil, DefaultRetryPolicy is used.
RetryPolicy asynqpg.RetryPolicy
ShutdownTimeout time.Duration
FetchInterval time.Duration
JanitorInterval time.Duration
// StuckThreshold is the duration after which a running task is considered stuck.
// Used by the rescuer to detect and recover stuck tasks.
StuckThreshold time.Duration
DefaultWorkersCount int
DefaultMaxAttempts int
DefaultTimeout time.Duration
// CancelCheckInterval is how often the consumer checks for tasks cancelled while running.
// Default: 1s.
CancelCheckInterval time.Duration
// DisableMaintenance disables maintenance services (rescuer, cleaner).
// By default, maintenance is enabled and the consumer participates in leader election.
// Only the leader runs maintenance services.
DisableMaintenance bool
// ClientID is a unique identifier for this consumer instance.
// Used for leader election. If empty, a UUID will be generated.
ClientID string
// Retention periods for task cleanup (used by cleaner)
CompletedRetention time.Duration // Default: 24h
FailedRetention time.Duration // Default: 7 days
CancelledRetention time.Duration // Default: 24h
// DisableBatchCompleter disables batching of task completions.
// By default, completions are accumulated and flushed in batches for better throughput.
DisableBatchCompleter bool
// BatchCompleterConfig configures the batch completer.
// Only used when batch completer is enabled (not disabled).
BatchCompleterConfig *completer.Config
// ErrorHandler is called when a task fails permanently (all retries exhausted,
// ErrSkipRetry, or panic). If nil, permanent failures are only logged.
ErrorHandler asynqpg.ErrorHandler
// MeterProvider for metrics. If nil, global OTel MeterProvider is used.
MeterProvider metric.MeterProvider
// TracerProvider for tracing. If nil, global OTel TracerProvider is used.
TracerProvider trace.TracerProvider
}
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func New ¶
Example ¶
package main
import (
"log"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/yakser/asynqpg/consumer"
)
func main() {
db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
if err != nil {
log.Fatal(err)
}
c, err := consumer.New(consumer.Config{
Pool: db,
FetchInterval: 100 * time.Millisecond,
ShutdownTimeout: 30 * time.Second,
})
if err != nil {
log.Fatal(err)
}
_ = c
}
func (*Consumer) RegisterTaskHandler ¶
func (c *Consumer) RegisterTaskHandler(taskType string, handler TaskHandler, opts ...TaskTypeOption) error
Example ¶
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/yakser/asynqpg"
"github.com/yakser/asynqpg/consumer"
)
func main() {
db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
if err != nil {
log.Fatal(err)
}
c, err := consumer.New(consumer.Config{Pool: db})
if err != nil {
log.Fatal(err)
}
if err := c.RegisterTaskHandler("email:send",
consumer.TaskHandlerFunc(func(ctx context.Context, task *asynqpg.TaskInfo) error {
fmt.Printf("Processing task %d: %s\n", task.ID, task.Type)
return nil
}),
consumer.WithWorkersCount(5),
consumer.WithTimeout(30*time.Second),
); err != nil {
log.Fatal(err)
}
}
func (*Consumer) Start ¶
Start starts the consumer and begins processing tasks. If maintenance is enabled, it also starts leader election.
func (*Consumer) Use ¶
func (c *Consumer) Use(mws ...MiddlewareFunc) error
Use registers global middleware that wraps every task handler. Middleware is applied in registration order: first registered = outermost. Must be called before Start().
Example ¶
package main
import (
"context"
"log"
"log/slog"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/yakser/asynqpg"
"github.com/yakser/asynqpg/consumer"
)
func main() {
db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
if err != nil {
log.Fatal(err)
}
c, err := consumer.New(consumer.Config{Pool: db})
if err != nil {
log.Fatal(err)
}
_ = c.Use(func(next consumer.TaskHandler) consumer.TaskHandler {
return consumer.TaskHandlerFunc(func(ctx context.Context, task *asynqpg.TaskInfo) error {
slog.Info("processing task", "type", task.Type, "id", task.ID)
err := next.Handle(ctx, task)
slog.Info("task done", "type", task.Type, "id", task.ID, "error", err)
return err
})
})
}
type MiddlewareFunc ¶
type MiddlewareFunc func(TaskHandler) TaskHandler
MiddlewareFunc wraps a TaskHandler with additional behavior. First registered middleware is the outermost (runs first on the way in, last on the way out).
type TaskHandler ¶
type TaskHandlerFunc ¶
TaskHandlerFunc is an adapter to allow the use of ordinary functions as TaskHandler. Similar to http.HandlerFunc.
type TaskTypeOption ¶
type TaskTypeOption func(*TaskTypeOptions)
func WithMaxAttempts ¶
func WithMaxAttempts(attempts int) TaskTypeOption
func WithMiddleware ¶
func WithMiddleware(mws ...MiddlewareFunc) TaskTypeOption
WithMiddleware sets per-task-type middleware applied after global middleware.
func WithTimeout ¶
func WithTimeout(timeout time.Duration) TaskTypeOption
func WithWorkersCount ¶
func WithWorkersCount(count int) TaskTypeOption
type TaskTypeOptions ¶
type TaskTypeOptions struct {
WorkersCount int
MaxAttempts int
Timeout time.Duration
Middleware []MiddlewareFunc
}
Click to show internal directories.
Click to hide internal directories.