Documentation
¶
Overview ¶
!DEPRECATED
Index ¶
- Constants
- Variables
- type QueueStatusType
- type TaskCounter
- type TaskQueue
- func (q *TaskQueue) AddAfter(id string, tasks ...task.Task)
- func (q *TaskQueue) AddBefore(id string, newTask task.Task)
- func (q *TaskQueue) AddFirst(tasks ...task.Task)
- func (q *TaskQueue) AddLast(tasks ...task.Task)
- func (q *TaskQueue) CancelTaskDelay()
- func (q *TaskQueue) DeleteFunc(fn func(task.Task) bool)
- func (q *TaskQueue) Get(id string) task.Task
- func (q *TaskQueue) GetFirst() task.Task
- func (q *TaskQueue) GetLast() task.Task
- func (q *TaskQueue) GetSnapshot() []task.Task
- func (q *TaskQueue) GetStatus() string
- func (q *TaskQueue) GetStatusType() QueueStatusType
- func (q *TaskQueue) IsEmpty() bool
- func (q *TaskQueue) IterateSnapshot(doFn func(task.Task))
- func (q *TaskQueue) Length() int
- func (q *TaskQueue) MeasureActionTime(action string) func()
- func (q *TaskQueue) Remove(id string) task.Task
- func (q *TaskQueue) RemoveFirst() task.Task
- func (q *TaskQueue) RemoveLast() task.Task
- func (q *TaskQueue) SetStatus(statusType QueueStatusType)
- func (q *TaskQueue) SetStatusText(text string)
- func (q *TaskQueue) Start(ctx context.Context)
- func (q *TaskQueue) Stop()
- func (q *TaskQueue) String() string
- type TaskQueueOption
- func WithCompactableTypes(taskTypes ...task.TaskType) TaskQueueOption
- func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) TaskQueueOption
- func WithContext(ctx context.Context) TaskQueueOption
- func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption
- func WithLogger(logger *log.Logger) TaskQueueOption
- func WithName(name string) TaskQueueOption
- type TaskQueueSet
- func (tqs *TaskQueueSet) Add(queue *TaskQueue)
- func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))
- func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue
- func (tqs *TaskQueueSet) GetMain() *TaskQueue
- func (tqs *TaskQueueSet) Iterate(ctx context.Context, doFn func(ctx context.Context, queue *TaskQueue))
- func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, ...)
- func (tqs *TaskQueueSet) Remove(name string)
- func (tqs *TaskQueueSet) Start(ctx context.Context)
- func (tqs *TaskQueueSet) StartMain(ctx context.Context)
- func (tqs *TaskQueueSet) Stop()
- func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)
- func (tqs *TaskQueueSet) WithContext(ctx context.Context)
- func (tqs *TaskQueueSet) WithMainName(name string)
- func (tqs *TaskQueueSet) WithMetricStorage(mstor metric.Storage) *TaskQueueSet
- type TaskQueueSlice
- func (q *TaskQueueSlice) AddAfter(id string, newTask task.Task)
- func (q *TaskQueueSlice) AddBefore(id string, newTask task.Task)
- func (q *TaskQueueSlice) AddFirst(tasks ...task.Task)
- func (q *TaskQueueSlice) AddLast(tasks ...task.Task)
- func (q *TaskQueueSlice) CancelTaskDelay()
- func (q *TaskQueueSlice) Filter(filterFn func(task.Task) bool)
- func (q *TaskQueueSlice) Get(id string) task.Task
- func (q *TaskQueueSlice) GetFirst() task.Task
- func (q *TaskQueueSlice) GetLast() task.Task
- func (q *TaskQueueSlice) GetStatus() string
- func (q *TaskQueueSlice) IsEmpty() bool
- func (q *TaskQueueSlice) Iterate(doFn func(task.Task))
- func (q *TaskQueueSlice) Length() int
- func (q *TaskQueueSlice) MeasureActionTime(action string) func()
- func (q *TaskQueueSlice) Remove(id string) task.Task
- func (q *TaskQueueSlice) RemoveFirst() task.Task
- func (q *TaskQueueSlice) RemoveLast() task.Task
- func (q *TaskQueueSlice) SetDebug(debug bool)
- func (q *TaskQueueSlice) SetStatus(status string)
- func (q *TaskQueueSlice) Start(ctx context.Context)
- func (q *TaskQueueSlice) Stop()
- func (q *TaskQueueSlice) String() string
- func (q *TaskQueueSlice) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueueSlice
- func (q *TaskQueueSlice) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueueSlice
- func (q *TaskQueueSlice) WithContext(ctx context.Context)
- func (q *TaskQueueSlice) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueueSlice
- func (q *TaskQueueSlice) WithLogger(logger *log.Logger)
- func (q *TaskQueueSlice) WithMetricStorage(mstor metric.Storage) *TaskQueueSlice
- func (q *TaskQueueSlice) WithName(name string) *TaskQueueSlice
- type TaskQueueStatus
- func (tqs *TaskQueueStatus) Get() string
- func (tqs *TaskQueueStatus) GetType() QueueStatusType
- func (tqs *TaskQueueStatus) Restore(statusType QueueStatusType, customText string)
- func (tqs *TaskQueueStatus) Set(statusType QueueStatusType)
- func (tqs *TaskQueueStatus) SetText(text string)
- func (tqs *TaskQueueStatus) SetWithText(statusType QueueStatusType, text string)
- func (tqs *TaskQueueStatus) Snapshot() (QueueStatusType, string)
- type TaskResult
- func (res *TaskResult) AddAfterTasks(t ...task.Task)
- func (res *TaskResult) AddHeadTasks(t ...task.Task)
- func (res *TaskResult) AddTailTasks(t ...task.Task)
- func (res *TaskResult) GetAfterTasks() []task.Task
- func (res *TaskResult) GetHeadTasks() []task.Task
- func (res *TaskResult) GetTailTasks() []task.Task
- type TaskStatus
Constants ¶
const MainQueueName = "main"
Variables ¶
var ( DefaultWaitLoopCheckInterval = 125 * time.Millisecond DefaultDelayOnQueueIsEmpty = 250 * time.Millisecond DefaultInitialDelayOnFailedTask = 5 * time.Second DefaultDelayOnRepeat = 25 * time.Millisecond )
Functions ¶
This section is empty.
Types ¶
type QueueStatusType ¶ added in v1.10.6
type QueueStatusType int
QueueStatusType represents the type of queue status
const ( QueueStatusUnknown QueueStatusType = iota QueueStatusIdle QueueStatusNoHandlerSet QueueStatusRunningTask QueueStatusStop QueueStatusRepeatTask QueueStatusSleeping QueueStatusWaiting )
func (QueueStatusType) String ¶ added in v1.10.6
func (qst QueueStatusType) String() string
String returns a human-readable representation of the queue status type
type TaskCounter ¶ added in v1.10.2
type TaskCounter struct {
// contains filtered or unexported fields
}
func NewTaskCounter ¶ added in v1.10.2
func (*TaskCounter) Add ¶ added in v1.10.2
func (tc *TaskCounter) Add(task task.Task)
func (*TaskCounter) GetReachedCap ¶ added in v1.10.2
func (tc *TaskCounter) GetReachedCap() map[string]struct{}
func (*TaskCounter) IsAnyCapReached ¶ added in v1.10.2
func (tc *TaskCounter) IsAnyCapReached() bool
func (*TaskCounter) Remove ¶ added in v1.10.2
func (tc *TaskCounter) Remove(task task.Task)
func (*TaskCounter) ResetReachedCap ¶ added in v1.10.2
func (tc *TaskCounter) ResetReachedCap()
type TaskQueue ¶
type TaskQueue struct {
Name string
Handler func(ctx context.Context, t task.Task) TaskResult
// Timing settings.
WaitLoopCheckInterval time.Duration
DelayOnQueueIsEmpty time.Duration
DelayOnRepeat time.Duration
ExponentialBackoffFn func(failureCount int) time.Duration
// contains filtered or unexported fields
}
func NewTasksQueue ¶
func NewTasksQueue(metricStorage metric.Storage, opts ...TaskQueueOption) *TaskQueue
NewTasksQueue creates a new TaskQueue with the provided options
func (*TaskQueue) CancelTaskDelay ¶ added in v1.0.11
func (q *TaskQueue) CancelTaskDelay()
CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.
func (*TaskQueue) DeleteFunc ¶ added in v1.10.6
DeleteFunc runs fn on every task and removes each task for which fn returns false.
func (*TaskQueue) GetSnapshot ¶ added in v1.10.6
GetSnapshot returns a copy of all tasks in the queue. This is useful for external iteration or processing without holding locks.
The returned slice is a snapshot at the time of the call and will not reflect subsequent changes to the queue.
func (*TaskQueue) GetStatusType ¶ added in v1.10.6
func (q *TaskQueue) GetStatusType() QueueStatusType
func (*TaskQueue) IterateSnapshot ¶ added in v1.10.6
IterateSnapshot creates a snapshot of all tasks and iterates over the copy. This is safer than Iterate() when you need to call queue methods inside the callback, as no locks are held during callback execution.
Note: The snapshot may become stale during iteration if tasks are added/removed by other goroutines or by the callback itself.
Use this method when:
- You need to call queue methods inside the callback (Add, Length, Filter, etc.)
- You need to process tasks asynchronously
- Safety is more important than performance
Memory overhead: O(n) where n is the number of tasks in the queue.
func (*TaskQueue) MeasureActionTime ¶
MeasureActionTime is a helper to measure execution time of queue's actions
func (*TaskQueue) RemoveFirst ¶
RemoveFirst deletes a head element, so head is moved.
func (*TaskQueue) RemoveLast ¶
RemoveLast deletes a tail element, so tail is moved.
func (*TaskQueue) SetStatus ¶ added in v1.6.0
func (q *TaskQueue) SetStatus(statusType QueueStatusType)
func (*TaskQueue) SetStatusText ¶ added in v1.10.6
type TaskQueueOption ¶ added in v1.10.2
type TaskQueueOption func(*TaskQueue)
TaskQueueOption defines a functional option for TaskQueue configuration
func WithCompactableTypes ¶ added in v1.10.1
func WithCompactableTypes(taskTypes ...task.TaskType) TaskQueueOption
WithCompactableTypes sets the compactable task types for the TaskQueue
func WithCompactionCallback ¶ added in v1.10.1
func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) TaskQueueOption
func WithContext ¶ added in v1.10.2
func WithContext(ctx context.Context) TaskQueueOption
WithContext sets the context for the TaskQueue
func WithHandler ¶ added in v1.10.2
func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption
WithHandler sets the task handler for the TaskQueue
func WithLogger ¶ added in v1.10.1
func WithLogger(logger *log.Logger) TaskQueueOption
func WithName ¶ added in v1.10.2
func WithName(name string) TaskQueueOption
WithName sets the name for the TaskQueue
type TaskQueueSet ¶
type TaskQueueSet struct {
MainName string
Queues *queueStorage
// contains filtered or unexported fields
}
TaskQueueSet is a manager for a set of named queues
func NewTaskQueueSet ¶
func NewTaskQueueSet() *TaskQueueSet
func (*TaskQueueSet) Add ¶
func (tqs *TaskQueueSet) Add(queue *TaskQueue)
Add register a new queue for TaskQueueSet.
func (*TaskQueueSet) DoWithLock ¶
func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))
func (*TaskQueueSet) GetByName ¶
func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue
func (*TaskQueueSet) GetMain ¶
func (tqs *TaskQueueSet) GetMain() *TaskQueue
func (*TaskQueueSet) Iterate ¶
func (tqs *TaskQueueSet) Iterate(ctx context.Context, doFn func(ctx context.Context, queue *TaskQueue))
Iterate run doFn for every task.
func (*TaskQueueSet) NewNamedQueue ¶
func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, opts ...TaskQueueOption)
func (*TaskQueueSet) Remove ¶
func (tqs *TaskQueueSet) Remove(name string)
func (*TaskQueueSet) Start ¶
func (tqs *TaskQueueSet) Start(ctx context.Context)
func (*TaskQueueSet) StartMain ¶
func (tqs *TaskQueueSet) StartMain(ctx context.Context)
func (*TaskQueueSet) Stop ¶
func (tqs *TaskQueueSet) Stop()
func (*TaskQueueSet) WaitStopWithTimeout ¶
func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)
func (*TaskQueueSet) WithContext ¶
func (tqs *TaskQueueSet) WithContext(ctx context.Context)
func (*TaskQueueSet) WithMainName ¶
func (tqs *TaskQueueSet) WithMainName(name string)
func (*TaskQueueSet) WithMetricStorage ¶
func (tqs *TaskQueueSet) WithMetricStorage(mstor metric.Storage) *TaskQueueSet
type TaskQueueSlice ¶ added in v1.10.0
type TaskQueueSlice struct {
CompactableTypes map[task.TaskType]struct{}
Name string
Handler func(ctx context.Context, t task.Task) TaskResult
Status string
// Callback for task compaction events
CompactionCallback func(compactedTasks []task.Task, targetTask task.Task)
// Timing settings.
WaitLoopCheckInterval time.Duration
DelayOnQueueIsEmpty time.Duration
DelayOnRepeat time.Duration
ExponentialBackoffFn func(failureCount int) time.Duration
// contains filtered or unexported fields
}
func NewTasksQueueSlice ¶ added in v1.10.0
func NewTasksQueueSlice() *TaskQueueSlice
func (*TaskQueueSlice) AddAfter ¶ added in v1.10.0
func (q *TaskQueueSlice) AddAfter(id string, newTask task.Task)
AddAfter inserts a task after the task with specified id.
func (*TaskQueueSlice) AddBefore ¶ added in v1.10.0
func (q *TaskQueueSlice) AddBefore(id string, newTask task.Task)
AddBefore inserts a task before the task with specified id.
func (*TaskQueueSlice) AddFirst ¶ added in v1.10.0
func (q *TaskQueueSlice) AddFirst(tasks ...task.Task)
AddFirst adds new head element.
func (*TaskQueueSlice) AddLast ¶ added in v1.10.0
func (q *TaskQueueSlice) AddLast(tasks ...task.Task)
AddLast adds new tail element.
func (*TaskQueueSlice) CancelTaskDelay ¶ added in v1.10.0
func (q *TaskQueueSlice) CancelTaskDelay()
CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.
func (*TaskQueueSlice) Filter ¶ added in v1.10.0
func (q *TaskQueueSlice) Filter(filterFn func(task.Task) bool)
Filter run filterFn on every task and remove each with false result.
func (*TaskQueueSlice) Get ¶ added in v1.10.0
func (q *TaskQueueSlice) Get(id string) task.Task
Get returns a task by id.
func (*TaskQueueSlice) GetFirst ¶ added in v1.10.0
func (q *TaskQueueSlice) GetFirst() task.Task
GetFirst returns a head element.
func (*TaskQueueSlice) GetLast ¶ added in v1.10.0
func (q *TaskQueueSlice) GetLast() task.Task
GetLast returns a tail element.
func (*TaskQueueSlice) GetStatus ¶ added in v1.10.0
func (q *TaskQueueSlice) GetStatus() string
func (*TaskQueueSlice) IsEmpty ¶ added in v1.10.0
func (q *TaskQueueSlice) IsEmpty() bool
func (*TaskQueueSlice) Iterate ¶ added in v1.10.0
func (q *TaskQueueSlice) Iterate(doFn func(task.Task))
Iterate run doFn for every task.
func (*TaskQueueSlice) Length ¶ added in v1.10.0
func (q *TaskQueueSlice) Length() int
func (*TaskQueueSlice) MeasureActionTime ¶ added in v1.10.0
func (q *TaskQueueSlice) MeasureActionTime(action string) func()
MeasureActionTime is a helper to measure execution time of queue's actions
func (*TaskQueueSlice) Remove ¶ added in v1.10.0
func (q *TaskQueueSlice) Remove(id string) task.Task
Remove finds element by id and deletes it.
func (*TaskQueueSlice) RemoveFirst ¶ added in v1.10.0
func (q *TaskQueueSlice) RemoveFirst() task.Task
RemoveFirst deletes a head element, so head is moved.
func (*TaskQueueSlice) RemoveLast ¶ added in v1.10.0
func (q *TaskQueueSlice) RemoveLast() task.Task
RemoveLast deletes a tail element, so tail is moved.
func (*TaskQueueSlice) SetDebug ¶ added in v1.10.0
func (q *TaskQueueSlice) SetDebug(debug bool)
func (*TaskQueueSlice) SetStatus ¶ added in v1.10.0
func (q *TaskQueueSlice) SetStatus(status string)
func (*TaskQueueSlice) Start ¶ added in v1.10.0
func (q *TaskQueueSlice) Start(ctx context.Context)
func (*TaskQueueSlice) Stop ¶ added in v1.10.0
func (q *TaskQueueSlice) Stop()
func (*TaskQueueSlice) String ¶ added in v1.10.0
func (q *TaskQueueSlice) String() string
Dump tasks in queue to one line
func (*TaskQueueSlice) WithCompactableTypes ¶ added in v1.10.0
func (q *TaskQueueSlice) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueueSlice
func (*TaskQueueSlice) WithCompactionCallback ¶ added in v1.10.0
func (q *TaskQueueSlice) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueueSlice
func (*TaskQueueSlice) WithContext ¶ added in v1.10.0
func (q *TaskQueueSlice) WithContext(ctx context.Context)
func (*TaskQueueSlice) WithHandler ¶ added in v1.10.0
func (q *TaskQueueSlice) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueueSlice
func (*TaskQueueSlice) WithLogger ¶ added in v1.10.0
func (q *TaskQueueSlice) WithLogger(logger *log.Logger)
func (*TaskQueueSlice) WithMetricStorage ¶ added in v1.10.0
func (q *TaskQueueSlice) WithMetricStorage(mstor metric.Storage) *TaskQueueSlice
func (*TaskQueueSlice) WithName ¶ added in v1.10.0
func (q *TaskQueueSlice) WithName(name string) *TaskQueueSlice
type TaskQueueStatus ¶ added in v1.10.6
type TaskQueueStatus struct {
// contains filtered or unexported fields
}
TaskQueueStatus encapsulates queue status with thread-safe access
func NewTaskQueueStatus ¶ added in v1.10.6
func NewTaskQueueStatus() *TaskQueueStatus
NewTaskQueueStatus creates a new TaskQueueStatus
func (*TaskQueueStatus) Get ¶ added in v1.10.6
func (tqs *TaskQueueStatus) Get() string
Get returns the current status as a string
func (*TaskQueueStatus) GetType ¶ added in v1.10.6
func (tqs *TaskQueueStatus) GetType() QueueStatusType
GetType returns the current status type
func (*TaskQueueStatus) Restore ¶ added in v1.10.6
func (tqs *TaskQueueStatus) Restore(statusType QueueStatusType, customText string)
Restore sets both the status type and custom text atomically
func (*TaskQueueStatus) Set ¶ added in v1.10.6
func (tqs *TaskQueueStatus) Set(statusType QueueStatusType)
Set sets the status to a specific type and clears custom text
func (*TaskQueueStatus) SetText ¶ added in v1.10.6
func (tqs *TaskQueueStatus) SetText(text string)
SetText sets a custom status text
func (*TaskQueueStatus) SetWithText ¶ added in v1.10.6
func (tqs *TaskQueueStatus) SetWithText(statusType QueueStatusType, text string)
SetWithText sets the status type with custom text
func (*TaskQueueStatus) Snapshot ¶ added in v1.10.6
func (tqs *TaskQueueStatus) Snapshot() (QueueStatusType, string)
Snapshot returns both the status type and custom text atomically
type TaskResult ¶
type TaskResult struct {
Status TaskStatus
DelayBeforeNextTask time.Duration
AfterHandle func()
// contains filtered or unexported fields
}
func (*TaskResult) AddAfterTasks ¶ added in v1.10.1
func (res *TaskResult) AddAfterTasks(t ...task.Task)
func (*TaskResult) AddHeadTasks ¶ added in v1.10.1
func (res *TaskResult) AddHeadTasks(t ...task.Task)
func (*TaskResult) AddTailTasks ¶ added in v1.10.1
func (res *TaskResult) AddTailTasks(t ...task.Task)
func (*TaskResult) GetAfterTasks ¶ added in v1.10.1
func (res *TaskResult) GetAfterTasks() []task.Task
func (*TaskResult) GetHeadTasks ¶ added in v1.10.1
func (res *TaskResult) GetHeadTasks() []task.Task
func (*TaskResult) GetTailTasks ¶ added in v1.10.1
func (res *TaskResult) GetTailTasks() []task.Task
type TaskStatus ¶ added in v1.0.11
type TaskStatus string
const ( Success TaskStatus = "Success" Fail TaskStatus = "Fail" Repeat TaskStatus = "Repeat" Keep TaskStatus = "Keep" )