Documentation
¶
Overview ¶
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
Index ¶
- Variables
- func LoggingAfterProcessing(ctx context.Context, task *entity.Task, err error)
- func LoggingBeforeProcessing(ctx context.Context, task *entity.Task)
- type GoqueProcessor
- type GoqueProcessorOpts
- func WithCleanerPeriod(period time.Duration) GoqueProcessorOpts
- func WithCleanerTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithCleanerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
- func WithDisableVerboseLogging() GoqueProcessorOpts
- func WithHealerPeriod(period time.Duration) GoqueProcessorOpts
- func WithHealerTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithHealerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
- func WithHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
- func WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
- func WithTaskFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts
- func WithTaskFetcherTick(tick time.Duration) GoqueProcessorOpts
- func WithTaskFetcherTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithTaskProcessingMaxAttempts(maxAttempts int32) GoqueProcessorOpts
- func WithTaskProcessingNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts
- func WithTaskProcessingTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithWorkersCount(count int) GoqueProcessorOpts
- func WithWorkersPanicHandler(handler func(ctx context.Context) func(any)) GoqueProcessorOpts
- type HookAfterProcessing
- type HookBeforeProcessing
- type NextAttemptAtFunc
- type TaskProcessor
- type TaskProcessorFunc
Constants ¶
This section is empty.
Variables ¶
var FibonacciPeriods = []time.Duration{ 1 * time.Minute, 2 * time.Minute, 3 * time.Minute, 5 * time.Minute, 8 * time.Minute, 13 * time.Minute, 21 * time.Minute, 34 * time.Minute, 55 * time.Minute, 89 * time.Minute, }
FibonacciPeriods defines retry delays following the Fibonacci sequence.
var StrongPeriods = []time.Duration{ 1 * time.Minute, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 30 * time.Minute, 60 * time.Minute, }
StrongPeriods defines a more aggressive retry delay schedule.
Functions ¶
func LoggingAfterProcessing ¶
LoggingAfterProcessing default log after processing the task.
Types ¶
type GoqueProcessor ¶
type GoqueProcessor struct {
// contains filtered or unexported fields
}
GoqueProcessor manages task fetching, processing, and worker pool coordination.
func NewGoqueProcessor ¶
func NewGoqueProcessor( taskStorage storages.Task, taskType entity.TaskType, processor TaskProcessor, opts ...GoqueProcessorOpts, ) *GoqueProcessor
NewGoqueProcessor creates a new processor instance with the specified configuration.
func (*GoqueProcessor) Name ¶
func (p *GoqueProcessor) Name() string
Name returns the processor's name based on its task type.
func (*GoqueProcessor) Run ¶
func (p *GoqueProcessor) Run(ctx context.Context) error
Run starts the processor, fetching and processing tasks until the context is canceled.
func (*GoqueProcessor) Stop ¶
func (p *GoqueProcessor) Stop()
Stop gracefully shuts down the processor by canceling its context.
type GoqueProcessorOpts ¶
type GoqueProcessorOpts func(*GoqueProcessor)
GoqueProcessorOpts is a function type for configuring GoqueProcessor options.
func WithCleanerPeriod ¶
func WithCleanerPeriod(period time.Duration) GoqueProcessorOpts
WithCleanerPeriod sets the interval at which the cleaner processor runs.
func WithCleanerTimeout ¶
func WithCleanerTimeout(timeout time.Duration) GoqueProcessorOpts
WithCleanerTimeout sets the timeout duration for cleaner operations.
func WithCleanerUpdatedAtTimeAgo ¶
func WithCleanerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
WithCleanerUpdatedAtTimeAgo sets the time threshold for considering tasks as old enough to be cleaned.
func WithDisableVerboseLogging ¶ added in v0.4.1
func WithDisableVerboseLogging() GoqueProcessorOpts
WithDisableVerboseLogging disables default verbose logging hooks (LoggingBeforeProcessing, LoggingAfterProcessing). Use this option in production to reduce memory allocations from logging. Custom logging hooks added via WithHooksBeforeProcessing/WithHooksAfterProcessing will still execute.
func WithHealerPeriod ¶
func WithHealerPeriod(period time.Duration) GoqueProcessorOpts
WithHealerPeriod sets the interval at which the healer processor runs.
func WithHealerTimeout ¶
func WithHealerTimeout(timeout time.Duration) GoqueProcessorOpts
WithHealerTimeout sets the timeout duration for healer operations.
func WithHealerUpdatedAtTimeAgo ¶
func WithHealerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
WithHealerUpdatedAtTimeAgo sets the time threshold for considering a task as stuck.
func WithHooksAfterProcessing ¶
func WithHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
WithHooksAfterProcessing adds hooks to run after task processing completes.
func WithHooksBeforeProcessing ¶
func WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
WithHooksBeforeProcessing adds hooks to run before task processing.
func WithTaskFetcherMaxTasks ¶
func WithTaskFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts
WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in each cycle.
func WithTaskFetcherTick ¶
func WithTaskFetcherTick(tick time.Duration) GoqueProcessorOpts
WithTaskFetcherTick sets the interval between task fetching cycles.
func WithTaskFetcherTimeout ¶
func WithTaskFetcherTimeout(timeout time.Duration) GoqueProcessorOpts
WithTaskFetcherTimeout sets the timeout for task fetching operations.
func WithTaskProcessingMaxAttempts ¶
func WithTaskProcessingMaxAttempts(maxAttempts int32) GoqueProcessorOpts
WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks.
func WithTaskProcessingNextAttemptAtFunc ¶
func WithTaskProcessingNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts
WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time.
func WithTaskProcessingTimeout ¶
func WithTaskProcessingTimeout(timeout time.Duration) GoqueProcessorOpts
WithTaskProcessingTimeout sets the maximum execution time for a single task.
func WithWorkersCount ¶
func WithWorkersCount(count int) GoqueProcessorOpts
WithWorkersCount sets the number of concurrent workers for processing tasks.
func WithWorkersPanicHandler ¶
func WithWorkersPanicHandler(handler func(ctx context.Context) func(any)) GoqueProcessorOpts
WithWorkersPanicHandler sets a custom panic handler for worker pool panics.
type HookAfterProcessing ¶
HookAfterProcessing defines a hook function called after task processing completes.
type HookBeforeProcessing ¶
HookBeforeProcessing defines a hook function called before task processing begins.
type NextAttemptAtFunc ¶
NextAttemptAtFunc is a function type that calculates the next retry time based on attempt number.
func RoundStepNextAttemptAtFunc ¶
func RoundStepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc
RoundStepNextAttemptAtFunc round increase next attempt time
example
attempt: 0, addDuration: 1m0s attempt: 1, addDuration: 5m0s attempt: 2, addDuration: 10m0s attempt: 3, addDuration: 15m0s attempt: 4, addDuration: 30m0s attempt: 5, addDuration: 1h0m0s attempt: 6, addDuration: 1m0s attempt: 7, addDuration: 5m0s attempt: 8, addDuration: 10m0s attempt: 9, addDuration: 15m0s
func StaticNextAttemptAtFunc ¶
func StaticNextAttemptAtFunc(period time.Duration) NextAttemptAtFunc
StaticNextAttemptAtFunc creates a function that returns a fixed delay for all retry attempts.
func StepNextAttemptAtFunc ¶
func StepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc
StepNextAttemptAtFunc increase next attempt time
example
attempt: 0, addDuration: 1m0s attempt: 1, addDuration: 5m0s attempt: 2, addDuration: 10m0s attempt: 3, addDuration: 15m0s attempt: 4, addDuration: 30m0s attempt: 5, addDuration: 1h0m0s attempt: 6, addDuration: 1h0m0s attempt: 8, addDuration: 1h0m0s attempt: 9, addDuration: 1h0m0s
type TaskProcessor ¶
TaskProcessor defines the interface for processing individual tasks.
func NoopTaskProcessor ¶
func NoopTaskProcessor() TaskProcessor
NoopTaskProcessor returns a task processor that logs task information without performing any actual processing.
type TaskProcessorFunc ¶
TaskProcessorFunc is a function type that implements the TaskProcessor interface.
func (TaskProcessorFunc) ProcessTask ¶
ProcessTask executes the task processing function.