processor

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package processor provides task queue processing functionality with configurable workers and retry logic.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTaskCancel is returned when a task is canceled during processing.
	ErrTaskCancel = fmt.Errorf("task canceled")
	// ErrTaskTimeout is returned when task processing exceeds the timeout limit.
	ErrTaskTimeout = fmt.Errorf("task processing timeout")
)
View Source
var FibonacciPeriods = []time.Duration{
	1 * time.Minute,
	2 * time.Minute,
	3 * time.Minute,
	5 * time.Minute,
	8 * time.Minute,
	13 * time.Minute,
	21 * time.Minute,
	34 * time.Minute,
	55 * time.Minute,
	89 * time.Minute,
}

FibonacciPeriods defines retry delays following the Fibonacci sequence.

View Source
var StrongPeriods = []time.Duration{
	1 * time.Minute,
	5 * time.Minute,
	10 * time.Minute,
	15 * time.Minute,
	30 * time.Minute,
	60 * time.Minute,
}

StrongPeriods defines a more aggressive retry delay schedule.

Functions

func LoggingAfterProcessing added in v0.0.2

func LoggingAfterProcessing(ctx context.Context, task *entity.Task, err error)

LoggingAfterProcessing default log after processing the task.

func LoggingBeforeProcessing added in v0.0.2

func LoggingBeforeProcessing(ctx context.Context, task *entity.Task)

LoggingBeforeProcessing default log before processing the task.

Types

type GoqueProcessor

type GoqueProcessor struct {
	// contains filtered or unexported fields
}

GoqueProcessor manages task fetching, processing, and worker pool coordination.

func NewGoqueProcessor

func NewGoqueProcessor(
	taskStorage TaskStorage,
	taskType entity.TaskType,
	processor TaskProcessor,
	opts ...GoqueProcessorOpts,
) *GoqueProcessor

NewGoqueProcessor creates a new processor instance with the specified configuration.

func (*GoqueProcessor) Name

func (p *GoqueProcessor) Name() string

Name returns the processor's name based on its task type.

func (*GoqueProcessor) Run

func (p *GoqueProcessor) Run(ctx context.Context) error

Run starts the processor, fetching and processing tasks until the context is canceled.

func (*GoqueProcessor) Stop

func (p *GoqueProcessor) Stop()

Stop gracefully shuts down the processor by canceling its context.

func (*GoqueProcessor) Tune added in v0.0.2

func (p *GoqueProcessor) Tune(opts []GoqueProcessorOpts)

Tune reconfigures the processor with new options.

type GoqueProcessorOpts

type GoqueProcessorOpts func(*GoqueProcessor)

GoqueProcessorOpts is a function type for configuring GoqueProcessor options.

func GetGoqueProcessorOpts added in v0.0.2

func GetGoqueProcessorOpts(opts []commonopts.InternalProcessorOpt) []GoqueProcessorOpts

GetGoqueProcessorOpts extracts processor-specific options from a list of internal processor options.

func WithHooksAfterProcessing added in v0.0.2

func WithHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts

WithHooksAfterProcessing adds hooks to run after task processing completes.

func WithHooksBeforeProcessing added in v0.0.2

func WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts

WithHooksBeforeProcessing adds hooks to run before task processing.

func WithMaxAttempts

func WithMaxAttempts(maxAttempts int32) GoqueProcessorOpts

WithMaxAttempts sets the maximum number of retry attempts for failed tasks.

func WithNextAttemptAtFunc

func WithNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts

WithNextAttemptAtFunc sets a custom function to calculate the next retry time.

func WithReplaceHooksAfterProcessing added in v0.0.2

func WithReplaceHooksAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts

WithReplaceHooksAfterProcessing replaces all hooks with new ones.

func WithReplaceHooksBeforeProcessing added in v0.0.2

func WithReplaceHooksBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts

WithReplaceHooksBeforeProcessing replaces all hooks with new ones.

func WithStaticNextAttemptAtFunc

func WithStaticNextAttemptAtFunc(period time.Duration) GoqueProcessorOpts

WithStaticNextAttemptAtFunc sets a fixed retry delay period.

func WithTaskFetcher added in v0.0.2

func WithTaskFetcher(fetcher TaskFetcher) GoqueProcessorOpts

WithTaskFetcher replaces the default task fetcher with a custom implementation.

func WithTaskFetcherMaxTasks added in v0.0.2

func WithTaskFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts

WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in each cycle.

func WithTaskFetcherTick added in v0.0.2

func WithTaskFetcherTick(tick time.Duration) GoqueProcessorOpts

WithTaskFetcherTick sets the interval between task fetching cycles.

func WithTaskFetcherTimeout added in v0.0.2

func WithTaskFetcherTimeout(timeout time.Duration) GoqueProcessorOpts

WithTaskFetcherTimeout sets the timeout for task fetching operations.

func WithTaskTimeout

func WithTaskTimeout(timeout time.Duration) GoqueProcessorOpts

WithTaskTimeout sets the maximum execution time for a single task.

func WithWorkersCount

func WithWorkersCount(count int) GoqueProcessorOpts

WithWorkersCount sets the number of concurrent workers for processing tasks.

func (GoqueProcessorOpts) OptionType added in v0.0.2

func (o GoqueProcessorOpts) OptionType() string

OptionType returns the option type identifier for processor options.

type HookAfterProcessing

type HookAfterProcessing func(ctx context.Context, task *entity.Task, err error)

HookAfterProcessing defines a hook function called after task processing completes.

type HookBeforeProcessing

type HookBeforeProcessing func(ctx context.Context, task *entity.Task)

HookBeforeProcessing defines a hook function called before task processing begins.

type NextAttemptAtFunc

type NextAttemptAtFunc func(currentAttempt int32) time.Time

NextAttemptAtFunc is a function type that calculates the next retry time based on attempt number.

func RoundStepNextAttemptAtFunc

func RoundStepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc

RoundStepNextAttemptAtFunc round increase next attempt time

example

attempt: 0, addDuration: 1m0s
attempt: 1, addDuration: 5m0s
attempt: 2, addDuration: 10m0s
attempt: 3, addDuration: 15m0s
attempt: 4, addDuration: 30m0s
attempt: 5, addDuration: 1h0m0s
attempt: 6, addDuration: 1m0s
attempt: 7, addDuration: 5m0s
attempt: 8, addDuration: 10m0s
attempt: 9, addDuration: 15m0s

func StaticNextAttemptAtFunc

func StaticNextAttemptAtFunc(period time.Duration) NextAttemptAtFunc

StaticNextAttemptAtFunc creates a function that returns a fixed delay for all retry attempts.

func StepNextAttemptAtFunc

func StepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc

StepNextAttemptAtFunc increase next attempt time

example

attempt: 0, addDuration: 1m0s
attempt: 1, addDuration: 5m0s
attempt: 2, addDuration: 10m0s
attempt: 3, addDuration: 15m0s
attempt: 4, addDuration: 30m0s
attempt: 5, addDuration: 1h0m0s
attempt: 6, addDuration: 1h0m0s
attempt: 8, addDuration: 1h0m0s
attempt: 9, addDuration: 1h0m0s

type TaskFetcher added in v0.0.2

type TaskFetcher interface {
	FetchTasks(ctx context.Context) ([]*entity.Task, error)
}

TaskFetcher defines the interface for fetching tasks for processing.

type TaskFetcherFunc added in v0.0.2

type TaskFetcherFunc func(ctx context.Context) ([]*entity.Task, error)

TaskFetcherFunc is a function type that implements the TaskFetcher interface.

func (TaskFetcherFunc) FetchTasks added in v0.0.2

func (f TaskFetcherFunc) FetchTasks(ctx context.Context) ([]*entity.Task, error)

FetchTasks executes the task fetching function.

type TaskProcessor

type TaskProcessor interface {
	ProcessTask(ctx context.Context, task *entity.Task) error
}

TaskProcessor defines the interface for processing individual tasks.

func NoopTaskProcessor added in v0.0.2

func NoopTaskProcessor() TaskProcessor

NoopTaskProcessor returns a task processor that logs task information without performing any actual processing.

type TaskProcessorFunc

type TaskProcessorFunc func(ctx context.Context, task *entity.Task) error

TaskProcessorFunc is a function type that implements the TaskProcessor interface.

func (TaskProcessorFunc) ProcessTask

func (f TaskProcessorFunc) ProcessTask(ctx context.Context, task *entity.Task) error

ProcessTask executes the task processing function.

type TaskStorage

type TaskStorage interface {
	GetTasksForProcessing(ctx context.Context, taskType entity.TaskType, maxTasks int64) ([]*entity.Task, error)
	UpdateTask(ctx context.Context, taskID uuid.UUID, task *entity.Task) error
}

TaskStorage defines the interface for task storage operations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL