Documentation
¶
Index ¶
- Constants
- Variables
- func WithConsumerCount(consumerCount int) func(*Config)
- func WithMaxHandleFailures(maxHandleFailures int) func(*Config)
- func WithMaxRetries(maxRetries int) func(*Config)
- func WithMaxSize(maxSize int) func(*Config)
- func WithMessage(message Message) func(*Config)
- func WithMessageIDGenerator(generator message.MessageIDGenerator) func(*Config)
- func WithPollInterval(pollInterval time.Duration) func(*Config)
- type BaseQueue
- func (q *BaseQueue) Close()
- func (q *BaseQueue) Dequeue() (Message, error)
- func (q *BaseQueue) Enqueue(data []byte) error
- func (q *BaseQueue) Kind() Kind
- func (q *BaseQueue) MaxHandleFailures() int
- func (q *BaseQueue) MaxSize() int
- func (q *BaseQueue) Name() string
- func (q *BaseQueue) NewMessage(data []byte) (Message, error)
- func (q *BaseQueue) Pack(data []byte) ([]byte, error)
- func (q *BaseQueue) Subscribe(cb Handler)
- func (q *BaseQueue) Unpack(data []byte) (Message, error)
- func (q *BaseQueue) ValidateQueueClosed() error
- type Config
- type DLQ
- type DLQer
- type Factory
- type Handler
- type JsonMessage
- type Kind
- type MemoryFactory
- type MemoryQueue
- func (q *MemoryQueue) Close()
- func (q *MemoryQueue) Dequeue() (Message, error)
- func (q *MemoryQueue) Enqueue(data []byte) error
- func (q *MemoryQueue) Kind() Kind
- func (q *MemoryQueue) Name() string
- func (q *MemoryQueue) Purge() error
- func (q *MemoryQueue) Recover(msg Message) error
- func (q *MemoryQueue) Subscribe(cb Handler)
- type Message
- type Option
- type Purgeable
- type Queue
- type Recoverable
- type RecoverableQueue
- type RedisQueue
- type RedisQueueFactory
- type SafeQueue
- type SimpleQueue
- func (q *SimpleQueue) Close()
- func (q *SimpleQueue) DLQ() (DLQ, error)
- func (q *SimpleQueue) Dequeue() (Message, error)
- func (q *SimpleQueue) Enqueue(data []byte) error
- func (q *SimpleQueue) IsDLQSupported() bool
- func (q *SimpleQueue) IsPurgeable() bool
- func (q *SimpleQueue) IsRecoverable() bool
- func (q *SimpleQueue) Kind() Kind
- func (q *SimpleQueue) MaxHandleFailures() int
- func (q *SimpleQueue) MaxSize() int
- func (q *SimpleQueue) Name() string
- func (q *SimpleQueue) Purge() error
- func (q *SimpleQueue) Recover(msg Message) error
- func (q *SimpleQueue) Subscribe(cb Handler)
- func (q *SimpleQueue) Unwrap() Queue
Constants ¶
View Source
const (
UnlimitedMaxSize = -1
)
Variables ¶
View Source
var ( DefaultPollInterval = 10 * time.Millisecond DefaultMaxRetries = 10 DefaultOptions = Config{ MaxSize: UnlimitedMaxSize, MaxHandleFailures: 10, PollInterval: DefaultPollInterval, MaxRetries: DefaultMaxRetries, ConsumerCount: 1, Message: &JsonMessage{}, MessageIDGenerator: message.GenerateRandomID, } )
View Source
var ( ErrQueueClosed = errors.New("queue is closed") ErrQueueFull = errors.New("queue is full") ErrQueueEmpty = errors.New("queue is empty") ErrQueueRecovered = errors.New("queue recovered") )
Errors
Functions ¶
func WithConsumerCount ¶ added in v0.1.0
func WithMaxHandleFailures ¶ added in v0.1.0
func WithMaxRetries ¶ added in v0.1.0
func WithMaxSize ¶ added in v0.1.0
func WithMessage ¶ added in v0.1.0
func WithMessageIDGenerator ¶ added in v0.1.0
func WithMessageIDGenerator(generator message.MessageIDGenerator) func(*Config)
func WithPollInterval ¶ added in v0.1.0
Types ¶
type BaseQueue ¶ added in v0.0.2
type BaseQueue struct {
// contains filtered or unexported fields
}
func NewBaseQueue ¶ added in v0.0.2
func (*BaseQueue) MaxHandleFailures ¶ added in v0.1.0
func (*BaseQueue) NewMessage ¶ added in v0.1.0
func (*BaseQueue) ValidateQueueClosed ¶ added in v0.1.0
type Config ¶ added in v0.1.0
type Config struct {
MaxSize int
// Messages will be discarded after this many failures, or
// pushed to DLQ if DLQ is supported
MaxHandleFailures int
PollInterval time.Duration
// Used for internal retrying, not for message retrying
MaxRetries int
// Specify how many consumers are consuming the queue using `Subscribe`.
// Be aware that too many consumers can cause order of messages to be changed.
// If you want to ensure the order of messages, please use FIFO queue and set ConsumerCount to 1
ConsumerCount int
// Specify standard message
// Use json message if nil
Message Message
MessageIDGenerator message.MessageIDGenerator
}
Configuarable options here, but some implementations of queue may not support all options
type JsonMessage ¶ added in v0.1.0
type JsonMessage struct {
message.JsonMessage
}
func NewJsonMessage ¶ added in v0.1.0
func NewJsonMessage() *JsonMessage
func (*JsonMessage) AddRetryCount ¶ added in v0.1.0
func (m *JsonMessage) AddRetryCount()
func (*JsonMessage) CreatedAt ¶ added in v0.1.0
func (m *JsonMessage) CreatedAt() time.Time
func (*JsonMessage) RefreshRetryCount ¶ added in v0.1.0
func (m *JsonMessage) RefreshRetryCount()
func (*JsonMessage) RefreshUpdatedAt ¶ added in v0.1.0
func (m *JsonMessage) RefreshUpdatedAt()
func (*JsonMessage) RetryCount ¶ added in v0.1.0
func (m *JsonMessage) RetryCount() int
func (*JsonMessage) TotalRetryCount ¶ added in v0.1.0
func (m *JsonMessage) TotalRetryCount() int
func (*JsonMessage) UpdatedAt ¶ added in v0.1.0
func (m *JsonMessage) UpdatedAt() time.Time
type MemoryFactory ¶
type MemoryFactory struct {
// contains filtered or unexported fields
}
func NewMemoryFactory ¶
func NewMemoryFactory() *MemoryFactory
func (*MemoryFactory) GetOrCreate ¶
func (f *MemoryFactory) GetOrCreate(name string, options ...Option) (Queue, error)
type MemoryQueue ¶
type MemoryQueue struct {
*BaseQueue
// contains filtered or unexported fields
}
func NewMemoryQueue ¶
func NewMemoryQueue(name string, options *Config) *MemoryQueue
func (*MemoryQueue) Close ¶
func (q *MemoryQueue) Close()
func (*MemoryQueue) Dequeue ¶
func (q *MemoryQueue) Dequeue() (Message, error)
func (*MemoryQueue) Enqueue ¶
func (q *MemoryQueue) Enqueue(data []byte) error
func (*MemoryQueue) Kind ¶
func (q *MemoryQueue) Kind() Kind
func (*MemoryQueue) Name ¶ added in v0.0.2
func (q *MemoryQueue) Name() string
func (*MemoryQueue) Purge ¶ added in v0.1.0
func (q *MemoryQueue) Purge() error
func (*MemoryQueue) Recover ¶
func (q *MemoryQueue) Recover(msg Message) error
func (*MemoryQueue) Subscribe ¶
func (q *MemoryQueue) Subscribe(cb Handler)
type Queue ¶
type Queue interface {
Kind() Kind
Name() string
// Reports max size of queue
// -1 for unlimited
MaxSize() int
// Reports max handle failures
// Messages will be discarded after this many failures, or
// pushed to DLQ if DLQ is supported
MaxHandleFailures() int
// Push data to end of queue
// Failed if queue is full or closed
Enqueue([]byte) error
// The implementation MUST set the retryCount of the message to 0 if its retryCount > MaxHandleFailures,
// in the case, the message is from DLQ redriving.
Dequeue() (Message, error)
// Subscribe queue with message confirmation.
// Once handler returns error, it'll automatically put message back to queue using `Recover` mechanism internally.
Subscribe(h Handler)
Close()
}
The interface of queue The implementation of queue should be thread-safe
type Recoverable ¶ added in v0.0.2
type Recoverable interface {
// If the queue supports `visibility window` like AWS SQS, the message will be put back to queue atomically without calling `Recover`.
// It's useful if the panic is from outside of the queue handler.
// But it's recommended to use `Recover` if the panic is from inside the queue handler for retrying the message fast.
Recover(Message) error
}
type RecoverableQueue ¶
type RecoverableQueue interface {
Queue
Recoverable
}
type RedisQueue ¶ added in v0.0.2
type RedisQueue struct {
*BaseQueue
// contains filtered or unexported fields
}
func NewRedisQueue ¶ added in v0.0.2
func NewRedisQueue(conn rmq.Connection, name string, options *Config) (*RedisQueue, error)
func (*RedisQueue) Dequeue ¶ added in v0.0.2
func (q *RedisQueue) Dequeue() (Message, error)
func (*RedisQueue) Enqueue ¶ added in v0.0.2
func (q *RedisQueue) Enqueue(data []byte) error
func (*RedisQueue) MaxSize ¶ added in v0.0.2
func (q *RedisQueue) MaxSize() int
func (*RedisQueue) Recover ¶ added in v0.0.2
func (q *RedisQueue) Recover(msg Message) error
func (*RedisQueue) Subscribe ¶ added in v0.0.2
func (q *RedisQueue) Subscribe(cb Handler)
type RedisQueueFactory ¶ added in v0.0.2
type RedisQueueFactory struct {
// contains filtered or unexported fields
}
func NewRedisQueueFactory ¶ added in v0.0.2
func NewRedisQueueFactory(redisClient redis.Cmdable) *RedisQueueFactory
func (*RedisQueueFactory) GetOrCreate ¶ added in v0.0.2
func (f *RedisQueueFactory) GetOrCreate(name string, options ...Option) (Queue, error)
type SafeQueue ¶
type SafeQueue interface {
Queue
Recoverable
IsRecoverable() bool
Purgeable
IsPurgeable() bool
DLQer
IsDLQSupported() bool
}
SafeQueue provides ability to put message back to queue when handler encounters panic and makes sure all function calls are safe. e.g, Returns ErrNotImplemented if calling Recover and it is not implemented
type SimpleQueue ¶ added in v0.1.0
type SimpleQueue struct {
// contains filtered or unexported fields
}
func NewSafeQueue ¶
func NewSafeQueue(queue Queue) (*SimpleQueue, error)
func (*SimpleQueue) Close ¶ added in v0.1.0
func (q *SimpleQueue) Close()
func (*SimpleQueue) DLQ ¶ added in v0.1.0
func (q *SimpleQueue) DLQ() (DLQ, error)
func (*SimpleQueue) Dequeue ¶ added in v0.1.0
func (q *SimpleQueue) Dequeue() (Message, error)
func (*SimpleQueue) Enqueue ¶ added in v0.1.0
func (q *SimpleQueue) Enqueue(data []byte) error
func (*SimpleQueue) IsDLQSupported ¶ added in v0.1.0
func (q *SimpleQueue) IsDLQSupported() bool
func (*SimpleQueue) IsPurgeable ¶ added in v0.1.0
func (q *SimpleQueue) IsPurgeable() bool
func (*SimpleQueue) IsRecoverable ¶ added in v0.1.0
func (q *SimpleQueue) IsRecoverable() bool
func (*SimpleQueue) Kind ¶ added in v0.1.0
func (q *SimpleQueue) Kind() Kind
func (*SimpleQueue) MaxHandleFailures ¶ added in v0.1.0
func (q *SimpleQueue) MaxHandleFailures() int
func (*SimpleQueue) MaxSize ¶ added in v0.1.0
func (q *SimpleQueue) MaxSize() int
func (*SimpleQueue) Name ¶ added in v0.1.0
func (q *SimpleQueue) Name() string
func (*SimpleQueue) Purge ¶ added in v0.1.0
func (q *SimpleQueue) Purge() error
func (*SimpleQueue) Recover ¶ added in v0.1.0
func (q *SimpleQueue) Recover(msg Message) error
func (*SimpleQueue) Subscribe ¶ added in v0.1.0
func (q *SimpleQueue) Subscribe(cb Handler)
func (*SimpleQueue) Unwrap ¶ added in v0.1.0
func (q *SimpleQueue) Unwrap() Queue
Click to show internal directories.
Click to hide internal directories.