Documentation
¶
Overview ¶
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
Index ¶
- Variables
- func LoggingAfterProcessing(ctx context.Context, task *entity.Task, err error)
- func LoggingBeforeProcessing(ctx context.Context, task *entity.Task)
- type GoqueProcessor
- type GoqueProcessorOpts
- func WithCleanerPeriod(period time.Duration) GoqueProcessorOpts
- func WithCleanerTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithCleanerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
- func WithHealerPeriod(period time.Duration) GoqueProcessorOpts
- func WithHealerTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithHealerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
- func WithHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
- func WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
- func WithTaskFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts
- func WithTaskFetcherTick(tick time.Duration) GoqueProcessorOpts
- func WithTaskFetcherTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithTaskProcessingMaxAttempts(maxAttempts int32) GoqueProcessorOpts
- func WithTaskProcessingNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts
- func WithTaskProcessingTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithWorkersCount(count int) GoqueProcessorOpts
- func WithWorkersPanicHandler(handler func(ctx context.Context) func(any)) GoqueProcessorOpts
- type HookAfterProcessing
- type HookBeforeProcessing
- type NextAttemptAtFunc
- type TaskProcessor
- type TaskProcessorFunc
Constants ¶
This section is empty.
Variables ¶
var FibonacciPeriods = []time.Duration{ 1 * time.Minute, 2 * time.Minute, 3 * time.Minute, 5 * time.Minute, 8 * time.Minute, 13 * time.Minute, 21 * time.Minute, 34 * time.Minute, 55 * time.Minute, 89 * time.Minute, }
FibonacciPeriods defines retry delays following the Fibonacci sequence.
var StrongPeriods = []time.Duration{ 1 * time.Minute, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 30 * time.Minute, 60 * time.Minute, }
StrongPeriods defines a more aggressive retry delay schedule.
Functions ¶
func LoggingAfterProcessing ¶
LoggingAfterProcessing default log after processing the task.
Types ¶
type GoqueProcessor ¶
type GoqueProcessor struct {
// contains filtered or unexported fields
}
GoqueProcessor manages task fetching, processing, and worker pool coordination.
func NewGoqueProcessor ¶
func NewGoqueProcessor( taskStorage storages.Task, taskType entity.TaskType, processor TaskProcessor, opts ...GoqueProcessorOpts, ) *GoqueProcessor
NewGoqueProcessor creates a new processor instance with the specified configuration.
func (*GoqueProcessor) Name ¶
func (p *GoqueProcessor) Name() string
Name returns the processor's name based on its task type.
func (*GoqueProcessor) Run ¶
func (p *GoqueProcessor) Run(ctx context.Context) error
Run starts the processor, fetching and processing tasks until the context is canceled.
func (*GoqueProcessor) Stop ¶
func (p *GoqueProcessor) Stop()
Stop gracefully shuts down the processor by canceling its context.
type GoqueProcessorOpts ¶
type GoqueProcessorOpts func(*GoqueProcessor)
GoqueProcessorOpts is a function type for configuring GoqueProcessor options.
func WithCleanerPeriod ¶
func WithCleanerPeriod(period time.Duration) GoqueProcessorOpts
WithCleanerPeriod sets the interval at which the cleaner processor runs.
func WithCleanerTimeout ¶
func WithCleanerTimeout(timeout time.Duration) GoqueProcessorOpts
WithCleanerTimeout sets the timeout duration for cleaner operations.
func WithCleanerUpdatedAtTimeAgo ¶
func WithCleanerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
WithCleanerUpdatedAtTimeAgo sets the time threshold for considering tasks as old enough to be cleaned.
func WithHealerPeriod ¶
func WithHealerPeriod(period time.Duration) GoqueProcessorOpts
WithHealerPeriod sets the interval at which the healer processor runs.
func WithHealerTimeout ¶
func WithHealerTimeout(timeout time.Duration) GoqueProcessorOpts
WithHealerTimeout sets the timeout duration for healer operations.
func WithHealerUpdatedAtTimeAgo ¶
func WithHealerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts
WithHealerUpdatedAtTimeAgo sets the time threshold for considering a task as stuck.
func WithHooksAfterProcessing ¶
func WithHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
WithHooksAfterProcessing adds hooks to run after task processing completes.
func WithHooksBeforeProcessing ¶
func WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
WithHooksBeforeProcessing adds hooks to run before task processing.
func WithTaskFetcherMaxTasks ¶
func WithTaskFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts
WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in each cycle.
func WithTaskFetcherTick ¶
func WithTaskFetcherTick(tick time.Duration) GoqueProcessorOpts
WithTaskFetcherTick sets the interval between task fetching cycles.
func WithTaskFetcherTimeout ¶
func WithTaskFetcherTimeout(timeout time.Duration) GoqueProcessorOpts
WithTaskFetcherTimeout sets the timeout for task fetching operations.
func WithTaskProcessingMaxAttempts ¶
func WithTaskProcessingMaxAttempts(maxAttempts int32) GoqueProcessorOpts
WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks.
func WithTaskProcessingNextAttemptAtFunc ¶
func WithTaskProcessingNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts
WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time.
func WithTaskProcessingTimeout ¶
func WithTaskProcessingTimeout(timeout time.Duration) GoqueProcessorOpts
WithTaskProcessingTimeout sets the maximum execution time for a single task.
func WithWorkersCount ¶
func WithWorkersCount(count int) GoqueProcessorOpts
WithWorkersCount sets the number of concurrent workers for processing tasks.
func WithWorkersPanicHandler ¶
func WithWorkersPanicHandler(handler func(ctx context.Context) func(any)) GoqueProcessorOpts
WithWorkersPanicHandler sets a custom panic handler for worker pool panics.
type HookAfterProcessing ¶
HookAfterProcessing defines a hook function called after task processing completes.
type HookBeforeProcessing ¶
HookBeforeProcessing defines a hook function called before task processing begins.
type NextAttemptAtFunc ¶
NextAttemptAtFunc is a function type that calculates the next retry time based on attempt number.
func RoundStepNextAttemptAtFunc ¶
func RoundStepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc
RoundStepNextAttemptAtFunc round increase next attempt time
example
attempt: 0, addDuration: 1m0s attempt: 1, addDuration: 5m0s attempt: 2, addDuration: 10m0s attempt: 3, addDuration: 15m0s attempt: 4, addDuration: 30m0s attempt: 5, addDuration: 1h0m0s attempt: 6, addDuration: 1m0s attempt: 7, addDuration: 5m0s attempt: 8, addDuration: 10m0s attempt: 9, addDuration: 15m0s
func StaticNextAttemptAtFunc ¶
func StaticNextAttemptAtFunc(period time.Duration) NextAttemptAtFunc
StaticNextAttemptAtFunc creates a function that returns a fixed delay for all retry attempts.
func StepNextAttemptAtFunc ¶
func StepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc
StepNextAttemptAtFunc increase next attempt time
example
attempt: 0, addDuration: 1m0s attempt: 1, addDuration: 5m0s attempt: 2, addDuration: 10m0s attempt: 3, addDuration: 15m0s attempt: 4, addDuration: 30m0s attempt: 5, addDuration: 1h0m0s attempt: 6, addDuration: 1h0m0s attempt: 8, addDuration: 1h0m0s attempt: 9, addDuration: 1h0m0s
type TaskProcessor ¶
TaskProcessor defines the interface for processing individual tasks.
func NoopTaskProcessor ¶
func NoopTaskProcessor() TaskProcessor
NoopTaskProcessor returns a task processor that logs task information without performing any actual processing.
type TaskProcessorFunc ¶
TaskProcessorFunc is a function type that implements the TaskProcessor interface.
func (TaskProcessorFunc) ProcessTask ¶
ProcessTask executes the task processing function.