Documentation
¶
Index ¶
- Constants
- Variables
- type BaseQueue
- type Factory
- type Handler
- type Kind
- type MemoryFactory
- type MemoryQueue
- type Queue
- type QueueOptions
- type Recoverable
- type RecoverableQueue
- type RedisQueue
- type RedisQueueFactory
- type SafeQueue
- func (q *SafeQueue) Close()
- func (q *SafeQueue) Dequeue() ([]byte, error)
- func (q *SafeQueue) Enqueue(data []byte) error
- func (q *SafeQueue) IsRecoverable() bool
- func (q *SafeQueue) Kind() Kind
- func (q *SafeQueue) MaxSize() int
- func (q *SafeQueue) Name() string
- func (q *SafeQueue) Recover(b []byte) error
- func (q *SafeQueue) Subscribe(cb Handler)
Constants ¶
View Source
const (
UnlimitedMaxSize = -1
)
Variables ¶
View Source
var ( DefaultPollInterval = 10 * time.Millisecond DefaultMaxRetries = 10 DefaultOptions = QueueOptions{ MaxSize: UnlimitedMaxSize, PollInterval: DefaultPollInterval, MaxRetries: DefaultMaxRetries, ConsumerCount: 1, } )
View Source
var ( ErrQueueClosed = errors.New("queue is closed") ErrQueueFull = errors.New("queue is full") ErrQueueEmpty = errors.New("queue is empty") ErrQueueRecovered = errors.New("queue recovered") )
Errors
Functions ¶
This section is empty.
Types ¶
type BaseQueue ¶ added in v0.0.2
type BaseQueue struct {
// contains filtered or unexported fields
}
func NewBaseQueue ¶ added in v0.0.2
func NewBaseQueue(name string, options *QueueOptions) *BaseQueue
type Factory ¶
type Factory interface {
// Create a new queue if name does not exist
// If name already exists, return the existing queue
GetOrCreate(name string, options ...func(*QueueOptions)) (Queue, error)
}
type MemoryFactory ¶
type MemoryFactory struct {
// contains filtered or unexported fields
}
func NewMemoryFactory ¶
func NewMemoryFactory() *MemoryFactory
func (*MemoryFactory) GetOrCreate ¶
func (f *MemoryFactory) GetOrCreate(name string, options ...func(*QueueOptions)) (Queue, error)
type MemoryQueue ¶
type MemoryQueue struct {
*BaseQueue
// contains filtered or unexported fields
}
func NewMemoryQueue ¶
func NewMemoryQueue(name string, options *QueueOptions) *MemoryQueue
func (*MemoryQueue) Close ¶
func (q *MemoryQueue) Close()
func (*MemoryQueue) Dequeue ¶
func (q *MemoryQueue) Dequeue() ([]byte, error)
func (*MemoryQueue) Enqueue ¶
func (q *MemoryQueue) Enqueue(data []byte) error
func (*MemoryQueue) Kind ¶
func (q *MemoryQueue) Kind() Kind
func (*MemoryQueue) Name ¶ added in v0.0.2
func (q *MemoryQueue) Name() string
func (*MemoryQueue) Recover ¶
func (q *MemoryQueue) Recover(b []byte) error
func (*MemoryQueue) Subscribe ¶
func (q *MemoryQueue) Subscribe(cb Handler)
type Queue ¶
type Queue interface {
Kind() Kind
Name() string
// Reports max size of queue
// -1 for unlimited
MaxSize() int
// Push data to end of queue
// Failed if queue is full or closed
Enqueue([]byte) error
// Pop data from beginning of queue without message confirmation
// Failed if queue is empty
Dequeue() ([]byte, error)
// Subscribe queue with message confirmation.
// Once handler returns error, it'll automatically put message back to queue using `Recover` mechanism internally.
Subscribe(h Handler)
Close()
}
The interface of queue The implementation of queue should be thread-safe
type QueueOptions ¶ added in v0.0.2
type QueueOptions struct {
MaxSize int
PollInterval time.Duration
MaxRetries int
// Specify how many consumers are consuming the queue using `Subscribe`.
// Be aware that too many consumers can cause order of messages to be changed.
// If you want to ensure the order of messages, please use FIFO queue and set ConsumerCount to 1
ConsumerCount int
}
Configuarable options here, but some implementations of queue may not support all options
type Recoverable ¶ added in v0.0.2
type RecoverableQueue ¶
type RecoverableQueue interface {
Queue
Recoverable
}
type RedisQueue ¶ added in v0.0.2
type RedisQueue struct {
*BaseQueue
// contains filtered or unexported fields
}
func NewRedisQueue ¶ added in v0.0.2
func NewRedisQueue(conn rmq.Connection, name string, options *QueueOptions) (*RedisQueue, error)
func (*RedisQueue) Dequeue ¶ added in v0.0.2
func (q *RedisQueue) Dequeue() ([]byte, error)
func (*RedisQueue) Enqueue ¶ added in v0.0.2
func (q *RedisQueue) Enqueue(data []byte) error
func (*RedisQueue) MaxSize ¶ added in v0.0.2
func (q *RedisQueue) MaxSize() int
func (*RedisQueue) Recover ¶ added in v0.0.2
func (q *RedisQueue) Recover(b []byte) error
func (*RedisQueue) Subscribe ¶ added in v0.0.2
func (q *RedisQueue) Subscribe(cb Handler)
type RedisQueueFactory ¶ added in v0.0.2
type RedisQueueFactory struct {
// contains filtered or unexported fields
}
func NewRedisQueueFactory ¶ added in v0.0.2
func NewRedisQueueFactory(redisClient redis.Cmdable) *RedisQueueFactory
func (*RedisQueueFactory) GetOrCreate ¶ added in v0.0.2
func (f *RedisQueueFactory) GetOrCreate(name string, options ...func(*QueueOptions)) (Queue, error)
type SafeQueue ¶
type SafeQueue struct {
// contains filtered or unexported fields
}
func NewSafeQueue ¶
func (*SafeQueue) IsRecoverable ¶
Click to show internal directories.
Click to hide internal directories.