Documentation
¶
Overview ¶
Package queue provides a job queue manager using Asynq.
Index ¶
- Constants
- type Config
- type Manager
- func (m *Manager) ArchiveTask(queue string, taskID string) error
- func (m *Manager) CancelTask(taskID string) error
- func (m *Manager) Client() *asynq.Client
- func (m *Manager) DeleteTask(queue string, taskID string) error
- func (m *Manager) EnqueueIn(ctx context.Context, task *Task, delay time.Duration) (*asynq.TaskInfo, error)
- func (m *Manager) EnqueueRecurringTask(task *Task, cronSpec string, entryID string) (string, error)
- func (m *Manager) EnqueueTask(ctx context.Context, task *Task) (*asynq.TaskInfo, error)
- func (m *Manager) GetQueueInfo(queue string) (*asynq.QueueInfo, error)
- func (m *Manager) GetTaskInfo(queue string, taskID string) (*asynq.TaskInfo, error)
- func (m *Manager) Inspector() *asynq.Inspector
- func (m *Manager) IsRunning() bool
- func (m *Manager) ListQueues() ([]string, error)
- func (m *Manager) RegisterHandler(taskType string, handler asynq.HandlerFunc)
- func (m *Manager) RegisterHandlerFunc(taskType string, handler func(context.Context, *asynq.Task) error)
- func (m *Manager) ScheduleTask(ctx context.Context, task *Task, processAt time.Time) (*asynq.TaskInfo, error)
- func (m *Manager) Scheduler() *asynq.Scheduler
- func (m *Manager) SetMux(mux *asynq.ServeMux)
- func (m *Manager) Start() error
- func (m *Manager) Stop() error
- func (m *Manager) UnregisterRecurringTask(entryID string) error
- type RetryPolicy
- type Task
- func (t *Task) WithDeadline(deadline time.Time) *Task
- func (t *Task) WithGroup(group string) *Task
- func (t *Task) WithMaxRetry(maxRetry int) *Task
- func (t *Task) WithQueue(queue string) *Task
- func (t *Task) WithRetention(retention time.Duration) *Task
- func (t *Task) WithTimeout(timeout time.Duration) *Task
- func (t *Task) WithUnique(key string, ttl time.Duration) *Task
Constants ¶
const ( QueueCritical = "critical" QueueDefault = "default" QueueLow = "low" )
Queue priority constants.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Redis configuration
RedisAddr string
RedisPassword string
RedisDB int
// Server configuration
Concurrency int
Queues map[string]int // queue name -> priority
// Retry configuration
MaxRetry int
RetryDelayFunc func(n int, err error, task Task) time.Duration
// Shutdown configuration
ShutdownTimeout time.Duration
}
Config holds queue configuration.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages the Asynq client and server.
func NewManager ¶
NewManager creates a new queue manager.
func (*Manager) ArchiveTask ¶
ArchiveTask archives a task.
func (*Manager) CancelTask ¶
CancelTask cancels a pending task.
func (*Manager) DeleteTask ¶
DeleteTask deletes a task from a queue.
func (*Manager) EnqueueIn ¶
func (m *Manager) EnqueueIn(ctx context.Context, task *Task, delay time.Duration) (*asynq.TaskInfo, error)
EnqueueIn enqueues a task to be processed after a delay.
func (*Manager) EnqueueRecurringTask ¶
EnqueueRecurringTask registers a recurring task with a cron expression.
func (*Manager) EnqueueTask ¶
EnqueueTask enqueues a task for immediate processing.
func (*Manager) GetQueueInfo ¶
GetQueueInfo retrieves information about a queue.
func (*Manager) GetTaskInfo ¶
GetTaskInfo retrieves information about a task.
func (*Manager) ListQueues ¶
ListQueues returns all queue names.
func (*Manager) RegisterHandler ¶
func (m *Manager) RegisterHandler(taskType string, handler asynq.HandlerFunc)
RegisterHandler registers a task handler for the given task type.
func (*Manager) RegisterHandlerFunc ¶
func (m *Manager) RegisterHandlerFunc(taskType string, handler func(context.Context, *asynq.Task) error)
RegisterHandlerFunc registers a handler function for the given task type.
func (*Manager) ScheduleTask ¶
func (m *Manager) ScheduleTask(ctx context.Context, task *Task, processAt time.Time) (*asynq.TaskInfo, error)
ScheduleTask schedules a task for future execution.
func (*Manager) UnregisterRecurringTask ¶
UnregisterRecurringTask removes a recurring task.
type RetryPolicy ¶
type RetryPolicy struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
RetryOnError func(err error) bool
}
RetryPolicy defines retry behavior for tasks.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() RetryPolicy
DefaultRetryPolicy returns a default retry policy with exponential backoff.
func (RetryPolicy) CalculateDelay ¶
func (p RetryPolicy) CalculateDelay(attempt int) time.Duration
CalculateDelay calculates the delay for the nth retry attempt.
type Task ¶
type Task struct {
// Type is the task type identifier.
Type string
// Payload is the task payload data.
Payload json.RawMessage
// Queue is the queue name (defaults to "default").
Queue string
// MaxRetry is the maximum number of retries (defaults to 3).
MaxRetry int
// Timeout is the task execution timeout.
Timeout time.Duration
// Deadline is the absolute time by which the task must be processed.
Deadline time.Time
// Retention is how long to keep the completed task.
Retention time.Duration
// UniqueKey prevents duplicate tasks with the same key.
UniqueKey string
// UniqueTTL is how long to enforce uniqueness.
UniqueTTL time.Duration
// Group is used for task grouping.
Group string
}
Task represents a task to be enqueued.
func (*Task) WithDeadline ¶
WithDeadline sets the deadline for the task.
func (*Task) WithMaxRetry ¶
WithMaxRetry sets the max retry count for the task.
func (*Task) WithRetention ¶
WithRetention sets the retention period for the task.
func (*Task) WithTimeout ¶
WithTimeout sets the timeout for the task.