Documentation
¶
Index ¶
- Variables
- type Config
- type Metrics
- type Option
- type PriorityLevel
- type PriorityQueue
- func (pq *PriorityQueue) Cancel(taskID string) bool
- func (pq *PriorityQueue) Clear()
- func (pq *PriorityQueue) GetMetrics() map[string]any
- func (pq *PriorityQueue) GetTask(taskID string) *QueuedTask
- func (pq *PriorityQueue) Len() int
- func (pq *PriorityQueue) LenAtPriority(priority int) int
- func (pq *PriorityQueue) Peek() *QueuedTask
- func (pq *PriorityQueue) PeekBatch(n int) []*QueuedTask
- func (pq *PriorityQueue) Pop() *QueuedTask
- func (pq *PriorityQueue) PopBatch(n int) []*QueuedTask
- func (pq *PriorityQueue) Push(task *QueuedTask) error
- func (pq *PriorityQueue) UpdatePriority(taskID string, newPriority int) bool
- type QueuedTask
- type RetryableError
- type TaskProcessor
- type TaskQueue
- func (q *TaskQueue) Cancel(taskID string) bool
- func (q *TaskQueue) GetMetrics() map[string]int64
- func (q *TaskQueue) IsBusy() bool
- func (q *TaskQueue) IsEmpty() bool
- func (q *TaskQueue) Process(task any) error
- func (q *TaskQueue) Push(task *QueuedTask) error
- func (q *TaskQueue) PushBatch(tasks []*QueuedTask) []error
- func (q *TaskQueue) Start()
- func (q *TaskQueue) Stop(ctx context.Context) error
- type TaskStatus
- type TimerMetrics
- type TimerQueue
- func (tq *TimerQueue) Cancel(taskID string) bool
- func (tq *TimerQueue) Clear()
- func (tq *TimerQueue) DueTasks() []*QueuedTask
- func (tq *TimerQueue) GetMetrics() map[string]int64
- func (tq *TimerQueue) GetTask(taskID string) *QueuedTask
- func (tq *TimerQueue) Len() int
- func (tq *TimerQueue) NextDue() time.Duration
- func (tq *TimerQueue) Peek() *QueuedTask
- func (tq *TimerQueue) Pop() *QueuedTask
- func (tq *TimerQueue) Push(task *QueuedTask) error
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidTask = errors.New("invalid task") ErrQueueFull = errors.New("task queue is full") ErrTaskTimeout = errors.New("task execution timeout") ErrDuplicateTask = errors.New("duplicate task ID") ErrTaskExists = errors.New("task already exists") ErrTemporaryFailure = errors.New("temporary failure") )
var RetryableErrors = []error{ context.DeadlineExceeded, ErrTaskTimeout, ErrTemporaryFailure, }
RetryableErrors are errors that can be retried
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Worker pool configuration
Workers int
QueueSize int
TaskTimeout time.Duration
// Queue specific configuration
MaxRetries int
RetryDelay time.Duration
// Priority configuration
MaxPriority int // Maximum priority level (0 to MaxPriority)
// Monitoring configuration
MetricsWindow time.Duration
}
Config represents queue configuration
type Metrics ¶
type Metrics struct {
EnqueueCount atomic.Int64
DequeueCount atomic.Int64
ProcessCount atomic.Int64
SuccessCount atomic.Int64
FailureCount atomic.Int64
RetryCount atomic.Int64
CancelCount atomic.Int64
ProcessingTime atomic.Int64 // nanoseconds
WaitTime atomic.Int64 // nanoseconds
QueueLength atomic.Int64
ActiveTasks atomic.Int64
}
Metrics represents queue metrics
type Option ¶
type Option func(*TaskQueue)
Option defines the function type for queue options
func WithProcessor ¶
func WithProcessor(processor TaskProcessor) Option
WithProcessor sets a custom task processor
type PriorityLevel ¶
type PriorityLevel struct {
// contains filtered or unexported fields
}
PriorityLevel represents task priority level
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue implements an optimized priority queue
func NewPriorityQueue ¶
func NewPriorityQueue(capacity int, maxPriority int) *PriorityQueue
NewPriorityQueue creates a new priority queue
Usage:
// Initialize a priority queue with capacity 1000 and maximum priority 10 pq := NewPriorityQueue(1000, 10)
// Add tasks to the queue
task1 := &QueuedTask{
ID: "task1",
Priority: 5,
Data: "data1",
}
pq.Push(task1)
task2 := &QueuedTask{
ID: "task2",
Priority: 8,
Data: "data2",
}
pq.Push(task2)
// Retrieve the highest priority task highestTask := pq.Pop() // Returns task2
// Update the priority of a specific task pq.UpdatePriority("task1", 9)
// Retrieve a batch of tasks (up to 5) tasks := pq.PopBatch(5)
// Retrieve queue metrics metrics := pq.GetMetrics()
func (*PriorityQueue) Cancel ¶
func (pq *PriorityQueue) Cancel(taskID string) bool
Cancel cancels and removes a task by ID
func (*PriorityQueue) GetMetrics ¶
func (pq *PriorityQueue) GetMetrics() map[string]any
GetMetrics returns queue metrics
func (*PriorityQueue) GetTask ¶
func (pq *PriorityQueue) GetTask(taskID string) *QueuedTask
GetTask returns a task by ID without removing it
func (*PriorityQueue) Len ¶
func (pq *PriorityQueue) Len() int
Len returns the current number of tasks
func (*PriorityQueue) LenAtPriority ¶
func (pq *PriorityQueue) LenAtPriority(priority int) int
LenAtPriority returns number of tasks at given priority level
func (*PriorityQueue) Peek ¶
func (pq *PriorityQueue) Peek() *QueuedTask
Peek returns highest priority task without removing it
func (*PriorityQueue) PeekBatch ¶
func (pq *PriorityQueue) PeekBatch(n int) []*QueuedTask
PeekBatch returns up to n highest priority tasks without removing them
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() *QueuedTask
Pop removes and returns the highest priority task
func (*PriorityQueue) PopBatch ¶
func (pq *PriorityQueue) PopBatch(n int) []*QueuedTask
PopBatch removes and returns up to n highest priority tasks
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(task *QueuedTask) error
Push pushes a task to priority queue
func (*PriorityQueue) UpdatePriority ¶
func (pq *PriorityQueue) UpdatePriority(taskID string, newPriority int) bool
UpdatePriority updates task priority
type QueuedTask ¶
type QueuedTask struct {
ID string
Type string
Priority int
Data any
TriggerAt time.Time
// Retry configuration
RetryCount int
MaxRetries int
RetryDelay time.Duration
LastRetryTime time.Time
// Runtime status
Status TaskStatus
LastError error
Context context.Context
Cancel context.CancelFunc
}
QueuedTask represents a task in queue
type RetryableError ¶
type RetryableError interface {
error
Temporary() bool // Returns true if the error is temporary and can be retried
}
RetryableError represents an error that can be retried
type TaskProcessor ¶
type TaskProcessor interface {
Process(task *QueuedTask) error
}
TaskProcessor represents the actual task execution logic
type TaskQueue ¶
type TaskQueue struct {
// contains filtered or unexported fields
}
TaskQueue represents a unified task queue
func NewTaskQueue ¶
NewTaskQueue creates a new task queue
Usage:
// Define a custom task
type CustomTask struct {
Name string
Data interface{}
}
// Implement TaskProcessor for CustomTask
type CustomTaskProcessor struct{}
func (p *CustomTaskProcessor) Process(task *queue.QueuedTask) error {
// Convert task data to CustomTask
customTask, ok := task.Data.(*CustomTask)
if !ok {
return fmt.Errorf("invalid task data type")
}
// Process the task
fmt.Printf("Processing task: %s with data: %v", customTask.Name, customTask.Data)
time.Sleep(time.Second) // Simulate task execution
return nil
}
// Create queue with default processor (handles func() and func() error)
q1 := NewTaskQueue(context.Background())
// Add a simple task using default processor
err := q1.Push(&QueuedTask{
ID: "func-task",
Data: func() error {
// Do some work
return nil
},
})
// Configuration for the task queue
cfg := &Config{
Workers: 10,
QueueSize: 1000,
TaskTimeout: time.Minute,
MaxPriority: 5,
MaxRetries: 3,
RetryDelay: time.Second * 5,
}
// Create queue with custom processor and configuration
processor := &CustomTaskProcessor{}
q2 := NewTaskQueue(context.Background(),
WithProcessor(processor),
WithConfig(cfg),
)
// Start the task queue
q2.Start()
// Example tasks
normalTask := &QueuedTask{
ID: "task-1",
Type: "normal",
Data: &CustomTask{
Name: "Normal Task",
Data: "some data",
},
}
priorityTask := &QueuedTask{
ID: "task-2",
Type: "priority",
Priority: 5,
Data: &CustomTask{
Name: "Priority Task",
Data: "priority data",
},
}
timerTask := &QueuedTask{
ID: "task-3",
Type: "timer",
TriggerAt: time.Now().Add(time.Minute),
Data: &CustomTask{
Name: "Timer Task",
Data: "timer data",
},
}
// Add tasks to the queue
if err := q2.Push(normalTask); err != nil {
fmt.Printf("Failed to push normal task: %v", err)
}
if err := q2.Push(priorityTask); err != nil {
fmt.Printf("Failed to push priority task: %v", err)
}
if err := q2.Push(timerTask); err != nil {
fmt.Printf("Failed to push timer task: %v", err)
}
// Retrieve queue metrics
metrics := q2.GetMetrics()
// Stop the task queue with a timeout
stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
q2.Stop(stopCtx)
func (*TaskQueue) GetMetrics ¶
GetMetrics returns current queue metrics
func (*TaskQueue) Push ¶
func (q *TaskQueue) Push(task *QueuedTask) error
Push pushes a task to queue
func (*TaskQueue) PushBatch ¶
func (q *TaskQueue) PushBatch(tasks []*QueuedTask) []error
PushBatch pushes multiple tasks to queue
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the status of a task
const ( TaskStatusPending TaskStatus = "pending" TaskStatusRunning TaskStatus = "running" TaskStatusCompleted TaskStatus = "completed" TaskStatusFailed TaskStatus = "failed" TaskStatusCanceled TaskStatus = "canceled" TaskStatusRetrying TaskStatus = "retrying" )
type TimerMetrics ¶
type TimerMetrics struct {
EnqueueCount atomic.Int64
DequeueCount atomic.Int64
CancelCount atomic.Int64
TimeoutCount atomic.Int64
OverdueCount atomic.Int64
}
TimerMetrics tracks operational metrics for the timer queue
type TimerQueue ¶
type TimerQueue struct {
// contains filtered or unexported fields
}
TimerQueue implements an optimized timer queue using min heap
func NewTimerQueue ¶
func NewTimerQueue(capacity int) *TimerQueue
NewTimerQueue creates a new timer queue with the specified capacity
Usage:
// Initialize a timer queue with capacity 1000 tq := NewTimerQueue(1000)
// Add a timer task
task := &QueuedTask{
ID: "timer-1",
TriggerAt: time.Now().Add(time.Minute),
Data: "timer data",
}
tq.Push(task)
// Retrieve the duration until the next task is due nextDue := tq.NextDue() fmt.Printf("Next task due in: %v\n", nextDue)
func (*TimerQueue) Cancel ¶
func (tq *TimerQueue) Cancel(taskID string) bool
Cancel removes a task by ID and returns true if successful
func (*TimerQueue) DueTasks ¶
func (tq *TimerQueue) DueTasks() []*QueuedTask
DueTasks returns all tasks that are due for execution
func (*TimerQueue) GetMetrics ¶
func (tq *TimerQueue) GetMetrics() map[string]int64
GetMetrics returns current queue metrics
func (*TimerQueue) GetTask ¶
func (tq *TimerQueue) GetTask(taskID string) *QueuedTask
GetTask returns a task by ID without removing it
func (*TimerQueue) Len ¶
func (tq *TimerQueue) Len() int
Len returns the current number of tasks in queue
func (*TimerQueue) NextDue ¶
func (tq *TimerQueue) NextDue() time.Duration
NextDue returns the duration until the next task is due Returns -1 if queue is empty
func (*TimerQueue) Peek ¶
func (tq *TimerQueue) Peek() *QueuedTask
Peek returns the next due task without removing it
func (*TimerQueue) Pop ¶
func (tq *TimerQueue) Pop() *QueuedTask
Pop removes and returns the earliest due task
func (*TimerQueue) Push ¶
func (tq *TimerQueue) Push(task *QueuedTask) error
Push adds a task to the timer queue