consumer

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrTaskHandlerAlreadyRegistered = errors.New("task handler already registered")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	Pool   asynqpg.Pool
	Logger *slog.Logger

	// RetryPolicy determines retry delays for failed tasks.
	// If nil, DefaultRetryPolicy is used.
	RetryPolicy asynqpg.RetryPolicy

	ShutdownTimeout time.Duration
	FetchInterval   time.Duration
	JanitorInterval time.Duration
	// StuckThreshold is the duration after which a running task is considered stuck.
	// Used by the rescuer to detect and recover stuck tasks.
	StuckThreshold time.Duration

	DefaultWorkersCount int
	DefaultMaxAttempts  int
	DefaultTimeout      time.Duration

	// CancelCheckInterval is how often the consumer checks for tasks cancelled while running.
	// Default: 1s.
	CancelCheckInterval time.Duration

	// DisableMaintenance disables maintenance services (rescuer, cleaner).
	// By default, maintenance is enabled and the consumer participates in leader election.
	// Only the leader runs maintenance services.
	DisableMaintenance bool

	// ClientID is a unique identifier for this consumer instance.
	// Used for leader election. If empty, a UUID will be generated.
	ClientID string

	// Retention periods for task cleanup (used by cleaner)
	CompletedRetention time.Duration // Default: 24h
	FailedRetention    time.Duration // Default: 7 days
	CancelledRetention time.Duration // Default: 24h

	// DisableBatchCompleter disables batching of task completions.
	// By default, completions are accumulated and flushed in batches for better throughput.
	DisableBatchCompleter bool

	// BatchCompleterConfig configures the batch completer.
	// Only used when batch completer is enabled (not disabled).
	BatchCompleterConfig *completer.Config

	// ErrorHandler is called when a task fails permanently (all retries exhausted,
	// ErrSkipRetry, or panic). If nil, permanent failures are only logged.
	ErrorHandler asynqpg.ErrorHandler

	// MeterProvider for metrics. If nil, global OTel MeterProvider is used.
	MeterProvider metric.MeterProvider
	// TracerProvider for tracing. If nil, global OTel TracerProvider is used.
	TracerProvider trace.TracerProvider
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func New

func New(config Config) (*Consumer, error)
Example
package main

import (
	"log"
	"time"

	"github.com/jmoiron/sqlx"

	_ "github.com/lib/pq"
	"github.com/yakser/asynqpg/consumer"
)

func main() {
	db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}

	c, err := consumer.New(consumer.Config{
		Pool:            db,
		FetchInterval:   100 * time.Millisecond,
		ShutdownTimeout: 30 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	_ = c
}

func (*Consumer) RegisterTaskHandler

func (c *Consumer) RegisterTaskHandler(taskType string, handler TaskHandler, opts ...TaskTypeOption) error
Example
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/jmoiron/sqlx"

	_ "github.com/lib/pq"
	"github.com/yakser/asynqpg"
	"github.com/yakser/asynqpg/consumer"
)

func main() {
	db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}

	c, err := consumer.New(consumer.Config{Pool: db})
	if err != nil {
		log.Fatal(err)
	}

	if err := c.RegisterTaskHandler("email:send",
		consumer.TaskHandlerFunc(func(ctx context.Context, task *asynqpg.TaskInfo) error {
			fmt.Printf("Processing task %d: %s\n", task.ID, task.Type)
			return nil
		}),
		consumer.WithWorkersCount(5),
		consumer.WithTimeout(30*time.Second),
	); err != nil {
		log.Fatal(err)
	}
}

func (*Consumer) Shutdown

func (c *Consumer) Shutdown(timeout time.Duration) error

func (*Consumer) Start

func (c *Consumer) Start() error

Start starts the consumer and begins processing tasks. If maintenance is enabled, it also starts leader election.

func (*Consumer) Stop

func (c *Consumer) Stop() error

func (*Consumer) Use

func (c *Consumer) Use(mws ...MiddlewareFunc) error

Use registers global middleware that wraps every task handler. Middleware is applied in registration order: first registered = outermost. Must be called before Start().

Example
package main

import (
	"context"
	"log"
	"log/slog"

	"github.com/jmoiron/sqlx"

	_ "github.com/lib/pq"
	"github.com/yakser/asynqpg"
	"github.com/yakser/asynqpg/consumer"
)

func main() {
	db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}

	c, err := consumer.New(consumer.Config{Pool: db})
	if err != nil {
		log.Fatal(err)
	}

	_ = c.Use(func(next consumer.TaskHandler) consumer.TaskHandler {
		return consumer.TaskHandlerFunc(func(ctx context.Context, task *asynqpg.TaskInfo) error {
			slog.Info("processing task", "type", task.Type, "id", task.ID)
			err := next.Handle(ctx, task)
			slog.Info("task done", "type", task.Type, "id", task.ID, "error", err)
			return err
		})
	})
}

type MiddlewareFunc

type MiddlewareFunc func(TaskHandler) TaskHandler

MiddlewareFunc wraps a TaskHandler with additional behavior. First registered middleware is the outermost (runs first on the way in, last on the way out).

type TaskHandler

type TaskHandler interface {
	Handle(ctx context.Context, task *asynqpg.TaskInfo) error
}

type TaskHandlerFunc

type TaskHandlerFunc func(ctx context.Context, task *asynqpg.TaskInfo) error

TaskHandlerFunc is an adapter to allow the use of ordinary functions as TaskHandler. Similar to http.HandlerFunc.

func (TaskHandlerFunc) Handle

func (f TaskHandlerFunc) Handle(ctx context.Context, task *asynqpg.TaskInfo) error

type TaskTypeOption

type TaskTypeOption func(*TaskTypeOptions)

func WithMaxAttempts

func WithMaxAttempts(attempts int) TaskTypeOption

func WithMiddleware

func WithMiddleware(mws ...MiddlewareFunc) TaskTypeOption

WithMiddleware sets per-task-type middleware applied after global middleware.

func WithTimeout

func WithTimeout(timeout time.Duration) TaskTypeOption

func WithWorkersCount

func WithWorkersCount(count int) TaskTypeOption

type TaskTypeOptions

type TaskTypeOptions struct {
	WorkersCount int
	MaxAttempts  int
	Timeout      time.Duration
	Middleware   []MiddlewareFunc
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL