queue

package
v1.35.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: BSD-3-Clause Imports: 21 Imported by: 0

Documentation

Overview

Queue package implements a queue system for background operations using disk storage. It provides a DiskQueue that stores tasks in chunk files on disk, allowing for efficient handling of large volumes of tasks without consuming excessive memory. The Scheduler manages multiple queues and schedules task processing to a fixed number of workers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	Tasks      []Task
	Ctx        context.Context
	OnDone     func()
	OnCanceled func()
	// contains filtered or unexported fields
}

A Batch represents a group of tasks dequeued together. The Scheduler will call Done() when all tasks have been processed, or Cancel() if the batch processing was canceled. The Queue implementation can use the OnDone and OnCanceled callbacks to perform any necessary actions when the batch is done or canceled.

func MergeBatches added in v1.30.0

func MergeBatches(batches ...*Batch) *Batch

MergeBatches merges multiple batches into a single batch. It will ignore nil batches. It will execute the OnDone and OnCanceled functions of all batches.

func (*Batch) Cancel

func (b *Batch) Cancel()

Called by the worker if the batch processing was canceled. It will execute the OnCanceled callback if it is set.

func (*Batch) Done

func (b *Batch) Done()

Called by the worker when all tasks in the batch have been processed. It will execute the OnDone callback if it is set.

type BeforeScheduleHook

type BeforeScheduleHook interface {
	BeforeSchedule() bool
}

type DiskQueue

type DiskQueue struct {
	// Logger for the queue. Wrappers of this queue should use this logger.
	Logger logrus.FieldLogger
	// contains filtered or unexported fields
}

func NewDiskQueue

func NewDiskQueue(opt DiskQueueOptions) (*DiskQueue, error)

func (*DiskQueue) Close

func (q *DiskQueue) Close() error

Close the queue, prevent further pushes and unregister it from the scheduler.

func (*DiskQueue) DequeueBatch

func (q *DiskQueue) DequeueBatch() (batch *Batch, err error)

func (*DiskQueue) Drop

func (q *DiskQueue) Drop() error

func (*DiskQueue) Flush

func (q *DiskQueue) Flush() error

func (*DiskQueue) ForceSwitch added in v1.34.15

func (q *DiskQueue) ForceSwitch(ctx context.Context, basePath string) ([]string, error)

ForceSwitch forces the queue to switch to a new chunk file. It also returns the content of the directory before the switch. Important: the queue must be paused before calling this method.

func (*DiskQueue) ID

func (q *DiskQueue) ID() string

func (*DiskQueue) Init

func (q *DiskQueue) Init() error

func (*DiskQueue) ListFiles added in v1.32.22

func (q *DiskQueue) ListFiles(ctx context.Context, basePath string) ([]string, error)

ListFiles returns a list of all chunk files in the queue directory. The returned paths are relative to the basePath. It is used for backup purposes and must be called only when the queue is not in use.

func (*DiskQueue) Metrics

func (q *DiskQueue) Metrics() *Metrics

func (*DiskQueue) Pause

func (q *DiskQueue) Pause()

func (*DiskQueue) Push

func (q *DiskQueue) Push(record []byte) error

func (*DiskQueue) Resume

func (q *DiskQueue) Resume()

func (*DiskQueue) Scheduler

func (q *DiskQueue) Scheduler() *Scheduler

func (*DiskQueue) Size

func (q *DiskQueue) Size() int64

func (*DiskQueue) Wait

func (q *DiskQueue) Wait()

type DiskQueueOptions

type DiskQueueOptions struct {
	// Required
	ID          string
	Scheduler   *Scheduler
	Dir         string
	TaskDecoder TaskDecoder

	// Optional
	Logger           logrus.FieldLogger
	StaleTimeout     time.Duration
	InactivityPeriod time.Duration
	ChunkSize        uint64
	OnBatchProcessed func()
	Metrics          *Metrics
}

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(
	logger logrus.FieldLogger,
	prom *monitoring.PrometheusMetrics,
	labels prometheus.Labels,
) *Metrics

func (*Metrics) DiskUsage

func (m *Metrics) DiskUsage(size int64)

func (*Metrics) Paused

func (m *Metrics) Paused(id string)

func (*Metrics) Registered

func (m *Metrics) Registered(id string)

func (*Metrics) Resumed

func (m *Metrics) Resumed(id string)

func (*Metrics) Size

func (m *Metrics) Size(size uint64)

func (*Metrics) TasksProcessed

func (m *Metrics) TasksProcessed(start time.Time, count int)

func (*Metrics) Unregistered

func (m *Metrics) Unregistered(id string)

type Queue

type Queue interface {
	ID() string
	Size() int64
	DequeueBatch() (batch *Batch, err error)
	Metrics() *Metrics
}

A Queue represents anything that can be scheduled by the Scheduler. It must return its ID, size, and be able to dequeue a batch of tasks.

type Scheduler

type Scheduler struct {
	SchedulerOptions
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(opts SchedulerOptions) *Scheduler

func (*Scheduler) Close

func (s *Scheduler) Close() error

func (*Scheduler) IsQueuePaused added in v1.34.15

func (s *Scheduler) IsQueuePaused(id string) bool

IsQueuePaused returns true if the queue is paused.

func (*Scheduler) PauseQueue

func (s *Scheduler) PauseQueue(id string)

func (*Scheduler) RegisterQueue

func (s *Scheduler) RegisterQueue(q Queue)

func (*Scheduler) ResumeQueue

func (s *Scheduler) ResumeQueue(id string)

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(ctx context.Context)

Manually schedule the queues.

func (*Scheduler) Start

func (s *Scheduler) Start()

func (*Scheduler) UnregisterQueue

func (s *Scheduler) UnregisterQueue(id string)

func (*Scheduler) Wait

func (s *Scheduler) Wait(id string)

func (*Scheduler) WaitAll

func (s *Scheduler) WaitAll()

type SchedulerOptions

type SchedulerOptions struct {
	Logger logrus.FieldLogger
	// Number of workers to process tasks. Defaults to the number of CPUs - 1.
	Workers int
	// The interval at which the scheduler checks the queues for tasks. Defaults to 1 second.
	ScheduleInterval time.Duration
	// The interval between retries for failed tasks.
	RetryInterval time.Duration
	// Function to be called when the scheduler is closed
	OnClose func()
}

type Task

type Task interface {
	// Op returns a uint8 representing the operation type of the task.
	// It is used for compressing tasks that implement the optional TaskGrouper interface.
	Op() uint8
	// Key returns a uint64 that can be used to influence task partitioning.
	// Tasks are assigned to workers based on the hash of their key, to ensure
	// that tasks with the same key are always processed by the same worker.
	// Implementations can also return a constant value to make sure all tasks are
	// processed by the same worker (e.g. for operations that must be serialized, like HFresh Merge operation).
	Key() uint64
	// Execute is called by a worker to process the task.
	// If a task returns a transient error from Execute (e.g. canceled context, not enough memory, etc.),
	// it will be retried using an exponential backoff strategy.
	// Otherwise, the error will be considered permanent and the task will not be retried.
	Execute(ctx context.Context) error
}

A Task represents a unit of work to be processed by the workers.

type TaskDecoder

type TaskDecoder interface {
	DecodeTask([]byte) (Task, error)
}

type TaskGrouper

type TaskGrouper interface {
	NewGroup(op uint8, tasks ...Task) Task
}

TaskGrouper is an optional interface that can be implemented by a Task to provide custom grouping logic.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(logger logrus.FieldLogger) (*Worker, chan *Batch)

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL