internalprocessors

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CleanerTaskStorage

type CleanerTaskStorage interface {
	DeleteTasks(ctx context.Context, taskType entity.TaskType, statuses []entity.TaskStatus, updatedAtTimeAgo time.Duration) ([]*entity.Task, error)
}

CleanerTaskStorage defines the storage interface required for the cleaner processor to delete old tasks.

type HealerTaskStorage

type HealerTaskStorage interface {
	CureTasks(ctx context.Context, taskType entity.TaskType, unhealthStatuses []entity.TaskStatus, updatedAtTimeAgo time.Duration, comment string) ([]*entity.Task, error)
}

HealerTaskStorage defines the storage interface required for the healer processor to cure stuck tasks.

type QueueCleaner

type QueueCleaner struct {
	// contains filtered or unexported fields
}

QueueCleaner removes old completed, canceled, or failed tasks from the queue.

func NewQueueCleaner

func NewQueueCleaner(taskStorage CleanerTaskStorage, taskType entity.TaskType) *QueueCleaner

NewQueueCleaner creates a new queue cleaner with the specified storage and options.

func (*QueueCleaner) CleanTasksQueue

func (q *QueueCleaner) CleanTasksQueue(ctx context.Context, taskType entity.TaskType) ([]*entity.Task, error)

CleanTasksQueue removes old tasks with done, canceled, or attempts_left status from the queue.

func (QueueCleaner) Run

func (p QueueCleaner) Run(ctx context.Context)

Run starts the processor, fetching and processing tasks until the context is canceled.

func (QueueCleaner) SetProcessPeriod

func (p QueueCleaner) SetProcessPeriod(processPeriod time.Duration)

func (QueueCleaner) SetProcessTimeout

func (p QueueCleaner) SetProcessTimeout(timeout time.Duration)

func (*QueueCleaner) SetUpdatedAtTimeAgo

func (q *QueueCleaner) SetUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration)

SetUpdatedAtTimeAgo sets the time threshold for considering tasks as old enough to be cleaned.

func (QueueCleaner) Stop

func (p QueueCleaner) Stop()

Stop gracefully shuts down the processor by canceling its context.

type QueueHealer

type QueueHealer struct {
	// contains filtered or unexported fields
}

QueueHealer identifies and fixes tasks that have been stuck in pending status for too long.

func NewQueueHealer

func NewQueueHealer(taskStorage HealerTaskStorage, taskType entity.TaskType) *QueueHealer

NewQueueHealer creates a new queue healer with the specified storage and options.

func (*QueueHealer) CureTasks

func (q *QueueHealer) CureTasks(ctx context.Context, taskType entity.TaskType) ([]*entity.Task, error)

CureTasks marks stuck tasks in pending status as errored based on the configured time threshold.

func (QueueHealer) Run

func (p QueueHealer) Run(ctx context.Context)

Run starts the processor, fetching and processing tasks until the context is canceled.

func (QueueHealer) SetProcessPeriod

func (p QueueHealer) SetProcessPeriod(processPeriod time.Duration)

func (QueueHealer) SetProcessTimeout

func (p QueueHealer) SetProcessTimeout(timeout time.Duration)

func (*QueueHealer) SetUpdatedAtTimeAgo

func (q *QueueHealer) SetUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration)

SetUpdatedAtTimeAgo sets the time threshold for considering a task as stuck.

func (QueueHealer) Stop

func (p QueueHealer) Stop()

Stop gracefully shuts down the processor by canceling its context.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL