Documentation
¶
Overview ¶
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
Index ¶
- type CleanerTaskStorage
- type HealerTaskStorage
- type QueueCleaner
- func (q *QueueCleaner) CleanTasksQueue(ctx context.Context, taskType entity.TaskType) ([]*entity.Task, error)
- func (p QueueCleaner) Run(ctx context.Context)
- func (p QueueCleaner) SetProcessPeriod(processPeriod time.Duration)
- func (p QueueCleaner) SetProcessTimeout(timeout time.Duration)
- func (q *QueueCleaner) SetUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration)
- func (p QueueCleaner) Stop()
- type QueueHealer
- func (q *QueueHealer) CureTasks(ctx context.Context, taskType entity.TaskType) ([]*entity.Task, error)
- func (p QueueHealer) Run(ctx context.Context)
- func (p QueueHealer) SetProcessPeriod(processPeriod time.Duration)
- func (p QueueHealer) SetProcessTimeout(timeout time.Duration)
- func (q *QueueHealer) SetUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration)
- func (p QueueHealer) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CleanerTaskStorage ¶
type CleanerTaskStorage interface {
DeleteTasks(ctx context.Context, taskType entity.TaskType, statuses []entity.TaskStatus, updatedAtTimeAgo time.Duration) ([]*entity.Task, error)
}
CleanerTaskStorage defines the storage interface required for the cleaner processor to delete old tasks.
type HealerTaskStorage ¶
type HealerTaskStorage interface {
CureTasks(ctx context.Context, taskType entity.TaskType, unhealthStatuses []entity.TaskStatus, updatedAtTimeAgo time.Duration, comment string) ([]*entity.Task, error)
}
HealerTaskStorage defines the storage interface required for the healer processor to cure stuck tasks.
type QueueCleaner ¶
type QueueCleaner struct {
// contains filtered or unexported fields
}
QueueCleaner removes old completed, canceled, or failed tasks from the queue.
func NewQueueCleaner ¶
func NewQueueCleaner(taskStorage CleanerTaskStorage, taskType entity.TaskType) *QueueCleaner
NewQueueCleaner creates a new queue cleaner with the specified storage and options.
func (*QueueCleaner) CleanTasksQueue ¶
func (q *QueueCleaner) CleanTasksQueue(ctx context.Context, taskType entity.TaskType) ([]*entity.Task, error)
CleanTasksQueue removes old tasks with done, canceled, or attempts_left status from the queue.
func (QueueCleaner) Run ¶
Run starts the processor, fetching and processing tasks until the context is canceled.
func (QueueCleaner) SetProcessPeriod ¶
func (QueueCleaner) SetProcessTimeout ¶
func (*QueueCleaner) SetUpdatedAtTimeAgo ¶
func (q *QueueCleaner) SetUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration)
SetUpdatedAtTimeAgo sets the time threshold for considering tasks as old enough to be cleaned.
type QueueHealer ¶
type QueueHealer struct {
// contains filtered or unexported fields
}
QueueHealer identifies and fixes tasks that have been stuck in pending status for too long.
func NewQueueHealer ¶
func NewQueueHealer(taskStorage HealerTaskStorage, taskType entity.TaskType) *QueueHealer
NewQueueHealer creates a new queue healer with the specified storage and options.
func (*QueueHealer) CureTasks ¶
func (q *QueueHealer) CureTasks(ctx context.Context, taskType entity.TaskType) ([]*entity.Task, error)
CureTasks marks stuck tasks in pending status as errored based on the configured time threshold.
func (QueueHealer) Run ¶
Run starts the processor, fetching and processing tasks until the context is canceled.
func (QueueHealer) SetProcessPeriod ¶
func (QueueHealer) SetProcessTimeout ¶
func (*QueueHealer) SetUpdatedAtTimeAgo ¶
func (q *QueueHealer) SetUpdatedAtTimeAgo(updatedAtTimeAgo time.Duration)
SetUpdatedAtTimeAgo sets the time threshold for considering a task as stuck.