Documentation
¶
Index ¶
- Constants
- func LeastTask[T any, K comparable](tm *QueueManager[T, K]) (K, bool)
- func NewQueueManager[T any, K comparable](maxGlobal, maxProcessing, maxWaiting int, equalFunc func(a, b T) bool, ...) task.QueueManager[T, K]
- func RoundRobinTask[T any, K comparable](tm *QueueManager[T, K]) (K, bool)
- type Options
- type Queue
- type QueueManager
- func (tm *QueueManager[T, K]) AddKey(ctx context.Context, key K) error
- func (tm *QueueManager[T, K]) Delete(ctx context.Context, key K, data T) error
- func (tm *QueueManager[T, K]) DeleteKey(ctx context.Context, key K) error
- func (tm *QueueManager[T, K]) GetGlobalQueuePosition(ctx context.Context, data T) (int, error)
- func (tm *QueueManager[T, K]) GetProcessingQueueLengths(ctx context.Context) (map[K]int, error)
- func (tm *QueueManager[T, K]) Insert(ctx context.Context, data T) error
- func (tm *QueueManager[T, K]) InsertByKey(ctx context.Context, key K, data T) error
- func (tm *QueueManager[T, K]) TransformProcessingData(ctx context.Context, fromKey, toKey K, data T) error
Constants ¶
View Source
const ( RoundRobin strategy = iota + 1 Least )
Variables ¶
This section is empty.
Functions ¶
func LeastTask ¶
func LeastTask[T any, K comparable](tm *QueueManager[T, K]) (K, bool)
LeastTask : return key witch has the least tasks
func NewQueueManager ¶
func NewQueueManager[T any, K comparable]( maxGlobal, maxProcessing, maxWaiting int, equalFunc func(a, b T) bool, opts ...Options[T, K], ) task.QueueManager[T, K]
func RoundRobinTask ¶
func RoundRobinTask[T any, K comparable](tm *QueueManager[T, K]) (K, bool)
RoundRobinTask : return key in round-robin fashion
Types ¶
type Options ¶
type Options[T any, K comparable] func(*QueueManager[T, K])
func WithAfterProcessPushFunc ¶
func WithAfterProcessPushFunc[T any, K comparable](fs ...func(key K, data T)) Options[T, K]
func WithStrategy ¶
func WithStrategy[T any, K comparable](s strategy) Options[T, K]
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
Queue will pop data from its waiting Queue. If it`s empty, it will pop data from global Queue(in QueueManager), and then push to process Queue.
type QueueManager ¶
type QueueManager[T any, K comparable] struct { // contains filtered or unexported fields }
func (*QueueManager[T, K]) AddKey ¶
func (tm *QueueManager[T, K]) AddKey(ctx context.Context, key K) error
func (*QueueManager[T, K]) Delete ¶
func (tm *QueueManager[T, K]) Delete(ctx context.Context, key K, data T) error
Delete will delete a data in key queues. If delete a data in processing Queue, taskQueue will pop data from its waiting Queue. If it`s empty, it will pop data from global Queue, and then push to process Queue.
func (*QueueManager[T, K]) DeleteKey ¶
func (tm *QueueManager[T, K]) DeleteKey(ctx context.Context, key K) error
DeleteKey removes a task queue and updates orderedKeys
func (*QueueManager[T, K]) GetGlobalQueuePosition ¶
func (tm *QueueManager[T, K]) GetGlobalQueuePosition(ctx context.Context, data T) (int, error)
GetGlobalQueuePosition returns the position of data in the global queue (0-based, -1 if not found)
func (*QueueManager[T, K]) GetProcessingQueueLengths ¶
func (tm *QueueManager[T, K]) GetProcessingQueueLengths(ctx context.Context) (map[K]int, error)
func (*QueueManager[T, K]) Insert ¶
func (tm *QueueManager[T, K]) Insert(ctx context.Context, data T) error
func (*QueueManager[T, K]) InsertByKey ¶
func (tm *QueueManager[T, K]) InsertByKey(ctx context.Context, key K, data T) error
func (*QueueManager[T, K]) TransformProcessingData ¶
func (tm *QueueManager[T, K]) TransformProcessingData(ctx context.Context, fromKey, toKey K, data T) error
Click to show internal directories.
Click to hide internal directories.