internalprocessors

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTaskIsFrozen is returned when a task cannot be cured because it is frozen.
	ErrTaskIsFrozen = errors.New("task is frozen")
)

Functions

This section is empty.

Types

type CleanerTaskStorage

type CleanerTaskStorage interface {
	DeleteTasks(ctx context.Context, taskType entity.TaskType, statuses []entity.TaskStatus, updatedAtTimeAgo time.Duration) ([]*entity.Task, error)
}

CleanerTaskStorage defines the storage interface required for the cleaner processor to delete old tasks.

type HealerTaskStorage

type HealerTaskStorage interface {
	CureTasks(ctx context.Context, taskType entity.TaskType, unhealthStatuses []entity.TaskStatus, updatedAtTimeAgo time.Duration, comment string) ([]*entity.Task, error)
}

HealerTaskStorage defines the storage interface required for the healer processor to cure stuck tasks.

type QueueCleaner

type QueueCleaner struct {
	// contains filtered or unexported fields
}

QueueCleaner removes old completed, canceled, or failed tasks from the queue.

func NewQueueCleaner

func NewQueueCleaner(taskStorage CleanerTaskStorage, taskType string) *QueueCleaner

NewQueueCleaner creates a new queue cleaner with the specified storage and options.

func (*QueueCleaner) CleanTasksQueue

func (q *QueueCleaner) CleanTasksQueue(ctx context.Context) error

CleanTasksQueue removes old tasks with done, canceled, or attempts_left status from the queue.

func (QueueCleaner) Name

func (p QueueCleaner) Name() string

Name returns the processor's name based on its task type.

func (QueueCleaner) Run

func (p QueueCleaner) Run(ctx context.Context, parentProcessor string)

Run starts the processor, fetching and processing tasks until the context is canceled.

func (QueueCleaner) SetProcessPeriod

func (p QueueCleaner) SetProcessPeriod(processPeriod time.Duration)

func (QueueCleaner) SetProcessTimeout

func (p QueueCleaner) SetProcessTimeout(timeout time.Duration)

func (*QueueCleaner) SetUpdatedAtTimeAgo

func (q *QueueCleaner) SetUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration)

SetUpdatedAtTimeAgo sets the time threshold for considering tasks as old enough to be cleaned.

func (QueueCleaner) Stop

func (p QueueCleaner) Stop()

Stop gracefully shuts down the processor by canceling its context.

type QueueHealer

type QueueHealer struct {
	// contains filtered or unexported fields
}

QueueHealer identifies and fixes tasks that have been stuck in pending status for too long.

func NewQueueHealer

func NewQueueHealer(taskStorage HealerTaskStorage, taskType entity.TaskType) *QueueHealer

NewQueueHealer creates a new queue healer with the specified storage and options.

func (*QueueHealer) CureTasks

func (q *QueueHealer) CureTasks(ctx context.Context) error

CureTasks marks stuck tasks in pending status as errored based on the configured time threshold.

func (QueueHealer) Name

func (p QueueHealer) Name() string

Name returns the processor's name based on its task type.

func (QueueHealer) Run

func (p QueueHealer) Run(ctx context.Context, parentProcessor string)

Run starts the processor, fetching and processing tasks until the context is canceled.

func (QueueHealer) SetProcessPeriod

func (p QueueHealer) SetProcessPeriod(processPeriod time.Duration)

func (QueueHealer) SetProcessTimeout

func (p QueueHealer) SetProcessTimeout(timeout time.Duration)

func (*QueueHealer) SetUpdatedAtTimeAgo

func (q *QueueHealer) SetUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration)

SetUpdatedAtTimeAgo sets the time threshold for considering a task as stuck.

func (QueueHealer) Stop

func (p QueueHealer) Stop()

Stop gracefully shuts down the processor by canceling its context.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL