Versions in this module Expand all Collapse all v0 v0.1.0 Apr 10, 2025 Changes in this version + var ErrDuplicateTask = errors.New("duplicate task ID") + var ErrInvalidTask = errors.New("invalid task") + var ErrQueueFull = errors.New("task queue is full") + var ErrTaskExists = errors.New("task already exists") + var ErrTaskTimeout = errors.New("task execution timeout") + var ErrTemporaryFailure = errors.New("temporary failure") + var RetryableErrors = []error + type Config struct + MaxPriority int + MaxRetries int + MetricsWindow time.Duration + QueueSize int + RetryDelay time.Duration + TaskTimeout time.Duration + Workers int + func DefaultConfig() *Config + func (c *Config) Validate() error + type Metrics struct + ActiveTasks atomic.Int64 + CancelCount atomic.Int64 + DequeueCount atomic.Int64 + EnqueueCount atomic.Int64 + FailureCount atomic.Int64 + ProcessCount atomic.Int64 + ProcessingTime atomic.Int64 + QueueLength atomic.Int64 + RetryCount atomic.Int64 + SuccessCount atomic.Int64 + WaitTime atomic.Int64 + type Option func(*TaskQueue) + func WithConfig(cfg *Config) Option + func WithProcessor(processor TaskProcessor) Option + type PriorityLevel struct + type PriorityQueue struct + func NewPriorityQueue(capacity int, maxPriority int) *PriorityQueue + func (pq *PriorityQueue) Cancel(taskID string) bool + func (pq *PriorityQueue) Clear() + func (pq *PriorityQueue) GetMetrics() map[string]any + func (pq *PriorityQueue) GetTask(taskID string) *QueuedTask + func (pq *PriorityQueue) Len() int + func (pq *PriorityQueue) LenAtPriority(priority int) int + func (pq *PriorityQueue) Peek() *QueuedTask + func (pq *PriorityQueue) PeekBatch(n int) []*QueuedTask + func (pq *PriorityQueue) Pop() *QueuedTask + func (pq *PriorityQueue) PopBatch(n int) []*QueuedTask + func (pq *PriorityQueue) Push(task *QueuedTask) error + func (pq *PriorityQueue) UpdatePriority(taskID string, newPriority int) bool + type QueuedTask struct + Cancel context.CancelFunc + Context context.Context + Data any + ID string + LastError error + LastRetryTime time.Time + MaxRetries int + Priority int + RetryCount int + RetryDelay time.Duration + Status TaskStatus + TriggerAt time.Time + Type string + type RetryableError interface + Temporary func() bool + type TaskProcessor interface + Process func(task *QueuedTask) error + type TaskQueue struct + func NewTaskQueue(ctx context.Context, opts ...Option) *TaskQueue + func (q *TaskQueue) Cancel(taskID string) bool + func (q *TaskQueue) GetMetrics() map[string]int64 + func (q *TaskQueue) IsBusy() bool + func (q *TaskQueue) IsEmpty() bool + func (q *TaskQueue) Process(task any) error + func (q *TaskQueue) Push(task *QueuedTask) error + func (q *TaskQueue) PushBatch(tasks []*QueuedTask) []error + func (q *TaskQueue) Start() + func (q *TaskQueue) Stop(ctx context.Context) error + type TaskStatus string + const TaskStatusCanceled + const TaskStatusCompleted + const TaskStatusFailed + const TaskStatusPending + const TaskStatusRetrying + const TaskStatusRunning + type TimerMetrics struct + CancelCount atomic.Int64 + DequeueCount atomic.Int64 + EnqueueCount atomic.Int64 + OverdueCount atomic.Int64 + TimeoutCount atomic.Int64 + type TimerQueue struct + func NewTimerQueue(capacity int) *TimerQueue + func (tq *TimerQueue) Cancel(taskID string) bool + func (tq *TimerQueue) Clear() + func (tq *TimerQueue) DueTasks() []*QueuedTask + func (tq *TimerQueue) GetMetrics() map[string]int64 + func (tq *TimerQueue) GetTask(taskID string) *QueuedTask + func (tq *TimerQueue) Len() int + func (tq *TimerQueue) NextDue() time.Duration + func (tq *TimerQueue) Peek() *QueuedTask + func (tq *TimerQueue) Pop() *QueuedTask + func (tq *TimerQueue) Push(task *QueuedTask) error