redistask

package
v0.0.50-alpha.103 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RoundRobin strategy = iota + 1
	Least
)

Variables

This section is empty.

Functions

func LeastTask

func LeastTask[T any, K comparable](ctx context.Context, m *QueueManager[T, K]) (K, bool)

LeastTask : return key which has the least tasks

func NewQueueManager

func NewQueueManager[T any, K comparable](
	ctx context.Context,
	client redis.UniversalClient,
	maxGlobal, maxProcessing, maxWaiting int,
	equalFunc func(a, b T) bool,
	opts ...Option[T, K],
) (task.QueueManager[T, K], error)

func RoundRobinTask

func RoundRobinTask[T any, K comparable](ctx context.Context, m *QueueManager[T, K]) (K, bool)

RoundRobinTask : return key in round-robin fashion

Types

type Option

type Option[T any, K comparable] func(*QueueManager[T, K])

func WithAfterProcessPushFunc

func WithAfterProcessPushFunc[T any, K comparable](fn func(key K, data T)) Option[T, K]

func WithEqualDataFunc

func WithEqualDataFunc[T any, K comparable](fn func(a, b T) bool) Option[T, K]

func WithMarshalFunc

func WithMarshalFunc[T any, K comparable](fn func(T) ([]byte, error)) Option[T, K]

func WithNamespace

func WithNamespace[T any, K comparable](namespace string) Option[T, K]

func WithStrategy

func WithStrategy[T any, K comparable](s strategy) Option[T, K]

func WithUnmarshalFunc

func WithUnmarshalFunc[T any, K comparable](fn func([]byte, *T) error) Option[T, K]

type QueueManager

type QueueManager[T any, K comparable] struct {
	// contains filtered or unexported fields
}

func (*QueueManager[T, K]) AddKey

func (m *QueueManager[T, K]) AddKey(ctx context.Context, key K) error

func (*QueueManager[T, K]) Close

func (m *QueueManager[T, K]) Close() error

func (*QueueManager[T, K]) Delete

func (m *QueueManager[T, K]) Delete(ctx context.Context, key K, data T) error

func (*QueueManager[T, K]) DeleteKey

func (m *QueueManager[T, K]) DeleteKey(ctx context.Context, key K) error

func (*QueueManager[T, K]) GetGlobalQueuePosition

func (m *QueueManager[T, K]) GetGlobalQueuePosition(ctx context.Context, data T) (int, error)

GetGlobalQueuePosition returns the position of data in the global queue (0-based, -1 if not found)

func (*QueueManager[T, K]) GetProcessingQueueLengths

func (m *QueueManager[T, K]) GetProcessingQueueLengths(ctx context.Context) (map[K]int, error)

func (*QueueManager[T, K]) Insert

func (m *QueueManager[T, K]) Insert(ctx context.Context, data T) (K, error)

func (*QueueManager[T, K]) InsertByKey

func (m *QueueManager[T, K]) InsertByKey(ctx context.Context, key K, data T) error

func (*QueueManager[T, K]) TransformProcessingData

func (m *QueueManager[T, K]) TransformProcessingData(ctx context.Context, fromKey, toKey K, data T) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL