Documentation
¶
Index ¶
- Constants
- func LeastTask[T any, K comparable](ctx context.Context, m *QueueManager[T, K]) (K, bool)
- func NewQueueManager[T any, K comparable](ctx context.Context, client redis.UniversalClient, ...) (task.QueueManager[T, K], error)
- func RoundRobinTask[T any, K comparable](ctx context.Context, m *QueueManager[T, K]) (K, bool)
- type Option
- func WithAfterProcessPushFunc[T any, K comparable](fn func(key K, data T)) Option[T, K]
- func WithEqualDataFunc[T any, K comparable](fn func(a, b T) bool) Option[T, K]
- func WithMarshalFunc[T any, K comparable](fn func(T) ([]byte, error)) Option[T, K]
- func WithNamespace[T any, K comparable](namespace string) Option[T, K]
- func WithStrategy[T any, K comparable](s strategy) Option[T, K]
- func WithUnmarshalFunc[T any, K comparable](fn func([]byte, *T) error) Option[T, K]
- type QueueManager
- func (m *QueueManager[T, K]) AddKey(ctx context.Context, key K) error
- func (m *QueueManager[T, K]) Close() error
- func (m *QueueManager[T, K]) Delete(ctx context.Context, key K, data T) error
- func (m *QueueManager[T, K]) DeleteKey(ctx context.Context, key K) error
- func (m *QueueManager[T, K]) GetGlobalQueuePosition(ctx context.Context, data T) (int, error)
- func (m *QueueManager[T, K]) GetProcessingQueueLengths(ctx context.Context) (map[K]int, error)
- func (m *QueueManager[T, K]) Insert(ctx context.Context, data T) (K, error)
- func (m *QueueManager[T, K]) InsertByKey(ctx context.Context, key K, data T) error
- func (m *QueueManager[T, K]) TransformProcessingData(ctx context.Context, fromKey, toKey K, data T) error
Constants ¶
View Source
const ( RoundRobin strategy = iota + 1 Least )
Variables ¶
This section is empty.
Functions ¶
func LeastTask ¶
func LeastTask[T any, K comparable](ctx context.Context, m *QueueManager[T, K]) (K, bool)
LeastTask : return key which has the least tasks
func NewQueueManager ¶
func NewQueueManager[T any, K comparable]( ctx context.Context, client redis.UniversalClient, maxGlobal, maxProcessing, maxWaiting int, equalFunc func(a, b T) bool, opts ...Option[T, K], ) (task.QueueManager[T, K], error)
func RoundRobinTask ¶
func RoundRobinTask[T any, K comparable](ctx context.Context, m *QueueManager[T, K]) (K, bool)
RoundRobinTask : return key in round-robin fashion
Types ¶
type Option ¶
type Option[T any, K comparable] func(*QueueManager[T, K])
func WithAfterProcessPushFunc ¶
func WithAfterProcessPushFunc[T any, K comparable](fn func(key K, data T)) Option[T, K]
func WithEqualDataFunc ¶
func WithEqualDataFunc[T any, K comparable](fn func(a, b T) bool) Option[T, K]
func WithMarshalFunc ¶
func WithMarshalFunc[T any, K comparable](fn func(T) ([]byte, error)) Option[T, K]
func WithNamespace ¶
func WithNamespace[T any, K comparable](namespace string) Option[T, K]
func WithStrategy ¶
func WithStrategy[T any, K comparable](s strategy) Option[T, K]
func WithUnmarshalFunc ¶
func WithUnmarshalFunc[T any, K comparable](fn func([]byte, *T) error) Option[T, K]
type QueueManager ¶
type QueueManager[T any, K comparable] struct { // contains filtered or unexported fields }
func (*QueueManager[T, K]) AddKey ¶
func (m *QueueManager[T, K]) AddKey(ctx context.Context, key K) error
func (*QueueManager[T, K]) Close ¶
func (m *QueueManager[T, K]) Close() error
func (*QueueManager[T, K]) Delete ¶
func (m *QueueManager[T, K]) Delete(ctx context.Context, key K, data T) error
func (*QueueManager[T, K]) DeleteKey ¶
func (m *QueueManager[T, K]) DeleteKey(ctx context.Context, key K) error
func (*QueueManager[T, K]) GetGlobalQueuePosition ¶
func (m *QueueManager[T, K]) GetGlobalQueuePosition(ctx context.Context, data T) (int, error)
GetGlobalQueuePosition returns the position of data in the global queue (0-based, -1 if not found)
func (*QueueManager[T, K]) GetProcessingQueueLengths ¶
func (m *QueueManager[T, K]) GetProcessingQueueLengths(ctx context.Context) (map[K]int, error)
func (*QueueManager[T, K]) Insert ¶
func (m *QueueManager[T, K]) Insert(ctx context.Context, data T) (K, error)
func (*QueueManager[T, K]) InsertByKey ¶
func (m *QueueManager[T, K]) InsertByKey(ctx context.Context, key K, data T) error
func (*QueueManager[T, K]) TransformProcessingData ¶
func (m *QueueManager[T, K]) TransformProcessingData(ctx context.Context, fromKey, toKey K, data T) error
Click to show internal directories.
Click to hide internal directories.