Documentation
¶
Overview ¶
Package goque provides a distributed task queue manager with support for multiple processors and automatic task healing and cleaning.
Package goque provides a robust, SQL-backed task queue system for Go applications.
Index ¶
Constants ¶
const ( TaskStatusNew = entity.TaskStatusNew // Task is ready to be picked up TaskStatusPending = entity.TaskStatusPending // Task is scheduled for future processing TaskStatusProcessing = entity.TaskStatusProcessing // Task is currently being processed TaskStatusDone = entity.TaskStatusDone // Task completed successfully TaskStatusCanceled = entity.TaskStatusCanceled // Task was manually canceled TaskStatusError = entity.TaskStatusError // Task failed but has retry attempts remaining TaskStatusAttemptsLeft = entity.TaskStatusAttemptsLeft // Task failed and exhausted all retries )
Task status constants define the possible states a task can be in.
Variables ¶
var ( // WithValue adds a single key-value pair to the context for task metadata tracking. WithValue = goquectx.WithValue // WithValues adds multiple key-value pairs to the context for task metadata tracking. WithValues = goquectx.WithValues // ValueByKey retrieves stored value from the context by key. ValueByKey = goquectx.ValueByKey // Values retrieves all stored metadata values from the context. Values = goquectx.Values )
Context value functions for storing and retrieving task metadata.
var ( // NewTask creates a new task with the specified type and payload. NewTask = entity.NewTask // NewTaskWithExternalID creates a new task with an external identifier for idempotency. NewTaskWithExternalID = entity.NewTaskWithExternalID )
Task creation functions for adding new tasks to the queue.
var ( // WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in a single batch. WithTaskFetcherMaxTasks = queueprocessor.WithTaskFetcherMaxTasks // WithTaskFetcherTick sets the interval between task fetch attempts. WithTaskFetcherTick = queueprocessor.WithTaskFetcherTick // WithTaskFetcherTimeout sets the timeout for fetching tasks from storage. WithTaskFetcherTimeout = queueprocessor.WithTaskFetcherTimeout )
Task fetcher configuration options.
var ( // WithWorkersCount sets the number of concurrent workers for processing tasks. WithWorkersCount = queueprocessor.WithWorkersCount // WithWorkersPanicHandler sets a custom panic handler for worker goroutines. WithWorkersPanicHandler = queueprocessor.WithWorkersPanicHandler // WithTaskProcessingTimeout sets the timeout for processing a single task. WithTaskProcessingTimeout = queueprocessor.WithTaskProcessingTimeout // WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks. WithTaskProcessingMaxAttempts = queueprocessor.WithTaskProcessingMaxAttempts // WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time. WithTaskProcessingNextAttemptAtFunc = queueprocessor.WithTaskProcessingNextAttemptAtFunc )
Worker and task processing configuration options.
var ( // WithHooksBeforeProcessing sets hooks to execute before processing each task. WithHooksBeforeProcessing = queueprocessor.WithHooksBeforeProcessing // WithHooksAfterProcessing sets hooks to execute after processing each task. WithHooksAfterProcessing = queueprocessor.WithHooksAfterProcessing )
Hook configuration options for task processing.
var ( // WithCleanerUpdatedAtTimeAgo sets the age threshold for tasks to be cleaned. WithCleanerUpdatedAtTimeAgo = queueprocessor.WithCleanerUpdatedAtTimeAgo // WithCleanerTimeout sets the timeout for the cleaner operation. WithCleanerTimeout = queueprocessor.WithCleanerTimeout // WithCleanerPeriod sets the interval between cleaner runs. WithCleanerPeriod = queueprocessor.WithCleanerPeriod )
Cleaner configuration options for removing old tasks.
var ( // WithHealerUpdatedAtTimeAgo sets the age threshold for tasks to be healed. WithHealerUpdatedAtTimeAgo = queueprocessor.WithHealerUpdatedAtTimeAgo // WithHealerTimeout sets the timeout for the healer operation. WithHealerTimeout = queueprocessor.WithHealerTimeout // WithHealerPeriod sets the interval between healer runs. WithHealerPeriod = queueprocessor.WithHealerPeriod )
Healer configuration options for fixing stuck tasks.
var NoopTaskProcessor = queueprocessor.NoopTaskProcessor
NoopTaskProcessor is a no-op task processor that does nothing and returns nil.
Functions ¶
func SetMetricsServiceName ¶ added in v0.2.0
func SetMetricsServiceName(name string)
SetMetricsServiceName sets the service name label for Prometheus metrics. This should be called once during application initialization, before starting the queue manager.
Example:
goque.SetMetricsServiceName("my-service")
Types ¶
type Goque ¶
type Goque struct {
// contains filtered or unexported fields
}
Goque is the main task queue manager that coordinates multiple task processors.
func NewGoque ¶
func NewGoque(taskStorage TaskStorage) *Goque
NewGoque creates a new Goque instance with the specified task storage.
func (*Goque) RegisterProcessor ¶
func (g *Goque) RegisterProcessor( processorType string, taskProcessor queueprocessor.TaskProcessor, opts ...queueprocessor.GoqueProcessorOpts, )
RegisterProcessor registers a new task processor for a specific task type. Should be call before Run.
type Metadata ¶ added in v0.3.0
Metadata represents arbitrary key-value data associated with a task for tracking and context.
type TaskFilter ¶ added in v0.1.1
type TaskFilter = dbentity.GetTasksFilter
TaskFilter represents filtering criteria for querying tasks from the queue.
type TaskProcessorFunc ¶ added in v0.0.4
type TaskProcessorFunc = queueprocessor.TaskProcessorFunc
TaskProcessorFunc is a function type that implements the TaskProcessor interface.
type TaskQueueManager ¶ added in v0.1.1
type TaskQueueManager interface {
AsyncAddTaskToQueue(ctx context.Context, task *Task)
AddTaskToQueue(ctx context.Context, task *Task) error
GetTask(ctx context.Context, taskID uuid.UUID) (*Task, error)
GetTasks(ctx context.Context, filter *TaskFilter, limit int64) ([]*Task, error)
ResetAttempts(ctx context.Context, taskID uuid.UUID) error
}
TaskQueueManager provides operations for managing tasks in the queue. It offers both synchronous and asynchronous methods for adding tasks, as well as querying and managing existing tasks.
func NewTaskQueueManager ¶ added in v0.1.1
func NewTaskQueueManager(taskStorage TaskStorage) TaskQueueManager
NewTaskQueueManager creates a new TaskQueueManager instance with the specified task storage.
type TaskStatus ¶ added in v0.0.4
type TaskStatus = entity.TaskStatus
TaskStatus represents the current status of a task in its lifecycle.
type TaskStorage ¶
TaskStorage defines the interface for task persistence operations.
func NewStorage ¶ added in v0.0.4
func NewStorage(db *sqlx.DB) (TaskStorage, error)
NewStorage creates a new task storage instance based on the database driver. Supports PostgreSQL and MySQL databases.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
entity
Package entity contains domain entities for the task queue system.
|
Package entity contains domain entities for the task queue system. |
|
metrics
Package metrics provides Prometheus instrumentation for Goque task queue operations.
|
Package metrics provides Prometheus instrumentation for Goque task queue operations. |
|
pkg/generated/mocks/mock_storages
Package mock_storages is a generated GoMock package.
|
Package mock_storages is a generated GoMock package. |
|
processors/internalprocessors
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
|
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations. |
|
processors/queueprocessor
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
|
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic. |
|
queuemanager
Package queuemanager provides high-level task queue management operations.
|
Package queuemanager provides high-level task queue management operations. |
|
storages
Package storages provides interfaces and implementations for task storage backends.
|
Package storages provides interfaces and implementations for task storage backends. |
|
storages/dbentity
Package dbentity provides common database entities and filters for task storage implementations.
|
Package dbentity provides common database entities and filters for task storage implementations. |
|
storages/dbutils
Package dbutils provides common database utilities for task storage implementations.
|
Package dbutils provides common database utilities for task storage implementations. |
|
storages/mysql/task
Package mysqltask provides MySQL storage operations for task management in the queue system.
|
Package mysqltask provides MySQL storage operations for task management in the queue system. |
|
storages/pg/task
Package task provides storage operations for task management in the queue system.
|
Package task provides storage operations for task management in the queue system. |
|
storages/sqlite
Package sqlite provides SQLite storage operations for task management in the queue system.
|
Package sqlite provides SQLite storage operations for task management in the queue system. |
|
utils/goquectx
Package goquectx provides utilities for managing task metadata within context values.
|
Package goquectx provides utilities for managing task metadata within context values. |
|
utils/xtime
Package xtime provides time-related utility functions.
|
Package xtime provides time-related utility functions. |
|
pkg
|
|
|
goquestorage
Package goquestorage provides task storage implementations for different database backends.
|
Package goquestorage provides task storage implementations for different database backends. |
|
scripts
|
|
|
dbmodels
command
|
|
|
test
|
|
|
testutils
Package testutils provides testing utilities for the goque project.
|
Package testutils provides testing utilities for the goque project. |