processor

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package processor provides task queue processing functionality with configurable workers and retry logic.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTaskCancel is returned when a task is canceled during processing.
	ErrTaskCancel = fmt.Errorf("task canceled")
	// ErrTaskTimeout is returned when task processing exceeds the timeout limit.
	ErrTaskTimeout = fmt.Errorf("task processing timeout")
)
View Source
var FibonacciPeriods = []time.Duration{
	1 * time.Minute,
	2 * time.Minute,
	3 * time.Minute,
	5 * time.Minute,
	8 * time.Minute,
	13 * time.Minute,
	21 * time.Minute,
	34 * time.Minute,
	55 * time.Minute,
	89 * time.Minute,
}

FibonacciPeriods defines retry delays following the Fibonacci sequence.

View Source
var StrongPeriods = []time.Duration{
	1 * time.Minute,
	5 * time.Minute,
	10 * time.Minute,
	15 * time.Minute,
	30 * time.Minute,
	60 * time.Minute,
}

StrongPeriods defines a more aggressive retry delay schedule.

Functions

This section is empty.

Types

type GoqueProcessor

type GoqueProcessor struct {
	// contains filtered or unexported fields
}

GoqueProcessor manages task fetching, processing, and worker pool coordination.

func NewGoqueProcessor

func NewGoqueProcessor(
	taskRepo TaskStorage,
	taskType entity.TaskType,
	taskProcessor TaskProcessor,
	opts ...GoqueProcessorOpts,
) *GoqueProcessor

NewGoqueProcessor creates a new processor instance with the specified configuration.

func (*GoqueProcessor) Name

func (p *GoqueProcessor) Name() string

Name returns the processor's name based on its task type.

func (*GoqueProcessor) Run

func (p *GoqueProcessor) Run(ctx context.Context) error

Run starts the processor, fetching and processing tasks until the context is canceled.

func (*GoqueProcessor) Stop

func (p *GoqueProcessor) Stop()

Stop gracefully shuts down the processor by canceling its context.

type GoqueProcessorOpts

type GoqueProcessorOpts func(*GoqueProcessor)

GoqueProcessorOpts is a function type for configuring GoqueProcessor options.

func WithFetcherMaxTasks

func WithFetcherMaxTasks(maxTasks int64) GoqueProcessorOpts

WithFetcherMaxTasks sets the maximum number of tasks to fetch in each cycle.

func WithFetcherTick

func WithFetcherTick(tick time.Duration) GoqueProcessorOpts

WithFetcherTick sets the interval between task fetching cycles.

func WithHookAfterProcessing

func WithHookAfterProcessing(hooks ...HookAfterProcessing) GoqueProcessorOpts

WithHookAfterProcessing adds hooks to run after task processing completes.

func WithHookBeforeProcessing

func WithHookBeforeProcessing(hooks ...HookBeforeProcessing) GoqueProcessorOpts

WithHookBeforeProcessing adds hooks to run before task processing.

func WithMaxAttempts

func WithMaxAttempts(maxAttempts int32) GoqueProcessorOpts

WithMaxAttempts sets the maximum number of retry attempts for failed tasks.

func WithNextAttemptAtFunc

func WithNextAttemptAtFunc(nextAttemptAt NextAttemptAtFunc) GoqueProcessorOpts

WithNextAttemptAtFunc sets a custom function to calculate the next retry time.

func WithStaticNextAttemptAtFunc

func WithStaticNextAttemptAtFunc(period time.Duration) GoqueProcessorOpts

WithStaticNextAttemptAtFunc sets a fixed retry delay period.

func WithTaskTimeout

func WithTaskTimeout(taskTimeout time.Duration) GoqueProcessorOpts

WithTaskTimeout sets the maximum execution time for a single task.

func WithWorkersCount

func WithWorkersCount(count int) GoqueProcessorOpts

WithWorkersCount sets the number of concurrent workers for processing tasks.

type HookAfterProcessing

type HookAfterProcessing func(ctx context.Context, task *entity.Task, err error)

HookAfterProcessing defines a hook function called after task processing completes.

type HookBeforeProcessing

type HookBeforeProcessing func(ctx context.Context, task *entity.Task)

HookBeforeProcessing defines a hook function called before task processing begins.

type NextAttemptAtFunc

type NextAttemptAtFunc func(currentAttempt int32) time.Time

NextAttemptAtFunc is a function type that calculates the next retry time based on attempt number.

func RoundStepNextAttemptAtFunc

func RoundStepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc

RoundStepNextAttemptAtFunc round increase next attempt time

example

attempt: 0, addDuration: 1m0s
attempt: 1, addDuration: 5m0s
attempt: 2, addDuration: 10m0s
attempt: 3, addDuration: 15m0s
attempt: 4, addDuration: 30m0s
attempt: 5, addDuration: 1h0m0s
attempt: 6, addDuration: 1m0s
attempt: 7, addDuration: 5m0s
attempt: 8, addDuration: 10m0s
attempt: 9, addDuration: 15m0s

func StaticNextAttemptAtFunc

func StaticNextAttemptAtFunc(period time.Duration) NextAttemptAtFunc

StaticNextAttemptAtFunc creates a function that returns a fixed delay for all retry attempts.

func StepNextAttemptAtFunc

func StepNextAttemptAtFunc(periods []time.Duration) NextAttemptAtFunc

StepNextAttemptAtFunc increase next attempt time

example

attempt: 0, addDuration: 1m0s
attempt: 1, addDuration: 5m0s
attempt: 2, addDuration: 10m0s
attempt: 3, addDuration: 15m0s
attempt: 4, addDuration: 30m0s
attempt: 5, addDuration: 1h0m0s
attempt: 6, addDuration: 1h0m0s
attempt: 8, addDuration: 1h0m0s
attempt: 9, addDuration: 1h0m0s

type TaskProcessor

type TaskProcessor interface {
	ProcessTask(ctx context.Context, payload string) error
}

TaskProcessor defines the interface for processing individual tasks.

type TaskProcessorFunc

type TaskProcessorFunc func(ctx context.Context, payload string) error

TaskProcessorFunc is a function type that implements the TaskProcessor interface.

func (TaskProcessorFunc) ProcessTask

func (f TaskProcessorFunc) ProcessTask(ctx context.Context, payload string) error

ProcessTask executes the task processing function.

type TaskStorage

type TaskStorage interface {
	GetTasksForProcessing(ctx context.Context, taskType string, maxTasks int64) ([]*entity.Task, error)
	UpdateTask(ctx context.Context, taskID uuid.UUID, task *entity.Task) error
}

TaskStorage defines the interface for task storage operations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL