Documentation
¶
Index ¶
- Constants
- Variables
- type QueueStatusType
- type TaskCounter
- type TaskQueue
- func (q *TaskQueue) AddAfter(id string, tasks ...task.Task)
- func (q *TaskQueue) AddBefore(id string, newTask task.Task)
- func (q *TaskQueue) AddFirst(tasks ...task.Task)
- func (q *TaskQueue) AddLast(tasks ...task.Task)
- func (q *TaskQueue) CancelTaskDelay()
- func (q *TaskQueue) DeleteFunc(fn func(task.Task) bool)
- func (q *TaskQueue) Get(id string) task.Task
- func (q *TaskQueue) GetFirst() task.Task
- func (q *TaskQueue) GetLast() task.Task
- func (q *TaskQueue) GetSnapshot() []task.Task
- func (q *TaskQueue) GetStatus() string
- func (q *TaskQueue) GetStatusType() QueueStatusType
- func (q *TaskQueue) IterateSnapshot(doFn func(task.Task))
- func (q *TaskQueue) Length() int
- func (q *TaskQueue) MeasureActionTime(action string) func()
- func (q *TaskQueue) Remove(id string) task.Task
- func (q *TaskQueue) RemoveFirst() task.Task
- func (q *TaskQueue) RemoveLast() task.Task
- func (q *TaskQueue) SetStatus(statusType QueueStatusType)
- func (q *TaskQueue) SetStatusText(text string)
- func (q *TaskQueue) Start(ctx context.Context)
- func (q *TaskQueue) Stop()
- func (q *TaskQueue) String() string
- type TaskQueueOption
- func WithCompactableTypes(taskTypes ...task.TaskType) TaskQueueOption
- func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) TaskQueueOption
- func WithContext(ctx context.Context) TaskQueueOption
- func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption
- func WithLogger(logger *log.Logger) TaskQueueOption
- type TaskQueueSet
- func (tqs *TaskQueueSet) Add(queue *TaskQueue)
- func (tqs *TaskQueueSet) AddTailTasks(tasks ...task.Task)
- func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue
- func (tqs *TaskQueueSet) GetMain() *TaskQueue
- func (tqs *TaskQueueSet) GetSnapshot() []*TaskQueue
- func (tqs *TaskQueueSet) IterateSnapshot(ctx context.Context, doFn func(ctx context.Context, queue *TaskQueue))
- func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, ...)
- func (tqs *TaskQueueSet) Remove(name string)
- func (tqs *TaskQueueSet) Start(ctx context.Context)
- func (tqs *TaskQueueSet) StartMain(ctx context.Context)
- func (tqs *TaskQueueSet) Stop()
- func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)
- func (tqs *TaskQueueSet) WithContext(ctx context.Context)
- func (tqs *TaskQueueSet) WithLogger(logger *log.Logger) *TaskQueueSet
- func (tqs *TaskQueueSet) WithMainName(name string)
- func (tqs *TaskQueueSet) WithMetricStorage(mstor metricsstorage.Storage) *TaskQueueSet
- type TaskQueueStatus
- func (tqs *TaskQueueStatus) Get() string
- func (tqs *TaskQueueStatus) GetType() QueueStatusType
- func (tqs *TaskQueueStatus) Restore(statusType QueueStatusType, customText string)
- func (tqs *TaskQueueStatus) Set(statusType QueueStatusType)
- func (tqs *TaskQueueStatus) SetText(text string)
- func (tqs *TaskQueueStatus) SetWithText(statusType QueueStatusType, text string)
- func (tqs *TaskQueueStatus) Snapshot() (QueueStatusType, string)
- type TaskResult
- func (res *TaskResult) AddAfterTasks(t ...task.Task)
- func (res *TaskResult) AddHeadTasks(t ...task.Task)
- func (res *TaskResult) AddTailTasks(t ...task.Task)
- func (res *TaskResult) GetAfterTasks() []task.Task
- func (res *TaskResult) GetHeadTasks() []task.Task
- func (res *TaskResult) GetTailTasks() []task.Task
- type TaskStatus
- type TaskStorage
- func (ts *TaskStorage) AddAfter(id string, tasks ...task.Task)
- func (ts *TaskStorage) AddBefore(id string, newTask task.Task)
- func (ts *TaskStorage) AddFirst(tasks ...task.Task)
- func (ts *TaskStorage) AddLast(tasks ...task.Task)
- func (ts *TaskStorage) DeleteFunc(fn func(task.Task) bool)
- func (ts *TaskStorage) Get(id string) task.Task
- func (ts *TaskStorage) GetFirst() task.Task
- func (ts *TaskStorage) GetLast() task.Task
- func (ts *TaskStorage) GetSnapshot() []task.Task
- func (ts *TaskStorage) Iterate(fn func(*list.Element[task.Task]))
- func (ts *TaskStorage) Length() int
- func (ts *TaskStorage) ProcessResult(taskRes TaskResult, t task.Task)
- func (ts *TaskStorage) Remove(id string) task.Task
- func (ts *TaskStorage) RemoveElement(element *list.Element[task.Task]) task.Task
- func (ts *TaskStorage) RemoveFirst() task.Task
- func (ts *TaskStorage) RemoveLast() task.Task
Constants ¶
const MainQueueName = "main"
Variables ¶
var ( DefaultWaitLoopCheckInterval = 125 * time.Millisecond DefaultDelayOnQueueIsEmpty = 250 * time.Millisecond DefaultInitialDelayOnFailedTask = 5 * time.Second DefaultDelayOnRepeat = 25 * time.Millisecond )
Functions ¶
This section is empty.
Types ¶
type QueueStatusType ¶ added in v1.10.5
type QueueStatusType int
QueueStatusType represents the type of queue status
const ( QueueStatusUnknown QueueStatusType = iota QueueStatusIdle QueueStatusNoHandlerSet QueueStatusRunningTask QueueStatusStop QueueStatusRepeatTask QueueStatusSleeping QueueStatusWaiting )
func (QueueStatusType) String ¶ added in v1.10.5
func (qst QueueStatusType) String() string
String returns a human-readable representation of the queue status type
type TaskCounter ¶ added in v1.10.2
type TaskCounter struct {
// contains filtered or unexported fields
}
func NewTaskCounter ¶ added in v1.10.2
func NewTaskCounter(name string, countableTypes map[task.TaskType]struct{}, metricStorage metricsstorage.Storage) *TaskCounter
func (*TaskCounter) Add ¶ added in v1.10.2
func (tc *TaskCounter) Add(task task.Task)
func (*TaskCounter) GetReachedCap ¶ added in v1.10.2
func (tc *TaskCounter) GetReachedCap() map[string]struct{}
func (*TaskCounter) IsAnyCapReached ¶ added in v1.10.2
func (tc *TaskCounter) IsAnyCapReached() bool
func (*TaskCounter) Remove ¶ added in v1.10.2
func (tc *TaskCounter) Remove(task task.Task)
func (*TaskCounter) ResetReachedCap ¶ added in v1.10.2
func (tc *TaskCounter) ResetReachedCap()
type TaskQueue ¶
type TaskQueue struct {
Name string
Handler func(ctx context.Context, t task.Task) TaskResult
// Timing settings.
WaitLoopCheckInterval time.Duration
DelayOnQueueIsEmpty time.Duration
DelayOnRepeat time.Duration
ExponentialBackoffFn func(failureCount int) time.Duration
// contains filtered or unexported fields
}
func NewTasksQueue ¶
func NewTasksQueue(name string, metricStorage metricsstorage.Storage, opts ...TaskQueueOption) *TaskQueue
NewTasksQueue creates a new TaskQueue with the provided options
func (*TaskQueue) AddLast ¶
addLast adds a new tail element. It implements the merging logic for HookRun tasks by scanning the whole queue.
func (*TaskQueue) CancelTaskDelay ¶ added in v1.0.11
func (q *TaskQueue) CancelTaskDelay()
CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.
func (*TaskQueue) DeleteFunc ¶ added in v1.10.5
DeleteFunc runs fn on every task and removes each task for which fn returns false.
func (*TaskQueue) GetSnapshot ¶ added in v1.10.5
GetSnapshot returns a copy of all tasks in the queue. This is useful for external iteration or processing without holding locks.
The returned slice is a snapshot at the time of the call and will not reflect subsequent changes to the queue.
func (*TaskQueue) GetStatusType ¶ added in v1.10.5
func (q *TaskQueue) GetStatusType() QueueStatusType
func (*TaskQueue) IterateSnapshot ¶ added in v1.10.5
IterateSnapshot creates a snapshot of all tasks and iterates over the copy. This is safer than Iterate() when you need to call queue methods inside the callback, as no locks are held during callback execution.
Note: The snapshot may become stale during iteration if tasks are added/removed by other goroutines or by the callback itself.
Use this method when:
- You need to call queue methods inside the callback (Add, Length, Filter, etc.)
- You need to process tasks asynchronously
- Safety is more important than performance
Memory overhead: O(n) where n is the number of tasks in the queue.
func (*TaskQueue) MeasureActionTime ¶
MeasureActionTime is a helper to measure execution time of queue's actions
func (*TaskQueue) RemoveFirst ¶
RemoveFirst deletes a head element, so head is moved.
func (*TaskQueue) RemoveLast ¶
RemoveLast deletes a tail element, so tail is moved.
func (*TaskQueue) SetStatus ¶ added in v1.6.0
func (q *TaskQueue) SetStatus(statusType QueueStatusType)
func (*TaskQueue) SetStatusText ¶ added in v1.10.5
type TaskQueueOption ¶ added in v1.10.2
type TaskQueueOption func(*TaskQueue)
TaskQueueOption defines a functional option for TaskQueue configuration
func WithCompactableTypes ¶ added in v1.10.1
func WithCompactableTypes(taskTypes ...task.TaskType) TaskQueueOption
WithCompactableTypes sets the compactable task types for the TaskQueue
func WithCompactionCallback ¶ added in v1.10.1
func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) TaskQueueOption
func WithContext ¶ added in v1.10.2
func WithContext(ctx context.Context) TaskQueueOption
WithContext sets the context for the TaskQueue
func WithHandler ¶ added in v1.10.2
func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption
WithHandler sets the task handler for the TaskQueue
func WithLogger ¶ added in v1.10.1
func WithLogger(logger *log.Logger) TaskQueueOption
type TaskQueueSet ¶
type TaskQueueSet struct {
MainName string
Queues *queueStorage
// contains filtered or unexported fields
}
TaskQueueSet is a manager for a set of named queues
func NewTaskQueueSet ¶
func NewTaskQueueSet() *TaskQueueSet
func (*TaskQueueSet) Add ¶
func (tqs *TaskQueueSet) Add(queue *TaskQueue)
Add register a new queue for TaskQueueSet.
func (*TaskQueueSet) AddTailTasks ¶ added in v1.10.7
func (tqs *TaskQueueSet) AddTailTasks(tasks ...task.Task)
func (*TaskQueueSet) GetByName ¶
func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue
func (*TaskQueueSet) GetMain ¶
func (tqs *TaskQueueSet) GetMain() *TaskQueue
func (*TaskQueueSet) GetSnapshot ¶ added in v1.10.7
func (tqs *TaskQueueSet) GetSnapshot() []*TaskQueue
GetSnapshot returns a snapshot of all queues at the time of the call. This is useful for iteration when you need to call methods on the queues that might acquire locks, preventing deadlocks.
The returned slice is a snapshot and will not reflect subsequent changes. The main queue (tqs.MainName) is always placed first in the list.
func (*TaskQueueSet) IterateSnapshot ¶ added in v1.10.7
func (tqs *TaskQueueSet) IterateSnapshot(ctx context.Context, doFn func(ctx context.Context, queue *TaskQueue))
IterateSnapshot creates a snapshot of all queues and iterates over the copy. This is safer than Iterate() when you need to call queue methods inside the callback, as no locks are held during callback execution.
Note: The snapshot may become stale during iteration if queues are added/removed by other goroutines or by the callback itself.
Use this method when:
- You need to call queue methods inside the callback (Start, Stop, etc.)
- You need to process queues asynchronously
- Safety is more important than performance
Memory overhead: O(n) where n is the number of queues.
func (*TaskQueueSet) NewNamedQueue ¶
func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, opts ...TaskQueueOption)
func (*TaskQueueSet) Remove ¶
func (tqs *TaskQueueSet) Remove(name string)
func (*TaskQueueSet) Start ¶
func (tqs *TaskQueueSet) Start(ctx context.Context)
func (*TaskQueueSet) StartMain ¶
func (tqs *TaskQueueSet) StartMain(ctx context.Context)
func (*TaskQueueSet) Stop ¶
func (tqs *TaskQueueSet) Stop()
func (*TaskQueueSet) WaitStopWithTimeout ¶
func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)
func (*TaskQueueSet) WithContext ¶
func (tqs *TaskQueueSet) WithContext(ctx context.Context)
func (*TaskQueueSet) WithLogger ¶ added in v1.10.7
func (tqs *TaskQueueSet) WithLogger(logger *log.Logger) *TaskQueueSet
func (*TaskQueueSet) WithMainName ¶
func (tqs *TaskQueueSet) WithMainName(name string)
func (*TaskQueueSet) WithMetricStorage ¶
func (tqs *TaskQueueSet) WithMetricStorage(mstor metricsstorage.Storage) *TaskQueueSet
type TaskQueueStatus ¶ added in v1.10.5
type TaskQueueStatus struct {
// contains filtered or unexported fields
}
TaskQueueStatus encapsulates queue status with thread-safe access
func NewTaskQueueStatus ¶ added in v1.10.5
func NewTaskQueueStatus() *TaskQueueStatus
NewTaskQueueStatus creates a new TaskQueueStatus
func (*TaskQueueStatus) Get ¶ added in v1.10.5
func (tqs *TaskQueueStatus) Get() string
Get returns the current status as a string
func (*TaskQueueStatus) GetType ¶ added in v1.10.5
func (tqs *TaskQueueStatus) GetType() QueueStatusType
GetType returns the current status type
func (*TaskQueueStatus) Restore ¶ added in v1.10.5
func (tqs *TaskQueueStatus) Restore(statusType QueueStatusType, customText string)
Restore sets both the status type and custom text atomically
func (*TaskQueueStatus) Set ¶ added in v1.10.5
func (tqs *TaskQueueStatus) Set(statusType QueueStatusType)
Set sets the status to a specific type and clears custom text
func (*TaskQueueStatus) SetText ¶ added in v1.10.5
func (tqs *TaskQueueStatus) SetText(text string)
SetText sets a custom status text
func (*TaskQueueStatus) SetWithText ¶ added in v1.10.5
func (tqs *TaskQueueStatus) SetWithText(statusType QueueStatusType, text string)
SetWithText sets the status type with custom text
func (*TaskQueueStatus) Snapshot ¶ added in v1.10.5
func (tqs *TaskQueueStatus) Snapshot() (QueueStatusType, string)
Snapshot returns both the status type and custom text atomically
type TaskResult ¶
type TaskResult struct {
Status TaskStatus
DelayBeforeNextTask time.Duration
AfterHandle func()
// contains filtered or unexported fields
}
func (*TaskResult) AddAfterTasks ¶ added in v1.10.1
func (res *TaskResult) AddAfterTasks(t ...task.Task)
func (*TaskResult) AddHeadTasks ¶ added in v1.10.1
func (res *TaskResult) AddHeadTasks(t ...task.Task)
func (*TaskResult) AddTailTasks ¶ added in v1.10.1
func (res *TaskResult) AddTailTasks(t ...task.Task)
func (*TaskResult) GetAfterTasks ¶ added in v1.10.1
func (res *TaskResult) GetAfterTasks() []task.Task
func (*TaskResult) GetHeadTasks ¶ added in v1.10.1
func (res *TaskResult) GetHeadTasks() []task.Task
func (*TaskResult) GetTailTasks ¶ added in v1.10.1
func (res *TaskResult) GetTailTasks() []task.Task
type TaskStatus ¶ added in v1.0.11
type TaskStatus string
const ( Success TaskStatus = "Success" Fail TaskStatus = "Fail" Repeat TaskStatus = "Repeat" Keep TaskStatus = "Keep" )
type TaskStorage ¶ added in v1.10.7
type TaskStorage struct {
// contains filtered or unexported fields
}
TaskStorage encapsulates the queue data structures with their own mutex
func (*TaskStorage) AddAfter ¶ added in v1.10.7
func (ts *TaskStorage) AddAfter(id string, tasks ...task.Task)
AddAfter inserts tasks after the task with the specified ID
func (*TaskStorage) AddBefore ¶ added in v1.10.7
func (ts *TaskStorage) AddBefore(id string, newTask task.Task)
AddBefore inserts a task before the task with the specified ID
func (*TaskStorage) AddFirst ¶ added in v1.10.7
func (ts *TaskStorage) AddFirst(tasks ...task.Task)
AddFirst adds tasks to the front of the queue
func (*TaskStorage) AddLast ¶ added in v1.10.7
func (ts *TaskStorage) AddLast(tasks ...task.Task)
AddLast adds tasks to the back of the queue
func (*TaskStorage) DeleteFunc ¶ added in v1.10.7
func (ts *TaskStorage) DeleteFunc(fn func(task.Task) bool)
DeleteFunc runs fn on every task and removes each task for which fn returns false
func (*TaskStorage) Get ¶ added in v1.10.7
func (ts *TaskStorage) Get(id string) task.Task
Get returns a task by ID
func (*TaskStorage) GetFirst ¶ added in v1.10.7
func (ts *TaskStorage) GetFirst() task.Task
GetFirst returns the first task without removing it
func (*TaskStorage) GetLast ¶ added in v1.10.7
func (ts *TaskStorage) GetLast() task.Task
GetLast returns the last task in the queue
func (*TaskStorage) GetSnapshot ¶ added in v1.10.7
func (ts *TaskStorage) GetSnapshot() []task.Task
GetSnapshot returns a copy of all tasks in the queue
func (*TaskStorage) Iterate ¶ added in v1.10.7
func (ts *TaskStorage) Iterate(fn func(*list.Element[task.Task]))
Iterate iterates over all tasks in the queue with a callback
func (*TaskStorage) Length ¶ added in v1.10.7
func (ts *TaskStorage) Length() int
Length returns the number of items in the queue
func (*TaskStorage) ProcessResult ¶ added in v1.10.7
func (ts *TaskStorage) ProcessResult(taskRes TaskResult, t task.Task)
func (*TaskStorage) Remove ¶ added in v1.10.7
func (ts *TaskStorage) Remove(id string) task.Task
Remove removes and returns a task by ID
func (*TaskStorage) RemoveElement ¶ added in v1.10.7
RemoveElement removes an element from the list and updates the index
func (*TaskStorage) RemoveFirst ¶ added in v1.10.7
func (ts *TaskStorage) RemoveFirst() task.Task
RemoveFirst removes and returns the first task
func (*TaskStorage) RemoveLast ¶ added in v1.10.7
func (ts *TaskStorage) RemoveLast() task.Task
RemoveLast removes and returns the last task