internalprocessors

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package internalprocessors provides internal task processors for queue management and maintenance.

Package internalprocessors provides internal task processors for queue management and maintenance.

Package internalprocessors provides internal task processors for queue management and maintenance.

Package internalprocessors provides internal task processors for queue management and maintenance.

Index

Constants

View Source
const (
	// CleanerProcessorName is the identifier for the queue cleaner processor.
	CleanerProcessorName = "task cleaner[internal processor]"
	// DefaultCleanerTimeout is the default timeout for cleaner operations.
	DefaultCleanerTimeout = 30 * time.Second
)
View Source
const (
	// Healer is a fake task type used internally by the healer.
	Healer = "task healer[internal processor]"
	// DefaultHealerTimeout is the default timeout for healer operations.
	DefaultHealerTimeout = 30 * time.Second
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Processor

type Processor interface {
	Tune(opts []commonopts.InternalProcessorOpt)
}

Processor defines the interface for internal processors that can be tuned with options.

type QueueCleaner

type QueueCleaner struct {
	// contains filtered or unexported fields
}

QueueCleaner removes old completed, canceled, or failed tasks from the queue.

func NewQueueCleaner

func NewQueueCleaner(taskStorage *taskstorage.Storage, opts ...QueueCleanerOpts) *QueueCleaner

NewQueueCleaner creates a new queue cleaner with the specified storage and options.

func (*QueueCleaner) CleanTasksQueue

func (q *QueueCleaner) CleanTasksQueue(ctx context.Context) ([]*entity.Task, error)

CleanTasksQueue removes old tasks with done, canceled, or attempts_left status from the queue.

func (*QueueCleaner) Tune

Tune reconfigures the cleaner with new options.

type QueueCleanerOpts

type QueueCleanerOpts func(*QueueCleaner)

QueueCleanerOpts is a function type for configuring QueueCleaner options.

func GetCleanerOpts

func GetCleanerOpts(opts []commonopts.InternalProcessorOpt) []QueueCleanerOpts

GetCleanerOpts extracts cleaner-specific options from a list of internal processor options.

func WithCleanerUpdatedAtTimeAgo

func WithCleanerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) QueueCleanerOpts

WithCleanerUpdatedAtTimeAgo sets the time threshold for considering tasks as old enough to be cleaned.

func (QueueCleanerOpts) OptionType

func (o QueueCleanerOpts) OptionType() string

OptionType returns the option type identifier for cleaner options.

type QueueHealer

type QueueHealer struct {
	// contains filtered or unexported fields
}

QueueHealer identifies and fixes tasks that have been stuck in pending status for too long.

func NewQueueHealer

func NewQueueHealer(taskStorage *taskstorage.Storage, opts ...QueueHealerOpts) *QueueHealer

NewQueueHealer creates a new queue healer with the specified storage and options.

func (*QueueHealer) CureTasks

func (q *QueueHealer) CureTasks(ctx context.Context) ([]*entity.Task, error)

CureTasks marks stuck tasks in pending status as errored based on the configured time threshold.

func (*QueueHealer) Tune

Tune reconfigures the healer with new options.

type QueueHealerOpts

type QueueHealerOpts func(*QueueHealer)

QueueHealerOpts is a function type for configuring QueueHealer options.

func GetHealerOpts

func GetHealerOpts(opts []commonopts.InternalProcessorOpt) []QueueHealerOpts

GetHealerOpts extracts healer-specific options from a list of internal processor options.

func WithHealerMaxTasks

func WithHealerMaxTasks(maxTasks int64) QueueHealerOpts

WithHealerMaxTasks sets the maximum number of tasks the healer will process in one batch.

func WithHealerUpdatedAtTimeAgo

func WithHealerUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration) QueueHealerOpts

WithHealerUpdatedAtTimeAgo sets the time threshold for considering a task as stuck.

func (QueueHealerOpts) OptionType

func (o QueueHealerOpts) OptionType() string

OptionType returns the option type identifier for healer options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL