queue

package
v0.0.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type QueueItem

type QueueItem struct {
	Task       *tasksv1.Task
	EnqueuedAt time.Time
	RetryCount int
	Handler    events.TaskHandlerFunc
	// contains filtered or unexported fields
}

type QueueMetrics

type QueueMetrics struct {
	ItemsEnqueued  atomic.Int64
	ItemsDequeued  atomic.Int64
	ItemsProcessed atomic.Int64
	ItemsFailed    atomic.Int64
	TotalRetries   atomic.Int64
	CurrentDepth   atomic.Int64
}

QueueMetrics tracks performance and operational metrics for the queue

type TaskQueue

type TaskQueue struct {
	// contains filtered or unexported fields
}

func NewTaskQueue

func NewTaskQueue(logger logger.Logger) *TaskQueue

NewTaskQueue creates a new unbounded task queue

func (*TaskQueue) Close

func (q *TaskQueue) Close()

func (*TaskQueue) Dequeue

func (q *TaskQueue) Dequeue() (*QueueItem, error)

Dequeue retrieves and removes a task from the queue Blocks until an item is available or queue is closed

func (*TaskQueue) Done

func (q *TaskQueue) Done(t *tasksv1.Task)

func (*TaskQueue) Enqueue

func (q *TaskQueue) Enqueue(ctx context.Context, item *QueueItem) error

Enqueue adds a task to the queue

func (*TaskQueue) Len

func (q *TaskQueue) Len() int

Len returns the current number of items in the queue

func (*TaskQueue) Pop

func (q *TaskQueue) Pop() any

func (*TaskQueue) Requeue

func (q *TaskQueue) Requeue(ctx context.Context, item *QueueItem) error

Requeue adds a task back to the queue with incremented retry count Used when task processing fails and should be retried

type WorkPool

type WorkPool struct {
	// contains filtered or unexported fields
}

func NewPool

func NewPool(queue *TaskQueue, opts ...WorkPoolOption) *WorkPool

func (*WorkPool) Start

func (w *WorkPool) Start(ctx context.Context)

func (*WorkPool) Stop

func (w *WorkPool) Stop()

type WorkPoolOption

type WorkPoolOption func(*WorkPool)

func WithBackoff

func WithBackoff(min, max time.Duration) WorkPoolOption

func WithLogger

func WithLogger(l logger.Logger) WorkPoolOption

func WithMaxRetries

func WithMaxRetries(n int) WorkPoolOption

func WithMaxWorkers

func WithMaxWorkers(n int) WorkPoolOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL