Documentation
¶
Overview ¶
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
Index ¶
- Variables
- func LoggingAfterProcessing(ctx context.Context, task *entity.Task, err error)
- func LoggingBeforeProcessing(ctx context.Context, task *entity.Task)
- type GoqueProcessor
- type GoqueProcessorOpts
- func WithCleanerPeriod(period time.Duration) GoqueProcessorOpts
- func WithCleanerTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithCleanerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
- func WithDisableVerboseLogging() GoqueProcessorOpts
- func WithHealerPeriod(period time.Duration) GoqueProcessorOpts
- func WithHealerTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithHealerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
- func WithHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
- func WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
- func WithTaskFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts
- func WithTaskFetcherTick(tick time.Duration) GoqueProcessorOpts
- func WithTaskFetcherTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithTaskProcessingMaxAttempts(maxAttempts int32) GoqueProcessorOpts
- func WithTaskProcessingNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts
- func WithTaskProcessingTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithWorkersCount(count int) GoqueProcessorOpts
- func WithWorkersPanicHandler(handler func(ctx context.Context) func(any)) GoqueProcessorOpts
- type GoqueTypedProcessor
- type GoqueTypedProcessorOpts
- type HookAfterProcessing
- type HookBeforeProcessing
- type NextAttemptAtFunc
- type TaskProcessor
- type TaskProcessorFunc
- type TypedTaskProcessor
- type TypedTaskProcessorFunc
Constants ¶
This section is empty.
Variables ¶
var FibonacciPeriods = []time.Duration{ 1 * time.Minute, 2 * time.Minute, 3 * time.Minute, 5 * time.Minute, 8 * time.Minute, 13 * time.Minute, 21 * time.Minute, 34 * time.Minute, 55 * time.Minute, 89 * time.Minute, }
FibonacciPeriods defines retry delays following the Fibonacci sequence.
var StrongPeriods = []time.Duration{ 1 * time.Minute, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 30 * time.Minute, 60 * time.Minute, }
StrongPeriods defines a more aggressive retry delay schedule.
Functions ¶
func LoggingAfterProcessing ¶
LoggingAfterProcessing default log after processing the task.
Types ¶
type GoqueProcessor ¶
type GoqueProcessor struct {
// contains filtered or unexported fields
}
GoqueProcessor manages task fetching, processing, and worker pool coordination.
func NewGoqueProcessor ¶
func NewGoqueProcessor( taskStorage storages.Task, taskType entity.TaskType, processor TaskProcessor, opts ...GoqueProcessorOpts, ) *GoqueProcessor
NewGoqueProcessor creates a new processor instance with the specified configuration.
func (*GoqueProcessor) Name ¶
func (p *GoqueProcessor) Name() string
Name returns the processor's name based on its task type.
func (*GoqueProcessor) Run ¶
func (p *GoqueProcessor) Run(ctx context.Context) error
Run starts the processor, fetching and processing tasks until the context is canceled.
func (*GoqueProcessor) Stop ¶
func (p *GoqueProcessor) Stop()
Stop gracefully shuts down the processor by canceling its context.
type GoqueProcessorOpts ¶
type GoqueProcessorOpts func(*GoqueProcessor)
GoqueProcessorOpts is a function type for configuring GoqueProcessor options.
func WithCleanerPeriod ¶
func WithCleanerPeriod(period time.Duration) GoqueProcessorOpts
WithCleanerPeriod sets the interval at which the cleaner processor runs.
func WithCleanerTimeout ¶
func WithCleanerTimeout(timeout time.Duration) GoqueProcessorOpts
WithCleanerTimeout sets the timeout duration for cleaner operations.
func WithCleanerUpdatedAtTimeAgo ¶
func WithCleanerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
WithCleanerUpdatedAtTimeAgo sets the time threshold for considering tasks as old enough to be cleaned.
func WithDisableVerboseLogging ¶ added in v0.4.1
func WithDisableVerboseLogging() GoqueProcessorOpts
WithDisableVerboseLogging disables default verbose logging hooks (LoggingBeforeProcessing, LoggingAfterProcessing). Use this option in production to reduce memory allocations from logging. Custom logging hooks added via WithHooksBeforeProcessing/WithHooksAfterProcessing will still execute.
func WithHealerPeriod ¶
func WithHealerPeriod(period time.Duration) GoqueProcessorOpts
WithHealerPeriod sets the interval at which the healer processor runs.
func WithHealerTimeout ¶
func WithHealerTimeout(timeout time.Duration) GoqueProcessorOpts
WithHealerTimeout sets the timeout duration for healer operations.
func WithHealerUpdatedAtTimeAgo ¶
func WithHealerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
WithHealerUpdatedAtTimeAgo sets the time threshold for considering a task as stuck.
func WithHooksAfterProcessing ¶
func WithHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
WithHooksAfterProcessing adds hooks to run after task processing completes.
func WithHooksBeforeProcessing ¶
func WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
WithHooksBeforeProcessing adds hooks to run before task processing.
func WithTaskFetcherMaxTasks ¶
func WithTaskFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts
WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in each cycle.
func WithTaskFetcherTick ¶
func WithTaskFetcherTick(tick time.Duration) GoqueProcessorOpts
WithTaskFetcherTick sets the interval between task fetching cycles.
func WithTaskFetcherTimeout ¶
func WithTaskFetcherTimeout(timeout time.Duration) GoqueProcessorOpts
WithTaskFetcherTimeout sets the timeout for task fetching operations.
func WithTaskProcessingMaxAttempts ¶
func WithTaskProcessingMaxAttempts(maxAttempts int32) GoqueProcessorOpts
WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks.
func WithTaskProcessingNextAttemptAtFunc ¶
func WithTaskProcessingNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts
WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time.
func WithTaskProcessingTimeout ¶
func WithTaskProcessingTimeout(timeout time.Duration) GoqueProcessorOpts
WithTaskProcessingTimeout sets the maximum execution time for a single task.
func WithWorkersCount ¶
func WithWorkersCount(count int) GoqueProcessorOpts
WithWorkersCount sets the number of concurrent workers for processing tasks.
func WithWorkersPanicHandler ¶
func WithWorkersPanicHandler(handler func(ctx context.Context) func(any)) GoqueProcessorOpts
WithWorkersPanicHandler sets a custom panic handler for worker pool panics.
type GoqueTypedProcessor ¶ added in v0.6.0
type GoqueTypedProcessor[T any] struct { // contains filtered or unexported fields }
GoqueTypedProcessor adapts a typed payload processor to the generic task processor interface.
func (*GoqueTypedProcessor[T]) ProcessTask ¶ added in v0.6.0
ProcessTask decodes the task payload before delegating to the wrapped typed processor.
type GoqueTypedProcessorOpts ¶ added in v0.6.0
type GoqueTypedProcessorOpts[T any] func(*GoqueTypedProcessor[T])
GoqueTypedProcessorOpts configures a typed task processor adapter.
func WithCancelTaskWhenPayloadDecodeError ¶ added in v0.6.0
func WithCancelTaskWhenPayloadDecodeError[T any]() GoqueTypedProcessorOpts[T]
WithCancelTaskWhenPayloadDecodeError cancels typed tasks when payload decoding fails instead of retrying them.
type HookAfterProcessing ¶
HookAfterProcessing defines a hook function called after task processing completes.
type HookBeforeProcessing ¶
HookBeforeProcessing defines a hook function called before task processing begins.
type NextAttemptAtFunc ¶
NextAttemptAtFunc is a function type that calculates the next retry time based on attempt number.
func RoundStepNextAttemptAtFunc ¶
func RoundStepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc
RoundStepNextAttemptAtFunc round increase next attempt time
example
attempt: 0, addDuration: 1m0s attempt: 1, addDuration: 5m0s attempt: 2, addDuration: 10m0s attempt: 3, addDuration: 15m0s attempt: 4, addDuration: 30m0s attempt: 5, addDuration: 1h0m0s attempt: 6, addDuration: 1m0s attempt: 7, addDuration: 5m0s attempt: 8, addDuration: 10m0s attempt: 9, addDuration: 15m0s
func StaticNextAttemptAtFunc ¶
func StaticNextAttemptAtFunc(period time.Duration) NextAttemptAtFunc
StaticNextAttemptAtFunc creates a function that returns a fixed delay for all retry attempts.
func StepNextAttemptAtFunc ¶
func StepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc
StepNextAttemptAtFunc increase next attempt time
example
attempt: 0, addDuration: 1m0s attempt: 1, addDuration: 5m0s attempt: 2, addDuration: 10m0s attempt: 3, addDuration: 15m0s attempt: 4, addDuration: 30m0s attempt: 5, addDuration: 1h0m0s attempt: 6, addDuration: 1h0m0s attempt: 8, addDuration: 1h0m0s attempt: 9, addDuration: 1h0m0s
type TaskProcessor ¶
TaskProcessor defines the interface for processing individual tasks.
func NewTypedTaskProcessor ¶ added in v0.6.0
func NewTypedTaskProcessor[T any](processor TypedTaskProcessor[T], opts ...GoqueTypedProcessorOpts[T]) TaskProcessor
NewTypedTaskProcessor wraps a typed task processor for use with RegisterProcessor.
func NoopTaskProcessor ¶
func NoopTaskProcessor() TaskProcessor
NoopTaskProcessor returns a task processor that logs task information without performing any actual processing.
type TaskProcessorFunc ¶
TaskProcessorFunc is a function type that implements the TaskProcessor interface.
func (TaskProcessorFunc) ProcessTask ¶
ProcessTask executes the task processing function.
type TypedTaskProcessor ¶ added in v0.6.0
type TypedTaskProcessor[T any] interface { ProcessTask(ctx context.Context, task *entity.TypedTask[T]) error }
TypedTaskProcessor defines the interface for processing typed task payloads.
func NoopTypedTaskProcessor ¶ added in v0.6.0
func NoopTypedTaskProcessor[T any]() TypedTaskProcessor[T]
NoopTypedTaskProcessor returns a task processor that logs task information without performing any actual processing.
type TypedTaskProcessorFunc ¶ added in v0.6.0
TypedTaskProcessorFunc is a function type that implements the TypedTaskProcessor interface.
func (TypedTaskProcessorFunc[T]) ProcessTask ¶ added in v0.6.0
func (f TypedTaskProcessorFunc[T]) ProcessTask(ctx context.Context, task *entity.TypedTask[T]) error
ProcessTask executes the typed task processing function.