queue

package
v1.34.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: BSD-3-Clause Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	Tasks []Task
	Ctx   context.Context
	// contains filtered or unexported fields
}

func MergeBatches added in v1.30.0

func MergeBatches(batches ...*Batch) *Batch

MergeBatches merges multiple batches into a single batch. It will ignore nil batches. It will execute the onDone and onCanceled functions of all batches.

func (*Batch) Cancel

func (b *Batch) Cancel()

func (*Batch) Done

func (b *Batch) Done()

type BeforeScheduleHook

type BeforeScheduleHook interface {
	BeforeSchedule() bool
}

type DiskQueue

type DiskQueue struct {
	// Logger for the queue. Wrappers of this queue should use this logger.
	Logger logrus.FieldLogger
	// contains filtered or unexported fields
}

func NewDiskQueue

func NewDiskQueue(opt DiskQueueOptions) (*DiskQueue, error)

func (*DiskQueue) Close

func (q *DiskQueue) Close() error

Close the queue, prevent further pushes and unregister it from the scheduler.

func (*DiskQueue) DequeueBatch

func (q *DiskQueue) DequeueBatch() (batch *Batch, err error)

func (*DiskQueue) Drop

func (q *DiskQueue) Drop() error

func (*DiskQueue) Flush

func (q *DiskQueue) Flush() error

func (*DiskQueue) ForceSwitch added in v1.34.15

func (q *DiskQueue) ForceSwitch(ctx context.Context, basePath string) ([]string, error)

ForceSwitch forces the queue to switch to a new chunk file. It also returns the content of the directory before the switch. Important: the queue must be paused before calling this method.

func (*DiskQueue) ID

func (q *DiskQueue) ID() string

func (*DiskQueue) Init

func (q *DiskQueue) Init() error

func (*DiskQueue) ListFiles added in v1.32.22

func (q *DiskQueue) ListFiles(ctx context.Context, basePath string) ([]string, error)

ListFiles returns a list of all chunk files in the queue directory. The returned paths are relative to the basePath. It is used for backup purposes and must be called only when the queue is not in use.

func (*DiskQueue) Metrics

func (q *DiskQueue) Metrics() *Metrics

func (*DiskQueue) Pause

func (q *DiskQueue) Pause()

func (*DiskQueue) Push

func (q *DiskQueue) Push(record []byte) error

func (*DiskQueue) Resume

func (q *DiskQueue) Resume()

func (*DiskQueue) Scheduler

func (q *DiskQueue) Scheduler() *Scheduler

func (*DiskQueue) Size

func (q *DiskQueue) Size() int64

func (*DiskQueue) Wait

func (q *DiskQueue) Wait()

type DiskQueueOptions

type DiskQueueOptions struct {
	// Required
	ID          string
	Scheduler   *Scheduler
	Dir         string
	TaskDecoder TaskDecoder

	// Optional
	Logger           logrus.FieldLogger
	StaleTimeout     time.Duration
	InactivityPeriod time.Duration
	ChunkSize        uint64
	OnBatchProcessed func()
	Metrics          *Metrics
}

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(
	logger logrus.FieldLogger,
	prom *monitoring.PrometheusMetrics,
	labels prometheus.Labels,
) *Metrics

func (*Metrics) DiskUsage

func (m *Metrics) DiskUsage(size int64)

func (*Metrics) Paused

func (m *Metrics) Paused(id string)

func (*Metrics) Registered

func (m *Metrics) Registered(id string)

func (*Metrics) Resumed

func (m *Metrics) Resumed(id string)

func (*Metrics) Size

func (m *Metrics) Size(size uint64)

func (*Metrics) TasksProcessed

func (m *Metrics) TasksProcessed(start time.Time, count int)

func (*Metrics) Unregistered

func (m *Metrics) Unregistered(id string)

type Queue

type Queue interface {
	ID() string
	Size() int64
	DequeueBatch() (batch *Batch, err error)
	Metrics() *Metrics
}

type Scheduler

type Scheduler struct {
	SchedulerOptions
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(opts SchedulerOptions) *Scheduler

func (*Scheduler) Close

func (s *Scheduler) Close() error

func (*Scheduler) IsQueuePaused added in v1.34.15

func (s *Scheduler) IsQueuePaused(id string) bool

IsQueuePaused returns true if the queue is paused.

func (*Scheduler) PauseQueue

func (s *Scheduler) PauseQueue(id string)

func (*Scheduler) RegisterQueue

func (s *Scheduler) RegisterQueue(q Queue)

func (*Scheduler) ResumeQueue

func (s *Scheduler) ResumeQueue(id string)

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(ctx context.Context)

Manually schedule the queues.

func (*Scheduler) Start

func (s *Scheduler) Start()

func (*Scheduler) UnregisterQueue

func (s *Scheduler) UnregisterQueue(id string)

func (*Scheduler) Wait

func (s *Scheduler) Wait(id string)

func (*Scheduler) WaitAll

func (s *Scheduler) WaitAll()

type SchedulerOptions

type SchedulerOptions struct {
	Logger logrus.FieldLogger
	// Number of workers to process tasks. Defaults to the number of CPUs - 1.
	Workers int
	// The interval at which the scheduler checks the queues for tasks. Defaults to 1 second.
	ScheduleInterval time.Duration
	// The interval between retries for failed tasks.
	RetryInterval time.Duration
	// Function to be called when the scheduler is closed
	OnClose func()
}

type Task

type Task interface {
	Op() uint8
	Key() uint64
	Execute(ctx context.Context) error
}

type TaskDecoder

type TaskDecoder interface {
	DecodeTask([]byte) (Task, error)
}

type TaskGrouper

type TaskGrouper interface {
	NewGroup(op uint8, tasks ...Task) Task
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(logger logrus.FieldLogger, retryInterval time.Duration) (*Worker, chan *Batch)

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL