Documentation
¶
Overview ¶
Package goque provides a distributed task queue manager with support for multiple processors and automatic task healing and cleaning.
Package goque provides a robust, PostgreSQL-backed task queue system for Go applications.
Index ¶
Constants ¶
const ( TaskStatusNew = entity.TaskStatusNew // Task is ready to be picked up TaskStatusPending = entity.TaskStatusPending // Task is scheduled for future processing TaskStatusProcessing = entity.TaskStatusProcessing // Task is currently being processed TaskStatusDone = entity.TaskStatusDone // Task completed successfully TaskStatusCanceled = entity.TaskStatusCanceled // Task was manually canceled TaskStatusError = entity.TaskStatusError // Task failed but has retry attempts remaining TaskStatusAttemptsLeft = entity.TaskStatusAttemptsLeft // Task failed and exhausted all retries )
Task status constants define the possible states a task can be in.
Variables ¶
var ( // NewTask creates a new task with the specified type and payload. NewTask = entity.NewTask // NewTaskWithExternalID creates a new task with an external identifier for idempotency. NewTaskWithExternalID = entity.NewTaskWithExternalID )
Task creation functions for adding new tasks to the queue.
var ( // WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in a single batch. WithTaskFetcherMaxTasks = queueprocessor.WithTaskFetcherMaxTasks // WithTaskFetcherTick sets the interval between task fetch attempts. WithTaskFetcherTick = queueprocessor.WithTaskFetcherTick // WithTaskFetcherTimeout sets the timeout for fetching tasks from storage. WithTaskFetcherTimeout = queueprocessor.WithTaskFetcherTimeout )
Task fetcher configuration options.
var ( // WithWorkersCount sets the number of concurrent workers for processing tasks. WithWorkersCount = queueprocessor.WithWorkersCount // WithWorkersPanicHandler sets a custom panic handler for worker goroutines. WithWorkersPanicHandler = queueprocessor.WithWorkersPanicHandler // WithTaskProcessingTimeout sets the timeout for processing a single task. WithTaskProcessingTimeout = queueprocessor.WithTaskProcessingTimeout // WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks. WithTaskProcessingMaxAttempts = queueprocessor.WithTaskProcessingMaxAttempts // WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time. WithTaskProcessingNextAttemptAtFunc = queueprocessor.WithTaskProcessingNextAttemptAtFunc )
Worker and task processing configuration options.
var ( // WithHooksBeforeProcessing sets hooks to execute before processing each task. WithHooksBeforeProcessing = queueprocessor.WithHooksBeforeProcessing // WithHooksAfterProcessing sets hooks to execute after processing each task. WithHooksAfterProcessing = queueprocessor.WithHooksAfterProcessing )
Hook configuration options for task processing.
var ( // WithCleanerUpdatedAtTimeAgo sets the age threshold for tasks to be cleaned. WithCleanerUpdatedAtTimeAgo = queueprocessor.WithCleanerUpdatedAtTimeAgo // WithCleanerTimeout sets the timeout for the cleaner operation. WithCleanerTimeout = queueprocessor.WithCleanerTimeout // WithCleanerPeriod sets the interval between cleaner runs. WithCleanerPeriod = queueprocessor.WithCleanerPeriod )
Cleaner configuration options for removing old tasks.
var ( // WithHealerUpdatedAtTimeAgo sets the age threshold for tasks to be healed. WithHealerUpdatedAtTimeAgo = queueprocessor.WithHealerUpdatedAtTimeAgo // WithHealerTimeout sets the timeout for the healer operation. WithHealerTimeout = queueprocessor.WithHealerTimeout // WithHealerPeriod sets the interval between healer runs. WithHealerPeriod = queueprocessor.WithHealerPeriod )
Healer configuration options for fixing stuck tasks.
var NoopTaskProcessor = queueprocessor.NoopTaskProcessor
NoopTaskProcessor is a no-op task processor that does nothing and returns nil.
Functions ¶
This section is empty.
Types ¶
type Goque ¶
type Goque struct {
// contains filtered or unexported fields
}
Goque is the main task queue manager that coordinates multiple task processors.
func NewGoque ¶
func NewGoque(taskStorage TaskStorage) *Goque
NewGoque creates a new Goque instance with the specified task storage.
func (*Goque) RegisterProcessor ¶
func (g *Goque) RegisterProcessor( processorType string, taskProcessor queueprocessor.TaskProcessor, opts ...queueprocessor.GoqueProcessorOpts, )
RegisterProcessor registers a new task processor for a specific task type. Should be call before Run.
type TaskProcessorFunc ¶ added in v0.0.4
type TaskProcessorFunc = queueprocessor.TaskProcessorFunc
TaskProcessorFunc is a function type that implements the TaskProcessor interface.
type TaskPusher ¶ added in v0.0.4
type TaskPusher struct {
// contains filtered or unexported fields
}
TaskPusher provides functionality for adding tasks to the queue storage.
func NewTaskPusher ¶ added in v0.0.4
func NewTaskPusher(taskStorage TaskStorage) *TaskPusher
NewTaskPusher creates a new TaskPusher instance with the specified task storage.
func (*TaskPusher) AddTaskToQueue ¶ added in v0.0.4
AddTaskToQueue adds a task to the queue and returns an error if the operation fails.
func (*TaskPusher) AsyncAddTaskToQueue ¶ added in v0.0.4
func (q *TaskPusher) AsyncAddTaskToQueue(ctx context.Context, task *entity.Task)
AsyncAddTaskToQueue adds a task to the queue asynchronously without waiting for completion.
type TaskStatus ¶ added in v0.0.4
type TaskStatus = entity.TaskStatus
TaskStatus represents the current status of a task in its lifecycle.
type TaskStorage ¶
TaskStorage defines the interface for task persistence operations.
func NewStorage ¶ added in v0.0.4
func NewStorage(db *sqlx.DB) (TaskStorage, error)
NewStorage creates a new task storage instance based on the database driver. Supports PostgreSQL and MySQL databases.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
entity
Package entity contains domain entities for the task queue system.
|
Package entity contains domain entities for the task queue system. |
|
pkg/generated/mocks/mock_storages
Package mock_storages is a generated GoMock package.
|
Package mock_storages is a generated GoMock package. |
|
processors/internalprocessors
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
|
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations. |
|
processors/queueprocessor
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
|
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic. |
|
storages
Package storages provides interfaces and implementations for task storage backends.
|
Package storages provides interfaces and implementations for task storage backends. |
|
storages/dbentity
Package dbentity provides common database entities and filters for task storage implementations.
|
Package dbentity provides common database entities and filters for task storage implementations. |
|
storages/dbutils
Package dbutils provides common database utilities for task storage implementations.
|
Package dbutils provides common database utilities for task storage implementations. |
|
storages/mysql/task
Package mysqltask provides MySQL storage operations for task management in the queue system.
|
Package mysqltask provides MySQL storage operations for task management in the queue system. |
|
storages/pg/task
Package task provides storage operations for task management in the queue system.
|
Package task provides storage operations for task management in the queue system. |
|
storages/sqlite
Package sqlite provides SQLite storage operations for task management in the queue system.
|
Package sqlite provides SQLite storage operations for task management in the queue system. |
|
utils/xtime
Package xtime provides time-related utility functions.
|
Package xtime provides time-related utility functions. |
|
pkg
|
|
|
goquestorage
Package goquestorage provides task storage implementations for different database backends.
|
Package goquestorage provides task storage implementations for different database backends. |
|
scripts
|
|
|
dbmodels
command
|
|
|
test
|
|
|
testutils
Package testutils provides testing utilities for the goque project.
|
Package testutils provides testing utilities for the goque project. |