Documentation
¶
Overview ¶
Queue package implements a queue system for background operations using disk storage. It provides a DiskQueue that stores tasks in chunk files on disk, allowing for efficient handling of large volumes of tasks without consuming excessive memory. The Scheduler manages multiple queues and schedules task processing to a fixed number of workers.
Index ¶
- type Batch
- type BeforeScheduleHook
- type DiskQueue
- func (q *DiskQueue) Close() error
- func (q *DiskQueue) DequeueBatch() (batch *Batch, err error)
- func (q *DiskQueue) Drop() error
- func (q *DiskQueue) Flush() error
- func (q *DiskQueue) ForceSwitch(ctx context.Context, basePath string) ([]string, error)
- func (q *DiskQueue) ID() string
- func (q *DiskQueue) Init() error
- func (q *DiskQueue) ListFiles(ctx context.Context, basePath string) ([]string, error)
- func (q *DiskQueue) Metrics() *Metrics
- func (q *DiskQueue) Pause()
- func (q *DiskQueue) Push(record []byte) error
- func (q *DiskQueue) Resume()
- func (q *DiskQueue) Scheduler() *Scheduler
- func (q *DiskQueue) Size() int64
- func (q *DiskQueue) Wait()
- type DiskQueueOptions
- type Metrics
- type Queue
- type Scheduler
- func (s *Scheduler) Close() error
- func (s *Scheduler) IsQueuePaused(id string) bool
- func (s *Scheduler) PauseQueue(id string)
- func (s *Scheduler) RegisterQueue(q Queue)
- func (s *Scheduler) ResumeQueue(id string)
- func (s *Scheduler) Schedule(ctx context.Context)
- func (s *Scheduler) Start()
- func (s *Scheduler) UnregisterQueue(id string)
- func (s *Scheduler) Wait(id string)
- func (s *Scheduler) WaitAll()
- type SchedulerOptions
- type Task
- type TaskDecoder
- type TaskGrouper
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct {
Tasks []Task
Ctx context.Context
OnDone func()
OnCanceled func()
// contains filtered or unexported fields
}
A Batch represents a group of tasks dequeued together. The Scheduler will call Done() when all tasks have been processed, or Cancel() if the batch processing was canceled. The Queue implementation can use the OnDone and OnCanceled callbacks to perform any necessary actions when the batch is done or canceled.
func MergeBatches ¶ added in v1.30.0
MergeBatches merges multiple batches into a single batch. It will ignore nil batches. It will execute the OnDone and OnCanceled functions of all batches.
type BeforeScheduleHook ¶
type BeforeScheduleHook interface {
BeforeSchedule() bool
}
type DiskQueue ¶
type DiskQueue struct {
// Logger for the queue. Wrappers of this queue should use this logger.
Logger logrus.FieldLogger
// contains filtered or unexported fields
}
func NewDiskQueue ¶
func NewDiskQueue(opt DiskQueueOptions) (*DiskQueue, error)
func (*DiskQueue) Close ¶
Close the queue, prevent further pushes and unregister it from the scheduler.
func (*DiskQueue) DequeueBatch ¶
func (*DiskQueue) ForceSwitch ¶ added in v1.34.15
ForceSwitch forces the queue to switch to a new chunk file. It also returns the content of the directory before the switch. Important: the queue must be paused before calling this method.
type DiskQueueOptions ¶
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func NewMetrics( logger logrus.FieldLogger, prom *monitoring.PrometheusMetrics, labels prometheus.Labels, ) *Metrics
func (*Metrics) Registered ¶
func (*Metrics) Unregistered ¶
type Queue ¶
type Queue interface {
ID() string
Size() int64
DequeueBatch() (batch *Batch, err error)
Metrics() *Metrics
}
A Queue represents anything that can be scheduled by the Scheduler. It must return its ID, size, and be able to dequeue a batch of tasks.
type Scheduler ¶
type Scheduler struct {
SchedulerOptions
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(opts SchedulerOptions) *Scheduler
func (*Scheduler) IsQueuePaused ¶ added in v1.34.15
IsQueuePaused returns true if the queue is paused.
func (*Scheduler) PauseQueue ¶
func (*Scheduler) RegisterQueue ¶
func (*Scheduler) ResumeQueue ¶
func (*Scheduler) UnregisterQueue ¶
type SchedulerOptions ¶
type SchedulerOptions struct {
Logger logrus.FieldLogger
// Number of workers to process tasks. Defaults to the number of CPUs - 1.
Workers int
// The interval at which the scheduler checks the queues for tasks. Defaults to 1 second.
ScheduleInterval time.Duration
// The interval between retries for failed tasks.
RetryInterval time.Duration
// Function to be called when the scheduler is closed
OnClose func()
}
type Task ¶
type Task interface {
// Op returns a uint8 representing the operation type of the task.
// It is used for compressing tasks that implement the optional TaskGrouper interface.
Op() uint8
// Key returns a uint64 that can be used to influence task partitioning.
// Tasks are assigned to workers based on the hash of their key, to ensure
// that tasks with the same key are always processed by the same worker.
// Implementations can also return a constant value to make sure all tasks are
// processed by the same worker (e.g. for operations that must be serialized, like HFresh Merge operation).
Key() uint64
// Execute is called by a worker to process the task.
// If a task returns a transient error from Execute (e.g. canceled context, not enough memory, etc.),
// it will be retried using an exponential backoff strategy.
// Otherwise, the error will be considered permanent and the task will not be retried.
Execute(ctx context.Context) error
}
A Task represents a unit of work to be processed by the workers.
type TaskDecoder ¶
type TaskGrouper ¶
TaskGrouper is an optional interface that can be implemented by a Task to provide custom grouping logic.