Documentation
¶
Index ¶
- Constants
- Variables
- func IsQueueTypeRegistered(queueType QueueType) booldeprecated
- func IsQueueTypeSupported(queueType QueueType) bool
- func RegisterQueueCreator[T any](queueType QueueType, creator QueueCreator[T])
- func RegisterQueueValidator(queueType QueueType, validator QueueConfigValidator)
- func ValidateUnifiedConfig(config UnifiedQueueConfig) errordeprecated
- func WithCallbackParallelExecution(enable bool) func(*Config)
- func WithCallbackTimeout(timeout time.Duration) func(*Config)
- func WithConsumerCount(consumerCount int) func(*Config)
- func WithMaxHandleFailures(maxHandleFailures int) func(*Config)
- func WithMaxRetries(maxRetries int) func(*Config)
- func WithMaxSize(maxSize int) func(*Config)
- func WithMessageIDGenerator(generator message.MessageIDGenerator) func(*Config)
- func WithPollInterval(pollInterval time.Duration) func(*Config)
- func WithRetryQueueCapacity(capacity int) func(*Config)
- func WithUnlimitedCapacity(capacity int) func(*Config)
- type BaseDLQ
- type BaseQueue
- func (q *BaseQueue[T]) AddInflight() bool
- func (q *BaseQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
- func (q *BaseQueue[T]) BEnqueue(ctx context.Context, data T) error
- func (q *BaseQueue[T]) Close()
- func (q *BaseQueue[T]) DLQ() (DLQ[T], error)
- func (q *BaseQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
- func (q *BaseQueue[T]) DoneInflight()
- func (q *BaseQueue[T]) Enqueue(ctx context.Context, data T) error
- func (q *BaseQueue[T]) ExitChannel() <-chan struct{}
- func (q *BaseQueue[T]) GetCallbacks() []Handler[T]
- func (q *BaseQueue[T]) GetConfig() *Config
- func (q *BaseQueue[T]) GetDeadletterQueueKey() string
- func (q *BaseQueue[T]) GetDeadletterQueueName() string
- func (q *BaseQueue[T]) GetLocker() locker.SyncLocker
- func (q *BaseQueue[T]) GetQueueKey() string
- func (q *BaseQueue[T]) GetRetryQueueKey() string
- func (q *BaseQueue[T]) GetRetryQueueName() string
- func (q *BaseQueue[T]) GracefulClose()
- func (q *BaseQueue[T]) HasCallbacks() bool
- func (q *BaseQueue[T]) IsClosed() bool
- func (q *BaseQueue[T]) IsClosing() bool
- func (q *BaseQueue[T]) Kind() Kind
- func (q *BaseQueue[T]) MaxHandleFailures() int
- func (q *BaseQueue[T]) MaxSize() int
- func (q *BaseQueue[T]) Name() string
- func (q *BaseQueue[T]) NewMessage(data T) (Message[T], error)
- func (q *BaseQueue[T]) Pack(data T) ([]byte, error)
- func (q *BaseQueue[T]) SetDLQ(dlq DLQ[T])
- func (q *BaseQueue[T]) ShouldSendToDLQ(ctx context.Context, msg Message[T]) (bool, error)
- func (q *BaseQueue[T]) Subscribe(ctx context.Context, cb Handler[T])
- func (q *BaseQueue[T]) TriggerCallbacks(ctx context.Context, msg Message[T])
- func (q *BaseQueue[T]) Unpack(data []byte) (Message[T], error)
- func (q *BaseQueue[T]) UnpackMessage(packedData []byte) (Message[T], error)
- func (q *BaseQueue[T]) ValidateQueueClosed() error
- type Config
- type DLQ
- type DLQer
- type Discoverable
- type Factory
- type Handler
- type JsonMessage
- func (m *JsonMessage[T]) AddRetryCount()
- func (m *JsonMessage[T]) CreatedAt() time.Time
- func (m *JsonMessage[T]) RefreshRetryCount()
- func (m *JsonMessage[T]) RefreshUpdatedAt()
- func (m *JsonMessage[T]) RetryCount() int
- func (m *JsonMessage[T]) TotalRetryCount() int
- func (m *JsonMessage[T]) UpdatedAt() time.Time
- type Kind
- type MemoryFactory
- type MemoryQueue
- func (q *MemoryQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
- func (q *MemoryQueue[T]) BEnqueue(ctx context.Context, data T) error
- func (q *MemoryQueue[T]) Close()
- func (q *MemoryQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
- func (q *MemoryQueue[T]) Enqueue(ctx context.Context, data T) error
- func (q *MemoryQueue[T]) EnqueueToRetryQueue(ctx context.Context, data T) error
- func (q *MemoryQueue[T]) Kind() Kind
- func (q *MemoryQueue[T]) Name() string
- func (q *MemoryQueue[T]) Purge(ctx context.Context) error
- func (q *MemoryQueue[T]) Recover(ctx context.Context, msg Message[T]) error
- func (q *MemoryQueue[T]) Stats() QueueStats
- type MemoryQueueConfig
- type MemoryQueueFactory
- type Message
- type MsgpackMessage
- func (m *MsgpackMessage[T]) AddRetryCount()
- func (m *MsgpackMessage[T]) CreatedAt() time.Time
- func (m *MsgpackMessage[T]) RefreshRetryCount()
- func (m *MsgpackMessage[T]) RefreshUpdatedAt()
- func (m *MsgpackMessage[T]) RetryCount() int
- func (m *MsgpackMessage[T]) TotalRetryCount() int
- func (m *MsgpackMessage[T]) UpdatedAt() time.Time
- type Option
- type Purgeable
- type Queue
- func GetOrCreate[T any](f *UnifiedFactory, name string, defaultMsg Message[T], options ...Option) (Queue[T], error)
- func MemoryGetOrCreate[T any](f *MemoryQueueFactory, name string, defaultMsg Message[T], options ...Option) (Queue[T], error)
- func RedisGetOrCreate[T any](f *RedisFactory, name string, defaultMsg Message[T], options ...Option) (Queue[T], error)
- type QueueConfigValidator
- type QueueCreator
- type QueueInfo
- type QueueStats
- type QueueType
- type Recoverable
- type RecoverableQueue
- type RedisFactory
- type RedisQueue
- func (q *RedisQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
- func (q *RedisQueue[T]) BEnqueue(ctx context.Context, data T) error
- func (q *RedisQueue[T]) Close()
- func (q *RedisQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
- func (q *RedisQueue[T]) Enqueue(ctx context.Context, data T) error
- func (q *RedisQueue[T]) EnqueueToRetryQueue(ctx context.Context, data T) error
- func (q *RedisQueue[T]) Purge(ctx context.Context) error
- func (q *RedisQueue[T]) Recover(ctx context.Context, msg Message[T]) error
- func (q *RedisQueue[T]) Stats() QueueStats
- type RedisQueueConfig
- type RedisQueueFactory
- type RetryQueueEnqueuer
- type SafeQueue
- func GetOrCreateSafe[T any](f *UnifiedFactory, name string, defaultMsg Message[T], options ...Option) (SafeQueue[T], error)
- func MemoryGetOrCreateSafe[T any](f *MemoryQueueFactory, name string, defaultMsg Message[T], options ...Option) (SafeQueue[T], error)
- func RedisGetOrCreateSafe[T any](f *RedisFactory, name string, defaultMsg Message[T], options ...Option) (SafeQueue[T], error)
- type SimpleQueue
- func (q *SimpleQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
- func (q *SimpleQueue[T]) BEnqueue(ctx context.Context, data T) error
- func (q *SimpleQueue[T]) Close()
- func (q *SimpleQueue[T]) DLQ() (DLQ[T], error)
- func (q *SimpleQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
- func (q *SimpleQueue[T]) Enqueue(ctx context.Context, data T) error
- func (q *SimpleQueue[T]) IsDLQSupported() bool
- func (q *SimpleQueue[T]) IsPurgeable() bool
- func (q *SimpleQueue[T]) IsRecoverable() bool
- func (q *SimpleQueue[T]) IsStatsProvider() bool
- func (q *SimpleQueue[T]) Kind() Kind
- func (q *SimpleQueue[T]) MaxHandleFailures() int
- func (q *SimpleQueue[T]) MaxSize() int
- func (q *SimpleQueue[T]) Name() string
- func (q *SimpleQueue[T]) Purge(ctx context.Context) error
- func (q *SimpleQueue[T]) Recover(ctx context.Context, msg Message[T]) error
- func (q *SimpleQueue[T]) Stats() QueueStats
- func (q *SimpleQueue[T]) Subscribe(ctx context.Context, cb Handler[T])
- func (q *SimpleQueue[T]) Unwrap() Queue[T]
- type SimpleQueueConfig
- type SimpleQueueOption
- type StatsProvider
- type TypedFactory
- type UnifiedFactory
- func (f *UnifiedFactory) CacheSize() int
- func (f *UnifiedFactory) ClearCache()
- func (f *UnifiedFactory) Config() UnifiedQueueConfig
- func (f *UnifiedFactory) DiscoverAllQueues(ctx context.Context, pattern string) ([]QueueInfo, error)
- func (f *UnifiedFactory) DiscoverQueues(ctx context.Context, pattern string) ([]QueueInfo, error)
- func (f *UnifiedFactory) RemoveFromCache(name string) bool
- type UnifiedQueueConfig
Constants ¶
const ( UnlimitedSize = -1 Namespace = "container::queue::" )
const ( QueueInfoTypeMain = "main" QueueInfoTypeRetry = "retry" QueueInfoTypeDLQ = "dlq" )
Queue type constants for QueueInfo.Type
Variables ¶
var ( DefaultPollInterval = 10 * time.Millisecond DefaultMaxRetries = 10 DefaultUnlimitedCapacity = 1000000 DefaultRetryQueueCapacity = 10000 // DefaultCallbackWaitInterval is the interval for checking if callbacks are registered. // Used by both MemoryQueue and RedisQueue in their run() loops. DefaultCallbackWaitInterval = 10 * time.Millisecond // DefaultBlockingTimeout is the timeout for blocking operations like BLPOP. // Used by RedisQueue for BDequeue and run() loops. DefaultBlockingTimeout = time.Second // DefaultNetworkRetryDelay is the delay before retrying after a network error. // Used by RedisQueue when encountering transient errors. DefaultNetworkRetryDelay = 100 * time.Millisecond DefaultOptions = Config{ LockerGenerator: locker.NewMemoryLockerGenerator(), MaxSize: UnlimitedSize, MaxHandleFailures: 10, PollInterval: DefaultPollInterval, MaxRetries: DefaultMaxRetries, ConsumerCount: 1, CallbackParallelExecution: false, CallbackTimeout: 0, MessageIDGenerator: message.GenerateRandomID, UnlimitedCapacity: DefaultUnlimitedCapacity, RetryQueueCapacity: DefaultRetryQueueCapacity, } )
var ( ErrQueueClosed = errors.New("queue is closed") ErrQueueFull = errors.New("queue is full") ErrQueueEmpty = errors.New("queue is empty") ErrQueueRecovered = errors.New("queue recovered") ErrInvalidData = errors.New("invalid data") )
Errors
Functions ¶
func IsQueueTypeRegistered
deprecated
added in
v0.12.0
func IsQueueTypeSupported ¶ added in v0.12.0
IsQueueTypeSupported checks if a queue type is supported (built-in or registered)
func RegisterQueueCreator ¶ added in v0.12.0
func RegisterQueueCreator[T any](queueType QueueType, creator QueueCreator[T])
RegisterQueueCreator registers a creator function for a custom queue type. Built-in types (memory, redis) don't need registration.
Note: Due to Go's generics limitations, creators are stored as any and type-asserted at runtime.
func RegisterQueueValidator ¶ added in v0.12.0
func RegisterQueueValidator(queueType QueueType, validator QueueConfigValidator)
RegisterQueueValidator registers a config validator for a custom queue type. Built-in types have built-in validation.
func ValidateUnifiedConfig
deprecated
added in
v0.12.0
func ValidateUnifiedConfig(config UnifiedQueueConfig) error
Deprecated: ValidateUnifiedConfig is deprecated, use NewUnifiedFactory which validates internally. Kept for backward compatibility.
func WithCallbackParallelExecution ¶ added in v0.9.6
func WithCallbackTimeout ¶ added in v0.9.6
func WithConsumerCount ¶ added in v0.1.0
func WithMaxHandleFailures ¶ added in v0.1.0
func WithMaxRetries ¶ added in v0.1.0
func WithMaxSize ¶ added in v0.1.0
func WithMessageIDGenerator ¶ added in v0.1.0
func WithMessageIDGenerator(generator message.MessageIDGenerator) func(*Config)
func WithPollInterval ¶ added in v0.1.0
func WithRetryQueueCapacity ¶ added in v0.12.0
func WithUnlimitedCapacity ¶ added in v0.12.0
Types ¶
type BaseDLQ ¶ added in v0.9.1
func (*BaseDLQ[T]) AssociatedQueue ¶ added in v0.9.1
type BaseQueue ¶ added in v0.0.2
type BaseQueue[T any] struct { // contains filtered or unexported fields }
func NewBaseQueue ¶ added in v0.0.2
func (*BaseQueue[T]) AddInflight ¶ added in v0.12.2
AddInflight increments the in-flight counter. Returns false if the queue is closing (to prevent WaitGroup race condition). Caller must check the return value and skip processing if false.
func (*BaseQueue[T]) DoneInflight ¶ added in v0.12.2
func (q *BaseQueue[T]) DoneInflight()
DoneInflight decrements the in-flight counter
func (*BaseQueue[T]) ExitChannel ¶ added in v0.12.0
func (q *BaseQueue[T]) ExitChannel() <-chan struct{}
func (*BaseQueue[T]) GetCallbacks ¶ added in v0.12.0
GetCallbacks returns the current callbacks slice. Lock-free read.
func (*BaseQueue[T]) GetDeadletterQueueKey ¶ added in v0.9.1
func (*BaseQueue[T]) GetDeadletterQueueName ¶ added in v0.9.1
func (*BaseQueue[T]) GetLocker ¶ added in v0.3.1
func (q *BaseQueue[T]) GetLocker() locker.SyncLocker
func (*BaseQueue[T]) GetQueueKey ¶ added in v0.9.0
func (*BaseQueue[T]) GetRetryQueueKey ¶ added in v0.12.0
func (*BaseQueue[T]) GetRetryQueueName ¶ added in v0.12.0
func (*BaseQueue[T]) GracefulClose ¶ added in v0.12.2
func (q *BaseQueue[T]) GracefulClose()
GracefulClose performs a graceful shutdown: 1. Marks as closing to prevent new enqueues 2. Waits for all in-flight messages to be processed 3. Closes the exit channel
func (*BaseQueue[T]) HasCallbacks ¶ added in v0.12.0
HasCallbacks returns true if there are registered callbacks. Lock-free.
func (*BaseQueue[T]) IsClosing ¶ added in v0.12.2
IsClosing returns true if the queue is in the process of closing
func (*BaseQueue[T]) MaxHandleFailures ¶ added in v0.1.0
func (*BaseQueue[T]) NewMessage ¶ added in v0.1.0
func (*BaseQueue[T]) ShouldSendToDLQ ¶ added in v0.12.2
ShouldSendToDLQ checks if the message should be sent to DLQ based on retry count. If yes, it sends the message to DLQ and records the metric. Returns true if the message was sent to DLQ, false otherwise.
func (*BaseQueue[T]) Subscribe ¶ added in v0.0.2
Subscribe adds a callback handler. Thread-safe using atomic operations.
func (*BaseQueue[T]) TriggerCallbacks ¶ added in v0.9.0
func (*BaseQueue[T]) UnpackMessage ¶ added in v0.12.2
UnpackMessage unpacks data and resets retry count if needed. This eliminates the duplicate unpackMessage logic from MemoryQueue and RedisQueue.
func (*BaseQueue[T]) ValidateQueueClosed ¶ added in v0.1.0
type Config ¶ added in v0.1.0
type Config struct {
LockerGenerator locker.SyncLockerGenerator
MaxSize int
// Messages will be discarded after this many failures, or
// pushed to DLQ if DLQ is supported
MaxHandleFailures int
// PollInterval is used by RedisQueue for BEnqueue polling when queue is full.
// MemoryQueue uses channel-based event-driven model and does not rely on polling.
PollInterval time.Duration
// Used for internal retrying, not for message retrying
MaxRetries int
// Specify how many consumers are consuming the queue using `Subscribe`.
// Be aware that too many consumers can cause order of messages to be changed.
// If you want to ensure the order of messages, please use FIFO queue and set ConsumerCount to 1
ConsumerCount int
// Enable parallel execution of callbacks. When true, all callbacks for a message
// will be executed concurrently in separate goroutines, improving throughput for
// slow handlers. When false (default), callbacks are executed sequentially.
CallbackParallelExecution bool
// Timeout for each callback execution. If set to 0 (default), no timeout is applied.
// When a callback exceeds this timeout, it will be cancelled and an error will be returned.
CallbackTimeout time.Duration
MessageIDGenerator message.MessageIDGenerator
// UnlimitedCapacity is the channel buffer size when MaxSize is UnlimitedSize.
// Only applies to MemoryQueue. Default is 1000000.
UnlimitedCapacity int
// RetryQueueCapacity is the buffer size for retry queue (used by Recover).
// Messages in retry queue are processed with higher priority than normal queue.
// Default is 10000.
RetryQueueCapacity int
}
Configurable options here, but some implementations of queue may not support all options
type Discoverable ¶ added in v0.12.0
type Discoverable interface {
// DiscoverQueues returns a list of all queue names in the backend.
// The pattern parameter supports glob-style matching (e.g., "*", "orders-*").
// An empty pattern matches all queues.
// Returns only main queue names (excludes retry and DLQ suffixes).
DiscoverQueues(ctx context.Context, pattern string) ([]QueueInfo, error)
// DiscoverAllQueues returns detailed information about all queues including retry and DLQ.
// This is useful for queue management services that need to monitor all queue types.
DiscoverAllQueues(ctx context.Context, pattern string) ([]QueueInfo, error)
}
Discoverable is an optional interface for backends that support queue discovery. This is useful for queue management services to discover existing queues in persistent backends like Redis. Backend factories (e.g., MemoryFactory, RedisQueueFactory) should implement this interface.
type Factory ¶
type Factory[T any] interface { // Create a new queue if name does not exist // If name already exists, return the existing queue GetOrCreate(name string, options ...Option) (Queue[T], error) // Same as GetOrCreate but returns SafeQueue GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error) }
type JsonMessage ¶ added in v0.1.0
type JsonMessage[T any] struct { message.JsonMessage[T] }
func NewJsonMessage ¶ added in v0.1.0
func NewJsonMessage[T any](data T) *JsonMessage[T]
func (*JsonMessage[T]) AddRetryCount ¶ added in v0.1.0
func (m *JsonMessage[T]) AddRetryCount()
func (*JsonMessage[T]) CreatedAt ¶ added in v0.1.0
func (m *JsonMessage[T]) CreatedAt() time.Time
func (*JsonMessage[T]) RefreshRetryCount ¶ added in v0.1.0
func (m *JsonMessage[T]) RefreshRetryCount()
func (*JsonMessage[T]) RefreshUpdatedAt ¶ added in v0.1.0
func (m *JsonMessage[T]) RefreshUpdatedAt()
func (*JsonMessage[T]) RetryCount ¶ added in v0.1.0
func (m *JsonMessage[T]) RetryCount() int
func (*JsonMessage[T]) TotalRetryCount ¶ added in v0.1.0
func (m *JsonMessage[T]) TotalRetryCount() int
func (*JsonMessage[T]) UpdatedAt ¶ added in v0.1.0
func (m *JsonMessage[T]) UpdatedAt() time.Time
type MemoryFactory ¶
type MemoryFactory[T any] struct { // contains filtered or unexported fields }
func NewMemoryFactory ¶
func NewMemoryFactory[T any](defaultMsg Message[T]) *MemoryFactory[T]
func (*MemoryFactory[T]) GetOrCreate ¶
func (f *MemoryFactory[T]) GetOrCreate(name string, options ...Option) (Queue[T], error)
func (*MemoryFactory[T]) GetOrCreateSafe ¶ added in v0.2.0
func (f *MemoryFactory[T]) GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)
type MemoryQueue ¶
func NewMemoryQueue ¶
func (*MemoryQueue[T]) BDequeue ¶ added in v0.9.3
func (q *MemoryQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
func (*MemoryQueue[T]) BEnqueue ¶ added in v0.9.3
func (q *MemoryQueue[T]) BEnqueue(ctx context.Context, data T) error
func (*MemoryQueue[T]) Close ¶
func (q *MemoryQueue[T]) Close()
func (*MemoryQueue[T]) Dequeue ¶
func (q *MemoryQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
func (*MemoryQueue[T]) Enqueue ¶
func (q *MemoryQueue[T]) Enqueue(ctx context.Context, data T) error
func (*MemoryQueue[T]) EnqueueToRetryQueue ¶ added in v0.12.0
func (q *MemoryQueue[T]) EnqueueToRetryQueue(ctx context.Context, data T) error
EnqueueToRetryQueue adds a message directly to the retry queue. Used by DLQ.Redrive to ensure recovered messages are processed with priority.
func (*MemoryQueue[T]) Kind ¶
func (q *MemoryQueue[T]) Kind() Kind
func (*MemoryQueue[T]) Name ¶ added in v0.0.2
func (q *MemoryQueue[T]) Name() string
func (*MemoryQueue[T]) Purge ¶ added in v0.1.0
func (q *MemoryQueue[T]) Purge(ctx context.Context) error
func (*MemoryQueue[T]) Recover ¶
func (q *MemoryQueue[T]) Recover(ctx context.Context, msg Message[T]) error
func (*MemoryQueue[T]) Stats ¶ added in v0.12.0
func (q *MemoryQueue[T]) Stats() QueueStats
Stats returns the current queue statistics. Implements the StatsProvider interface.
type MemoryQueueConfig ¶ added in v0.12.0
type MemoryQueueConfig struct {
}
MemoryQueueConfig is the backend-specific config for memory queues Currently empty but reserved for future extensions
type MemoryQueueFactory ¶ added in v0.12.0
type MemoryQueueFactory struct {
// contains filtered or unexported fields
}
MemoryQueueFactory is a non-generic factory for creating memory-backed queues. Unlike the generic MemoryFactory[T], this factory doesn't require a type parameter at construction time. The type is specified when creating queues. This allows MemoryQueueFactory to implement the Discoverable interface directly.
func NewMemoryQueueFactory ¶ added in v0.12.0
func NewMemoryQueueFactory() (*MemoryQueueFactory, error)
NewMemoryQueueFactory creates a new non-generic memory factory. The factory can create queues of any message type using GetOrCreateSafe.
func (*MemoryQueueFactory) DiscoverAllQueues ¶ added in v0.12.0
func (f *MemoryQueueFactory) DiscoverAllQueues(ctx context.Context, pattern string) ([]QueueInfo, error)
DiscoverAllQueues returns cached queue information for memory backends. Memory queues don't persist retry/DLQ separately in cache, so this returns the same as DiscoverQueues.
func (*MemoryQueueFactory) DiscoverQueues ¶ added in v0.12.0
func (f *MemoryQueueFactory) DiscoverQueues(ctx context.Context, pattern string) ([]QueueInfo, error)
DiscoverQueues returns cached queue information for memory backends. The pattern parameter supports glob-style matching (e.g., "*", "orders-*"). An empty pattern matches all queues.
type MsgpackMessage ¶ added in v0.12.2
type MsgpackMessage[T any] struct { message.MsgpackMessage[T] }
func NewMsgpackMessage ¶ added in v0.12.2
func NewMsgpackMessage[T any](data T) *MsgpackMessage[T]
func (*MsgpackMessage[T]) AddRetryCount ¶ added in v0.12.2
func (m *MsgpackMessage[T]) AddRetryCount()
func (*MsgpackMessage[T]) CreatedAt ¶ added in v0.12.2
func (m *MsgpackMessage[T]) CreatedAt() time.Time
func (*MsgpackMessage[T]) RefreshRetryCount ¶ added in v0.12.2
func (m *MsgpackMessage[T]) RefreshRetryCount()
func (*MsgpackMessage[T]) RefreshUpdatedAt ¶ added in v0.12.2
func (m *MsgpackMessage[T]) RefreshUpdatedAt()
func (*MsgpackMessage[T]) RetryCount ¶ added in v0.12.2
func (m *MsgpackMessage[T]) RetryCount() int
func (*MsgpackMessage[T]) TotalRetryCount ¶ added in v0.12.2
func (m *MsgpackMessage[T]) TotalRetryCount() int
func (*MsgpackMessage[T]) UpdatedAt ¶ added in v0.12.2
func (m *MsgpackMessage[T]) UpdatedAt() time.Time
type Queue ¶
type Queue[T any] interface { Kind() Kind Name() string // Reports max size of queue // -1 for unlimited MaxSize() int // Reports max handle failures // Messages will be discarded after this many failures, or // pushed to DLQ if DLQ is supported MaxHandleFailures() int // Push data to end of queue // Failed if queue is full or closed Enqueue(ctx context.Context, data T) error // Same as Enqueue but block thread until queue is not full and message is enqueued BEnqueue(ctx context.Context, data T) error // The implementation MUST set the retryCount of the message to 0 if its retryCount > MaxHandleFailures, // in the case, the message is from DLQ redriving. Dequeue(context.Context) (Message[T], error) // Same as Dequeue but block thread until message is available BDequeue(context.Context) (Message[T], error) // Subscribe queue with message confirmation. // Once handler returns error, it'll automatically put message back to queue using `Recover` mechanism internally. Subscribe(ctx context.Context, h Handler[T]) Close() }
The interface of queue The implementation of queue should be thread-safe
func GetOrCreate ¶ added in v0.12.0
func GetOrCreate[T any](f *UnifiedFactory, name string, defaultMsg Message[T], options ...Option) (Queue[T], error)
GetOrCreate creates or returns a cached Queue with the given name and message type. This is a convenience wrapper around GetOrCreateSafe.
func MemoryGetOrCreate ¶ added in v0.12.0
func MemoryGetOrCreate[T any](f *MemoryQueueFactory, name string, defaultMsg Message[T], options ...Option) (Queue[T], error)
MemoryGetOrCreate creates or returns a cached Queue with the given name and message type. This is a convenience wrapper around MemoryGetOrCreateSafe.
func RedisGetOrCreate ¶ added in v0.12.0
func RedisGetOrCreate[T any](f *RedisFactory, name string, defaultMsg Message[T], options ...Option) (Queue[T], error)
RedisGetOrCreate creates or returns a cached Queue with the given name and message type. This is a convenience wrapper around RedisGetOrCreateSafe.
type QueueConfigValidator ¶ added in v0.12.0
type QueueConfigValidator interface {
Validate(rawConfig json.RawMessage) error
}
QueueConfigValidator validates backend-specific configuration
type QueueCreator ¶ added in v0.12.0
type QueueCreator[T any] func(ctx context.Context, name string, defaultMsg Message[T], rawConfig json.RawMessage, options ...Option) (Queue[T], error)
QueueCreator is a function that creates a queue from raw JSON config. Used for registering custom queue backend implementations.
type QueueInfo ¶ added in v0.12.0
type QueueInfo struct {
// Name is the queue name (without namespace prefix)
Name string
// Key is the full Redis key (with namespace prefix)
Key string
// Type indicates the queue type: "main", "retry", or "dlq"
Type string
// Depth is the current number of messages in the queue (optional, may be 0 if not fetched)
Depth int64
}
QueueInfo contains metadata about a discovered queue. This is useful for queue management services to discover and manage queues.
type QueueStats ¶ added in v0.12.0
type QueueStats struct {
// Depth is the current number of messages in the main queue
Depth int64
// RetryDepth is the current number of messages in the retry queue
RetryDepth int64
// Inflight is the number of messages currently being processed
Inflight int64
// ConsumerCount is the number of registered callbacks/consumers
ConsumerCount int
// Capacity is the maximum capacity of the queue (-1 for unlimited)
Capacity int
// RetryCapacity is the maximum capacity of the retry queue
RetryCapacity int
}
QueueStats provides queue statistics for monitoring and observability.
type QueueType ¶ added in v0.12.0
type QueueType string
QueueType represents the storage backend type
func GetRegisteredQueueTypes ¶ added in v0.12.0
func GetRegisteredQueueTypes() []QueueType
GetRegisteredQueueTypes returns all available queue types (built-in + registered)
type Recoverable ¶ added in v0.0.2
type Recoverable[T any] interface { // If the queue supports `visibility window` like AWS SQS, the message will be put back to queue atomically without calling `Recover`. // It's useful if the panic is from outside of the queue handler. // But it's recommended to use `Recover` if the panic is from inside the queue handler for retrying the message fast. Recover(context.Context, Message[T]) error }
type RecoverableQueue ¶
type RecoverableQueue[T any] interface { Queue[T] Recoverable[T] }
type RedisFactory ¶ added in v0.12.0
type RedisFactory struct {
// contains filtered or unexported fields
}
RedisFactory is a non-generic factory for creating Redis-backed queues. Unlike RedisQueueFactory[T], this factory doesn't require a type parameter at construction time. The type is specified when creating queues. This allows RedisFactory to implement the Discoverable interface directly.
func NewRedisFactory ¶ added in v0.12.0
func NewRedisFactory(redisClient redis.Cmdable) (*RedisFactory, error)
NewRedisFactory creates a new non-generic Redis factory. The factory can create queues of any message type using GetOrCreateSafe.
func (*RedisFactory) Client ¶ added in v0.12.0
func (f *RedisFactory) Client() redis.Cmdable
Client returns the underlying Redis client.
func (*RedisFactory) DiscoverAllQueues ¶ added in v0.12.0
DiscoverAllQueues returns detailed information about all queues including retry and DLQ.
func (*RedisFactory) DiscoverQueues ¶ added in v0.12.0
DiscoverQueues scans Redis for all queue keys and returns queue information. The pattern parameter supports glob-style matching (e.g., "*", "orders-*"). An empty pattern matches all queues. By default, returns only main queues (excludes retry and DLQ queues).
type RedisQueue ¶ added in v0.0.2
func NewRedisQueue ¶ added in v0.0.2
func (*RedisQueue[T]) BDequeue ¶ added in v0.9.3
func (q *RedisQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
func (*RedisQueue[T]) BEnqueue ¶ added in v0.9.3
func (q *RedisQueue[T]) BEnqueue(ctx context.Context, data T) error
func (*RedisQueue[T]) Close ¶ added in v0.12.0
func (q *RedisQueue[T]) Close()
func (*RedisQueue[T]) Dequeue ¶ added in v0.0.2
func (q *RedisQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
func (*RedisQueue[T]) Enqueue ¶ added in v0.0.2
func (q *RedisQueue[T]) Enqueue(ctx context.Context, data T) error
func (*RedisQueue[T]) EnqueueToRetryQueue ¶ added in v0.12.0
func (q *RedisQueue[T]) EnqueueToRetryQueue(ctx context.Context, data T) error
EnqueueToRetryQueue adds a message directly to the retry queue. Used by DLQ.Redrive to ensure recovered messages are processed with priority.
func (*RedisQueue[T]) Purge ¶ added in v0.9.1
func (q *RedisQueue[T]) Purge(ctx context.Context) error
func (*RedisQueue[T]) Recover ¶ added in v0.0.2
func (q *RedisQueue[T]) Recover(ctx context.Context, msg Message[T]) error
func (*RedisQueue[T]) Stats ¶ added in v0.12.0
func (q *RedisQueue[T]) Stats() QueueStats
Stats returns the current queue statistics. Implements the StatsProvider interface.
type RedisQueueConfig ¶ added in v0.12.0
type RedisQueueConfig struct {
// Addr is the Redis server address (e.g., "localhost:6379")
Addr string `json:"addr"`
// Password is the Redis password (optional)
Password string `json:"password,omitempty"`
// DB is the Redis database number (optional, default 0)
DB int `json:"db,omitempty"`
}
RedisQueueConfig is the backend-specific config for Redis queues
type RedisQueueFactory ¶ added in v0.0.2
type RedisQueueFactory[T any] struct { // contains filtered or unexported fields }
func NewRedisQueueFactory ¶ added in v0.0.2
func NewRedisQueueFactory[T any](redisClient redis.Cmdable, defaultMsg Message[T]) *RedisQueueFactory[T]
func (*RedisQueueFactory[T]) GetOrCreate ¶ added in v0.0.2
func (f *RedisQueueFactory[T]) GetOrCreate(name string, options ...Option) (Queue[T], error)
func (*RedisQueueFactory[T]) GetOrCreateSafe ¶ added in v0.2.0
func (f *RedisQueueFactory[T]) GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)
type RetryQueueEnqueuer ¶ added in v0.12.0
RetryQueueEnqueuer is an optional interface for queues that support enqueueing directly to the retry queue for priority processing. Used by DLQ.Redrive to ensure recovered messages are processed first.
type SafeQueue ¶
type SafeQueue[T any] interface { Queue[T] Recoverable[T] IsRecoverable() bool Purgeable IsPurgeable() bool DLQer[T] IsDLQSupported() bool }
SafeQueue provides ability to put message back to queue when handler encounters panic and makes sure all function calls are safe. e.g, Returns ErrNotImplemented if calling Recover and it is not implemented
func GetOrCreateSafe ¶ added in v0.12.0
func GetOrCreateSafe[T any](f *UnifiedFactory, name string, defaultMsg Message[T], options ...Option) (SafeQueue[T], error)
GetOrCreateSafe creates or returns a cached SafeQueue with the given name and message type. This is a generic function that allows creating queues of different types from the same factory.
Example:
factory, _ := queue.NewUnifiedFactory(config)
bytesQueue, _ := queue.GetOrCreateSafe[[]byte](factory, "bytes-queue", queue.NewJsonMessage([]byte{}))
myTypeQueue, _ := queue.GetOrCreateSafe[MyType](factory, "mytype-queue", queue.NewJsonMessage(MyType{}))
func MemoryGetOrCreateSafe ¶ added in v0.12.0
func MemoryGetOrCreateSafe[T any](f *MemoryQueueFactory, name string, defaultMsg Message[T], options ...Option) (SafeQueue[T], error)
MemoryGetOrCreateSafe creates or returns a cached SafeQueue with the given name and message type. This is a generic function that allows creating queues of different types from the same factory.
Example:
factory, _ := queue.NewMemoryQueueFactory()
bytesQueue, _ := queue.MemoryGetOrCreateSafe[[]byte](factory, "bytes-queue", queue.NewJsonMessage([]byte{}))
myTypeQueue, _ := queue.MemoryGetOrCreateSafe[MyType](factory, "mytype-queue", queue.NewJsonMessage(MyType{}))
func RedisGetOrCreateSafe ¶ added in v0.12.0
func RedisGetOrCreateSafe[T any](f *RedisFactory, name string, defaultMsg Message[T], options ...Option) (SafeQueue[T], error)
GetOrCreateSafe creates or returns a cached SafeQueue with the given name and message type. This is a generic function that allows creating queues of different types from the same factory.
Example:
factory, _ := queue.NewRedisFactory(redisClient)
bytesQueue, _ := queue.RedisGetOrCreateSafe[[]byte](factory, "bytes-queue", queue.NewJsonMessage([]byte{}))
myTypeQueue, _ := queue.RedisGetOrCreateSafe[MyType](factory, "mytype-queue", queue.NewJsonMessage(MyType{}))
type SimpleQueue ¶ added in v0.1.0
type SimpleQueue[T any] struct { // contains filtered or unexported fields }
func NewSimpleQueue ¶ added in v0.2.0
func NewSimpleQueue[T any](queue Queue[T], opts ...SimpleQueueOption) (*SimpleQueue[T], error)
func (*SimpleQueue[T]) BDequeue ¶ added in v0.9.3
func (q *SimpleQueue[T]) BDequeue(ctx context.Context) (Message[T], error)
func (*SimpleQueue[T]) BEnqueue ¶ added in v0.9.3
func (q *SimpleQueue[T]) BEnqueue(ctx context.Context, data T) error
func (*SimpleQueue[T]) Close ¶ added in v0.1.0
func (q *SimpleQueue[T]) Close()
func (*SimpleQueue[T]) DLQ ¶ added in v0.1.0
func (q *SimpleQueue[T]) DLQ() (DLQ[T], error)
func (*SimpleQueue[T]) Dequeue ¶ added in v0.1.0
func (q *SimpleQueue[T]) Dequeue(ctx context.Context) (Message[T], error)
func (*SimpleQueue[T]) Enqueue ¶ added in v0.1.0
func (q *SimpleQueue[T]) Enqueue(ctx context.Context, data T) error
func (*SimpleQueue[T]) IsDLQSupported ¶ added in v0.1.0
func (q *SimpleQueue[T]) IsDLQSupported() bool
func (*SimpleQueue[T]) IsPurgeable ¶ added in v0.1.0
func (q *SimpleQueue[T]) IsPurgeable() bool
func (*SimpleQueue[T]) IsRecoverable ¶ added in v0.1.0
func (q *SimpleQueue[T]) IsRecoverable() bool
func (*SimpleQueue[T]) IsStatsProvider ¶ added in v0.12.0
func (q *SimpleQueue[T]) IsStatsProvider() bool
IsStatsProvider returns true if the underlying queue implements StatsProvider.
func (*SimpleQueue[T]) Kind ¶ added in v0.1.0
func (q *SimpleQueue[T]) Kind() Kind
func (*SimpleQueue[T]) MaxHandleFailures ¶ added in v0.1.0
func (q *SimpleQueue[T]) MaxHandleFailures() int
func (*SimpleQueue[T]) MaxSize ¶ added in v0.1.0
func (q *SimpleQueue[T]) MaxSize() int
func (*SimpleQueue[T]) Name ¶ added in v0.1.0
func (q *SimpleQueue[T]) Name() string
func (*SimpleQueue[T]) Purge ¶ added in v0.1.0
func (q *SimpleQueue[T]) Purge(ctx context.Context) error
func (*SimpleQueue[T]) Recover ¶ added in v0.1.0
func (q *SimpleQueue[T]) Recover(ctx context.Context, msg Message[T]) error
func (*SimpleQueue[T]) Stats ¶ added in v0.12.0
func (q *SimpleQueue[T]) Stats() QueueStats
Stats returns the current queue statistics if the underlying queue supports it. Returns zero-valued QueueStats if not supported.
func (*SimpleQueue[T]) Subscribe ¶ added in v0.1.0
func (q *SimpleQueue[T]) Subscribe(ctx context.Context, cb Handler[T])
func (*SimpleQueue[T]) Unwrap ¶ added in v0.1.0
func (q *SimpleQueue[T]) Unwrap() Queue[T]
type SimpleQueueConfig ¶ added in v0.9.6
SimpleQueueConfig holds configuration for SimpleQueue
type SimpleQueueOption ¶ added in v0.9.6
type SimpleQueueOption func(*SimpleQueueConfig)
SimpleQueueOption is a function option for configuring SimpleQueue
func WithLogger ¶ added in v0.9.6
func WithLogger(logger *zerolog.Logger) SimpleQueueOption
WithLogger sets the logger for SimpleQueue
type StatsProvider ¶ added in v0.12.0
type StatsProvider interface {
// Stats returns the current queue statistics
Stats() QueueStats
}
StatsProvider is an optional interface for queues that support statistics reporting. Used for monitoring and observability.
type TypedFactory ¶ added in v0.12.0
type TypedFactory[T any] struct { // contains filtered or unexported fields }
TypedFactory is a typed wrapper around UnifiedFactory for convenience. Use this when you only need queues of a single message type.
func NewTypedFactory ¶ added in v0.12.0
func NewTypedFactory[T any](config UnifiedQueueConfig, defaultMsg Message[T]) (*TypedFactory[T], error)
NewTypedFactory creates a typed factory wrapper. This is a convenience for when you only need queues of one message type.
Example:
factory, _ := queue.NewTypedFactory(config, queue.NewJsonMessage([]byte{}))
q, _ := factory.GetOrCreateSafe("my-queue")
func (*TypedFactory[T]) Config ¶ added in v0.12.0
func (f *TypedFactory[T]) Config() UnifiedQueueConfig
Config returns the factory's configuration
func (*TypedFactory[T]) Factory ¶ added in v0.12.0
func (f *TypedFactory[T]) Factory() *UnifiedFactory
Factory returns the underlying UnifiedFactory
func (*TypedFactory[T]) GetOrCreate ¶ added in v0.12.0
func (f *TypedFactory[T]) GetOrCreate(name string, options ...Option) (Queue[T], error)
GetOrCreate creates or returns a cached queue with the given name
func (*TypedFactory[T]) GetOrCreateSafe ¶ added in v0.12.0
func (f *TypedFactory[T]) GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)
GetOrCreateSafe creates or returns a cached SafeQueue with the given name
type UnifiedFactory ¶ added in v0.12.0
type UnifiedFactory struct {
// contains filtered or unexported fields
}
UnifiedFactory creates queues based on configuration. It is type-agnostic and can create queues of any message type. Built-in types (memory, redis) are created directly without registration. Custom types use the registry.
func NewUnifiedFactory ¶ added in v0.12.0
func NewUnifiedFactory(config UnifiedQueueConfig) (*UnifiedFactory, error)
NewUnifiedFactory creates a new unified queue factory. No registration required for built-in types (memory, redis). The factory can create queues of any message type using GetOrCreateSafe.
func (*UnifiedFactory) CacheSize ¶ added in v0.12.1
func (f *UnifiedFactory) CacheSize() int
CacheSize returns the number of queues currently in the cache.
func (*UnifiedFactory) ClearCache ¶ added in v0.12.1
func (f *UnifiedFactory) ClearCache()
ClearCache removes all cached queues from the factory. This is primarily intended for testing scenarios where tests need to ensure isolation between test cases. After clearing the cache, subsequent GetOrCreateSafe calls will create new queue instances.
WARNING: This does not close the cached queues. Callers should ensure all queues are properly stopped/closed before clearing the cache.
func (*UnifiedFactory) Config ¶ added in v0.12.0
func (f *UnifiedFactory) Config() UnifiedQueueConfig
Config returns a copy of the factory's configuration. The returned config is safe to modify without affecting the factory.
func (*UnifiedFactory) DiscoverAllQueues ¶ added in v0.12.0
func (f *UnifiedFactory) DiscoverAllQueues(ctx context.Context, pattern string) ([]QueueInfo, error)
DiscoverAllQueues discovers all queues including retry and DLQ. For Redis backends, it returns all queue keys (main, retry, DLQ). For Memory backends, it returns the same as DiscoverQueues (memory doesn't persist retry/DLQ separately).
func (*UnifiedFactory) DiscoverQueues ¶ added in v0.12.0
DiscoverQueues discovers queues from the factory's backend. For Redis backends, it scans Redis keys with the queue namespace prefix. For Memory backends, it returns queues from the factory's cache. The pattern parameter supports glob-style matching (e.g., "*", "orders-*").
func (*UnifiedFactory) RemoveFromCache ¶ added in v0.12.1
func (f *UnifiedFactory) RemoveFromCache(name string) bool
RemoveFromCache removes a specific queue from the cache by name. Returns true if the queue was found and removed, false otherwise. This is useful for removing a single queue that has been closed.
WARNING: This does not close the queue. Callers should ensure the queue is properly stopped/closed before removing it from the cache.
type UnifiedQueueConfig ¶ added in v0.12.0
type UnifiedQueueConfig struct {
// Type determines which backend to use: "memory", "redis", etc.
Type QueueType `json:"type"`
// Common options that apply to all backends
MaxSize int `json:"max_size,omitempty"` // -1 for unlimited
MaxHandleFailures int `json:"max_handle_failures,omitempty"` // Max failures before DLQ
ConsumerCount int `json:"consumer_count,omitempty"` // Number of concurrent consumers
CallbackParallel bool `json:"callback_parallel,omitempty"` // Enable parallel callback execution
UnlimitedCapacity int `json:"unlimited_capacity,omitempty"` // Buffer size when unlimited (Memory only)
RetryQueueCapacity int `json:"retry_queue_capacity,omitempty"` // Retry queue buffer size
// BackendConfig contains backend-specific configuration as raw JSON
// Parsed according to Type:
// - "memory": MemoryQueueConfig (currently empty, reserved for future)
// - "redis": RedisQueueConfig
BackendConfig json.RawMessage `json:"backend_config,omitempty"`
// RedisClient allows using an existing Redis client instead of creating a new one.
// When set, BackendConfig is ignored for Redis queues.
// This field is not serializable to JSON.
RedisClient redis.Cmdable `json:"-"`
}
UnifiedQueueConfig is the top-level configuration for creating queues