Documentation
¶
Overview ¶
Package goque provides a distributed task queue manager with support for multiple processors and automatic task healing and cleaning.
Package goque provides a robust, SQL-backed task queue system for Go applications.
Package goque provides a robust, SQL-backed task queue system for Go applications.
Index ¶
- Constants
- Variables
- func SetMetricsServiceName(name string)
- func SetTracerProvider(tp trace.TracerProvider)
- type Goque
- type Metadata
- type PeriodicJob
- type PeriodicJobFactory
- type PeriodicJobOpts
- type PeriodicSchedule
- type PeriodicSchedulerFunc
- type ProcessorOpts
- type Task
- type TaskFilter
- type TaskProcessor
- type TaskProcessorFunc
- type TaskQueueManager
- type TaskStatus
- type TaskStorage
- type TaskType
- type TypedTask
- type TypedTaskProcessor
- type TypedTaskProcessorFunc
- type TypedTaskProcessorOpt
Constants ¶
const ( TaskStatusNew = entity.TaskStatusNew // Task is ready to be picked up TaskStatusPending = entity.TaskStatusPending // Task is scheduled for future processing TaskStatusProcessing = entity.TaskStatusProcessing // Task is currently being processed TaskStatusDone = entity.TaskStatusDone // Task completed successfully TaskStatusCanceled = entity.TaskStatusCanceled // Task was manually canceled TaskStatusError = entity.TaskStatusError // Task failed but has retry attempts remaining TaskStatusAttemptsLeft = entity.TaskStatusAttemptsLeft // Task failed and exhausted all retries )
Task status constants define the possible states a task can be in.
Variables ¶
var ( // WithValue adds a single key-value pair to the context for task metadata tracking. WithValue = goquectx.WithValue // WithValues adds multiple key-value pairs to the context for task metadata tracking. WithValues = goquectx.WithValues // ValueByKey retrieves stored value from the context by key. ValueByKey = goquectx.ValueByKey // Values retrieves all stored metadata values from the context. Values = goquectx.Values )
Context value functions for storing and retrieving task metadata.
var ( // NoTaskPayload represents an empty task payload. NoTaskPayload = entity.NoTaskPayload // NewTask creates a new task with the specified type and payload. NewTask = entity.NewTask // NewTaskWithExternalID creates a new task with an external identifier for idempotency. NewTaskWithExternalID = entity.NewTaskWithExternalID )
Task creation functions for adding new tasks to the queue.
var ( // ErrDuplicateTask is returned when attempting to insert a task with a duplicate external ID. ErrDuplicateTask = entity.ErrDuplicateTask // ErrInvalidPayloadFormat is returned when the task payload is not valid JSON. ErrInvalidPayloadFormat = entity.ErrInvalidPayloadFormat // ErrPayloadMarshal is returned when a typed task payload cannot be marshaled to JSON. ErrPayloadMarshal = entity.ErrPayloadMarshal // ErrPayloadUnmarshal is returned when a typed task payload cannot be unmarshaled from JSON. ErrPayloadUnmarshal = entity.ErrPayloadUnmarshal // ErrTaskCancel is returned when a task is canceled during processing. ErrTaskCancel = entity.ErrTaskCancel // ErrTaskTimeout is returned when task processing exceeds the timeout limit. ErrTaskTimeout = entity.ErrTaskTimeout )
var ( // NewPeriodicJob creates a periodic job from a schedule and task factory. NewPeriodicJob = periodicprocessor.NewJob // NewCronJob creates a periodic job from a standard 5-field cron spec. NewCronJob = periodicprocessor.NewCronJob // CronSchedule creates a schedule from a standard 5-field cron spec. CronSchedule = periodicprocessor.CronSchedule // WithPeriodicJobRunOnStart makes a periodic job enqueue one task when the scheduler starts. WithPeriodicJobRunOnStart = periodicprocessor.WithRunOnStart )
var ( // WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in a single batch. WithTaskFetcherMaxTasks = queueprocessor.WithTaskFetcherMaxTasks // WithTaskFetcherTick sets the interval between task fetch attempts. WithTaskFetcherTick = queueprocessor.WithTaskFetcherTick // WithTaskFetcherTimeout sets the timeout for fetching tasks from storage. WithTaskFetcherTimeout = queueprocessor.WithTaskFetcherTimeout )
Task fetcher configuration options.
var ( // WithWorkersCount sets the number of concurrent workers for processing tasks. WithWorkersCount = queueprocessor.WithWorkersCount // WithWorkersPanicHandler sets a custom panic handler for worker goroutines. WithWorkersPanicHandler = queueprocessor.WithWorkersPanicHandler // WithTaskProcessingTimeout sets the timeout for processing a single task. WithTaskProcessingTimeout = queueprocessor.WithTaskProcessingTimeout // WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks. WithTaskProcessingMaxAttempts = queueprocessor.WithTaskProcessingMaxAttempts // WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time. WithTaskProcessingNextAttemptAtFunc = queueprocessor.WithTaskProcessingNextAttemptAtFunc )
Worker and task processing configuration options.
var ( // WithHooksBeforeProcessing sets hooks to execute before processing each task. WithHooksBeforeProcessing = queueprocessor.WithHooksBeforeProcessing // WithHooksAfterProcessing sets hooks to execute after processing each task. WithHooksAfterProcessing = queueprocessor.WithHooksAfterProcessing )
Hook configuration options for task processing.
var ( // WithCleanerUpdatedAtTimeAgo sets the age threshold for tasks to be cleaned. WithCleanerUpdatedAtTimeAgo = queueprocessor.WithCleanerUpdatedAtTimeAgo // WithCleanerTimeout sets the timeout for the cleaner operation. WithCleanerTimeout = queueprocessor.WithCleanerTimeout // WithCleanerPeriod sets the interval between cleaner runs. WithCleanerPeriod = queueprocessor.WithCleanerPeriod )
Cleaner configuration options for removing old tasks.
var ( // WithHealerUpdatedAtTimeAgo sets the age threshold for tasks to be healed. WithHealerUpdatedAtTimeAgo = queueprocessor.WithHealerUpdatedAtTimeAgo // WithHealerTimeout sets the timeout for the healer operation. WithHealerTimeout = queueprocessor.WithHealerTimeout // WithHealerPeriod sets the interval between healer runs. WithHealerPeriod = queueprocessor.WithHealerPeriod )
Healer configuration options for fixing stuck tasks.
var NoopTaskProcessor = queueprocessor.NoopTaskProcessor
NoopTaskProcessor is a no-op task processor that does nothing and returns nil.
var TxFromContext = dbtx.TxFromContext
TxFromContext returns the *sqlx.Tx previously attached with WithTx, plus a boolean indicating whether one was present. Useful from custom TaskProcessor implementations that need to enroll their own DB writes in the same tx as goque's enqueue.
var WithTx = dbtx.WithTx
WithTx returns a context that carries tx so that storage operations run inside it instead of the storage's own *sqlx.DB. This enables the transactional-outbox pattern: open a tx, write your domain rows, enqueue a task via goque, then commit — all atomically. If the tx rolls back, the enqueue rolls back with it.
Calls that do not have a tx attached behave exactly as before and commit to the underlying database immediately.
The caller owns the lifecycle of tx (Begin, Commit, Rollback). Goque only writes through it.
If tx is nil, WithTx returns ctx unchanged and logs a WARN — storing a nil tx silently would degrade outbox guarantees to at-least-once with a window. To deliberately detach a tx from ctx use WithoutTx.
Scope: tx-aware methods (participate in your tx)
- AddTaskToQueue
- GetTask, GetTasks
- UpdateTask
- CancelTask
- DeleteTasks, CureTasks (PostgreSQL only; on MySQL/SQLite these run in their own internally-managed tx for batched updates)
Scope: NOT tx-aware (always run in their own internal tx)
- GetTasksForProcessing (uses FOR UPDATE SKIP LOCKED — must not be entangled with caller's outbox tx)
- ResetAttempts (read+write loop with its own boundaries)
- DeleteTasks, CureTasks on MySQL/SQLite (see above)
Do NOT use WithTx with AsyncAddTaskToQueue: the async enqueue runs in a goroutine that the caller does not wait on, so it races against the caller's Commit/Rollback and will either lose the write or panic on a closed tx. The async path detects this defensively, strips the tx, and logs a WARN — but the resulting enqueue is no longer atomic with your domain write. Stick to the synchronous AddTaskToQueue.
var WithoutTx = dbtx.WithoutTx
WithoutTx returns a context with any *sqlx.Tx attached via WithTx removed. Use it when handing ctx to a code path that must NOT enroll in the caller's tx — for example, a goroutine that outlives the caller's Commit/Rollback (see AsyncAddTaskToQueue, which calls this defensively).
Functions ¶
func SetMetricsServiceName ¶ added in v0.2.0
func SetMetricsServiceName(name string)
SetMetricsServiceName sets the service name label for Prometheus metrics. This should be called once during application initialization, before starting the queue manager.
Example:
goque.SetMetricsServiceName("my-service")
func SetTracerProvider ¶ added in v0.4.0
func SetTracerProvider(tp trace.TracerProvider)
SetTracerProvider configures the TracerProvider used by Goque for distributed tracing.
This function must be called BEFORE creating any Goque instances or TaskQueueManager instances, as they capture the tracer at creation time.
By default, Goque uses a noop tracer (zero overhead). Call this function to enable OpenTelemetry distributed tracing in production environments.
Example usage:
import (
"github.com/ruko1202/goque"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
// Initialize TracerProvider
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.01)), // 1% sampling
sdktrace.WithBatcher(exporter),
)
defer tracerProvider.Shutdown(ctx)
// Configure Goque (BEFORE creating instances)
goque.SetTracerProvider(tracerProvider)
// Now create Goque instances
goq := goque.NewGoque(taskStorage)
Types ¶
type Goque ¶
type Goque struct {
// contains filtered or unexported fields
}
Goque is the main task queue manager that coordinates multiple task processors.
func NewGoque ¶
func NewGoque(taskStorage TaskStorage) *Goque
NewGoque creates a new Goque instance with the specified task storage.
func (*Goque) RegisterPeriodicJob ¶ added in v0.5.0
func (g *Goque) RegisterPeriodicJob(job *PeriodicJob)
RegisterPeriodicJob registers a periodic job processor. Should be called before Run.
func (*Goque) RegisterProcessor ¶
func (g *Goque) RegisterProcessor( processorType string, taskProcessor TaskProcessor, opts ...ProcessorOpts, )
RegisterProcessor registers a new task processor for a specific task type. Should be call before Run.
func (*Goque) Stop ¶
func (g *Goque) Stop()
Stop gracefully shuts down all registered processors and waits for them to finish.
Order matters: periodic processors are stopped first (no more new tasks dispatched), then queue processors drain in-flight work, then any in-flight AsyncAddTaskToQueue goroutines are drained. The last step is critical for callers that close the underlying *sqlx.DB after Stop() returns — without it a late async write hits a closed connection pool.
type Metadata ¶ added in v0.3.0
Metadata represents arbitrary key-value data associated with a task for tracking and context.
type PeriodicJob ¶ added in v0.5.0
type PeriodicJob = periodicprocessor.Job
PeriodicJob describes a producer that periodically inserts regular queue tasks.
type PeriodicJobFactory ¶ added in v0.5.0
type PeriodicJobFactory = periodicprocessor.TaskFactory
PeriodicJobFactory creates a task for a scheduled run.
type PeriodicJobOpts ¶ added in v0.5.0
type PeriodicJobOpts = periodicprocessor.JobOptions
PeriodicJobOpts configures a periodic job.
type PeriodicSchedule ¶ added in v0.5.0
type PeriodicSchedule = periodicprocessor.Scheduler
PeriodicSchedule calculates the next run time for a periodic job.
type PeriodicSchedulerFunc ¶ added in v0.5.0
type PeriodicSchedulerFunc = periodicprocessor.SchedulerFunc
PeriodicSchedulerFunc wrap PeriodicSchedule.
type ProcessorOpts ¶ added in v0.5.0
type ProcessorOpts = queueprocessor.GoqueProcessorOpts
ProcessorOpts is a function type for configuring GoqueProcessor options.
type Task ¶ added in v0.0.4
Task represents a unit of work to be processed by the queue system.
func NewTaskWithPayload ¶ added in v0.6.0
NewTaskWithPayload creates a new task with a typed payload marshaled as JSON.
type TaskFilter ¶ added in v0.1.1
type TaskFilter = dbentity.GetTasksFilter
TaskFilter represents filtering criteria for querying tasks from the queue.
type TaskProcessor ¶ added in v0.5.0
type TaskProcessor = queueprocessor.TaskProcessor
TaskProcessor defines the interface for processing individual tasks.
func NewTypedTaskProcessor ¶ added in v0.6.0
func NewTypedTaskProcessor[T any](processor TypedTaskProcessor[T], opts ...TypedTaskProcessorOpt[T]) TaskProcessor
NewTypedTaskProcessor wraps a typed task processor for use with RegisterProcessor.
type TaskProcessorFunc ¶ added in v0.0.4
type TaskProcessorFunc = queueprocessor.TaskProcessorFunc
TaskProcessorFunc is a function type that implements the TaskProcessor interface.
type TaskQueueManager ¶ added in v0.1.1
type TaskQueueManager interface {
// AsyncAddTaskToQueue enqueues task in a background goroutine
// and returns immediately. Errors are logged, not returned. If
// ctx carries a *sqlx.Tx (via WithTx) it is stripped before
// dispatch — the async goroutine outlives the caller's
// Commit/Rollback, so enrolling it in the caller's tx would
// race the close. For outbox semantics use AddTaskToQueue.
AsyncAddTaskToQueue(ctx context.Context, task *Task)
// AddTaskToQueue enqueues task synchronously and returns any
// storage error. Honors a *sqlx.Tx attached to ctx via WithTx
// (transactional outbox): the insert participates in the
// caller's tx and is rolled back if the caller rolls back.
AddTaskToQueue(ctx context.Context, task *Task) error
// GetTask returns the task with the given ID or an error if it
// is not found. Honors a tx attached to ctx via WithTx — read
// goes through the caller's tx if present.
GetTask(ctx context.Context, taskID uuid.UUID) (*Task, error)
// GetTasks returns tasks matching filter up to limit. Honors a
// tx attached to ctx via WithTx.
GetTasks(ctx context.Context, filter *TaskFilter, limit int64) ([]*Task, error)
// ResetAttempts clears the retry counter and sets the task back
// to status=new so it can be picked up again. Runs in its own
// internal tx and therefore ignores any tx in ctx.
ResetAttempts(ctx context.Context, taskID uuid.UUID) error
// CancelTask moves a non-terminal task to status=canceled.
// No-op if the task is already in a terminal state. Honors a
// tx attached to ctx via WithTx: both the read and the write
// participate in the caller's tx, so a rollback unwinds the
// cancel.
CancelTask(ctx context.Context, taskID uuid.UUID) error
// WaitAsyncEnqueues blocks until every in-flight goroutine
// spawned by AsyncAddTaskToQueue has returned. Called
// automatically by Goque.Stop(); direct users of
// TaskQueueManager must call it before closing the underlying
// *sqlx.DB to avoid "sql: database is closed" errors from late
// async writes.
WaitAsyncEnqueues()
}
TaskQueueManager provides operations for managing tasks in the queue. It offers both synchronous and asynchronous methods for adding tasks, as well as querying and managing existing tasks.
func NewTaskQueueManager ¶ added in v0.1.1
func NewTaskQueueManager(taskStorage TaskStorage) TaskQueueManager
NewTaskQueueManager creates a new TaskQueueManager instance with the specified task storage.
type TaskStatus ¶ added in v0.0.4
type TaskStatus = entity.TaskStatus
TaskStatus represents the current status of a task in its lifecycle.
type TaskStorage ¶
TaskStorage defines the interface for task persistence operations.
func NewStorage ¶ added in v0.0.4
func NewStorage(db *sqlx.DB) (TaskStorage, error)
NewStorage creates a new task storage instance based on the database driver. Supports PostgreSQL, MySQL and SQLite.
PostgreSQL accepts three driver names: PgDriver (legacy lib/pq, caller imports lib/pq themselves), PgxDriver (pgx/v5, registered by goquestorage), and PgxV5Driver (alternative name pgx registers itself under). All three resolve to the same PG-backed storage.
type TypedTask ¶ added in v0.6.0
TypedTask represents a task with a payload decoded into the expected Go type.
type TypedTaskProcessor ¶ added in v0.6.0
type TypedTaskProcessor[T any] = queueprocessor.TypedTaskProcessor[T]
TypedTaskProcessor defines the interface for processing typed task payloads.
type TypedTaskProcessorFunc ¶ added in v0.6.0
type TypedTaskProcessorFunc[T any] = queueprocessor.TypedTaskProcessorFunc[T]
TypedTaskProcessorFunc is a function type that implements the TypedTaskProcessor interface.
type TypedTaskProcessorOpt ¶ added in v0.6.0
type TypedTaskProcessorOpt[T any] = queueprocessor.GoqueTypedProcessorOpts[T]
TypedTaskProcessorOpt configures a typed task processor adapter.
func WithCancelTaskWhenPayloadDecodeError ¶ added in v0.6.0
func WithCancelTaskWhenPayloadDecodeError[T any]() TypedTaskProcessorOpt[T]
WithCancelTaskWhenPayloadDecodeError cancels typed tasks when payload decoding fails instead of retrying them.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
entity
Package entity contains domain entities for the task queue system.
|
Package entity contains domain entities for the task queue system. |
|
metrics
Package metrics provides Prometheus instrumentation for Goque task queue operations.
|
Package metrics provides Prometheus instrumentation for Goque task queue operations. |
|
pkg/generated/mocks/mock_periodicprocessor
Package mock_periodicprocessor is a generated GoMock package.
|
Package mock_periodicprocessor is a generated GoMock package. |
|
pkg/generated/mocks/mock_storages
Package mock_storages is a generated GoMock package.
|
Package mock_storages is a generated GoMock package. |
|
processors/internalprocessors
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
|
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations. |
|
processors/periodicprocessor
Package periodicprocessor provides cron-based periodic job scheduling.
|
Package periodicprocessor provides cron-based periodic job scheduling. |
|
processors/queueprocessor
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
|
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic. |
|
queuemanager
Package queuemanager provides high-level task queue management operations.
|
Package queuemanager provides high-level task queue management operations. |
|
storages
Package storages provides interfaces and implementations for task storage backends.
|
Package storages provides interfaces and implementations for task storage backends. |
|
storages/dbentity
Package dbentity provides common database entities and filters for task storage implementations.
|
Package dbentity provides common database entities and filters for task storage implementations. |
|
storages/dbtx
Package dbtx provides a thin transaction-aware database executor for storage backends.
|
Package dbtx provides a thin transaction-aware database executor for storage backends. |
|
storages/dbutils
Package dbutils provides common database utilities for task storage implementations.
|
Package dbutils provides common database utilities for task storage implementations. |
|
storages/mysql/task
Package mysqltask provides MySQL storage operations for task management in the queue system.
|
Package mysqltask provides MySQL storage operations for task management in the queue system. |
|
storages/pg/task
Package task provides storage operations for task management in the queue system.
|
Package task provides storage operations for task management in the queue system. |
|
storages/sqlite
Package sqlite provides SQLite storage operations for task management in the queue system.
|
Package sqlite provides SQLite storage operations for task management in the queue system. |
|
utils/goquectx
Package goquectx provides utilities for managing task metadata within context values.
|
Package goquectx provides utilities for managing task metadata within context values. |
|
utils/xcollections
Package xcollections provides thread-safe collection types.
|
Package xcollections provides thread-safe collection types. |
|
utils/xpool
Package xpool provides a buffer pool for efficient memory reuse.
|
Package xpool provides a buffer pool for efficient memory reuse. |
|
utils/xtime
Package xtime provides time-related utility functions.
|
Package xtime provides time-related utility functions. |
|
utils/xtracer
Package xtracer provides OpenTelemetry tracing utilities for Goque.
|
Package xtracer provides OpenTelemetry tracing utilities for Goque. |
|
pkg
|
|
|
goquestorage
Package goquestorage provides task storage implementations for different database backends.
|
Package goquestorage provides task storage implementations for different database backends. |
|
scripts
|
|
|
dbmodels
command
|
|
|
test
|
|
|
testutils
Package testutils provides testing utilities for the goque project.
|
Package testutils provides testing utilities for the goque project. |