Versions in this module Expand all Collapse all v1 v1.999.0 Jan 31, 2020 v1.8.0 Jan 31, 2020 Changes in this version + func IsErrInvalidConfiguration(err error) bool + func RegisteredTypesAsString() []string + type ChannelQueue struct + func (c *ChannelQueue) Name() string + func (c *ChannelQueue) Push(data Data) error + func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) + type ChannelQueueConfiguration struct + Name string + Workers int + type Data interface + type DummyQueue struct + func (b *DummyQueue) Flush(time.Duration) error + func (b *DummyQueue) FlushWithContext(context.Context) error + func (b *DummyQueue) IsEmpty() bool + func (b *DummyQueue) Push(Data) error + func (b *DummyQueue) Run(_, _ func(context.Context, func())) + type ErrInvalidConfiguration struct + func (err ErrInvalidConfiguration) Error() string + type Flushable interface + Flush func(time.Duration) error + FlushWithContext func(ctx context.Context) error + IsEmpty func() bool + type HandlerFunc func(...Data) + type LevelQueue struct + func (l *LevelQueue) IsEmpty() bool + func (l *LevelQueue) Name() string + func (l *LevelQueue) Push(data Data) error + func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) + func (l *LevelQueue) Shutdown() + func (l *LevelQueue) Terminate() + type LevelQueueConfiguration struct + DataDir string + Name string + Workers int + type ManagedPool interface + AddWorkers func(number int, timeout time.Duration) context.CancelFunc + BlockTimeout func() time.Duration + BoostTimeout func() time.Duration + BoostWorkers func() int + MaxNumberOfWorkers func() int + NumberOfWorkers func() int + SetMaxNumberOfWorkers func(int) + SetPoolSettings func(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) + type ManagedQueue struct + Configuration interface{} + ExemplarType string + Managed interface{} + Name string + PoolWorkers map[int64]*PoolWorkers + QID int64 + Type Type + func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc + func (q *ManagedQueue) BlockTimeout() time.Duration + func (q *ManagedQueue) BoostTimeout() time.Duration + func (q *ManagedQueue) BoostWorkers() int + func (q *ManagedQueue) CancelWorkers(pid int64) + func (q *ManagedQueue) Flush(timeout time.Duration) error + func (q *ManagedQueue) IsEmpty() bool + func (q *ManagedQueue) MaxNumberOfWorkers() int + func (q *ManagedQueue) NumberOfWorkers() int + func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, ...) int64 + func (q *ManagedQueue) RemoveWorkers(pid int64) + func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) + func (q *ManagedQueue) Workers() []*PoolWorkers + type ManagedQueueList []*ManagedQueue + func (l ManagedQueueList) Len() int + func (l ManagedQueueList) Less(i, j int) bool + func (l ManagedQueueList) Swap(i, j int) + type Manager struct + Queues map[int64]*ManagedQueue + func GetManager() *Manager + func (m *Manager) Add(managed interface{}, t Type, configuration, exemplar interface{}) int64 + func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error + func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue + func (m *Manager) ManagedQueues() []*ManagedQueue + func (m *Manager) Remove(qid int64) + type Named interface + Name func() string + type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error) + type PersistableChannelQueue struct + func (p *PersistableChannelQueue) Flush(timeout time.Duration) error + func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error + func (p *PersistableChannelQueue) IsEmpty() bool + func (p *PersistableChannelQueue) Name() string + func (p *PersistableChannelQueue) Push(data Data) error + func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) + func (p *PersistableChannelQueue) Shutdown() + func (p *PersistableChannelQueue) Terminate() + type PersistableChannelQueueConfiguration struct + BatchLength int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int + DataDir string + MaxAttempts int + MaxWorkers int + Name string + QueueLength int + Timeout time.Duration + Workers int + type PoolWorkers struct + Cancel context.CancelFunc + HasTimeout bool + IsFlusher bool + PID int64 + Start time.Time + Timeout time.Time + Workers int + type PoolWorkersList []*PoolWorkers + func (l PoolWorkersList) Len() int + func (l PoolWorkersList) Less(i, j int) bool + func (l PoolWorkersList) Swap(i, j int) + type Queue interface + Push func(Data) error + Run func(atShutdown, atTerminate func(context.Context, func())) + func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue + func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) + func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) + func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) + func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) + func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) + func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) + func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) + type RedisQueue struct + func (r *RedisQueue) IsEmpty() bool + func (r *RedisQueue) Name() string + func (r *RedisQueue) Push(data Data) error + func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) + func (r *RedisQueue) Shutdown() + func (r *RedisQueue) Terminate() + type RedisQueueConfiguration struct + Addresses string + DBIndex int + Name string + Network string + Password string + QueueName string + Workers int + type Shutdownable interface + Shutdown func() + Terminate func() + type Type string + const ChannelQueueType + const DummyQueueType + const LevelQueueType + const PersistableChannelQueueType + const RedisQueueType + const WrappedQueueType + func RegisteredTypes() []Type + type WorkerPool struct + func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool + func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc + func (p *WorkerPool) BlockTimeout() time.Duration + func (p *WorkerPool) BoostTimeout() time.Duration + func (p *WorkerPool) BoostWorkers() int + func (p *WorkerPool) CleanUp(ctx context.Context) + func (p *WorkerPool) Flush(timeout time.Duration) error + func (p *WorkerPool) FlushWithContext(ctx context.Context) error + func (p *WorkerPool) IsEmpty() bool + func (p *WorkerPool) MaxNumberOfWorkers() int + func (p *WorkerPool) NumberOfWorkers() int + func (p *WorkerPool) Push(data Data) + func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) + func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) + func (p *WorkerPool) Wait() + type WorkerPoolConfiguration struct + BatchLength int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int + MaxWorkers int + QueueLength int + type WrappedQueue struct + func (q *WrappedQueue) Flush(timeout time.Duration) error + func (q *WrappedQueue) FlushWithContext(ctx context.Context) error + func (q *WrappedQueue) IsEmpty() bool + func (q *WrappedQueue) Name() string + func (q *WrappedQueue) Push(data Data) error + func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) + func (q *WrappedQueue) Shutdown() + func (q *WrappedQueue) Terminate() + type WrappedQueueConfiguration struct + Config interface{} + MaxAttempts int + Name string + QueueLength int + Timeout time.Duration + Underlying Type