queue

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UnlimitedMaxSize = -1
)

Variables

View Source
var (
	DefaultPollInterval = 10 * time.Millisecond
	DefaultMaxRetries   = 10
	DefaultOptions      = QueueOptions{
		MaxSize:      UnlimitedMaxSize,
		PollInterval: DefaultPollInterval,
		MaxRetries:   DefaultMaxRetries,

		ConsumerCount: 1,
	}
)
View Source
var (
	ErrQueueClosed    = errors.New("queue is closed")
	ErrQueueFull      = errors.New("queue is full")
	ErrQueueEmpty     = errors.New("queue is empty")
	ErrQueueRecovered = errors.New("queue recovered")
)

Errors

Functions

This section is empty.

Types

type BaseQueue added in v0.0.2

type BaseQueue struct {
	// contains filtered or unexported fields
}

func NewBaseQueue added in v0.0.2

func NewBaseQueue(name string, options *QueueOptions) *BaseQueue

func (*BaseQueue) Close added in v0.0.2

func (q *BaseQueue) Close()

func (*BaseQueue) Dequeue added in v0.0.2

func (q *BaseQueue) Dequeue() ([]byte, error)

func (*BaseQueue) Enqueue added in v0.0.2

func (q *BaseQueue) Enqueue(data []byte) error

func (*BaseQueue) Kind added in v0.0.2

func (q *BaseQueue) Kind() Kind

func (*BaseQueue) MaxSize added in v0.0.2

func (q *BaseQueue) MaxSize() int

func (*BaseQueue) Name added in v0.0.2

func (q *BaseQueue) Name() string

func (*BaseQueue) Subscribe added in v0.0.2

func (q *BaseQueue) Subscribe(cb Handler)

type Factory

type Factory interface {
	// Create a new queue if name does not exist
	// If name already exists, return the existing queue
	GetOrCreate(name string, options ...func(*QueueOptions)) (Queue, error)
}

type Handler

type Handler func([]byte) error

type Kind

type Kind uint8
const (
	KindFIFO Kind = iota + 1
	KindStandard
)

type MemoryFactory

type MemoryFactory struct {
	// contains filtered or unexported fields
}

func NewMemoryFactory

func NewMemoryFactory() *MemoryFactory

func (*MemoryFactory) GetOrCreate

func (f *MemoryFactory) GetOrCreate(name string, options ...func(*QueueOptions)) (Queue, error)

type MemoryQueue

type MemoryQueue struct {
	*BaseQueue
	// contains filtered or unexported fields
}

func NewMemoryQueue

func NewMemoryQueue(name string, options *QueueOptions) *MemoryQueue

func (*MemoryQueue) Close

func (q *MemoryQueue) Close()

func (*MemoryQueue) Dequeue

func (q *MemoryQueue) Dequeue() ([]byte, error)

func (*MemoryQueue) Enqueue

func (q *MemoryQueue) Enqueue(data []byte) error

func (*MemoryQueue) Kind

func (q *MemoryQueue) Kind() Kind

func (*MemoryQueue) Name added in v0.0.2

func (q *MemoryQueue) Name() string

func (*MemoryQueue) Recover

func (q *MemoryQueue) Recover(b []byte) error

func (*MemoryQueue) Subscribe

func (q *MemoryQueue) Subscribe(cb Handler)

type Queue

type Queue interface {
	Kind() Kind

	Name() string

	// Reports max size of queue
	// -1 for unlimited
	MaxSize() int

	// Push data to end of queue
	// Failed if queue is full or closed
	Enqueue([]byte) error

	// Pop data from beginning of queue without message confirmation
	// Failed if queue is empty
	Dequeue() ([]byte, error)

	// Subscribe queue with message confirmation.
	// Once handler returns error, it'll automatically put message back to queue using `Recover` mechanism internally.
	Subscribe(h Handler)

	Close()
}

The interface of queue The implementation of queue should be thread-safe

type QueueOptions added in v0.0.2

type QueueOptions struct {
	MaxSize      int
	PollInterval time.Duration
	MaxRetries   int

	// Specify how many consumers are consuming the queue using `Subscribe`.
	// Be aware that too many consumers can cause order of messages to be changed.
	// If you want to ensure the order of messages, please use FIFO queue and set ConsumerCount to 1
	ConsumerCount int
}

Configuarable options here, but some implementations of queue may not support all options

type Recoverable added in v0.0.2

type Recoverable interface {

	// or just does not implement it
	Recover([]byte) error
}

type RecoverableQueue

type RecoverableQueue interface {
	Queue

	Recoverable
}

type RedisQueue added in v0.0.2

type RedisQueue struct {
	*BaseQueue
	// contains filtered or unexported fields
}

func NewRedisQueue added in v0.0.2

func NewRedisQueue(conn rmq.Connection, name string, options *QueueOptions) (*RedisQueue, error)

func (*RedisQueue) Dequeue added in v0.0.2

func (q *RedisQueue) Dequeue() ([]byte, error)

func (*RedisQueue) Enqueue added in v0.0.2

func (q *RedisQueue) Enqueue(data []byte) error

func (*RedisQueue) MaxSize added in v0.0.2

func (q *RedisQueue) MaxSize() int

func (*RedisQueue) Recover added in v0.0.2

func (q *RedisQueue) Recover(b []byte) error

func (*RedisQueue) Subscribe added in v0.0.2

func (q *RedisQueue) Subscribe(cb Handler)

type RedisQueueFactory added in v0.0.2

type RedisQueueFactory struct {
	// contains filtered or unexported fields
}

func NewRedisQueueFactory added in v0.0.2

func NewRedisQueueFactory(redisClient redis.Cmdable) *RedisQueueFactory

func (*RedisQueueFactory) GetOrCreate added in v0.0.2

func (f *RedisQueueFactory) GetOrCreate(name string, options ...func(*QueueOptions)) (Queue, error)

type SafeQueue

type SafeQueue struct {
	// contains filtered or unexported fields
}

func NewSafeQueue

func NewSafeQueue(queue Queue) (*SafeQueue, error)

func (*SafeQueue) Close added in v0.0.2

func (q *SafeQueue) Close()

func (*SafeQueue) Dequeue

func (q *SafeQueue) Dequeue() ([]byte, error)

func (*SafeQueue) Enqueue

func (q *SafeQueue) Enqueue(data []byte) error

func (*SafeQueue) IsRecoverable

func (q *SafeQueue) IsRecoverable() bool

func (*SafeQueue) Kind

func (q *SafeQueue) Kind() Kind

func (*SafeQueue) MaxSize

func (q *SafeQueue) MaxSize() int

func (*SafeQueue) Name added in v0.0.2

func (q *SafeQueue) Name() string

func (*SafeQueue) Recover

func (q *SafeQueue) Recover(b []byte) error

func (*SafeQueue) Subscribe

func (q *SafeQueue) Subscribe(cb Handler)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL