queue

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UnlimitedMaxSize = -1
)

Variables

View Source
var (
	DefaultPollInterval = 10 * time.Millisecond
	DefaultMaxRetries   = 10
	DefaultOptions      = Config{
		MaxSize: UnlimitedMaxSize,

		MaxHandleFailures: 10,

		PollInterval: DefaultPollInterval,
		MaxRetries:   DefaultMaxRetries,

		ConsumerCount: 1,

		Message: &JsonMessage{},

		MessageIDGenerator: message.GenerateRandomID,
	}
)
View Source
var (
	ErrQueueClosed    = errors.New("queue is closed")
	ErrQueueFull      = errors.New("queue is full")
	ErrQueueEmpty     = errors.New("queue is empty")
	ErrQueueRecovered = errors.New("queue recovered")
)

Errors

Functions

func WithConsumerCount added in v0.1.0

func WithConsumerCount(consumerCount int) func(*Config)

func WithMaxHandleFailures added in v0.1.0

func WithMaxHandleFailures(maxHandleFailures int) func(*Config)

func WithMaxRetries added in v0.1.0

func WithMaxRetries(maxRetries int) func(*Config)

func WithMaxSize added in v0.1.0

func WithMaxSize(maxSize int) func(*Config)

func WithMessage added in v0.1.0

func WithMessage(message Message) func(*Config)

func WithMessageIDGenerator added in v0.1.0

func WithMessageIDGenerator(generator message.MessageIDGenerator) func(*Config)

func WithPollInterval added in v0.1.0

func WithPollInterval(pollInterval time.Duration) func(*Config)

Types

type BaseQueue added in v0.0.2

type BaseQueue struct {
	// contains filtered or unexported fields
}

func NewBaseQueue added in v0.0.2

func NewBaseQueue(name string, options *Config) *BaseQueue

func (*BaseQueue) Close added in v0.0.2

func (q *BaseQueue) Close()

func (*BaseQueue) Dequeue added in v0.0.2

func (q *BaseQueue) Dequeue() (Message, error)

func (*BaseQueue) Enqueue added in v0.0.2

func (q *BaseQueue) Enqueue(data []byte) error

func (*BaseQueue) Kind added in v0.0.2

func (q *BaseQueue) Kind() Kind

func (*BaseQueue) MaxHandleFailures added in v0.1.0

func (q *BaseQueue) MaxHandleFailures() int

func (*BaseQueue) MaxSize added in v0.0.2

func (q *BaseQueue) MaxSize() int

func (*BaseQueue) Name added in v0.0.2

func (q *BaseQueue) Name() string

func (*BaseQueue) NewMessage added in v0.1.0

func (q *BaseQueue) NewMessage(data []byte) (Message, error)

func (*BaseQueue) Pack added in v0.1.0

func (q *BaseQueue) Pack(data []byte) ([]byte, error)

func (*BaseQueue) Subscribe added in v0.0.2

func (q *BaseQueue) Subscribe(cb Handler)

func (*BaseQueue) Unpack added in v0.1.0

func (q *BaseQueue) Unpack(data []byte) (Message, error)

func (*BaseQueue) ValidateQueueClosed added in v0.1.0

func (q *BaseQueue) ValidateQueueClosed() error

type Config added in v0.1.0

type Config struct {
	MaxSize int

	// Messages will be discarded after this many failures, or
	// pushed to DLQ if DLQ is supported
	MaxHandleFailures int

	PollInterval time.Duration

	// Used for internal retrying, not for message retrying
	MaxRetries int

	// Specify how many consumers are consuming the queue using `Subscribe`.
	// Be aware that too many consumers can cause order of messages to be changed.
	// If you want to ensure the order of messages, please use FIFO queue and set ConsumerCount to 1
	ConsumerCount int

	// Specify standard message
	// Use json message if nil
	Message            Message
	MessageIDGenerator message.MessageIDGenerator
}

Configuarable options here, but some implementations of queue may not support all options

type DLQ added in v0.1.0

type DLQ interface {
	Queue

	// Push `items` of messages to associated Queue
	Redrive(items int) error

	AssociatedQueue() Queue
}

type DLQer added in v0.1.0

type DLQer interface {
	DLQ() (DLQ, error)
}

type Factory

type Factory interface {
	// Create a new queue if name does not exist
	// If name already exists, return the existing queue
	GetOrCreate(name string, options ...Option) (Queue, error)
}

type Handler

type Handler func(msg Message) error

type JsonMessage added in v0.1.0

type JsonMessage struct {
	message.JsonMessage
}

func NewJsonMessage added in v0.1.0

func NewJsonMessage() *JsonMessage

func (*JsonMessage) AddRetryCount added in v0.1.0

func (m *JsonMessage) AddRetryCount()

func (*JsonMessage) CreatedAt added in v0.1.0

func (m *JsonMessage) CreatedAt() time.Time

func (*JsonMessage) RefreshRetryCount added in v0.1.0

func (m *JsonMessage) RefreshRetryCount()

func (*JsonMessage) RefreshUpdatedAt added in v0.1.0

func (m *JsonMessage) RefreshUpdatedAt()

func (*JsonMessage) RetryCount added in v0.1.0

func (m *JsonMessage) RetryCount() int

func (*JsonMessage) TotalRetryCount added in v0.1.0

func (m *JsonMessage) TotalRetryCount() int

func (*JsonMessage) UpdatedAt added in v0.1.0

func (m *JsonMessage) UpdatedAt() time.Time

type Kind

type Kind uint8
const (
	KindFIFO Kind = iota + 1
	KindStandard
)

type MemoryFactory

type MemoryFactory struct {
	// contains filtered or unexported fields
}

func NewMemoryFactory

func NewMemoryFactory() *MemoryFactory

func (*MemoryFactory) GetOrCreate

func (f *MemoryFactory) GetOrCreate(name string, options ...Option) (Queue, error)

type MemoryQueue

type MemoryQueue struct {
	*BaseQueue
	// contains filtered or unexported fields
}

func NewMemoryQueue

func NewMemoryQueue(name string, options *Config) *MemoryQueue

func (*MemoryQueue) Close

func (q *MemoryQueue) Close()

func (*MemoryQueue) Dequeue

func (q *MemoryQueue) Dequeue() (Message, error)

func (*MemoryQueue) Enqueue

func (q *MemoryQueue) Enqueue(data []byte) error

func (*MemoryQueue) Kind

func (q *MemoryQueue) Kind() Kind

func (*MemoryQueue) Name added in v0.0.2

func (q *MemoryQueue) Name() string

func (*MemoryQueue) Purge added in v0.1.0

func (q *MemoryQueue) Purge() error

func (*MemoryQueue) Recover

func (q *MemoryQueue) Recover(msg Message) error

func (*MemoryQueue) Subscribe

func (q *MemoryQueue) Subscribe(cb Handler)

type Message added in v0.1.0

type Message interface {
	message.Message

	RetryCount() int
	AddRetryCount()
	TotalRetryCount() int
	RefreshRetryCount()

	CreatedAt() time.Time
	UpdatedAt() time.Time

	RefreshUpdatedAt()
}

type Option added in v0.1.0

type Option func(*Config)

type Purgeable added in v0.1.0

type Purgeable interface {
	// Clean up the queue
	Purge() error
}

type Queue

type Queue interface {
	Kind() Kind

	Name() string

	// Reports max size of queue
	// -1 for unlimited
	MaxSize() int

	// Reports max handle failures
	// Messages will be discarded after this many failures, or
	// pushed to DLQ if DLQ is supported
	MaxHandleFailures() int

	// Push data to end of queue
	// Failed if queue is full or closed
	Enqueue([]byte) error

	// The implementation MUST set the retryCount of the message to 0 if its retryCount > MaxHandleFailures,
	// in the case, the message is from DLQ redriving.
	Dequeue() (Message, error)

	// Subscribe queue with message confirmation.
	// Once handler returns error, it'll automatically put message back to queue using `Recover` mechanism internally.
	Subscribe(h Handler)

	Close()
}

The interface of queue The implementation of queue should be thread-safe

type Recoverable added in v0.0.2

type Recoverable interface {

	// If the queue supports `visibility window` like AWS SQS, the message will be put back to queue atomically without calling `Recover`.
	// It's useful if the panic is from outside of the queue handler.
	// But it's recommended to use `Recover` if the panic is from inside the queue handler for retrying the message fast.
	Recover(Message) error
}

type RecoverableQueue

type RecoverableQueue interface {
	Queue

	Recoverable
}

type RedisQueue added in v0.0.2

type RedisQueue struct {
	*BaseQueue
	// contains filtered or unexported fields
}

func NewRedisQueue added in v0.0.2

func NewRedisQueue(conn rmq.Connection, name string, options *Config) (*RedisQueue, error)

func (*RedisQueue) Dequeue added in v0.0.2

func (q *RedisQueue) Dequeue() (Message, error)

func (*RedisQueue) Enqueue added in v0.0.2

func (q *RedisQueue) Enqueue(data []byte) error

func (*RedisQueue) MaxSize added in v0.0.2

func (q *RedisQueue) MaxSize() int

func (*RedisQueue) Recover added in v0.0.2

func (q *RedisQueue) Recover(msg Message) error

func (*RedisQueue) Subscribe added in v0.0.2

func (q *RedisQueue) Subscribe(cb Handler)

type RedisQueueFactory added in v0.0.2

type RedisQueueFactory struct {
	// contains filtered or unexported fields
}

func NewRedisQueueFactory added in v0.0.2

func NewRedisQueueFactory(redisClient redis.Cmdable) *RedisQueueFactory

func (*RedisQueueFactory) GetOrCreate added in v0.0.2

func (f *RedisQueueFactory) GetOrCreate(name string, options ...Option) (Queue, error)

type SafeQueue

type SafeQueue interface {
	Queue

	Recoverable
	IsRecoverable() bool

	Purgeable
	IsPurgeable() bool

	DLQer
	IsDLQSupported() bool
}

SafeQueue provides ability to put message back to queue when handler encounters panic and makes sure all function calls are safe. e.g, Returns ErrNotImplemented if calling Recover and it is not implemented

type SimpleQueue added in v0.1.0

type SimpleQueue struct {
	// contains filtered or unexported fields
}

func NewSafeQueue

func NewSafeQueue(queue Queue) (*SimpleQueue, error)

func (*SimpleQueue) Close added in v0.1.0

func (q *SimpleQueue) Close()

func (*SimpleQueue) DLQ added in v0.1.0

func (q *SimpleQueue) DLQ() (DLQ, error)

func (*SimpleQueue) Dequeue added in v0.1.0

func (q *SimpleQueue) Dequeue() (Message, error)

func (*SimpleQueue) Enqueue added in v0.1.0

func (q *SimpleQueue) Enqueue(data []byte) error

func (*SimpleQueue) IsDLQSupported added in v0.1.0

func (q *SimpleQueue) IsDLQSupported() bool

func (*SimpleQueue) IsPurgeable added in v0.1.0

func (q *SimpleQueue) IsPurgeable() bool

func (*SimpleQueue) IsRecoverable added in v0.1.0

func (q *SimpleQueue) IsRecoverable() bool

func (*SimpleQueue) Kind added in v0.1.0

func (q *SimpleQueue) Kind() Kind

func (*SimpleQueue) MaxHandleFailures added in v0.1.0

func (q *SimpleQueue) MaxHandleFailures() int

func (*SimpleQueue) MaxSize added in v0.1.0

func (q *SimpleQueue) MaxSize() int

func (*SimpleQueue) Name added in v0.1.0

func (q *SimpleQueue) Name() string

func (*SimpleQueue) Purge added in v0.1.0

func (q *SimpleQueue) Purge() error

func (*SimpleQueue) Recover added in v0.1.0

func (q *SimpleQueue) Recover(msg Message) error

func (*SimpleQueue) Subscribe added in v0.1.0

func (q *SimpleQueue) Subscribe(cb Handler)

func (*SimpleQueue) Unwrap added in v0.1.0

func (q *SimpleQueue) Unwrap() Queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL