queue

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package queue provides the core job queue functionality, including job dispatching, worker management, and queue lifecycle control. It supports multiple backend drivers and provides health monitoring and statistics collection.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue struct {

	// ShutdownTimeout is the maximum duration to wait for graceful shutdown
	ShutdownTimeout time.Duration
	// contains filtered or unexported fields
}

Queue represents a job queue instance that manages job dispatching and processing. It coordinates the interaction between job storage, dispatchers, and workers while providing monitoring and statistics collection capabilities.

func NewQueue

func NewQueue(queueName string, cfg config.Config, shutdownTimeout time.Duration) (*Queue, error)

NewQueue creates a new Queue instance with the specified configuration. It initializes the appropriate storage backend, dispatcher, and worker components. The shutdownTimeout parameter controls how long to wait for graceful shutdown.

Returns an error if the configuration is invalid or if backend initialization fails.

func (*Queue) Dispatch

func (q *Queue) Dispatch(job job.Job) error

Dispatch adds a new job to the queue for processing. The job will be stored in the backend and picked up by available workers.

func (*Queue) DispatchBatch

func (q *Queue) DispatchBatch(jobs []job.Job) error

DispatchBatch adds multiple jobs to the queue for processing. The jobs will be stored in the backend and picked up by available workers.

func (*Queue) DispatchBatchWithDelay

func (q *Queue) DispatchBatchWithDelay(jobs []job.Job, delay time.Duration) error

DispatchBatchWithDelay adds multiple jobs to the queue for processing after a delay.

func (*Queue) DispatchWithDelay

func (q *Queue) DispatchWithDelay(job job.Job, delay time.Duration) error

DispatchWithDelay adds a new job to the queue for processing after a delay.

func (*Queue) IsHealthy

func (q *Queue) IsHealthy() bool

IsHealthy checks if the queue and its backend storage are functioning properly. For Redis-backed queues, this includes checking the Redis connection health.

func (*Queue) IsOverloaded

func (q *Queue) IsOverloaded() bool

IsOverloaded checks if the queue is currently experiencing high load based on configured thresholds. Returns false if statistics collection is disabled.

func (*Queue) Shutdown

func (q *Queue) Shutdown(ctx context.Context) error

Shutdown gracefully stops the queue, waiting for in-progress jobs to complete up to the configured shutdown timeout duration. It cancels the queue context and stops all workers.

func (*Queue) StartWorkers

func (q *Queue) StartWorkers(ctx context.Context, count int) error

StartWorkers launches the specified number of worker goroutines to process jobs. The workers will continue running until the context is cancelled.

func (*Queue) Stats

func (q *Queue) Stats() stats.QueueStats

Stats returns current queue statistics including health status and performance metrics. If statistics collection is disabled, only basic health information is returned.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL