queue

package
v0.1.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidTask      = errors.New("invalid task")
	ErrQueueFull        = errors.New("task queue is full")
	ErrTaskTimeout      = errors.New("task execution timeout")
	ErrDuplicateTask    = errors.New("duplicate task ID")
	ErrTaskExists       = errors.New("task already exists")
	ErrTemporaryFailure = errors.New("temporary failure")
)

RetryableErrors are errors that can be retried

Functions

This section is empty.

Types

type Config

type Config struct {
	// Worker pool configuration
	Workers     int
	QueueSize   int
	TaskTimeout time.Duration

	// Queue specific configuration
	MaxRetries int
	RetryDelay time.Duration

	// Priority configuration
	MaxPriority int // Maximum priority level (0 to MaxPriority)

	// Monitoring configuration
	MetricsWindow time.Duration
}

Config represents queue configuration

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns default configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type Metrics

type Metrics struct {
	EnqueueCount   atomic.Int64
	DequeueCount   atomic.Int64
	ProcessCount   atomic.Int64
	SuccessCount   atomic.Int64
	FailureCount   atomic.Int64
	RetryCount     atomic.Int64
	CancelCount    atomic.Int64
	ProcessingTime atomic.Int64 // nanoseconds
	WaitTime       atomic.Int64 // nanoseconds
	QueueLength    atomic.Int64
	ActiveTasks    atomic.Int64
}

Metrics represents queue metrics

type Option

type Option func(*TaskQueue)

Option defines the function type for queue options

func WithConfig

func WithConfig(cfg *Config) Option

WithConfig sets the queue configuration

func WithProcessor

func WithProcessor(processor TaskProcessor) Option

WithProcessor sets a custom task processor

type PriorityLevel

type PriorityLevel struct {
	// contains filtered or unexported fields
}

PriorityLevel represents task priority level

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue implements an optimized priority queue

func NewPriorityQueue

func NewPriorityQueue(capacity int, maxPriority int) *PriorityQueue

NewPriorityQueue creates a new priority queue

Usage:

// Initialize a priority queue with capacity 1000 and maximum priority 10 pq := NewPriorityQueue(1000, 10)

// Add tasks to the queue

task1 := &QueuedTask{
    ID:       "task1",
    Priority: 5,
    Data:     "data1",
}

pq.Push(task1)

task2 := &QueuedTask{
    ID:       "task2",
    Priority: 8,
    Data:     "data2",
}

pq.Push(task2)

// Retrieve the highest priority task highestTask := pq.Pop() // Returns task2

// Update the priority of a specific task pq.UpdatePriority("task1", 9)

// Retrieve a batch of tasks (up to 5) tasks := pq.PopBatch(5)

// Retrieve queue metrics metrics := pq.GetMetrics()

func (*PriorityQueue) Cancel

func (pq *PriorityQueue) Cancel(taskID string) bool

Cancel cancels and removes a task by ID

func (*PriorityQueue) Clear

func (pq *PriorityQueue) Clear()

Clear removes all tasks

func (*PriorityQueue) GetMetrics

func (pq *PriorityQueue) GetMetrics() map[string]any

GetMetrics returns queue metrics

func (*PriorityQueue) GetTask

func (pq *PriorityQueue) GetTask(taskID string) *QueuedTask

GetTask returns a task by ID without removing it

func (*PriorityQueue) Len

func (pq *PriorityQueue) Len() int

Len returns the current number of tasks

func (*PriorityQueue) LenAtPriority

func (pq *PriorityQueue) LenAtPriority(priority int) int

LenAtPriority returns number of tasks at given priority level

func (*PriorityQueue) Peek

func (pq *PriorityQueue) Peek() *QueuedTask

Peek returns highest priority task without removing it

func (*PriorityQueue) PeekBatch

func (pq *PriorityQueue) PeekBatch(n int) []*QueuedTask

PeekBatch returns up to n highest priority tasks without removing them

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() *QueuedTask

Pop removes and returns the highest priority task

func (*PriorityQueue) PopBatch

func (pq *PriorityQueue) PopBatch(n int) []*QueuedTask

PopBatch removes and returns up to n highest priority tasks

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(task *QueuedTask) error

Push pushes a task to priority queue

func (*PriorityQueue) UpdatePriority

func (pq *PriorityQueue) UpdatePriority(taskID string, newPriority int) bool

UpdatePriority updates task priority

type QueuedTask

type QueuedTask struct {
	ID        string
	Type      string
	Priority  int
	Data      any
	TriggerAt time.Time

	// Retry configuration
	RetryCount    int
	MaxRetries    int
	RetryDelay    time.Duration
	LastRetryTime time.Time

	// Runtime status
	Status    TaskStatus
	LastError error
	Context   context.Context
	Cancel    context.CancelFunc
}

QueuedTask represents a task in queue

type RetryableError

type RetryableError interface {
	error
	Temporary() bool // Returns true if the error is temporary and can be retried
}

RetryableError represents an error that can be retried

type TaskProcessor

type TaskProcessor interface {
	Process(task *QueuedTask) error
}

TaskProcessor represents the actual task execution logic

type TaskQueue

type TaskQueue struct {
	// contains filtered or unexported fields
}

TaskQueue represents a unified task queue

func NewTaskQueue

func NewTaskQueue(ctx context.Context, opts ...Option) *TaskQueue

NewTaskQueue creates a new task queue

Usage:

// Define a custom task
type CustomTask struct {
    Name string
    Data interface{}
}

// Implement TaskProcessor for CustomTask
type CustomTaskProcessor struct{}

func (p *CustomTaskProcessor) Process(task *queue.QueuedTask) error {
    // Convert task data to CustomTask
    customTask, ok := task.Data.(*CustomTask)
    if !ok {
        return fmt.Errorf("invalid task data type")
    }

    // Process the task
    fmt.Printf("Processing task: %s with data: %v", customTask.Name, customTask.Data)
    time.Sleep(time.Second) // Simulate task execution
    return nil
}

// Create queue with default processor (handles func() and func() error)
q1 := NewTaskQueue(context.Background())

// Add a simple task using default processor
err := q1.Push(&QueuedTask{
    ID: "func-task",
    Data: func() error {
        // Do some work
        return nil
    },
})

// Configuration for the task queue
cfg := &Config{
    Workers:     10,
    QueueSize:   1000,
    TaskTimeout: time.Minute,
    MaxPriority: 5,
    MaxRetries:  3,
    RetryDelay:  time.Second * 5,
}

// Create queue with custom processor and configuration
processor := &CustomTaskProcessor{}
q2 := NewTaskQueue(context.Background(),
    WithProcessor(processor),
    WithConfig(cfg),
)

// Start the task queue
q2.Start()

// Example tasks
normalTask := &QueuedTask{
    ID:   "task-1",
    Type: "normal",
    Data: &CustomTask{
        Name: "Normal Task",
        Data: "some data",
    },
}

priorityTask := &QueuedTask{
    ID:       "task-2",
    Type:     "priority",
    Priority: 5,
    Data: &CustomTask{
        Name: "Priority Task",
        Data: "priority data",
    },
}

timerTask := &QueuedTask{
    ID:        "task-3",
    Type:      "timer",
    TriggerAt: time.Now().Add(time.Minute),
    Data: &CustomTask{
        Name: "Timer Task",
        Data: "timer data",
    },
}

// Add tasks to the queue
if err := q2.Push(normalTask); err != nil {
    fmt.Printf("Failed to push normal task: %v", err)
}

if err := q2.Push(priorityTask); err != nil {
    fmt.Printf("Failed to push priority task: %v", err)
}

if err := q2.Push(timerTask); err != nil {
    fmt.Printf("Failed to push timer task: %v", err)
}

// Retrieve queue metrics
metrics := q2.GetMetrics()

// Stop the task queue with a timeout
stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
q2.Stop(stopCtx)

func (*TaskQueue) Cancel

func (q *TaskQueue) Cancel(taskID string) bool

Cancel cancels a task by ID

func (*TaskQueue) GetMetrics

func (q *TaskQueue) GetMetrics() map[string]int64

GetMetrics returns current queue metrics

func (*TaskQueue) IsBusy

func (q *TaskQueue) IsBusy() bool

IsBusy returns whether the queue is currently busy

func (*TaskQueue) IsEmpty

func (q *TaskQueue) IsEmpty() bool

IsEmpty returns whether the queue is empty

func (*TaskQueue) Process

func (q *TaskQueue) Process(task any) error

Process implements TaskProcessor interface for worker pool

func (*TaskQueue) Push

func (q *TaskQueue) Push(task *QueuedTask) error

Push pushes a task to queue

func (*TaskQueue) PushBatch

func (q *TaskQueue) PushBatch(tasks []*QueuedTask) []error

PushBatch pushes multiple tasks to queue

func (*TaskQueue) Start

func (q *TaskQueue) Start()

Start starts the queue

func (*TaskQueue) Stop

func (q *TaskQueue) Stop(ctx context.Context) error

Stop stops the queue

type TaskStatus

type TaskStatus string

TaskStatus represents the status of a task

const (
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusCanceled  TaskStatus = "canceled"
	TaskStatusRetrying  TaskStatus = "retrying"
)

type TimerMetrics

type TimerMetrics struct {
	EnqueueCount atomic.Int64
	DequeueCount atomic.Int64
	CancelCount  atomic.Int64
	TimeoutCount atomic.Int64
	OverdueCount atomic.Int64
}

TimerMetrics tracks operational metrics for the timer queue

type TimerQueue

type TimerQueue struct {
	// contains filtered or unexported fields
}

TimerQueue implements an optimized timer queue using min heap

func NewTimerQueue

func NewTimerQueue(capacity int) *TimerQueue

NewTimerQueue creates a new timer queue with the specified capacity

Usage:

// Initialize a timer queue with capacity 1000 tq := NewTimerQueue(1000)

// Add a timer task

task := &QueuedTask{
    ID:        "timer-1",
    TriggerAt: time.Now().Add(time.Minute),
    Data:      "timer data",
}

tq.Push(task)

// Retrieve the duration until the next task is due nextDue := tq.NextDue() fmt.Printf("Next task due in: %v\n", nextDue)

func (*TimerQueue) Cancel

func (tq *TimerQueue) Cancel(taskID string) bool

Cancel removes a task by ID and returns true if successful

func (*TimerQueue) Clear

func (tq *TimerQueue) Clear()

Clear removes all tasks from the queue

func (*TimerQueue) DueTasks

func (tq *TimerQueue) DueTasks() []*QueuedTask

DueTasks returns all tasks that are due for execution

func (*TimerQueue) GetMetrics

func (tq *TimerQueue) GetMetrics() map[string]int64

GetMetrics returns current queue metrics

func (*TimerQueue) GetTask

func (tq *TimerQueue) GetTask(taskID string) *QueuedTask

GetTask returns a task by ID without removing it

func (*TimerQueue) Len

func (tq *TimerQueue) Len() int

Len returns the current number of tasks in queue

func (*TimerQueue) NextDue

func (tq *TimerQueue) NextDue() time.Duration

NextDue returns the duration until the next task is due Returns -1 if queue is empty

func (*TimerQueue) Peek

func (tq *TimerQueue) Peek() *QueuedTask

Peek returns the next due task without removing it

func (*TimerQueue) Pop

func (tq *TimerQueue) Pop() *QueuedTask

Pop removes and returns the earliest due task

func (*TimerQueue) Push

func (tq *TimerQueue) Push(task *QueuedTask) error

Push adds a task to the timer queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL