Documentation
¶
Overview ¶
!DEPRECATED
Index ¶
- Constants
- Variables
- type QueueOpts
- type TaskQueue
- func (q *TaskQueue) AddAfter(id string, newTask task.Task)
- func (q *TaskQueue) AddBefore(id string, newTask task.Task)
- func (q *TaskQueue) AddFirst(t task.Task)
- func (q *TaskQueue) AddLast(t task.Task)
- func (q *TaskQueue) CancelTaskDelay()
- func (q *TaskQueue) Filter(filterFn func(task.Task) bool)
- func (q *TaskQueue) Get(id string) task.Task
- func (q *TaskQueue) GetFirst() task.Task
- func (q *TaskQueue) GetLast() task.Task
- func (q *TaskQueue) GetStatus() string
- func (q *TaskQueue) IsEmpty() bool
- func (q *TaskQueue) Iterate(doFn func(task.Task))
- func (q *TaskQueue) Length() int
- func (q *TaskQueue) MeasureActionTime(action string) func()
- func (q *TaskQueue) Remove(id string) task.Task
- func (q *TaskQueue) RemoveFirst() task.Task
- func (q *TaskQueue) RemoveLast() task.Task
- func (q *TaskQueue) SetStatus(status string)
- func (q *TaskQueue) Start(ctx context.Context)
- func (q *TaskQueue) Stop()
- func (q *TaskQueue) String() string
- func (q *TaskQueue) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueue
- func (q *TaskQueue) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueue
- func (q *TaskQueue) WithContext(ctx context.Context)
- func (q *TaskQueue) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueue
- func (q *TaskQueue) WithLogger(logger *log.Logger) *TaskQueue
- func (q *TaskQueue) WithMetricStorage(mstor metric.Storage) *TaskQueue
- func (q *TaskQueue) WithName(name string) *TaskQueue
- type TaskQueueSet
- func (tqs *TaskQueueSet) Add(queue *TaskQueue)
- func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))
- func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue
- func (tqs *TaskQueueSet) GetMain() *TaskQueue
- func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue))
- func (tqs *TaskQueueSet) NewNamedQueue(name string, opts QueueOpts)
- func (tqs *TaskQueueSet) Remove(name string)
- func (tqs *TaskQueueSet) Start(ctx context.Context)
- func (tqs *TaskQueueSet) StartMain(ctx context.Context)
- func (tqs *TaskQueueSet) Stop()
- func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)
- func (tqs *TaskQueueSet) WithContext(ctx context.Context)
- func (tqs *TaskQueueSet) WithMainName(name string)
- func (tqs *TaskQueueSet) WithMetricStorage(mstor metric.Storage) *TaskQueueSet
- type TaskQueueSlice
- func (q *TaskQueueSlice) AddAfter(id string, newTask task.Task)
- func (q *TaskQueueSlice) AddBefore(id string, newTask task.Task)
- func (q *TaskQueueSlice) AddFirst(t task.Task)
- func (q *TaskQueueSlice) AddLast(t task.Task)
- func (q *TaskQueueSlice) CancelTaskDelay()
- func (q *TaskQueueSlice) Filter(filterFn func(task.Task) bool)
- func (q *TaskQueueSlice) Get(id string) task.Task
- func (q *TaskQueueSlice) GetFirst() task.Task
- func (q *TaskQueueSlice) GetLast() task.Task
- func (q *TaskQueueSlice) GetStatus() string
- func (q *TaskQueueSlice) IsEmpty() bool
- func (q *TaskQueueSlice) Iterate(doFn func(task.Task))
- func (q *TaskQueueSlice) Length() int
- func (q *TaskQueueSlice) MeasureActionTime(action string) func()
- func (q *TaskQueueSlice) Remove(id string) task.Task
- func (q *TaskQueueSlice) RemoveFirst() task.Task
- func (q *TaskQueueSlice) RemoveLast() task.Task
- func (q *TaskQueueSlice) SetDebug(debug bool)
- func (q *TaskQueueSlice) SetStatus(status string)
- func (q *TaskQueueSlice) Start(ctx context.Context)
- func (q *TaskQueueSlice) Stop()
- func (q *TaskQueueSlice) String() string
- func (q *TaskQueueSlice) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueueSlice
- func (q *TaskQueueSlice) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueueSlice
- func (q *TaskQueueSlice) WithContext(ctx context.Context)
- func (q *TaskQueueSlice) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueueSlice
- func (q *TaskQueueSlice) WithLogger(logger *log.Logger)
- func (q *TaskQueueSlice) WithMetricStorage(mstor metric.Storage) *TaskQueueSlice
- func (q *TaskQueueSlice) WithName(name string) *TaskQueueSlice
- type TaskResult
- type TaskStatus
Constants ¶
const MainQueueName = "main"
Variables ¶
var ( DefaultWaitLoopCheckInterval = 125 * time.Millisecond DefaultDelayOnQueueIsEmpty = 250 * time.Millisecond DefaultInitialDelayOnFailedTask = 5 * time.Second DefaultDelayOnRepeat = 25 * time.Millisecond )
Functions ¶
This section is empty.
Types ¶
type TaskQueue ¶
type TaskQueue struct {
Name string
Handler func(ctx context.Context, t task.Task) TaskResult
Status string
// Timing settings.
WaitLoopCheckInterval time.Duration
DelayOnQueueIsEmpty time.Duration
DelayOnRepeat time.Duration
ExponentialBackoffFn func(failureCount int) time.Duration
// Compaction
CompactionCallback func(compactedTasks []task.Task, targetTask task.Task)
// contains filtered or unexported fields
}
func NewTasksQueue ¶
func NewTasksQueue() *TaskQueue
func (*TaskQueue) CancelTaskDelay ¶ added in v1.0.11
func (q *TaskQueue) CancelTaskDelay()
CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.
func (*TaskQueue) MeasureActionTime ¶
MeasureActionTime is a helper to measure execution time of queue's actions
func (*TaskQueue) RemoveFirst ¶
RemoveFirst deletes a head element, so head is moved.
func (*TaskQueue) RemoveLast ¶
RemoveLast deletes a tail element, so tail is moved.
func (*TaskQueue) WithCompactableTypes ¶ added in v1.10.0
func (*TaskQueue) WithCompactionCallback ¶ added in v1.10.0
func (*TaskQueue) WithContext ¶
func (*TaskQueue) WithHandler ¶
func (*TaskQueue) WithLogger ¶ added in v1.10.0
func (*TaskQueue) WithMetricStorage ¶
type TaskQueueSet ¶
type TaskQueueSet struct {
MainName string
Queues map[string]*TaskQueue
// contains filtered or unexported fields
}
TaskQueueSet is a manager for a set of named queues
func NewTaskQueueSet ¶
func NewTaskQueueSet() *TaskQueueSet
func (*TaskQueueSet) Add ¶
func (tqs *TaskQueueSet) Add(queue *TaskQueue)
func (*TaskQueueSet) DoWithLock ¶
func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))
taskQueueSet.DoWithLock(func(tqs *TaskQueueSet){
tqs.GetMain().Pop()
})
func (*TaskQueueSet) GetByName ¶
func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue
func (*TaskQueueSet) GetMain ¶
func (tqs *TaskQueueSet) GetMain() *TaskQueue
func (*TaskQueueSet) Iterate ¶
func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue))
Iterate run doFn for every task.
func (*TaskQueueSet) NewNamedQueue ¶
func (tqs *TaskQueueSet) NewNamedQueue(name string, opts QueueOpts)
func (*TaskQueueSet) Remove ¶
func (tqs *TaskQueueSet) Remove(name string)
func (*TaskQueueSet) Start ¶
func (tqs *TaskQueueSet) Start(ctx context.Context)
func (*TaskQueueSet) StartMain ¶
func (tqs *TaskQueueSet) StartMain(ctx context.Context)
func (*TaskQueueSet) Stop ¶
func (tqs *TaskQueueSet) Stop()
func (*TaskQueueSet) WaitStopWithTimeout ¶
func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)
func (*TaskQueueSet) WithContext ¶
func (tqs *TaskQueueSet) WithContext(ctx context.Context)
func (*TaskQueueSet) WithMainName ¶
func (tqs *TaskQueueSet) WithMainName(name string)
func (*TaskQueueSet) WithMetricStorage ¶
func (tqs *TaskQueueSet) WithMetricStorage(mstor metric.Storage) *TaskQueueSet
type TaskQueueSlice ¶ added in v1.10.0
type TaskQueueSlice struct {
CompactableTypes map[task.TaskType]struct{}
Name string
Handler func(ctx context.Context, t task.Task) TaskResult
Status string
// Callback for task compaction events
CompactionCallback func(compactedTasks []task.Task, targetTask task.Task)
// Timing settings.
WaitLoopCheckInterval time.Duration
DelayOnQueueIsEmpty time.Duration
DelayOnRepeat time.Duration
ExponentialBackoffFn func(failureCount int) time.Duration
// contains filtered or unexported fields
}
func NewTasksQueueSlice ¶ added in v1.10.0
func NewTasksQueueSlice() *TaskQueueSlice
func (*TaskQueueSlice) AddAfter ¶ added in v1.10.0
func (q *TaskQueueSlice) AddAfter(id string, newTask task.Task)
AddAfter inserts a task after the task with specified id.
func (*TaskQueueSlice) AddBefore ¶ added in v1.10.0
func (q *TaskQueueSlice) AddBefore(id string, newTask task.Task)
AddBefore inserts a task before the task with specified id.
func (*TaskQueueSlice) AddFirst ¶ added in v1.10.0
func (q *TaskQueueSlice) AddFirst(t task.Task)
AddFirst adds new head element.
func (*TaskQueueSlice) AddLast ¶ added in v1.10.0
func (q *TaskQueueSlice) AddLast(t task.Task)
AddLast adds new tail element.
func (*TaskQueueSlice) CancelTaskDelay ¶ added in v1.10.0
func (q *TaskQueueSlice) CancelTaskDelay()
CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.
func (*TaskQueueSlice) Filter ¶ added in v1.10.0
func (q *TaskQueueSlice) Filter(filterFn func(task.Task) bool)
Filter run filterFn on every task and remove each with false result.
func (*TaskQueueSlice) Get ¶ added in v1.10.0
func (q *TaskQueueSlice) Get(id string) task.Task
Get returns a task by id.
func (*TaskQueueSlice) GetFirst ¶ added in v1.10.0
func (q *TaskQueueSlice) GetFirst() task.Task
GetFirst returns a head element.
func (*TaskQueueSlice) GetLast ¶ added in v1.10.0
func (q *TaskQueueSlice) GetLast() task.Task
GetLast returns a tail element.
func (*TaskQueueSlice) GetStatus ¶ added in v1.10.0
func (q *TaskQueueSlice) GetStatus() string
func (*TaskQueueSlice) IsEmpty ¶ added in v1.10.0
func (q *TaskQueueSlice) IsEmpty() bool
func (*TaskQueueSlice) Iterate ¶ added in v1.10.0
func (q *TaskQueueSlice) Iterate(doFn func(task.Task))
Iterate run doFn for every task.
func (*TaskQueueSlice) Length ¶ added in v1.10.0
func (q *TaskQueueSlice) Length() int
func (*TaskQueueSlice) MeasureActionTime ¶ added in v1.10.0
func (q *TaskQueueSlice) MeasureActionTime(action string) func()
MeasureActionTime is a helper to measure execution time of queue's actions
func (*TaskQueueSlice) Remove ¶ added in v1.10.0
func (q *TaskQueueSlice) Remove(id string) task.Task
Remove finds element by id and deletes it.
func (*TaskQueueSlice) RemoveFirst ¶ added in v1.10.0
func (q *TaskQueueSlice) RemoveFirst() task.Task
RemoveFirst deletes a head element, so head is moved.
func (*TaskQueueSlice) RemoveLast ¶ added in v1.10.0
func (q *TaskQueueSlice) RemoveLast() task.Task
RemoveLast deletes a tail element, so tail is moved.
func (*TaskQueueSlice) SetDebug ¶ added in v1.10.0
func (q *TaskQueueSlice) SetDebug(debug bool)
func (*TaskQueueSlice) SetStatus ¶ added in v1.10.0
func (q *TaskQueueSlice) SetStatus(status string)
func (*TaskQueueSlice) Start ¶ added in v1.10.0
func (q *TaskQueueSlice) Start(ctx context.Context)
func (*TaskQueueSlice) Stop ¶ added in v1.10.0
func (q *TaskQueueSlice) Stop()
func (*TaskQueueSlice) String ¶ added in v1.10.0
func (q *TaskQueueSlice) String() string
Dump tasks in queue to one line
func (*TaskQueueSlice) WithCompactableTypes ¶ added in v1.10.0
func (q *TaskQueueSlice) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueueSlice
func (*TaskQueueSlice) WithCompactionCallback ¶ added in v1.10.0
func (q *TaskQueueSlice) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueueSlice
func (*TaskQueueSlice) WithContext ¶ added in v1.10.0
func (q *TaskQueueSlice) WithContext(ctx context.Context)
func (*TaskQueueSlice) WithHandler ¶ added in v1.10.0
func (q *TaskQueueSlice) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueueSlice
func (*TaskQueueSlice) WithLogger ¶ added in v1.10.0
func (q *TaskQueueSlice) WithLogger(logger *log.Logger)
func (*TaskQueueSlice) WithMetricStorage ¶ added in v1.10.0
func (q *TaskQueueSlice) WithMetricStorage(mstor metric.Storage) *TaskQueueSlice
func (*TaskQueueSlice) WithName ¶ added in v1.10.0
func (q *TaskQueueSlice) WithName(name string) *TaskQueueSlice
type TaskResult ¶
type TaskStatus ¶ added in v1.0.11
type TaskStatus string
const ( Success TaskStatus = "Success" Fail TaskStatus = "Fail" Repeat TaskStatus = "Repeat" Keep TaskStatus = "Keep" )