queue

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package queue provides a job queue manager using Asynq.

Index

Constants

View Source
const (
	QueueCritical = "critical"
	QueueDefault  = "default"
	QueueLow      = "low"
)

Queue priority constants.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Redis configuration
	RedisAddr     string
	RedisPassword string
	RedisDB       int

	// Server configuration
	Concurrency int
	Queues      map[string]int // queue name -> priority

	// Retry configuration
	MaxRetry       int
	RetryDelayFunc func(n int, err error, task Task) time.Duration

	// Shutdown configuration
	ShutdownTimeout time.Duration
}

Config holds queue configuration.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the Asynq client and server.

func NewManager

func NewManager(cfg Config) (*Manager, error)

NewManager creates a new queue manager.

func (*Manager) ArchiveTask

func (m *Manager) ArchiveTask(queue string, taskID string) error

ArchiveTask archives a task.

func (*Manager) CancelTask

func (m *Manager) CancelTask(taskID string) error

CancelTask cancels a pending task.

func (*Manager) Client

func (m *Manager) Client() *asynq.Client

Client returns the underlying Asynq client.

func (*Manager) DeleteTask

func (m *Manager) DeleteTask(queue string, taskID string) error

DeleteTask deletes a task from a queue.

func (*Manager) EnqueueIn

func (m *Manager) EnqueueIn(ctx context.Context, task *Task, delay time.Duration) (*asynq.TaskInfo, error)

EnqueueIn enqueues a task to be processed after a delay.

func (*Manager) EnqueueRecurringTask

func (m *Manager) EnqueueRecurringTask(task *Task, cronSpec string, entryID string) (string, error)

EnqueueRecurringTask registers a recurring task with a cron expression.

func (*Manager) EnqueueTask

func (m *Manager) EnqueueTask(ctx context.Context, task *Task) (*asynq.TaskInfo, error)

EnqueueTask enqueues a task for immediate processing.

func (*Manager) GetQueueInfo

func (m *Manager) GetQueueInfo(queue string) (*asynq.QueueInfo, error)

GetQueueInfo retrieves information about a queue.

func (*Manager) GetTaskInfo

func (m *Manager) GetTaskInfo(queue string, taskID string) (*asynq.TaskInfo, error)

GetTaskInfo retrieves information about a task.

func (*Manager) Inspector

func (m *Manager) Inspector() *asynq.Inspector

Inspector returns the Asynq inspector for queue introspection.

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning returns whether the manager is running.

func (*Manager) ListQueues

func (m *Manager) ListQueues() ([]string, error)

ListQueues returns all queue names.

func (*Manager) RegisterHandler

func (m *Manager) RegisterHandler(taskType string, handler asynq.HandlerFunc)

RegisterHandler registers a task handler for the given task type.

func (*Manager) RegisterHandlerFunc

func (m *Manager) RegisterHandlerFunc(taskType string, handler func(context.Context, *asynq.Task) error)

RegisterHandlerFunc registers a handler function for the given task type.

func (*Manager) ScheduleTask

func (m *Manager) ScheduleTask(ctx context.Context, task *Task, processAt time.Time) (*asynq.TaskInfo, error)

ScheduleTask schedules a task for future execution.

func (*Manager) Scheduler

func (m *Manager) Scheduler() *asynq.Scheduler

Scheduler returns the Asynq scheduler for cron jobs.

func (*Manager) SetMux

func (m *Manager) SetMux(mux *asynq.ServeMux)

SetMux sets the handler mux directly.

func (*Manager) Start

func (m *Manager) Start() error

Start starts the queue server and scheduler.

func (*Manager) Stop

func (m *Manager) Stop() error

Stop gracefully stops the queue server and scheduler.

func (*Manager) UnregisterRecurringTask

func (m *Manager) UnregisterRecurringTask(entryID string) error

UnregisterRecurringTask removes a recurring task.

type RetryPolicy

type RetryPolicy struct {
	MaxRetries   int
	InitialDelay time.Duration
	MaxDelay     time.Duration
	Multiplier   float64
	RetryOnError func(err error) bool
}

RetryPolicy defines retry behavior for tasks.

func DefaultRetryPolicy

func DefaultRetryPolicy() RetryPolicy

DefaultRetryPolicy returns a default retry policy with exponential backoff.

func (RetryPolicy) CalculateDelay

func (p RetryPolicy) CalculateDelay(attempt int) time.Duration

CalculateDelay calculates the delay for the nth retry attempt.

type Task

type Task struct {
	// Type is the task type identifier.
	Type string

	// Payload is the task payload data.
	Payload json.RawMessage

	// Queue is the queue name (defaults to "default").
	Queue string

	// MaxRetry is the maximum number of retries (defaults to 3).
	MaxRetry int

	// Timeout is the task execution timeout.
	Timeout time.Duration

	// Deadline is the absolute time by which the task must be processed.
	Deadline time.Time

	// Retention is how long to keep the completed task.
	Retention time.Duration

	// UniqueKey prevents duplicate tasks with the same key.
	UniqueKey string

	// UniqueTTL is how long to enforce uniqueness.
	UniqueTTL time.Duration

	// Group is used for task grouping.
	Group string
}

Task represents a task to be enqueued.

func NewTask

func NewTask(taskType string, payload any) (*Task, error)

NewTask creates a new task with the given type and payload.

func (*Task) WithDeadline

func (t *Task) WithDeadline(deadline time.Time) *Task

WithDeadline sets the deadline for the task.

func (*Task) WithGroup

func (t *Task) WithGroup(group string) *Task

WithGroup sets the group for the task.

func (*Task) WithMaxRetry

func (t *Task) WithMaxRetry(maxRetry int) *Task

WithMaxRetry sets the max retry count for the task.

func (*Task) WithQueue

func (t *Task) WithQueue(queue string) *Task

WithQueue sets the queue for the task.

func (*Task) WithRetention

func (t *Task) WithRetention(retention time.Duration) *Task

WithRetention sets the retention period for the task.

func (*Task) WithTimeout

func (t *Task) WithTimeout(timeout time.Duration) *Task

WithTimeout sets the timeout for the task.

func (*Task) WithUnique

func (t *Task) WithUnique(key string, ttl time.Duration) *Task

WithUnique sets uniqueness constraints for the task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL