Documentation
¶
Overview ¶
Package processor provides task queue processing functionality with configurable workers and retry logic.
Index ¶
- Variables
- type GoqueProcessor
- type GoqueProcessorOpts
- func WithFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts
- func WithFetcherTick(tick time.Duration) GoqueProcessorOpts
- func WithHookAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
- func WithHookBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
- func WithMaxAttempts(maxAttempts int32) GoqueProcessorOpts
- func WithNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts
- func WithStaticNextAttemptAtFunc(period time.Duration) GoqueProcessorOpts
- func WithTaskTimeout(taskTimeout time.Duration) GoqueProcessorOpts
- func WithWorkersCount(count int) GoqueProcessorOpts
- type HookAfterProcessing
- type HookBeforeProcessing
- type NextAttemptAtFunc
- type TaskProcessor
- type TaskProcessorFunc
- type TaskStorage
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTaskCancel is returned when a task is canceled during processing. ErrTaskCancel = fmt.Errorf("task canceled") // ErrTaskTimeout is returned when task processing exceeds the timeout limit. ErrTaskTimeout = fmt.Errorf("task processing timeout") )
var FibonacciPeriods = []time.Duration{ 1 * time.Minute, 2 * time.Minute, 3 * time.Minute, 5 * time.Minute, 8 * time.Minute, 13 * time.Minute, 21 * time.Minute, 34 * time.Minute, 55 * time.Minute, 89 * time.Minute, }
FibonacciPeriods defines retry delays following the Fibonacci sequence.
var StrongPeriods = []time.Duration{ 1 * time.Minute, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 30 * time.Minute, 60 * time.Minute, }
StrongPeriods defines a more aggressive retry delay schedule.
Functions ¶
This section is empty.
Types ¶
type GoqueProcessor ¶
type GoqueProcessor struct {
// contains filtered or unexported fields
}
GoqueProcessor manages task fetching, processing, and worker pool coordination.
func NewGoqueProcessor ¶
func NewGoqueProcessor( taskRepo TaskStorage, taskType entity.TaskType, taskProcessor TaskProcessor, opts ...GoqueProcessorOpts, ) *GoqueProcessor
NewGoqueProcessor creates a new processor instance with the specified configuration.
func (*GoqueProcessor) Name ¶
func (p *GoqueProcessor) Name() string
Name returns the processor's name based on its task type.
func (*GoqueProcessor) Run ¶
func (p *GoqueProcessor) Run(ctx context.Context) error
Run starts the processor, fetching and processing tasks until the context is canceled.
func (*GoqueProcessor) Stop ¶
func (p *GoqueProcessor) Stop()
Stop gracefully shuts down the processor by canceling its context.
type GoqueProcessorOpts ¶
type GoqueProcessorOpts func(*GoqueProcessor)
GoqueProcessorOpts is a function type for configuring GoqueProcessor options.
func WithFetcherMaxTasks ¶
func WithFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts
WithFetcherMaxTasks sets the maximum number of tasks to fetch in each cycle.
func WithFetcherTick ¶
func WithFetcherTick(tick time.Duration) GoqueProcessorOpts
WithFetcherTick sets the interval between task fetching cycles.
func WithHookAfterProcessing ¶
func WithHookAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
WithHookAfterProcessing adds hooks to run after task processing completes.
func WithHookBeforeProcessing ¶
func WithHookBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
WithHookBeforeProcessing adds hooks to run before task processing.
func WithMaxAttempts ¶
func WithMaxAttempts(maxAttempts int32) GoqueProcessorOpts
WithMaxAttempts sets the maximum number of retry attempts for failed tasks.
func WithNextAttemptAtFunc ¶
func WithNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts
WithNextAttemptAtFunc sets a custom function to calculate the next retry time.
func WithStaticNextAttemptAtFunc ¶
func WithStaticNextAttemptAtFunc(period time.Duration) GoqueProcessorOpts
WithStaticNextAttemptAtFunc sets a fixed retry delay period.
func WithTaskTimeout ¶
func WithTaskTimeout(taskTimeout time.Duration) GoqueProcessorOpts
WithTaskTimeout sets the maximum execution time for a single task.
func WithWorkersCount ¶
func WithWorkersCount(count int) GoqueProcessorOpts
WithWorkersCount sets the number of concurrent workers for processing tasks.
type HookAfterProcessing ¶
HookAfterProcessing defines a hook function called after task processing completes.
type HookBeforeProcessing ¶
HookBeforeProcessing defines a hook function called before task processing begins.
type NextAttemptAtFunc ¶
NextAttemptAtFunc is a function type that calculates the next retry time based on attempt number.
func RoundStepNextAttemptAtFunc ¶
func RoundStepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc
RoundStepNextAttemptAtFunc round increase next attempt time
example
attempt: 0, addDuration: 1m0s attempt: 1, addDuration: 5m0s attempt: 2, addDuration: 10m0s attempt: 3, addDuration: 15m0s attempt: 4, addDuration: 30m0s attempt: 5, addDuration: 1h0m0s attempt: 6, addDuration: 1m0s attempt: 7, addDuration: 5m0s attempt: 8, addDuration: 10m0s attempt: 9, addDuration: 15m0s
func StaticNextAttemptAtFunc ¶
func StaticNextAttemptAtFunc(period time.Duration) NextAttemptAtFunc
StaticNextAttemptAtFunc creates a function that returns a fixed delay for all retry attempts.
func StepNextAttemptAtFunc ¶
func StepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc
StepNextAttemptAtFunc increase next attempt time
example
attempt: 0, addDuration: 1m0s attempt: 1, addDuration: 5m0s attempt: 2, addDuration: 10m0s attempt: 3, addDuration: 15m0s attempt: 4, addDuration: 30m0s attempt: 5, addDuration: 1h0m0s attempt: 6, addDuration: 1h0m0s attempt: 8, addDuration: 1h0m0s attempt: 9, addDuration: 1h0m0s
type TaskProcessor ¶
TaskProcessor defines the interface for processing individual tasks.
type TaskProcessorFunc ¶
TaskProcessorFunc is a function type that implements the TaskProcessor interface.
func (TaskProcessorFunc) ProcessTask ¶
func (f TaskProcessorFunc) ProcessTask(ctx context.Context, payload string) error
ProcessTask executes the task processing function.