queue

package
v1.10.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2025 License: Apache-2.0 Imports: 19 Imported by: 23

Documentation

Overview

!DEPRECATED

Index

Constants

View Source
const MainQueueName = "main"

Variables

View Source
var (
	DefaultWaitLoopCheckInterval    = 125 * time.Millisecond
	DefaultDelayOnQueueIsEmpty      = 250 * time.Millisecond
	DefaultInitialDelayOnFailedTask = 5 * time.Second
	DefaultDelayOnRepeat            = 25 * time.Millisecond
)

Functions

This section is empty.

Types

type QueueStatusType added in v1.10.6

type QueueStatusType int

QueueStatusType represents the type of queue status

const (
	QueueStatusUnknown QueueStatusType = iota
	QueueStatusIdle
	QueueStatusNoHandlerSet
	QueueStatusRunningTask
	QueueStatusStop
	QueueStatusRepeatTask
	QueueStatusSleeping
	QueueStatusWaiting
)

func (QueueStatusType) String added in v1.10.6

func (qst QueueStatusType) String() string

String returns a human-readable representation of the queue status type

type TaskCounter added in v1.10.2

type TaskCounter struct {
	// contains filtered or unexported fields
}

func NewTaskCounter added in v1.10.2

func NewTaskCounter(name string, countableTypes map[task.TaskType]struct{}, metricStorage metric.Storage) *TaskCounter

func (*TaskCounter) Add added in v1.10.2

func (tc *TaskCounter) Add(task task.Task)

func (*TaskCounter) GetReachedCap added in v1.10.2

func (tc *TaskCounter) GetReachedCap() map[string]struct{}

func (*TaskCounter) IsAnyCapReached added in v1.10.2

func (tc *TaskCounter) IsAnyCapReached() bool

func (*TaskCounter) Remove added in v1.10.2

func (tc *TaskCounter) Remove(task task.Task)

func (*TaskCounter) ResetReachedCap added in v1.10.2

func (tc *TaskCounter) ResetReachedCap()

type TaskQueue

type TaskQueue struct {
	Name    string
	Handler func(ctx context.Context, t task.Task) TaskResult

	// Timing settings.
	WaitLoopCheckInterval time.Duration
	DelayOnQueueIsEmpty   time.Duration
	DelayOnRepeat         time.Duration
	ExponentialBackoffFn  func(failureCount int) time.Duration
	// contains filtered or unexported fields
}

func NewTasksQueue

func NewTasksQueue(metricStorage metric.Storage, opts ...TaskQueueOption) *TaskQueue

NewTasksQueue creates a new TaskQueue with the provided options

func (*TaskQueue) AddAfter

func (q *TaskQueue) AddAfter(id string, tasks ...task.Task)

AddAfter inserts a task after the task with specified id.

func (*TaskQueue) AddBefore

func (q *TaskQueue) AddBefore(id string, newTask task.Task)

AddBefore inserts a task before the task with specified id.

func (*TaskQueue) AddFirst

func (q *TaskQueue) AddFirst(tasks ...task.Task)

AddFirst adds new head element.

func (*TaskQueue) AddLast

func (q *TaskQueue) AddLast(tasks ...task.Task)

AddLast adds new tail element.

func (*TaskQueue) CancelTaskDelay added in v1.0.11

func (q *TaskQueue) CancelTaskDelay()

CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.

func (*TaskQueue) DeleteFunc added in v1.10.6

func (q *TaskQueue) DeleteFunc(fn func(task.Task) bool)

DeleteFunc runs fn on every task and removes each task for which fn returns false.

func (*TaskQueue) Get

func (q *TaskQueue) Get(id string) task.Task

Get returns a task by id.

func (*TaskQueue) GetFirst

func (q *TaskQueue) GetFirst() task.Task

GetFirst returns a head element.

func (*TaskQueue) GetLast

func (q *TaskQueue) GetLast() task.Task

GetLast returns a tail element.

func (*TaskQueue) GetSnapshot added in v1.10.6

func (q *TaskQueue) GetSnapshot() []task.Task

GetSnapshot returns a copy of all tasks in the queue. This is useful for external iteration or processing without holding locks.

The returned slice is a snapshot at the time of the call and will not reflect subsequent changes to the queue.

func (*TaskQueue) GetStatus added in v1.6.0

func (q *TaskQueue) GetStatus() string

func (*TaskQueue) GetStatusType added in v1.10.6

func (q *TaskQueue) GetStatusType() QueueStatusType

func (*TaskQueue) IsEmpty

func (q *TaskQueue) IsEmpty() bool

func (*TaskQueue) IterateSnapshot added in v1.10.6

func (q *TaskQueue) IterateSnapshot(doFn func(task.Task))

IterateSnapshot creates a snapshot of all tasks and iterates over the copy. This is safer than Iterate() when you need to call queue methods inside the callback, as no locks are held during callback execution.

Note: The snapshot may become stale during iteration if tasks are added/removed by other goroutines or by the callback itself.

Use this method when:

  • You need to call queue methods inside the callback (Add, Length, Filter, etc.)
  • You need to process tasks asynchronously
  • Safety is more important than performance

Memory overhead: O(n) where n is the number of tasks in the queue.

func (*TaskQueue) Length

func (q *TaskQueue) Length() int

func (*TaskQueue) MeasureActionTime

func (q *TaskQueue) MeasureActionTime(action string) func()

MeasureActionTime is a helper to measure execution time of queue's actions

func (*TaskQueue) Remove

func (q *TaskQueue) Remove(id string) task.Task

Remove finds element by id and deletes it.

func (*TaskQueue) RemoveFirst

func (q *TaskQueue) RemoveFirst() task.Task

RemoveFirst deletes a head element, so head is moved.

func (*TaskQueue) RemoveLast

func (q *TaskQueue) RemoveLast() task.Task

RemoveLast deletes a tail element, so tail is moved.

func (*TaskQueue) SetStatus added in v1.6.0

func (q *TaskQueue) SetStatus(statusType QueueStatusType)

func (*TaskQueue) SetStatusText added in v1.10.6

func (q *TaskQueue) SetStatusText(text string)

func (*TaskQueue) Start

func (q *TaskQueue) Start(ctx context.Context)

func (*TaskQueue) Stop

func (q *TaskQueue) Stop()

func (*TaskQueue) String

func (q *TaskQueue) String() string

Dump tasks in queue to one line

type TaskQueueOption added in v1.10.2

type TaskQueueOption func(*TaskQueue)

TaskQueueOption defines a functional option for TaskQueue configuration

func WithCompactableTypes added in v1.10.1

func WithCompactableTypes(taskTypes ...task.TaskType) TaskQueueOption

WithCompactableTypes sets the compactable task types for the TaskQueue

func WithCompactionCallback added in v1.10.1

func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) TaskQueueOption

func WithContext added in v1.10.2

func WithContext(ctx context.Context) TaskQueueOption

WithContext sets the context for the TaskQueue

func WithHandler added in v1.10.2

func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption

WithHandler sets the task handler for the TaskQueue

func WithLogger added in v1.10.1

func WithLogger(logger *log.Logger) TaskQueueOption

func WithName added in v1.10.2

func WithName(name string) TaskQueueOption

WithName sets the name for the TaskQueue

type TaskQueueSet

type TaskQueueSet struct {
	MainName string

	Queues *queueStorage
	// contains filtered or unexported fields
}

TaskQueueSet is a manager for a set of named queues

func NewTaskQueueSet

func NewTaskQueueSet() *TaskQueueSet

func (*TaskQueueSet) Add

func (tqs *TaskQueueSet) Add(queue *TaskQueue)

Add register a new queue for TaskQueueSet.

func (*TaskQueueSet) DoWithLock

func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))

func (*TaskQueueSet) GetByName

func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue

func (*TaskQueueSet) GetMain

func (tqs *TaskQueueSet) GetMain() *TaskQueue

func (*TaskQueueSet) Iterate

func (tqs *TaskQueueSet) Iterate(ctx context.Context, doFn func(ctx context.Context, queue *TaskQueue))

Iterate run doFn for every task.

func (*TaskQueueSet) NewNamedQueue

func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, opts ...TaskQueueOption)

func (*TaskQueueSet) Remove

func (tqs *TaskQueueSet) Remove(name string)

func (*TaskQueueSet) Start

func (tqs *TaskQueueSet) Start(ctx context.Context)

func (*TaskQueueSet) StartMain

func (tqs *TaskQueueSet) StartMain(ctx context.Context)

func (*TaskQueueSet) Stop

func (tqs *TaskQueueSet) Stop()

func (*TaskQueueSet) WaitStopWithTimeout

func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)

func (*TaskQueueSet) WithContext

func (tqs *TaskQueueSet) WithContext(ctx context.Context)

func (*TaskQueueSet) WithMainName

func (tqs *TaskQueueSet) WithMainName(name string)

func (*TaskQueueSet) WithMetricStorage

func (tqs *TaskQueueSet) WithMetricStorage(mstor metric.Storage) *TaskQueueSet

type TaskQueueSlice added in v1.10.0

type TaskQueueSlice struct {
	CompactableTypes map[task.TaskType]struct{}

	Name    string
	Handler func(ctx context.Context, t task.Task) TaskResult
	Status  string

	// Callback for task compaction events
	CompactionCallback func(compactedTasks []task.Task, targetTask task.Task)

	// Timing settings.
	WaitLoopCheckInterval time.Duration
	DelayOnQueueIsEmpty   time.Duration
	DelayOnRepeat         time.Duration
	ExponentialBackoffFn  func(failureCount int) time.Duration
	// contains filtered or unexported fields
}

func NewTasksQueueSlice added in v1.10.0

func NewTasksQueueSlice() *TaskQueueSlice

func (*TaskQueueSlice) AddAfter added in v1.10.0

func (q *TaskQueueSlice) AddAfter(id string, newTask task.Task)

AddAfter inserts a task after the task with specified id.

func (*TaskQueueSlice) AddBefore added in v1.10.0

func (q *TaskQueueSlice) AddBefore(id string, newTask task.Task)

AddBefore inserts a task before the task with specified id.

func (*TaskQueueSlice) AddFirst added in v1.10.0

func (q *TaskQueueSlice) AddFirst(tasks ...task.Task)

AddFirst adds new head element.

func (*TaskQueueSlice) AddLast added in v1.10.0

func (q *TaskQueueSlice) AddLast(tasks ...task.Task)

AddLast adds new tail element.

func (*TaskQueueSlice) CancelTaskDelay added in v1.10.0

func (q *TaskQueueSlice) CancelTaskDelay()

CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.

func (*TaskQueueSlice) Filter added in v1.10.0

func (q *TaskQueueSlice) Filter(filterFn func(task.Task) bool)

Filter run filterFn on every task and remove each with false result.

func (*TaskQueueSlice) Get added in v1.10.0

func (q *TaskQueueSlice) Get(id string) task.Task

Get returns a task by id.

func (*TaskQueueSlice) GetFirst added in v1.10.0

func (q *TaskQueueSlice) GetFirst() task.Task

GetFirst returns a head element.

func (*TaskQueueSlice) GetLast added in v1.10.0

func (q *TaskQueueSlice) GetLast() task.Task

GetLast returns a tail element.

func (*TaskQueueSlice) GetStatus added in v1.10.0

func (q *TaskQueueSlice) GetStatus() string

func (*TaskQueueSlice) IsEmpty added in v1.10.0

func (q *TaskQueueSlice) IsEmpty() bool

func (*TaskQueueSlice) Iterate added in v1.10.0

func (q *TaskQueueSlice) Iterate(doFn func(task.Task))

Iterate run doFn for every task.

func (*TaskQueueSlice) Length added in v1.10.0

func (q *TaskQueueSlice) Length() int

func (*TaskQueueSlice) MeasureActionTime added in v1.10.0

func (q *TaskQueueSlice) MeasureActionTime(action string) func()

MeasureActionTime is a helper to measure execution time of queue's actions

func (*TaskQueueSlice) Remove added in v1.10.0

func (q *TaskQueueSlice) Remove(id string) task.Task

Remove finds element by id and deletes it.

func (*TaskQueueSlice) RemoveFirst added in v1.10.0

func (q *TaskQueueSlice) RemoveFirst() task.Task

RemoveFirst deletes a head element, so head is moved.

func (*TaskQueueSlice) RemoveLast added in v1.10.0

func (q *TaskQueueSlice) RemoveLast() task.Task

RemoveLast deletes a tail element, so tail is moved.

func (*TaskQueueSlice) SetDebug added in v1.10.0

func (q *TaskQueueSlice) SetDebug(debug bool)

func (*TaskQueueSlice) SetStatus added in v1.10.0

func (q *TaskQueueSlice) SetStatus(status string)

func (*TaskQueueSlice) Start added in v1.10.0

func (q *TaskQueueSlice) Start(ctx context.Context)

func (*TaskQueueSlice) Stop added in v1.10.0

func (q *TaskQueueSlice) Stop()

func (*TaskQueueSlice) String added in v1.10.0

func (q *TaskQueueSlice) String() string

Dump tasks in queue to one line

func (*TaskQueueSlice) WithCompactableTypes added in v1.10.0

func (q *TaskQueueSlice) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueueSlice

func (*TaskQueueSlice) WithCompactionCallback added in v1.10.0

func (q *TaskQueueSlice) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueueSlice

func (*TaskQueueSlice) WithContext added in v1.10.0

func (q *TaskQueueSlice) WithContext(ctx context.Context)

func (*TaskQueueSlice) WithHandler added in v1.10.0

func (q *TaskQueueSlice) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueueSlice

func (*TaskQueueSlice) WithLogger added in v1.10.0

func (q *TaskQueueSlice) WithLogger(logger *log.Logger)

func (*TaskQueueSlice) WithMetricStorage added in v1.10.0

func (q *TaskQueueSlice) WithMetricStorage(mstor metric.Storage) *TaskQueueSlice

func (*TaskQueueSlice) WithName added in v1.10.0

func (q *TaskQueueSlice) WithName(name string) *TaskQueueSlice

type TaskQueueStatus added in v1.10.6

type TaskQueueStatus struct {
	// contains filtered or unexported fields
}

TaskQueueStatus encapsulates queue status with thread-safe access

func NewTaskQueueStatus added in v1.10.6

func NewTaskQueueStatus() *TaskQueueStatus

NewTaskQueueStatus creates a new TaskQueueStatus

func (*TaskQueueStatus) Get added in v1.10.6

func (tqs *TaskQueueStatus) Get() string

Get returns the current status as a string

func (*TaskQueueStatus) GetType added in v1.10.6

func (tqs *TaskQueueStatus) GetType() QueueStatusType

GetType returns the current status type

func (*TaskQueueStatus) Restore added in v1.10.6

func (tqs *TaskQueueStatus) Restore(statusType QueueStatusType, customText string)

Restore sets both the status type and custom text atomically

func (*TaskQueueStatus) Set added in v1.10.6

func (tqs *TaskQueueStatus) Set(statusType QueueStatusType)

Set sets the status to a specific type and clears custom text

func (*TaskQueueStatus) SetText added in v1.10.6

func (tqs *TaskQueueStatus) SetText(text string)

SetText sets a custom status text

func (*TaskQueueStatus) SetWithText added in v1.10.6

func (tqs *TaskQueueStatus) SetWithText(statusType QueueStatusType, text string)

SetWithText sets the status type with custom text

func (*TaskQueueStatus) Snapshot added in v1.10.6

func (tqs *TaskQueueStatus) Snapshot() (QueueStatusType, string)

Snapshot returns both the status type and custom text atomically

type TaskResult

type TaskResult struct {
	Status TaskStatus

	DelayBeforeNextTask time.Duration

	AfterHandle func()
	// contains filtered or unexported fields
}

func (*TaskResult) AddAfterTasks added in v1.10.1

func (res *TaskResult) AddAfterTasks(t ...task.Task)

func (*TaskResult) AddHeadTasks added in v1.10.1

func (res *TaskResult) AddHeadTasks(t ...task.Task)

func (*TaskResult) AddTailTasks added in v1.10.1

func (res *TaskResult) AddTailTasks(t ...task.Task)

func (*TaskResult) GetAfterTasks added in v1.10.1

func (res *TaskResult) GetAfterTasks() []task.Task

func (*TaskResult) GetHeadTasks added in v1.10.1

func (res *TaskResult) GetHeadTasks() []task.Task

func (*TaskResult) GetTailTasks added in v1.10.1

func (res *TaskResult) GetTailTasks() []task.Task

type TaskStatus added in v1.0.11

type TaskStatus string
const (
	Success TaskStatus = "Success"
	Fail    TaskStatus = "Fail"
	Repeat  TaskStatus = "Repeat"
	Keep    TaskStatus = "Keep"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL