queueprocessor

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.

Index

Constants

This section is empty.

Variables

View Source
var FibonacciPeriods = []time.Duration{
	1 * time.Minute,
	2 * time.Minute,
	3 * time.Minute,
	5 * time.Minute,
	8 * time.Minute,
	13 * time.Minute,
	21 * time.Minute,
	34 * time.Minute,
	55 * time.Minute,
	89 * time.Minute,
}

FibonacciPeriods defines retry delays following the Fibonacci sequence.

View Source
var StrongPeriods = []time.Duration{
	1 * time.Minute,
	5 * time.Minute,
	10 * time.Minute,
	15 * time.Minute,
	30 * time.Minute,
	60 * time.Minute,
}

StrongPeriods defines a more aggressive retry delay schedule.

Functions

func LoggingAfterProcessing

func LoggingAfterProcessing(ctx context.Context, task *entity.Task, err error)

LoggingAfterProcessing default log after processing the task.

func LoggingBeforeProcessing

func LoggingBeforeProcessing(ctx context.Context, task *entity.Task)

LoggingBeforeProcessing default log before processing the task.

Types

type GoqueProcessor

type GoqueProcessor struct {
	// contains filtered or unexported fields
}

GoqueProcessor manages task fetching, processing, and worker pool coordination.

func NewGoqueProcessor

func NewGoqueProcessor(
	taskStorage storages.Task,
	taskType entity.TaskType,
	processor TaskProcessor,
	opts ...GoqueProcessorOpts,
) *GoqueProcessor

NewGoqueProcessor creates a new processor instance with the specified configuration.

func (*GoqueProcessor) Name

func (p *GoqueProcessor) Name() string

Name returns the processor's name based on its task type.

func (*GoqueProcessor) Run

func (p *GoqueProcessor) Run(ctx context.Context) error

Run starts the processor, fetching and processing tasks until the context is canceled.

func (*GoqueProcessor) Stop

func (p *GoqueProcessor) Stop()

Stop gracefully shuts down the processor by canceling its context.

type GoqueProcessorOpts

type GoqueProcessorOpts func(*GoqueProcessor)

GoqueProcessorOpts is a function type for configuring GoqueProcessor options.

func WithCleanerPeriod

func WithCleanerPeriod(period time.Duration) GoqueProcessorOpts

WithCleanerPeriod sets the interval at which the cleaner processor runs.

func WithCleanerTimeout

func WithCleanerTimeout(timeout time.Duration) GoqueProcessorOpts

WithCleanerTimeout sets the timeout duration for cleaner operations.

func WithCleanerUpdatedAtTimeAgo

func WithCleanerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts

WithCleanerUpdatedAtTimeAgo sets the time threshold for considering tasks as old enough to be cleaned.

func WithHealerPeriod

func WithHealerPeriod(period time.Duration) GoqueProcessorOpts

WithHealerPeriod sets the interval at which the healer processor runs.

func WithHealerTimeout

func WithHealerTimeout(timeout time.Duration) GoqueProcessorOpts

WithHealerTimeout sets the timeout duration for healer operations.

func WithHealerUpdatedAtTimeAgo

func WithHealerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) GoqueProcessorOpts

WithHealerUpdatedAtTimeAgo sets the time threshold for considering a task as stuck.

func WithHooksAfterProcessing

func WithHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts

WithHooksAfterProcessing adds hooks to run after task processing completes.

func WithHooksBeforeProcessing

func WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts

WithHooksBeforeProcessing adds hooks to run before task processing.

func WithTaskFetcherMaxTasks

func WithTaskFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts

WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in each cycle.

func WithTaskFetcherTick

func WithTaskFetcherTick(tick time.Duration) GoqueProcessorOpts

WithTaskFetcherTick sets the interval between task fetching cycles.

func WithTaskFetcherTimeout

func WithTaskFetcherTimeout(timeout time.Duration) GoqueProcessorOpts

WithTaskFetcherTimeout sets the timeout for task fetching operations.

func WithTaskProcessingMaxAttempts

func WithTaskProcessingMaxAttempts(maxAttempts int32) GoqueProcessorOpts

WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks.

func WithTaskProcessingNextAttemptAtFunc

func WithTaskProcessingNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts

WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time.

func WithTaskProcessingTimeout

func WithTaskProcessingTimeout(timeout time.Duration) GoqueProcessorOpts

WithTaskProcessingTimeout sets the maximum execution time for a single task.

func WithWorkersCount

func WithWorkersCount(count int) GoqueProcessorOpts

WithWorkersCount sets the number of concurrent workers for processing tasks.

func WithWorkersPanicHandler

func WithWorkersPanicHandler(handler func(ctx context.Context) func(any)) GoqueProcessorOpts

WithWorkersPanicHandler sets a custom panic handler for worker pool panics.

type HookAfterProcessing

type HookAfterProcessing func(ctx context.Context, task *entity.Task, err error)

HookAfterProcessing defines a hook function called after task processing completes.

type HookBeforeProcessing

type HookBeforeProcessing func(ctx context.Context, task *entity.Task)

HookBeforeProcessing defines a hook function called before task processing begins.

type NextAttemptAtFunc

type NextAttemptAtFunc func(currentAttempt int32) time.Time

NextAttemptAtFunc is a function type that calculates the next retry time based on attempt number.

func RoundStepNextAttemptAtFunc

func RoundStepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc

RoundStepNextAttemptAtFunc round increase next attempt time

example

attempt: 0, addDuration: 1m0s
attempt: 1, addDuration: 5m0s
attempt: 2, addDuration: 10m0s
attempt: 3, addDuration: 15m0s
attempt: 4, addDuration: 30m0s
attempt: 5, addDuration: 1h0m0s
attempt: 6, addDuration: 1m0s
attempt: 7, addDuration: 5m0s
attempt: 8, addDuration: 10m0s
attempt: 9, addDuration: 15m0s

func StaticNextAttemptAtFunc

func StaticNextAttemptAtFunc(period time.Duration) NextAttemptAtFunc

StaticNextAttemptAtFunc creates a function that returns a fixed delay for all retry attempts.

func StepNextAttemptAtFunc

func StepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc

StepNextAttemptAtFunc increase next attempt time

example

attempt: 0, addDuration: 1m0s
attempt: 1, addDuration: 5m0s
attempt: 2, addDuration: 10m0s
attempt: 3, addDuration: 15m0s
attempt: 4, addDuration: 30m0s
attempt: 5, addDuration: 1h0m0s
attempt: 6, addDuration: 1h0m0s
attempt: 8, addDuration: 1h0m0s
attempt: 9, addDuration: 1h0m0s

type TaskProcessor

type TaskProcessor interface {
	ProcessTask(ctx context.Context, task *entity.Task) error
}

TaskProcessor defines the interface for processing individual tasks.

func NoopTaskProcessor

func NoopTaskProcessor() TaskProcessor

NoopTaskProcessor returns a task processor that logs task information without performing any actual processing.

type TaskProcessorFunc

type TaskProcessorFunc func(ctx context.Context, task *entity.Task) error

TaskProcessorFunc is a function type that implements the TaskProcessor interface.

func (TaskProcessorFunc) ProcessTask

func (f TaskProcessorFunc) ProcessTask(ctx context.Context, task *entity.Task) error

ProcessTask executes the task processing function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL