Documentation
¶
Overview ¶
Package goque provides a distributed task queue manager with support for multiple processors and automatic task healing and cleaning.
Package goque provides a robust, SQL-backed task queue system for Go applications.
Index ¶
Constants ¶
const ( TaskStatusNew = entity.TaskStatusNew // Task is ready to be picked up TaskStatusPending = entity.TaskStatusPending // Task is scheduled for future processing TaskStatusProcessing = entity.TaskStatusProcessing // Task is currently being processed TaskStatusDone = entity.TaskStatusDone // Task completed successfully TaskStatusCanceled = entity.TaskStatusCanceled // Task was manually canceled TaskStatusError = entity.TaskStatusError // Task failed but has retry attempts remaining TaskStatusAttemptsLeft = entity.TaskStatusAttemptsLeft // Task failed and exhausted all retries )
Task status constants define the possible states a task can be in.
Variables ¶
var ( // WithValue adds a single key-value pair to the context for task metadata tracking. WithValue = goquectx.WithValue // WithValues adds multiple key-value pairs to the context for task metadata tracking. WithValues = goquectx.WithValues // ValueByKey retrieves stored value from the context by key. ValueByKey = goquectx.ValueByKey // Values retrieves all stored metadata values from the context. Values = goquectx.Values )
Context value functions for storing and retrieving task metadata.
var ( // NewTask creates a new task with the specified type and payload. NewTask = entity.NewTask // NewTaskWithExternalID creates a new task with an external identifier for idempotency. NewTaskWithExternalID = entity.NewTaskWithExternalID )
Task creation functions for adding new tasks to the queue.
var ( // WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in a single batch. WithTaskFetcherMaxTasks = queueprocessor.WithTaskFetcherMaxTasks // WithTaskFetcherTick sets the interval between task fetch attempts. WithTaskFetcherTick = queueprocessor.WithTaskFetcherTick // WithTaskFetcherTimeout sets the timeout for fetching tasks from storage. WithTaskFetcherTimeout = queueprocessor.WithTaskFetcherTimeout )
Task fetcher configuration options.
var ( // WithWorkersCount sets the number of concurrent workers for processing tasks. WithWorkersCount = queueprocessor.WithWorkersCount // WithWorkersPanicHandler sets a custom panic handler for worker goroutines. WithWorkersPanicHandler = queueprocessor.WithWorkersPanicHandler // WithTaskProcessingTimeout sets the timeout for processing a single task. WithTaskProcessingTimeout = queueprocessor.WithTaskProcessingTimeout // WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks. WithTaskProcessingMaxAttempts = queueprocessor.WithTaskProcessingMaxAttempts // WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time. WithTaskProcessingNextAttemptAtFunc = queueprocessor.WithTaskProcessingNextAttemptAtFunc )
Worker and task processing configuration options.
var ( // WithHooksBeforeProcessing sets hooks to execute before processing each task. WithHooksBeforeProcessing = queueprocessor.WithHooksBeforeProcessing // WithHooksAfterProcessing sets hooks to execute after processing each task. WithHooksAfterProcessing = queueprocessor.WithHooksAfterProcessing )
Hook configuration options for task processing.
var ( // WithCleanerUpdatedAtTimeAgo sets the age threshold for tasks to be cleaned. WithCleanerUpdatedAtTimeAgo = queueprocessor.WithCleanerUpdatedAtTimeAgo // WithCleanerTimeout sets the timeout for the cleaner operation. WithCleanerTimeout = queueprocessor.WithCleanerTimeout // WithCleanerPeriod sets the interval between cleaner runs. WithCleanerPeriod = queueprocessor.WithCleanerPeriod )
Cleaner configuration options for removing old tasks.
var ( // WithHealerUpdatedAtTimeAgo sets the age threshold for tasks to be healed. WithHealerUpdatedAtTimeAgo = queueprocessor.WithHealerUpdatedAtTimeAgo // WithHealerTimeout sets the timeout for the healer operation. WithHealerTimeout = queueprocessor.WithHealerTimeout // WithHealerPeriod sets the interval between healer runs. WithHealerPeriod = queueprocessor.WithHealerPeriod )
Healer configuration options for fixing stuck tasks.
var NoopTaskProcessor = queueprocessor.NoopTaskProcessor
NoopTaskProcessor is a no-op task processor that does nothing and returns nil.
Functions ¶
func SetMetricsServiceName ¶ added in v0.2.0
func SetMetricsServiceName(name string)
SetMetricsServiceName sets the service name label for Prometheus metrics. This should be called once during application initialization, before starting the queue manager.
Example:
goque.SetMetricsServiceName("my-service")
func SetTracerProvider ¶ added in v0.4.0
func SetTracerProvider(tp trace.TracerProvider)
SetTracerProvider configures the TracerProvider used by Goque for distributed tracing.
This function must be called BEFORE creating any Goque instances or TaskQueueManager instances, as they capture the tracer at creation time.
By default, Goque uses a noop tracer (zero overhead). Call this function to enable OpenTelemetry distributed tracing in production environments.
Example usage:
import (
"github.com/ruko1202/goque"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
// Initialize TracerProvider
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.01)), // 1% sampling
sdktrace.WithBatcher(exporter),
)
defer tracerProvider.Shutdown(ctx)
// Configure Goque (BEFORE creating instances)
goque.SetTracerProvider(tracerProvider)
// Now create Goque instances
goq := goque.NewGoque(taskStorage)
Types ¶
type Goque ¶
type Goque struct {
// contains filtered or unexported fields
}
Goque is the main task queue manager that coordinates multiple task processors.
func NewGoque ¶
func NewGoque(taskStorage TaskStorage) *Goque
NewGoque creates a new Goque instance with the specified task storage.
func (*Goque) RegisterProcessor ¶
func (g *Goque) RegisterProcessor( processorType string, taskProcessor queueprocessor.TaskProcessor, opts ...queueprocessor.GoqueProcessorOpts, )
RegisterProcessor registers a new task processor for a specific task type. Should be call before Run.
type Metadata ¶ added in v0.3.0
Metadata represents arbitrary key-value data associated with a task for tracking and context.
type TaskFilter ¶ added in v0.1.1
type TaskFilter = dbentity.GetTasksFilter
TaskFilter represents filtering criteria for querying tasks from the queue.
type TaskProcessorFunc ¶ added in v0.0.4
type TaskProcessorFunc = queueprocessor.TaskProcessorFunc
TaskProcessorFunc is a function type that implements the TaskProcessor interface.
type TaskQueueManager ¶ added in v0.1.1
type TaskQueueManager interface {
AsyncAddTaskToQueue(ctx context.Context, task *Task)
AddTaskToQueue(ctx context.Context, task *Task) error
GetTask(ctx context.Context, taskID uuid.UUID) (*Task, error)
GetTasks(ctx context.Context, filter *TaskFilter, limit int64) ([]*Task, error)
ResetAttempts(ctx context.Context, taskID uuid.UUID) error
}
TaskQueueManager provides operations for managing tasks in the queue. It offers both synchronous and asynchronous methods for adding tasks, as well as querying and managing existing tasks.
func NewTaskQueueManager ¶ added in v0.1.1
func NewTaskQueueManager(taskStorage TaskStorage) TaskQueueManager
NewTaskQueueManager creates a new TaskQueueManager instance with the specified task storage.
type TaskStatus ¶ added in v0.0.4
type TaskStatus = entity.TaskStatus
TaskStatus represents the current status of a task in its lifecycle.
type TaskStorage ¶
TaskStorage defines the interface for task persistence operations.
func NewStorage ¶ added in v0.0.4
func NewStorage(db *sqlx.DB) (TaskStorage, error)
NewStorage creates a new task storage instance based on the database driver. Supports PostgreSQL and MySQL databases.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
entity
Package entity contains domain entities for the task queue system.
|
Package entity contains domain entities for the task queue system. |
|
metrics
Package metrics provides Prometheus instrumentation for Goque task queue operations.
|
Package metrics provides Prometheus instrumentation for Goque task queue operations. |
|
pkg/generated/mocks/mock_storages
Package mock_storages is a generated GoMock package.
|
Package mock_storages is a generated GoMock package. |
|
processors/internalprocessors
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
|
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations. |
|
processors/queueprocessor
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
|
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic. |
|
queuemanager
Package queuemanager provides high-level task queue management operations.
|
Package queuemanager provides high-level task queue management operations. |
|
storages
Package storages provides interfaces and implementations for task storage backends.
|
Package storages provides interfaces and implementations for task storage backends. |
|
storages/dbentity
Package dbentity provides common database entities and filters for task storage implementations.
|
Package dbentity provides common database entities and filters for task storage implementations. |
|
storages/dbutils
Package dbutils provides common database utilities for task storage implementations.
|
Package dbutils provides common database utilities for task storage implementations. |
|
storages/mysql/task
Package mysqltask provides MySQL storage operations for task management in the queue system.
|
Package mysqltask provides MySQL storage operations for task management in the queue system. |
|
storages/pg/task
Package task provides storage operations for task management in the queue system.
|
Package task provides storage operations for task management in the queue system. |
|
storages/sqlite
Package sqlite provides SQLite storage operations for task management in the queue system.
|
Package sqlite provides SQLite storage operations for task management in the queue system. |
|
utils/goquectx
Package goquectx provides utilities for managing task metadata within context values.
|
Package goquectx provides utilities for managing task metadata within context values. |
|
utils/xcollections
Package xcollections provides thread-safe collection types.
|
Package xcollections provides thread-safe collection types. |
|
utils/xpool
Package xpool provides a buffer pool for efficient memory reuse.
|
Package xpool provides a buffer pool for efficient memory reuse. |
|
utils/xtime
Package xtime provides time-related utility functions.
|
Package xtime provides time-related utility functions. |
|
utils/xtracer
Package xtracer provides OpenTelemetry tracing utilities for Goque.
|
Package xtracer provides OpenTelemetry tracing utilities for Goque. |
|
pkg
|
|
|
goquestorage
Package goquestorage provides task storage implementations for different database backends.
|
Package goquestorage provides task storage implementations for different database backends. |
|
scripts
|
|
|
dbmodels
command
|
|
|
test
|
|
|
testutils
Package testutils provides testing utilities for the goque project.
|
Package testutils provides testing utilities for the goque project. |