Documentation
¶
Overview ¶
Package processor provides task queue processing functionality with configurable workers and retry logic.
Index ¶
- Variables
- func LoggingAfterProcessing(ctx context.Context, task *entity.Task, err error)
- func LoggingBeforeProcessing(ctx context.Context, task *entity.Task)
- type GoqueProcessor
- type GoqueProcessorOpts
- func GetGoqueProcessorOpts(opts []commonopts.InternalProcessorOpt) []GoqueProcessorOpts
- func WithHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
- func WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
- func WithMaxAttempts(maxAttempts int32) GoqueProcessorOpts
- func WithNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts
- func WithReplaceHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
- func WithReplaceHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
- func WithStaticNextAttemptAtFunc(period time.Duration) GoqueProcessorOpts
- func WithTaskFetcher(fetcher TaskFetcher) GoqueProcessorOpts
- func WithTaskFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts
- func WithTaskFetcherTick(tick time.Duration) GoqueProcessorOpts
- func WithTaskFetcherTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithTaskTimeout(timeout time.Duration) GoqueProcessorOpts
- func WithWorkersCount(count int) GoqueProcessorOpts
- type HookAfterProcessing
- type HookBeforeProcessing
- type NextAttemptAtFunc
- type TaskFetcher
- type TaskFetcherFunc
- type TaskProcessor
- type TaskProcessorFunc
- type TaskStorage
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTaskCancel is returned when a task is canceled during processing. ErrTaskCancel = fmt.Errorf("task canceled") // ErrTaskTimeout is returned when task processing exceeds the timeout limit. ErrTaskTimeout = fmt.Errorf("task processing timeout") )
var FibonacciPeriods = []time.Duration{ 1 * time.Minute, 2 * time.Minute, 3 * time.Minute, 5 * time.Minute, 8 * time.Minute, 13 * time.Minute, 21 * time.Minute, 34 * time.Minute, 55 * time.Minute, 89 * time.Minute, }
FibonacciPeriods defines retry delays following the Fibonacci sequence.
var StrongPeriods = []time.Duration{ 1 * time.Minute, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute, 30 * time.Minute, 60 * time.Minute, }
StrongPeriods defines a more aggressive retry delay schedule.
Functions ¶
func LoggingAfterProcessing ¶ added in v0.0.2
LoggingAfterProcessing default log after processing the task.
Types ¶
type GoqueProcessor ¶
type GoqueProcessor struct {
// contains filtered or unexported fields
}
GoqueProcessor manages task fetching, processing, and worker pool coordination.
func NewGoqueProcessor ¶
func NewGoqueProcessor( taskStorage TaskStorage, taskType entity.TaskType, processor TaskProcessor, opts ...GoqueProcessorOpts, ) *GoqueProcessor
NewGoqueProcessor creates a new processor instance with the specified configuration.
func (*GoqueProcessor) Name ¶
func (p *GoqueProcessor) Name() string
Name returns the processor's name based on its task type.
func (*GoqueProcessor) Run ¶
func (p *GoqueProcessor) Run(ctx context.Context) error
Run starts the processor, fetching and processing tasks until the context is canceled.
func (*GoqueProcessor) Stop ¶
func (p *GoqueProcessor) Stop()
Stop gracefully shuts down the processor by canceling its context.
func (*GoqueProcessor) Tune ¶ added in v0.0.2
func (p *GoqueProcessor) Tune(opts []GoqueProcessorOpts)
Tune reconfigures the processor with new options.
type GoqueProcessorOpts ¶
type GoqueProcessorOpts func(*GoqueProcessor)
GoqueProcessorOpts is a function type for configuring GoqueProcessor options.
func GetGoqueProcessorOpts ¶ added in v0.0.2
func GetGoqueProcessorOpts(opts []commonopts.InternalProcessorOpt) []GoqueProcessorOpts
GetGoqueProcessorOpts extracts processor-specific options from a list of internal processor options.
func WithHooksAfterProcessing ¶ added in v0.0.2
func WithHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
WithHooksAfterProcessing adds hooks to run after task processing completes.
func WithHooksBeforeProcessing ¶ added in v0.0.2
func WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
WithHooksBeforeProcessing adds hooks to run before task processing.
func WithMaxAttempts ¶
func WithMaxAttempts(maxAttempts int32) GoqueProcessorOpts
WithMaxAttempts sets the maximum number of retry attempts for failed tasks.
func WithNextAttemptAtFunc ¶
func WithNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts
WithNextAttemptAtFunc sets a custom function to calculate the next retry time.
func WithReplaceHooksAfterProcessing ¶ added in v0.0.2
func WithReplaceHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts
WithReplaceHooksAfterProcessing replaces all hooks with new ones.
func WithReplaceHooksBeforeProcessing ¶ added in v0.0.2
func WithReplaceHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts
WithReplaceHooksBeforeProcessing replaces all hooks with new ones.
func WithStaticNextAttemptAtFunc ¶
func WithStaticNextAttemptAtFunc(period time.Duration) GoqueProcessorOpts
WithStaticNextAttemptAtFunc sets a fixed retry delay period.
func WithTaskFetcher ¶ added in v0.0.2
func WithTaskFetcher(fetcher TaskFetcher) GoqueProcessorOpts
WithTaskFetcher replaces the default task fetcher with a custom implementation.
func WithTaskFetcherMaxTasks ¶ added in v0.0.2
func WithTaskFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts
WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in each cycle.
func WithTaskFetcherTick ¶ added in v0.0.2
func WithTaskFetcherTick(tick time.Duration) GoqueProcessorOpts
WithTaskFetcherTick sets the interval between task fetching cycles.
func WithTaskFetcherTimeout ¶ added in v0.0.2
func WithTaskFetcherTimeout(timeout time.Duration) GoqueProcessorOpts
WithTaskFetcherTimeout sets the timeout for task fetching operations.
func WithTaskTimeout ¶
func WithTaskTimeout(timeout time.Duration) GoqueProcessorOpts
WithTaskTimeout sets the maximum execution time for a single task.
func WithWorkersCount ¶
func WithWorkersCount(count int) GoqueProcessorOpts
WithWorkersCount sets the number of concurrent workers for processing tasks.
func (GoqueProcessorOpts) OptionType ¶ added in v0.0.2
func (o GoqueProcessorOpts) OptionType() string
OptionType returns the option type identifier for processor options.
type HookAfterProcessing ¶
HookAfterProcessing defines a hook function called after task processing completes.
type HookBeforeProcessing ¶
HookBeforeProcessing defines a hook function called before task processing begins.
type NextAttemptAtFunc ¶
NextAttemptAtFunc is a function type that calculates the next retry time based on attempt number.
func RoundStepNextAttemptAtFunc ¶
func RoundStepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc
RoundStepNextAttemptAtFunc round increase next attempt time
example
attempt: 0, addDuration: 1m0s attempt: 1, addDuration: 5m0s attempt: 2, addDuration: 10m0s attempt: 3, addDuration: 15m0s attempt: 4, addDuration: 30m0s attempt: 5, addDuration: 1h0m0s attempt: 6, addDuration: 1m0s attempt: 7, addDuration: 5m0s attempt: 8, addDuration: 10m0s attempt: 9, addDuration: 15m0s
func StaticNextAttemptAtFunc ¶
func StaticNextAttemptAtFunc(period time.Duration) NextAttemptAtFunc
StaticNextAttemptAtFunc creates a function that returns a fixed delay for all retry attempts.
func StepNextAttemptAtFunc ¶
func StepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc
StepNextAttemptAtFunc increase next attempt time
example
attempt: 0, addDuration: 1m0s attempt: 1, addDuration: 5m0s attempt: 2, addDuration: 10m0s attempt: 3, addDuration: 15m0s attempt: 4, addDuration: 30m0s attempt: 5, addDuration: 1h0m0s attempt: 6, addDuration: 1h0m0s attempt: 8, addDuration: 1h0m0s attempt: 9, addDuration: 1h0m0s
type TaskFetcher ¶ added in v0.0.2
TaskFetcher defines the interface for fetching tasks for processing.
type TaskFetcherFunc ¶ added in v0.0.2
TaskFetcherFunc is a function type that implements the TaskFetcher interface.
func (TaskFetcherFunc) FetchTasks ¶ added in v0.0.2
FetchTasks executes the task fetching function.
type TaskProcessor ¶
TaskProcessor defines the interface for processing individual tasks.
func NoopTaskProcessor ¶ added in v0.0.2
func NoopTaskProcessor() TaskProcessor
NoopTaskProcessor returns a task processor that logs task information without performing any actual processing.
type TaskProcessorFunc ¶
TaskProcessorFunc is a function type that implements the TaskProcessor interface.
func (TaskProcessorFunc) ProcessTask ¶
ProcessTask executes the task processing function.