Documentation
¶
Index ¶
- Constants
- Variables
- type TaskCounter
- type TaskQueue
- func (q *TaskQueue) AddAfter(id string, tasks ...task.Task)
- func (q *TaskQueue) AddBefore(id string, newTask task.Task)
- func (q *TaskQueue) AddFirst(tasks ...task.Task)
- func (q *TaskQueue) AddLast(tasks ...task.Task)
- func (q *TaskQueue) CancelTaskDelay()
- func (q *TaskQueue) Filter(filterFn func(task.Task) bool)
- func (q *TaskQueue) Get(id string) task.Task
- func (q *TaskQueue) GetFirst() task.Task
- func (q *TaskQueue) GetLast() task.Task
- func (q *TaskQueue) GetStatus() string
- func (q *TaskQueue) IsEmpty() bool
- func (q *TaskQueue) Iterate(doFn func(task.Task))
- func (q *TaskQueue) Length() int
- func (q *TaskQueue) MeasureActionTime(action string) func()
- func (q *TaskQueue) Remove(id string) task.Task
- func (q *TaskQueue) RemoveFirst() task.Task
- func (q *TaskQueue) RemoveLast() task.Task
- func (q *TaskQueue) SetStatus(status string)
- func (q *TaskQueue) Start(ctx context.Context)
- func (q *TaskQueue) Stop()
- func (q *TaskQueue) String() string
- type TaskQueueOption
- func WithCompactableTypes(taskTypes ...task.TaskType) TaskQueueOption
- func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) TaskQueueOption
- func WithContext(ctx context.Context) TaskQueueOption
- func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption
- func WithLogger(logger *log.Logger) TaskQueueOption
- func WithName(name string) TaskQueueOption
- type TaskQueueSet
- func (tqs *TaskQueueSet) Add(queue *TaskQueue)
- func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))
- func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue
- func (tqs *TaskQueueSet) GetMain() *TaskQueue
- func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue))
- func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, ...)
- func (tqs *TaskQueueSet) Remove(name string)
- func (tqs *TaskQueueSet) Start(ctx context.Context)
- func (tqs *TaskQueueSet) StartMain(ctx context.Context)
- func (tqs *TaskQueueSet) Stop()
- func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)
- func (tqs *TaskQueueSet) WithContext(ctx context.Context)
- func (tqs *TaskQueueSet) WithMainName(name string)
- func (tqs *TaskQueueSet) WithMetricStorage(mstor metricsstorage.Storage) *TaskQueueSet
- type TaskResult
- func (res *TaskResult) AddAfterTasks(t ...task.Task)
- func (res *TaskResult) AddHeadTasks(t ...task.Task)
- func (res *TaskResult) AddTailTasks(t ...task.Task)
- func (res *TaskResult) GetAfterTasks() []task.Task
- func (res *TaskResult) GetHeadTasks() []task.Task
- func (res *TaskResult) GetTailTasks() []task.Task
- type TaskStatus
Constants ¶
const MainQueueName = "main"
Variables ¶
var ( DefaultWaitLoopCheckInterval = 125 * time.Millisecond DefaultDelayOnQueueIsEmpty = 250 * time.Millisecond DefaultInitialDelayOnFailedTask = 5 * time.Second DefaultDelayOnRepeat = 25 * time.Millisecond )
Functions ¶
This section is empty.
Types ¶
type TaskCounter ¶ added in v1.10.2
type TaskCounter struct {
// contains filtered or unexported fields
}
func NewTaskCounter ¶ added in v1.10.2
func NewTaskCounter(name string, countableTypes map[task.TaskType]struct{}, metricStorage metricsstorage.Storage) *TaskCounter
func (*TaskCounter) Add ¶ added in v1.10.2
func (tc *TaskCounter) Add(task task.Task)
func (*TaskCounter) GetReachedCap ¶ added in v1.10.2
func (tc *TaskCounter) GetReachedCap() map[string]struct{}
func (*TaskCounter) IsAnyCapReached ¶ added in v1.10.2
func (tc *TaskCounter) IsAnyCapReached() bool
func (*TaskCounter) Remove ¶ added in v1.10.2
func (tc *TaskCounter) Remove(task task.Task)
func (*TaskCounter) ResetReachedCap ¶ added in v1.10.2
func (tc *TaskCounter) ResetReachedCap()
type TaskQueue ¶
type TaskQueue struct {
Name string
Handler func(ctx context.Context, t task.Task) TaskResult
Status string
// Timing settings.
WaitLoopCheckInterval time.Duration
DelayOnQueueIsEmpty time.Duration
DelayOnRepeat time.Duration
ExponentialBackoffFn func(failureCount int) time.Duration
// contains filtered or unexported fields
}
func NewTasksQueue ¶
func NewTasksQueue(metricStorage metricsstorage.Storage, opts ...TaskQueueOption) *TaskQueue
NewTasksQueue creates a new TaskQueue with the provided options
func (*TaskQueue) CancelTaskDelay ¶ added in v1.0.11
func (q *TaskQueue) CancelTaskDelay()
CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.
func (*TaskQueue) MeasureActionTime ¶
MeasureActionTime is a helper to measure execution time of queue's actions
func (*TaskQueue) RemoveFirst ¶
RemoveFirst deletes a head element, so head is moved.
func (*TaskQueue) RemoveLast ¶
RemoveLast deletes a tail element, so tail is moved.
type TaskQueueOption ¶ added in v1.10.2
type TaskQueueOption func(*TaskQueue)
TaskQueueOption defines a functional option for TaskQueue configuration
func WithCompactableTypes ¶ added in v1.10.1
func WithCompactableTypes(taskTypes ...task.TaskType) TaskQueueOption
WithCompactableTypes sets the compactable task types for the TaskQueue
func WithCompactionCallback ¶ added in v1.10.1
func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) TaskQueueOption
func WithContext ¶ added in v1.10.2
func WithContext(ctx context.Context) TaskQueueOption
WithContext sets the context for the TaskQueue
func WithHandler ¶ added in v1.10.2
func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption
WithHandler sets the task handler for the TaskQueue
func WithLogger ¶ added in v1.10.1
func WithLogger(logger *log.Logger) TaskQueueOption
func WithName ¶ added in v1.10.2
func WithName(name string) TaskQueueOption
WithName sets the name for the TaskQueue
type TaskQueueSet ¶
type TaskQueueSet struct {
MainName string
Queues map[string]*TaskQueue
// contains filtered or unexported fields
}
TaskQueueSet is a manager for a set of named queues
func NewTaskQueueSet ¶
func NewTaskQueueSet() *TaskQueueSet
func (*TaskQueueSet) Add ¶
func (tqs *TaskQueueSet) Add(queue *TaskQueue)
func (*TaskQueueSet) DoWithLock ¶
func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))
taskQueueSet.DoWithLock(func(tqs *TaskQueueSet){
tqs.GetMain().Pop()
})
func (*TaskQueueSet) GetByName ¶
func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue
func (*TaskQueueSet) GetMain ¶
func (tqs *TaskQueueSet) GetMain() *TaskQueue
func (*TaskQueueSet) Iterate ¶
func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue))
Iterate run doFn for every task.
func (*TaskQueueSet) NewNamedQueue ¶
func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, opts ...TaskQueueOption)
func (*TaskQueueSet) Remove ¶
func (tqs *TaskQueueSet) Remove(name string)
func (*TaskQueueSet) Start ¶
func (tqs *TaskQueueSet) Start(ctx context.Context)
func (*TaskQueueSet) StartMain ¶
func (tqs *TaskQueueSet) StartMain(ctx context.Context)
func (*TaskQueueSet) Stop ¶
func (tqs *TaskQueueSet) Stop()
func (*TaskQueueSet) WaitStopWithTimeout ¶
func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)
func (*TaskQueueSet) WithContext ¶
func (tqs *TaskQueueSet) WithContext(ctx context.Context)
func (*TaskQueueSet) WithMainName ¶
func (tqs *TaskQueueSet) WithMainName(name string)
func (*TaskQueueSet) WithMetricStorage ¶
func (tqs *TaskQueueSet) WithMetricStorage(mstor metricsstorage.Storage) *TaskQueueSet
type TaskResult ¶
type TaskResult struct {
Status TaskStatus
DelayBeforeNextTask time.Duration
AfterHandle func()
// contains filtered or unexported fields
}
func (*TaskResult) AddAfterTasks ¶ added in v1.10.1
func (res *TaskResult) AddAfterTasks(t ...task.Task)
func (*TaskResult) AddHeadTasks ¶ added in v1.10.1
func (res *TaskResult) AddHeadTasks(t ...task.Task)
func (*TaskResult) AddTailTasks ¶ added in v1.10.1
func (res *TaskResult) AddTailTasks(t ...task.Task)
func (*TaskResult) GetAfterTasks ¶ added in v1.10.1
func (res *TaskResult) GetAfterTasks() []task.Task
func (*TaskResult) GetHeadTasks ¶ added in v1.10.1
func (res *TaskResult) GetHeadTasks() []task.Task
func (*TaskResult) GetTailTasks ¶ added in v1.10.1
func (res *TaskResult) GetTailTasks() []task.Task
type TaskStatus ¶ added in v1.0.11
type TaskStatus string
const ( Success TaskStatus = "Success" Fail TaskStatus = "Fail" Repeat TaskStatus = "Repeat" Keep TaskStatus = "Keep" )