Documentation
¶
Overview ¶
Package goque provides a distributed task queue manager with support for multiple processors and automatic task healing and cleaning.
Package goque provides a robust, SQL-backed task queue system for Go applications.
Index ¶
- Constants
- Variables
- func SetMetricsServiceName(name string)
- func SetTracerProvider(tp trace.TracerProvider)
- type Goque
- type Metadata
- type PeriodicJob
- type PeriodicJobFactory
- type PeriodicJobOpts
- type PeriodicSchedule
- type PeriodicSchedulerFunc
- type ProcessorOpts
- type Task
- type TaskFilter
- type TaskProcessor
- type TaskProcessorFunc
- type TaskQueueManager
- type TaskStatus
- type TaskStorage
- type TaskType
Constants ¶
const ( TaskStatusNew = entity.TaskStatusNew // Task is ready to be picked up TaskStatusPending = entity.TaskStatusPending // Task is scheduled for future processing TaskStatusProcessing = entity.TaskStatusProcessing // Task is currently being processed TaskStatusDone = entity.TaskStatusDone // Task completed successfully TaskStatusCanceled = entity.TaskStatusCanceled // Task was manually canceled TaskStatusError = entity.TaskStatusError // Task failed but has retry attempts remaining TaskStatusAttemptsLeft = entity.TaskStatusAttemptsLeft // Task failed and exhausted all retries )
Task status constants define the possible states a task can be in.
Variables ¶
var ( // WithValue adds a single key-value pair to the context for task metadata tracking. WithValue = goquectx.WithValue // WithValues adds multiple key-value pairs to the context for task metadata tracking. WithValues = goquectx.WithValues // ValueByKey retrieves stored value from the context by key. ValueByKey = goquectx.ValueByKey // Values retrieves all stored metadata values from the context. Values = goquectx.Values )
Context value functions for storing and retrieving task metadata.
var ( NoTaskPayload = entity.NoTaskPayload // NewTask creates a new task with the specified type and payload. NewTask = entity.NewTask // NewTaskWithExternalID creates a new task with an external identifier for idempotency. NewTaskWithExternalID = entity.NewTaskWithExternalID )
Task creation functions for adding new tasks to the queue.
var ( // NewPeriodicJob creates a periodic job from a schedule and task factory. NewPeriodicJob = periodicprocessor.NewJob // NewCronJob creates a periodic job from a standard 5-field cron spec. NewCronJob = periodicprocessor.NewCronJob // CronSchedule creates a schedule from a standard 5-field cron spec. CronSchedule = periodicprocessor.CronSchedule // WithPeriodicJobRunOnStart makes a periodic job enqueue one task when the scheduler starts. WithPeriodicJobRunOnStart = periodicprocessor.WithRunOnStart )
var ( // WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in a single batch. WithTaskFetcherMaxTasks = queueprocessor.WithTaskFetcherMaxTasks // WithTaskFetcherTick sets the interval between task fetch attempts. WithTaskFetcherTick = queueprocessor.WithTaskFetcherTick // WithTaskFetcherTimeout sets the timeout for fetching tasks from storage. WithTaskFetcherTimeout = queueprocessor.WithTaskFetcherTimeout )
Task fetcher configuration options.
var ( // WithWorkersCount sets the number of concurrent workers for processing tasks. WithWorkersCount = queueprocessor.WithWorkersCount // WithWorkersPanicHandler sets a custom panic handler for worker goroutines. WithWorkersPanicHandler = queueprocessor.WithWorkersPanicHandler // WithTaskProcessingTimeout sets the timeout for processing a single task. WithTaskProcessingTimeout = queueprocessor.WithTaskProcessingTimeout // WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks. WithTaskProcessingMaxAttempts = queueprocessor.WithTaskProcessingMaxAttempts // WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time. WithTaskProcessingNextAttemptAtFunc = queueprocessor.WithTaskProcessingNextAttemptAtFunc )
Worker and task processing configuration options.
var ( // WithHooksBeforeProcessing sets hooks to execute before processing each task. WithHooksBeforeProcessing = queueprocessor.WithHooksBeforeProcessing // WithHooksAfterProcessing sets hooks to execute after processing each task. WithHooksAfterProcessing = queueprocessor.WithHooksAfterProcessing )
Hook configuration options for task processing.
var ( // WithCleanerUpdatedAtTimeAgo sets the age threshold for tasks to be cleaned. WithCleanerUpdatedAtTimeAgo = queueprocessor.WithCleanerUpdatedAtTimeAgo // WithCleanerTimeout sets the timeout for the cleaner operation. WithCleanerTimeout = queueprocessor.WithCleanerTimeout // WithCleanerPeriod sets the interval between cleaner runs. WithCleanerPeriod = queueprocessor.WithCleanerPeriod )
Cleaner configuration options for removing old tasks.
var ( // WithHealerUpdatedAtTimeAgo sets the age threshold for tasks to be healed. WithHealerUpdatedAtTimeAgo = queueprocessor.WithHealerUpdatedAtTimeAgo // WithHealerTimeout sets the timeout for the healer operation. WithHealerTimeout = queueprocessor.WithHealerTimeout // WithHealerPeriod sets the interval between healer runs. WithHealerPeriod = queueprocessor.WithHealerPeriod )
Healer configuration options for fixing stuck tasks.
var NoopTaskProcessor = queueprocessor.NoopTaskProcessor
NoopTaskProcessor is a no-op task processor that does nothing and returns nil.
Functions ¶
func SetMetricsServiceName ¶ added in v0.2.0
func SetMetricsServiceName(name string)
SetMetricsServiceName sets the service name label for Prometheus metrics. This should be called once during application initialization, before starting the queue manager.
Example:
goque.SetMetricsServiceName("my-service")
func SetTracerProvider ¶ added in v0.4.0
func SetTracerProvider(tp trace.TracerProvider)
SetTracerProvider configures the TracerProvider used by Goque for distributed tracing.
This function must be called BEFORE creating any Goque instances or TaskQueueManager instances, as they capture the tracer at creation time.
By default, Goque uses a noop tracer (zero overhead). Call this function to enable OpenTelemetry distributed tracing in production environments.
Example usage:
import (
"github.com/ruko1202/goque"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
// Initialize TracerProvider
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.01)), // 1% sampling
sdktrace.WithBatcher(exporter),
)
defer tracerProvider.Shutdown(ctx)
// Configure Goque (BEFORE creating instances)
goque.SetTracerProvider(tracerProvider)
// Now create Goque instances
goq := goque.NewGoque(taskStorage)
Types ¶
type Goque ¶
type Goque struct {
// contains filtered or unexported fields
}
Goque is the main task queue manager that coordinates multiple task processors.
func NewGoque ¶
func NewGoque(taskStorage TaskStorage) *Goque
NewGoque creates a new Goque instance with the specified task storage.
func (*Goque) RegisterPeriodicJob ¶ added in v0.5.0
func (g *Goque) RegisterPeriodicJob(job *PeriodicJob)
RegisterPeriodicJob registers a periodic job processor. Should be called before Run.
func (*Goque) RegisterProcessor ¶
func (g *Goque) RegisterProcessor( processorType string, taskProcessor TaskProcessor, opts ...ProcessorOpts, )
RegisterProcessor registers a new task processor for a specific task type. Should be call before Run.
type Metadata ¶ added in v0.3.0
Metadata represents arbitrary key-value data associated with a task for tracking and context.
type PeriodicJob ¶ added in v0.5.0
type PeriodicJob = periodicprocessor.Job
PeriodicJob describes a producer that periodically inserts regular queue tasks.
type PeriodicJobFactory ¶ added in v0.5.0
type PeriodicJobFactory = periodicprocessor.TaskFactory
PeriodicJobFactory creates a task for a scheduled run.
type PeriodicJobOpts ¶ added in v0.5.0
type PeriodicJobOpts = periodicprocessor.JobOptions
PeriodicJobOpts configures a periodic job.
type PeriodicSchedule ¶ added in v0.5.0
type PeriodicSchedule = periodicprocessor.Scheduler
PeriodicSchedule calculates the next run time for a periodic job.
type PeriodicSchedulerFunc ¶ added in v0.5.0
type PeriodicSchedulerFunc = periodicprocessor.SchedulerFunc
PeriodicSchedulerFunc wrap PeriodicSchedule.
type ProcessorOpts ¶ added in v0.5.0
type ProcessorOpts = queueprocessor.GoqueProcessorOpts
ProcessorOpts is a function type for configuring GoqueProcessor options.
type TaskFilter ¶ added in v0.1.1
type TaskFilter = dbentity.GetTasksFilter
TaskFilter represents filtering criteria for querying tasks from the queue.
type TaskProcessor ¶ added in v0.5.0
type TaskProcessor = queueprocessor.TaskProcessor
TaskProcessor defines the interface for processing individual tasks.
type TaskProcessorFunc ¶ added in v0.0.4
type TaskProcessorFunc = queueprocessor.TaskProcessorFunc
TaskProcessorFunc is a function type that implements the TaskProcessor interface.
type TaskQueueManager ¶ added in v0.1.1
type TaskQueueManager interface {
AsyncAddTaskToQueue(ctx context.Context, task *Task)
AddTaskToQueue(ctx context.Context, task *Task) error
GetTask(ctx context.Context, taskID uuid.UUID) (*Task, error)
GetTasks(ctx context.Context, filter *TaskFilter, limit int64) ([]*Task, error)
ResetAttempts(ctx context.Context, taskID uuid.UUID) error
CancelTask(ctx context.Context, taskID uuid.UUID) error
}
TaskQueueManager provides operations for managing tasks in the queue. It offers both synchronous and asynchronous methods for adding tasks, as well as querying and managing existing tasks.
func NewTaskQueueManager ¶ added in v0.1.1
func NewTaskQueueManager(taskStorage TaskStorage) TaskQueueManager
NewTaskQueueManager creates a new TaskQueueManager instance with the specified task storage.
type TaskStatus ¶ added in v0.0.4
type TaskStatus = entity.TaskStatus
TaskStatus represents the current status of a task in its lifecycle.
type TaskStorage ¶
TaskStorage defines the interface for task persistence operations.
func NewStorage ¶ added in v0.0.4
func NewStorage(db *sqlx.DB) (TaskStorage, error)
NewStorage creates a new task storage instance based on the database driver. Supports PostgreSQL and MySQL databases.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
entity
Package entity contains domain entities for the task queue system.
|
Package entity contains domain entities for the task queue system. |
|
metrics
Package metrics provides Prometheus instrumentation for Goque task queue operations.
|
Package metrics provides Prometheus instrumentation for Goque task queue operations. |
|
pkg/generated/mocks/mock_periodicprocessor
Package mock_periodicprocessor is a generated GoMock package.
|
Package mock_periodicprocessor is a generated GoMock package. |
|
pkg/generated/mocks/mock_storages
Package mock_storages is a generated GoMock package.
|
Package mock_storages is a generated GoMock package. |
|
processors/internalprocessors
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
|
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations. |
|
processors/periodicprocessor
Package periodicprocessor provides cron-based periodic job scheduling.
|
Package periodicprocessor provides cron-based periodic job scheduling. |
|
processors/queueprocessor
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
|
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic. |
|
queuemanager
Package queuemanager provides high-level task queue management operations.
|
Package queuemanager provides high-level task queue management operations. |
|
storages
Package storages provides interfaces and implementations for task storage backends.
|
Package storages provides interfaces and implementations for task storage backends. |
|
storages/dbentity
Package dbentity provides common database entities and filters for task storage implementations.
|
Package dbentity provides common database entities and filters for task storage implementations. |
|
storages/dbutils
Package dbutils provides common database utilities for task storage implementations.
|
Package dbutils provides common database utilities for task storage implementations. |
|
storages/mysql/task
Package mysqltask provides MySQL storage operations for task management in the queue system.
|
Package mysqltask provides MySQL storage operations for task management in the queue system. |
|
storages/pg/task
Package task provides storage operations for task management in the queue system.
|
Package task provides storage operations for task management in the queue system. |
|
storages/sqlite
Package sqlite provides SQLite storage operations for task management in the queue system.
|
Package sqlite provides SQLite storage operations for task management in the queue system. |
|
utils/goquectx
Package goquectx provides utilities for managing task metadata within context values.
|
Package goquectx provides utilities for managing task metadata within context values. |
|
utils/xcollections
Package xcollections provides thread-safe collection types.
|
Package xcollections provides thread-safe collection types. |
|
utils/xpool
Package xpool provides a buffer pool for efficient memory reuse.
|
Package xpool provides a buffer pool for efficient memory reuse. |
|
utils/xtime
Package xtime provides time-related utility functions.
|
Package xtime provides time-related utility functions. |
|
utils/xtracer
Package xtracer provides OpenTelemetry tracing utilities for Goque.
|
Package xtracer provides OpenTelemetry tracing utilities for Goque. |
|
pkg
|
|
|
goquestorage
Package goquestorage provides task storage implementations for different database backends.
|
Package goquestorage provides task storage implementations for different database backends. |
|
scripts
|
|
|
dbmodels
command
|
|
|
test
|
|
|
testutils
Package testutils provides testing utilities for the goque project.
|
Package testutils provides testing utilities for the goque project. |