queue

package
v0.12.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: MIT Imports: 17 Imported by: 0

README

Queue Package

A generic, thread-safe queue implementation for Go with support for multiple storage backends, automatic retry mechanisms, dead-letter queues (DLQ), and Prometheus metrics.

Features

  • Generic Type Support: Works with any data type using Go generics
  • Multiple Backends: Memory and Redis implementations with unified factory
  • Zero Registration: Built-in types work out of the box, no registration required
  • Automatic Retry: Failed messages are automatically retried with configurable limits
  • Dead-Letter Queue (DLQ): Messages exceeding retry limits are moved to DLQ for manual inspection
  • Priority Retry Queue: Retry messages are processed with higher priority than new messages
  • Blocking Operations: Support for blocking enqueue/dequeue with context cancellation
  • Parallel Callbacks: Optional parallel execution of message handlers
  • Prometheus Metrics: Built-in observability with queue depth, latency, and error metrics
  • Thread-Safe: All operations are thread-safe

Installation

go get github.com/ivanzzeth/go-universal-data-containers

Quick Start

The unified factory allows you to switch between backends using configuration. No registration required for built-in types (Memory, Redis). The factory is type-agnostic - you can create queues of different message types from a single factory:

package main

import (
    "context"
    "fmt"

    "github.com/ivanzzeth/go-universal-data-containers/queue"
)

func main() {
    // Configuration can come from JSON file or environment
    config := queue.UnifiedQueueConfig{
        Type:              queue.QueueTypeMemory, // or queue.QueueTypeRedis
        MaxSize:           1000,
        MaxHandleFailures: 3,
        ConsumerCount:     2,
    }

    // Create factory - type-agnostic, can create queues of any type
    factory, err := queue.NewUnifiedFactory(config)
    if err != nil {
        panic(err)
    }

    // Create queue with specific type using generic function
    q, err := queue.GetOrCreateSafe[[]byte](factory, "my-queue", queue.NewJsonMessage([]byte{}))
    if err != nil {
        panic(err)
    }
    defer q.Close()

    ctx := context.Background()

    // Enqueue message
    err = q.Enqueue(ctx, []byte("hello world"))
    if err != nil {
        panic(err)
    }

    // Dequeue message
    msg, err := q.Dequeue(ctx)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Received: %s\n", msg.Data())
}
Using Custom Message Types

The factory supports any message type through Go generics. You can even create queues of different types from the same factory:

type MyMessage struct {
    ID   string `json:"id"`
    Data string `json:"data"`
}

config := queue.UnifiedQueueConfig{
    Type:    queue.QueueTypeMemory,
    MaxSize: 1000,
}

// Create a single factory
factory, _ := queue.NewUnifiedFactory(config)

// Create queues of different types from the same factory
bytesQueue, _ := queue.GetOrCreateSafe[[]byte](factory, "bytes-queue", queue.NewJsonMessage([]byte{}))
myTypeQueue, _ := queue.GetOrCreateSafe[MyMessage](factory, "mytype-queue", queue.NewJsonMessage(MyMessage{}))

// Enqueue and dequeue with your custom type
myTypeQueue.Enqueue(ctx, MyMessage{ID: "123", Data: "test"})
msg, _ := myTypeQueue.Dequeue(ctx)
fmt.Println(msg.Data().ID) // "123"
Using TypedFactory (Convenience Wrapper)

If you only need queues of a single message type, use TypedFactory for a more concise API:

// Create a typed factory for []byte
factory, _ := queue.NewTypedFactory(config, queue.NewJsonMessage([]byte{}))

// No need to specify type or defaultMsg when creating queues
q, _ := factory.GetOrCreateSafe("my-queue")
q.Enqueue(ctx, []byte("hello"))
Using Redis Backend
// Redis configuration
backendConfig, _ := json.Marshal(queue.RedisQueueConfig{
    Addr:     "localhost:6379",
    Password: "",
    DB:       0,
})

config := queue.UnifiedQueueConfig{
    Type:              queue.QueueTypeRedis,
    MaxSize:           10000,
    MaxHandleFailures: 5,
    BackendConfig:     backendConfig,
}

factory, _ := queue.NewUnifiedFactory(config)
q, _ := queue.GetOrCreateSafe[[]byte](factory, "redis-queue", queue.NewJsonMessage([]byte{}))
Using Existing Redis Client
import "github.com/redis/go-redis/v9"

// Create your own Redis client
rdb := redis.NewClient(&redis.Options{
    Addr: "localhost:6379",
})

// Pass client directly via config - no registration needed!
config := queue.UnifiedQueueConfig{
    Type:        queue.QueueTypeRedis,
    MaxSize:     10000,
    RedisClient: rdb, // Use existing client
}

factory, _ := queue.NewUnifiedFactory(config)
q, _ := queue.GetOrCreateSafe[[]byte](factory, "my-queue", queue.NewJsonMessage([]byte{}))
Using Non-Generic Factories (Multiple Types + Discovery)

For scenarios requiring multiple message types or queue discovery without UnifiedFactory, use the non-generic factories:

import "github.com/redis/go-redis/v9"

// Redis non-generic factory
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
redisFactory, _ := queue.NewRedisFactory(rdb)

// Create queues of different types from the same factory
bytesQueue, _ := queue.RedisGetOrCreateSafe[[]byte](redisFactory, "bytes-queue", queue.NewJsonMessage([]byte{}))
ordersQueue, _ := queue.RedisGetOrCreateSafe[Order](redisFactory, "orders-queue", queue.NewJsonMessage(Order{}))

// Memory non-generic factory
memFactory, _ := queue.NewMemoryQueueFactory()
q1, _ := queue.MemoryGetOrCreateSafe[[]byte](memFactory, "queue1", queue.NewJsonMessage([]byte{}))
q2, _ := queue.MemoryGetOrCreateSafe[MyType](memFactory, "queue2", queue.NewJsonMessage(MyType{}))

// Both factories implement Discoverable interface
queues, _ := redisFactory.DiscoverQueues(ctx, "orders-*")
queues, _ := memFactory.DiscoverQueues(ctx, "")
Factory Types Summary
Factory Type Parameter Discoverable Use Case
MemoryFactory[T] At construction No Single type, simple API
RedisQueueFactory[T] At construction No Single type, simple API
MemoryQueueFactory At queue creation Yes Multiple types, discovery
RedisFactory At queue creation Yes Multiple types, discovery
UnifiedFactory At queue creation Yes Config-driven, backend switching
TypedFactory[T] At construction Yes (via inner) Convenience wrapper
JSON Configuration

Configuration can be loaded from JSON:

{
    "type": "redis",
    "max_size": 10000,
    "max_handle_failures": 5,
    "consumer_count": 4,
    "callback_parallel": true,
    "backend_config": {
        "addr": "localhost:6379",
        "password": "",
        "db": 0
    }
}
var config queue.UnifiedQueueConfig
json.Unmarshal(configJSON, &config)
factory, _ := queue.NewUnifiedFactory(config)
q, _ := queue.GetOrCreateSafe[[]byte](factory, "my-queue", queue.NewJsonMessage([]byte{}))
Custom Queue Backend

For custom queue backend implementations (e.g., Kafka, SQS), use the registry:

func init() {
    queue.RegisterQueueCreator(MyQueueType, myQueueCreator)
    queue.RegisterQueueValidator(MyQueueType, &MyConfigValidator{})
}
Queue Discovery

For queue management services, you can discover existing queues in persistent backends (Redis):

// Discover all queues
queues, err := factory.DiscoverQueues(ctx, "")
for _, q := range queues {
    fmt.Printf("Queue: %s, Depth: %d\n", q.Name, q.Depth)
}

// Discover with pattern matching
ordersQueues, _ := factory.DiscoverQueues(ctx, "orders-*")

// Discover all queues including retry and DLQ
allQueues, _ := factory.DiscoverAllQueues(ctx, "")
for _, q := range allQueues {
    fmt.Printf("Queue: %s, Type: %s, Depth: %d\n", q.Name, q.Type, q.Depth)
}

Pattern matching supports:

  • * - match all
  • prefix* - match queues starting with prefix
  • *suffix - match queues ending with suffix
  • *contains* - match queues containing substring

Subscribe Pattern

For continuous message processing, use the Subscribe pattern:

ctx := context.Background()

q.Subscribe(ctx, func(ctx context.Context, msg queue.Message[[]byte]) error {
    // Process message
    data := msg.Data()

    // Return nil on success
    // Return error to trigger automatic retry
    if err := processMessage(data); err != nil {
        return err // Message will be retried
    }

    return nil
})

// Keep the application running
select {}

Important: When the handler returns an error, the message is automatically placed in the retry queue and will be reprocessed. Messages exceeding MaxHandleFailures are moved to the DLQ.

Dead-Letter Queue (DLQ)

Access and manage failed messages:

// Get DLQ
dlq, err := q.DLQ()
if err != nil {
    // DLQ not supported or error
}

// Manually dequeue from DLQ for inspection
msg, err := dlq.Dequeue(ctx)

// Redrive messages back to the main queue
// This moves `n` messages from DLQ to the retry queue for reprocessing
err = dlq.Redrive(ctx, 10) // Redrive 10 messages

Configuration Options

UnifiedQueueConfig
Field Type Description
Type QueueType Backend type: "memory" or "redis"
MaxSize int Maximum queue size (-1 for unlimited)
MaxHandleFailures int Max retries before moving to DLQ
ConsumerCount int Number of concurrent consumers
CallbackParallel bool Enable parallel callback execution
UnlimitedCapacity int Buffer size when MaxSize is unlimited (Memory only)
RetryQueueCapacity int Retry queue buffer size
BackendConfig json.RawMessage Backend-specific configuration
RedisClient redis.Cmdable Existing Redis client (optional, not serializable)
RedisQueueConfig
Field Type Description
Addr string Redis server address (e.g., "localhost:6379")
Password string Redis password (optional)
DB int Redis database number (default: 0)
Functional Options

You can also pass options when creating queues:

q, _ := factory.GetOrCreateSafe("my-queue",
    queue.WithMaxSize(5000),
    queue.WithMaxHandleFailures(10),
    queue.WithConsumerCount(4),
    queue.WithCallbackParallelExecution(true),
    queue.WithCallbackTimeout(30*time.Second),
    queue.WithRetryQueueCapacity(20000),
)

Blocking Operations

For producer/consumer patterns with backpressure:

// Blocking enqueue - waits until space is available
err := q.BEnqueue(ctx, data)

// Blocking dequeue - waits until message is available
msg, err := q.BDequeue(ctx)

Both operations respect context cancellation:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

msg, err := q.BDequeue(ctx)
if err == context.DeadlineExceeded {
    // Timeout
}

Prometheus Metrics

The package exports the following Prometheus metrics:

Metric Type Description
queue_depth Gauge Current number of messages (labels: queue_name, queue_type)
queue_capacity Gauge Maximum queue capacity (labels: queue_name, queue_type)
queue_inflight Gauge Messages currently being processed
queue_consumers_active Gauge Number of active consumers
queue_enqueue_total Counter Total enqueue operations
queue_enqueue_error_total Counter Failed enqueue operations
queue_dequeue_total Counter Total dequeue operations
queue_dequeue_error_total Counter Failed dequeue operations
queue_enqueue_duration_seconds Histogram Enqueue operation latency
queue_dequeue_duration_seconds Histogram Dequeue operation latency
queue_handle_duration_seconds Histogram Message handler execution time
queue_message_age_seconds Histogram Time from message creation to processing
queue_dlq_messages_total Counter Messages moved to DLQ
queue_redrive_total Counter DLQ redrive operations
queue_redrive_successful_total Counter Successful redrive operations
queue_redrive_error_total Counter Failed redrive operations

Architecture

                    +-------------------+
                    |  UnifiedFactory   |
                    +-------------------+
                            |
              +-------------+-------------+
              |                           |
    +------------------+        +------------------+
    |   MemoryQueue    |        |    RedisQueue    |
    +------------------+        +------------------+
              |                           |
              +-------------+-------------+
                            |
                    +-------------------+
                    |   SimpleQueue     |
                    |   (SafeQueue)     |
                    +-------------------+
                            |
                    +-------------------+
                    |   Metrics Layer   |
                    +-------------------+
Queue Flow
+--------+     +-----------+     +-------------+
| Client | --> | MainQueue | --> | RetryQueue  | (higher priority)
+--------+     +-----------+     +-------------+
                    |                   |
                    +-------------------+
                            |
                    +---------------+
                    |   Consumer    |
                    +---------------+
                            |
                    +---------------+
                    |   Handler     | --> success: done
                    +---------------+ --> failure: RetryQueue
                            |              (up to MaxHandleFailures)
                            v
                    +---------------+
                    |     DLQ       | --> manual inspection
                    +---------------+     or Redrive

Error Handling

Error Description
ErrQueueClosed Queue has been closed
ErrQueueFull Queue is at capacity (non-blocking operations)
ErrQueueEmpty No messages available (non-blocking operations)
ErrInvalidData Invalid message data

Thread Safety

All queue operations are thread-safe. Multiple goroutines can safely:

  • Enqueue messages concurrently
  • Subscribe with multiple consumers
  • Access DLQ operations

Best Practices

  1. Always handle errors: Check return values, especially for DLQ operations
  2. Use Subscribe for consumers: It handles retry logic automatically
  3. Set appropriate MaxHandleFailures: Balance between retry attempts and moving to DLQ
  4. Monitor DLQ: Regularly check and process DLQ messages
  5. Use context cancellation: Enables graceful shutdown
  6. Close queues on shutdown: Call q.Close() to drain in-flight messages

License

MIT License

Documentation

Index

Constants

View Source
const (
	UnlimitedSize = -1
	Namespace     = "container::queue::"
)
View Source
const (
	QueueInfoTypeMain  = "main"
	QueueInfoTypeRetry = "retry"
	QueueInfoTypeDLQ   = "dlq"
)

Queue type constants for QueueInfo.Type

Variables

View Source
var (
	DefaultPollInterval       = 10 * time.Millisecond
	DefaultMaxRetries         = 10
	DefaultUnlimitedCapacity  = 1000000
	DefaultRetryQueueCapacity = 10000

	// DefaultCallbackWaitInterval is the interval for checking if callbacks are registered.
	// Used by both MemoryQueue and RedisQueue in their run() loops.
	DefaultCallbackWaitInterval = 10 * time.Millisecond

	// DefaultBlockingTimeout is the timeout for blocking operations like BLPOP.
	// Used by RedisQueue for BDequeue and run() loops.
	DefaultBlockingTimeout = time.Second

	// DefaultNetworkRetryDelay is the delay before retrying after a network error.
	// Used by RedisQueue when encountering transient errors.
	DefaultNetworkRetryDelay = 100 * time.Millisecond
	DefaultOptions           = Config{
		LockerGenerator: locker.NewMemoryLockerGenerator(),

		MaxSize: UnlimitedSize,

		MaxHandleFailures: 10,

		PollInterval: DefaultPollInterval,
		MaxRetries:   DefaultMaxRetries,

		ConsumerCount: 1,

		CallbackParallelExecution: false,
		CallbackTimeout:           0,

		MessageIDGenerator: message.GenerateRandomID,

		UnlimitedCapacity: DefaultUnlimitedCapacity,

		RetryQueueCapacity: DefaultRetryQueueCapacity,
	}
)
View Source
var (
	ErrQueueClosed    = errors.New("queue is closed")
	ErrQueueFull      = errors.New("queue is full")
	ErrQueueEmpty     = errors.New("queue is empty")
	ErrQueueRecovered = errors.New("queue recovered")
	ErrInvalidData    = errors.New("invalid data")
)

Errors

Functions

func IsQueueTypeRegistered deprecated added in v0.12.0

func IsQueueTypeRegistered(queueType QueueType) bool

Deprecated: IsQueueTypeRegistered is deprecated, use IsQueueTypeSupported instead. Kept for backward compatibility.

func IsQueueTypeSupported added in v0.12.0

func IsQueueTypeSupported(queueType QueueType) bool

IsQueueTypeSupported checks if a queue type is supported (built-in or registered)

func RegisterQueueCreator added in v0.12.0

func RegisterQueueCreator[T any](queueType QueueType, creator QueueCreator[T])

RegisterQueueCreator registers a creator function for a custom queue type. Built-in types (memory, redis) don't need registration.

Note: Due to Go's generics limitations, creators are stored as any and type-asserted at runtime.

func RegisterQueueValidator added in v0.12.0

func RegisterQueueValidator(queueType QueueType, validator QueueConfigValidator)

RegisterQueueValidator registers a config validator for a custom queue type. Built-in types have built-in validation.

func ValidateUnifiedConfig deprecated added in v0.12.0

func ValidateUnifiedConfig(config UnifiedQueueConfig) error

Deprecated: ValidateUnifiedConfig is deprecated, use NewUnifiedFactory which validates internally. Kept for backward compatibility.

func WithCallbackParallelExecution added in v0.9.6

func WithCallbackParallelExecution(enable bool) func(*Config)

func WithCallbackTimeout added in v0.9.6

func WithCallbackTimeout(timeout time.Duration) func(*Config)

func WithConsumerCount added in v0.1.0

func WithConsumerCount(consumerCount int) func(*Config)

func WithMaxHandleFailures added in v0.1.0

func WithMaxHandleFailures(maxHandleFailures int) func(*Config)

func WithMaxRetries added in v0.1.0

func WithMaxRetries(maxRetries int) func(*Config)

func WithMaxSize added in v0.1.0

func WithMaxSize(maxSize int) func(*Config)

func WithMessageIDGenerator added in v0.1.0

func WithMessageIDGenerator(generator message.MessageIDGenerator) func(*Config)

func WithPollInterval added in v0.1.0

func WithPollInterval(pollInterval time.Duration) func(*Config)

func WithRetryQueueCapacity added in v0.12.0

func WithRetryQueueCapacity(capacity int) func(*Config)

func WithUnlimitedCapacity added in v0.12.0

func WithUnlimitedCapacity(capacity int) func(*Config)

Types

type BaseDLQ added in v0.9.1

type BaseDLQ[T any] struct {
	Queue[T]
	// contains filtered or unexported fields
}

func (*BaseDLQ[T]) AssociatedQueue added in v0.9.1

func (q *BaseDLQ[T]) AssociatedQueue() Queue[T]

func (*BaseDLQ[T]) Redrive added in v0.9.1

func (q *BaseDLQ[T]) Redrive(ctx context.Context, items int) error

type BaseQueue added in v0.0.2

type BaseQueue[T any] struct {
	// contains filtered or unexported fields
}

func NewBaseQueue added in v0.0.2

func NewBaseQueue[T any](name string, defaultMsg Message[T], options ...Option) (*BaseQueue[T], error)

func (*BaseQueue[T]) AddInflight added in v0.12.2

func (q *BaseQueue[T]) AddInflight() bool

AddInflight increments the in-flight counter. Returns false if the queue is closing (to prevent WaitGroup race condition). Caller must check the return value and skip processing if false.

func (*BaseQueue[T]) BDequeue added in v0.9.3

func (q *BaseQueue[T]) BDequeue(ctx context.Context) (Message[T], error)

func (*BaseQueue[T]) BEnqueue added in v0.9.3

func (q *BaseQueue[T]) BEnqueue(ctx context.Context, data T) error

func (*BaseQueue[T]) Close added in v0.0.2

func (q *BaseQueue[T]) Close()

func (*BaseQueue[T]) DLQ added in v0.9.1

func (q *BaseQueue[T]) DLQ() (DLQ[T], error)

func (*BaseQueue[T]) Dequeue added in v0.0.2

func (q *BaseQueue[T]) Dequeue(ctx context.Context) (Message[T], error)

func (*BaseQueue[T]) DoneInflight added in v0.12.2

func (q *BaseQueue[T]) DoneInflight()

DoneInflight decrements the in-flight counter

func (*BaseQueue[T]) Enqueue added in v0.0.2

func (q *BaseQueue[T]) Enqueue(ctx context.Context, data T) error

func (*BaseQueue[T]) ExitChannel added in v0.12.0

func (q *BaseQueue[T]) ExitChannel() <-chan struct{}

func (*BaseQueue[T]) GetCallbacks added in v0.12.0

func (q *BaseQueue[T]) GetCallbacks() []Handler[T]

GetCallbacks returns the current callbacks slice. Lock-free read.

func (*BaseQueue[T]) GetConfig added in v0.9.0

func (q *BaseQueue[T]) GetConfig() *Config

func (*BaseQueue[T]) GetDeadletterQueueKey added in v0.9.1

func (q *BaseQueue[T]) GetDeadletterQueueKey() string

func (*BaseQueue[T]) GetDeadletterQueueName added in v0.9.1

func (q *BaseQueue[T]) GetDeadletterQueueName() string

func (*BaseQueue[T]) GetLocker added in v0.3.1

func (q *BaseQueue[T]) GetLocker() locker.SyncLocker

func (*BaseQueue[T]) GetQueueKey added in v0.9.0

func (q *BaseQueue[T]) GetQueueKey() string

func (*BaseQueue[T]) GetRetryQueueKey added in v0.12.0

func (q *BaseQueue[T]) GetRetryQueueKey() string

func (*BaseQueue[T]) GetRetryQueueName added in v0.12.0

func (q *BaseQueue[T]) GetRetryQueueName() string

func (*BaseQueue[T]) GracefulClose added in v0.12.2

func (q *BaseQueue[T]) GracefulClose()

GracefulClose performs a graceful shutdown: 1. Marks as closing to prevent new enqueues 2. Waits for all in-flight messages to be processed 3. Closes the exit channel

func (*BaseQueue[T]) HasCallbacks added in v0.12.0

func (q *BaseQueue[T]) HasCallbacks() bool

HasCallbacks returns true if there are registered callbacks. Lock-free.

func (*BaseQueue[T]) IsClosed added in v0.12.0

func (q *BaseQueue[T]) IsClosed() bool

func (*BaseQueue[T]) IsClosing added in v0.12.2

func (q *BaseQueue[T]) IsClosing() bool

IsClosing returns true if the queue is in the process of closing

func (*BaseQueue[T]) Kind added in v0.0.2

func (q *BaseQueue[T]) Kind() Kind

func (*BaseQueue[T]) MaxHandleFailures added in v0.1.0

func (q *BaseQueue[T]) MaxHandleFailures() int

func (*BaseQueue[T]) MaxSize added in v0.0.2

func (q *BaseQueue[T]) MaxSize() int

func (*BaseQueue[T]) Name added in v0.0.2

func (q *BaseQueue[T]) Name() string

func (*BaseQueue[T]) NewMessage added in v0.1.0

func (q *BaseQueue[T]) NewMessage(data T) (Message[T], error)

func (*BaseQueue[T]) Pack added in v0.1.0

func (q *BaseQueue[T]) Pack(data T) ([]byte, error)

func (*BaseQueue[T]) SetDLQ added in v0.9.1

func (q *BaseQueue[T]) SetDLQ(dlq DLQ[T])

func (*BaseQueue[T]) ShouldSendToDLQ added in v0.12.2

func (q *BaseQueue[T]) ShouldSendToDLQ(ctx context.Context, msg Message[T]) (bool, error)

ShouldSendToDLQ checks if the message should be sent to DLQ based on retry count. If yes, it sends the message to DLQ and records the metric. Returns true if the message was sent to DLQ, false otherwise.

func (*BaseQueue[T]) Subscribe added in v0.0.2

func (q *BaseQueue[T]) Subscribe(ctx context.Context, cb Handler[T])

Subscribe adds a callback handler. Thread-safe using atomic operations.

func (*BaseQueue[T]) TriggerCallbacks added in v0.9.0

func (q *BaseQueue[T]) TriggerCallbacks(ctx context.Context, msg Message[T])

func (*BaseQueue[T]) Unpack added in v0.1.0

func (q *BaseQueue[T]) Unpack(data []byte) (Message[T], error)

func (*BaseQueue[T]) UnpackMessage added in v0.12.2

func (q *BaseQueue[T]) UnpackMessage(packedData []byte) (Message[T], error)

UnpackMessage unpacks data and resets retry count if needed. This eliminates the duplicate unpackMessage logic from MemoryQueue and RedisQueue.

func (*BaseQueue[T]) ValidateQueueClosed added in v0.1.0

func (q *BaseQueue[T]) ValidateQueueClosed() error

type Config added in v0.1.0

type Config struct {
	LockerGenerator locker.SyncLockerGenerator

	MaxSize int

	// Messages will be discarded after this many failures, or
	// pushed to DLQ if DLQ is supported
	MaxHandleFailures int

	// PollInterval is used by RedisQueue for BEnqueue polling when queue is full.
	// MemoryQueue uses channel-based event-driven model and does not rely on polling.
	PollInterval time.Duration

	// Used for internal retrying, not for message retrying
	MaxRetries int

	// Specify how many consumers are consuming the queue using `Subscribe`.
	// Be aware that too many consumers can cause order of messages to be changed.
	// If you want to ensure the order of messages, please use FIFO queue and set ConsumerCount to 1
	ConsumerCount int

	// Enable parallel execution of callbacks. When true, all callbacks for a message
	// will be executed concurrently in separate goroutines, improving throughput for
	// slow handlers. When false (default), callbacks are executed sequentially.
	CallbackParallelExecution bool

	// Timeout for each callback execution. If set to 0 (default), no timeout is applied.
	// When a callback exceeds this timeout, it will be cancelled and an error will be returned.
	CallbackTimeout time.Duration

	MessageIDGenerator message.MessageIDGenerator

	// UnlimitedCapacity is the channel buffer size when MaxSize is UnlimitedSize.
	// Only applies to MemoryQueue. Default is 1000000.
	UnlimitedCapacity int

	// RetryQueueCapacity is the buffer size for retry queue (used by Recover).
	// Messages in retry queue are processed with higher priority than normal queue.
	// Default is 10000.
	RetryQueueCapacity int
}

Configurable options here, but some implementations of queue may not support all options

type DLQ added in v0.1.0

type DLQ[T any] interface {
	Queue[T]

	// Push `items` of messages to associated Queue's retry queue
	// Messages will be processed with higher priority than normal queue
	Redrive(ctx context.Context, items int) error

	AssociatedQueue() Queue[T]
}

type DLQer added in v0.1.0

type DLQer[T any] interface {
	DLQ() (DLQ[T], error)
}

type Discoverable added in v0.12.0

type Discoverable interface {
	// DiscoverQueues returns a list of all queue names in the backend.
	// The pattern parameter supports glob-style matching (e.g., "*", "orders-*").
	// An empty pattern matches all queues.
	// Returns only main queue names (excludes retry and DLQ suffixes).
	DiscoverQueues(ctx context.Context, pattern string) ([]QueueInfo, error)

	// DiscoverAllQueues returns detailed information about all queues including retry and DLQ.
	// This is useful for queue management services that need to monitor all queue types.
	DiscoverAllQueues(ctx context.Context, pattern string) ([]QueueInfo, error)
}

Discoverable is an optional interface for backends that support queue discovery. This is useful for queue management services to discover existing queues in persistent backends like Redis. Backend factories (e.g., MemoryFactory, RedisQueueFactory) should implement this interface.

type Factory

type Factory[T any] interface {
	// Create a new queue if name does not exist
	// If name already exists, return the existing queue
	GetOrCreate(name string, options ...Option) (Queue[T], error)

	// Same as GetOrCreate but returns SafeQueue
	GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)
}

type Handler

type Handler[T any] func(ctx context.Context, msg Message[T]) error

type JsonMessage added in v0.1.0

type JsonMessage[T any] struct {
	message.JsonMessage[T]
}

func NewJsonMessage added in v0.1.0

func NewJsonMessage[T any](data T) *JsonMessage[T]

func (*JsonMessage[T]) AddRetryCount added in v0.1.0

func (m *JsonMessage[T]) AddRetryCount()

func (*JsonMessage[T]) CreatedAt added in v0.1.0

func (m *JsonMessage[T]) CreatedAt() time.Time

func (*JsonMessage[T]) RefreshRetryCount added in v0.1.0

func (m *JsonMessage[T]) RefreshRetryCount()

func (*JsonMessage[T]) RefreshUpdatedAt added in v0.1.0

func (m *JsonMessage[T]) RefreshUpdatedAt()

func (*JsonMessage[T]) RetryCount added in v0.1.0

func (m *JsonMessage[T]) RetryCount() int

func (*JsonMessage[T]) TotalRetryCount added in v0.1.0

func (m *JsonMessage[T]) TotalRetryCount() int

func (*JsonMessage[T]) UpdatedAt added in v0.1.0

func (m *JsonMessage[T]) UpdatedAt() time.Time

type Kind

type Kind uint8
const (
	KindFIFO Kind = iota + 1
	KindStandard
)

type MemoryFactory

type MemoryFactory[T any] struct {
	// contains filtered or unexported fields
}

func NewMemoryFactory

func NewMemoryFactory[T any](defaultMsg Message[T]) *MemoryFactory[T]

func (*MemoryFactory[T]) GetOrCreate

func (f *MemoryFactory[T]) GetOrCreate(name string, options ...Option) (Queue[T], error)

func (*MemoryFactory[T]) GetOrCreateSafe added in v0.2.0

func (f *MemoryFactory[T]) GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)

type MemoryQueue

type MemoryQueue[T any] struct {
	*BaseQueue[T]
	// contains filtered or unexported fields
}

func NewMemoryQueue

func NewMemoryQueue[T any](name string, defaultMsg Message[T], options ...Option) (*MemoryQueue[T], error)

func (*MemoryQueue[T]) BDequeue added in v0.9.3

func (q *MemoryQueue[T]) BDequeue(ctx context.Context) (Message[T], error)

func (*MemoryQueue[T]) BEnqueue added in v0.9.3

func (q *MemoryQueue[T]) BEnqueue(ctx context.Context, data T) error

func (*MemoryQueue[T]) Close

func (q *MemoryQueue[T]) Close()

func (*MemoryQueue[T]) Dequeue

func (q *MemoryQueue[T]) Dequeue(ctx context.Context) (Message[T], error)

func (*MemoryQueue[T]) Enqueue

func (q *MemoryQueue[T]) Enqueue(ctx context.Context, data T) error

func (*MemoryQueue[T]) EnqueueToRetryQueue added in v0.12.0

func (q *MemoryQueue[T]) EnqueueToRetryQueue(ctx context.Context, data T) error

EnqueueToRetryQueue adds a message directly to the retry queue. Used by DLQ.Redrive to ensure recovered messages are processed with priority.

func (*MemoryQueue[T]) Kind

func (q *MemoryQueue[T]) Kind() Kind

func (*MemoryQueue[T]) Name added in v0.0.2

func (q *MemoryQueue[T]) Name() string

func (*MemoryQueue[T]) Purge added in v0.1.0

func (q *MemoryQueue[T]) Purge(ctx context.Context) error

func (*MemoryQueue[T]) Recover

func (q *MemoryQueue[T]) Recover(ctx context.Context, msg Message[T]) error

func (*MemoryQueue[T]) Stats added in v0.12.0

func (q *MemoryQueue[T]) Stats() QueueStats

Stats returns the current queue statistics. Implements the StatsProvider interface.

type MemoryQueueConfig added in v0.12.0

type MemoryQueueConfig struct {
}

MemoryQueueConfig is the backend-specific config for memory queues Currently empty but reserved for future extensions

type MemoryQueueFactory added in v0.12.0

type MemoryQueueFactory struct {
	// contains filtered or unexported fields
}

MemoryQueueFactory is a non-generic factory for creating memory-backed queues. Unlike the generic MemoryFactory[T], this factory doesn't require a type parameter at construction time. The type is specified when creating queues. This allows MemoryQueueFactory to implement the Discoverable interface directly.

func NewMemoryQueueFactory added in v0.12.0

func NewMemoryQueueFactory() (*MemoryQueueFactory, error)

NewMemoryQueueFactory creates a new non-generic memory factory. The factory can create queues of any message type using GetOrCreateSafe.

func (*MemoryQueueFactory) DiscoverAllQueues added in v0.12.0

func (f *MemoryQueueFactory) DiscoverAllQueues(ctx context.Context, pattern string) ([]QueueInfo, error)

DiscoverAllQueues returns cached queue information for memory backends. Memory queues don't persist retry/DLQ separately in cache, so this returns the same as DiscoverQueues.

func (*MemoryQueueFactory) DiscoverQueues added in v0.12.0

func (f *MemoryQueueFactory) DiscoverQueues(ctx context.Context, pattern string) ([]QueueInfo, error)

DiscoverQueues returns cached queue information for memory backends. The pattern parameter supports glob-style matching (e.g., "*", "orders-*"). An empty pattern matches all queues.

type Message added in v0.1.0

type Message[T any] interface {
	message.Message[T]

	RetryCount() int
	AddRetryCount()
	TotalRetryCount() int
	RefreshRetryCount()

	CreatedAt() time.Time
	UpdatedAt() time.Time

	RefreshUpdatedAt()
}

type MsgpackMessage added in v0.12.2

type MsgpackMessage[T any] struct {
	message.MsgpackMessage[T]
}

func NewMsgpackMessage added in v0.12.2

func NewMsgpackMessage[T any](data T) *MsgpackMessage[T]

func (*MsgpackMessage[T]) AddRetryCount added in v0.12.2

func (m *MsgpackMessage[T]) AddRetryCount()

func (*MsgpackMessage[T]) CreatedAt added in v0.12.2

func (m *MsgpackMessage[T]) CreatedAt() time.Time

func (*MsgpackMessage[T]) RefreshRetryCount added in v0.12.2

func (m *MsgpackMessage[T]) RefreshRetryCount()

func (*MsgpackMessage[T]) RefreshUpdatedAt added in v0.12.2

func (m *MsgpackMessage[T]) RefreshUpdatedAt()

func (*MsgpackMessage[T]) RetryCount added in v0.12.2

func (m *MsgpackMessage[T]) RetryCount() int

func (*MsgpackMessage[T]) TotalRetryCount added in v0.12.2

func (m *MsgpackMessage[T]) TotalRetryCount() int

func (*MsgpackMessage[T]) UpdatedAt added in v0.12.2

func (m *MsgpackMessage[T]) UpdatedAt() time.Time

type Option added in v0.1.0

type Option func(*Config)

type Purgeable added in v0.1.0

type Purgeable interface {
	// Clean up the queue
	Purge(context.Context) error
}

type Queue

type Queue[T any] interface {
	Kind() Kind

	Name() string

	// Reports max size of queue
	// -1 for unlimited
	MaxSize() int

	// Reports max handle failures
	// Messages will be discarded after this many failures, or
	// pushed to DLQ if DLQ is supported
	MaxHandleFailures() int

	// Push data to end of queue
	// Failed if queue is full or closed
	Enqueue(ctx context.Context, data T) error

	// Same as Enqueue but block thread until queue is not full and message is enqueued
	BEnqueue(ctx context.Context, data T) error

	// The implementation MUST set the retryCount of the message to 0 if its retryCount > MaxHandleFailures,
	// in the case, the message is from DLQ redriving.
	Dequeue(context.Context) (Message[T], error)

	// Same as Dequeue but block thread until message is available
	BDequeue(context.Context) (Message[T], error)

	// Subscribe queue with message confirmation.
	// Once handler returns error, it'll automatically put message back to queue using `Recover` mechanism internally.
	Subscribe(ctx context.Context, h Handler[T])

	Close()
}

The interface of queue The implementation of queue should be thread-safe

func GetOrCreate added in v0.12.0

func GetOrCreate[T any](f *UnifiedFactory, name string, defaultMsg Message[T], options ...Option) (Queue[T], error)

GetOrCreate creates or returns a cached Queue with the given name and message type. This is a convenience wrapper around GetOrCreateSafe.

func MemoryGetOrCreate added in v0.12.0

func MemoryGetOrCreate[T any](f *MemoryQueueFactory, name string, defaultMsg Message[T], options ...Option) (Queue[T], error)

MemoryGetOrCreate creates or returns a cached Queue with the given name and message type. This is a convenience wrapper around MemoryGetOrCreateSafe.

func RedisGetOrCreate added in v0.12.0

func RedisGetOrCreate[T any](f *RedisFactory, name string, defaultMsg Message[T], options ...Option) (Queue[T], error)

RedisGetOrCreate creates or returns a cached Queue with the given name and message type. This is a convenience wrapper around RedisGetOrCreateSafe.

type QueueConfigValidator added in v0.12.0

type QueueConfigValidator interface {
	Validate(rawConfig json.RawMessage) error
}

QueueConfigValidator validates backend-specific configuration

type QueueCreator added in v0.12.0

type QueueCreator[T any] func(ctx context.Context, name string, defaultMsg Message[T], rawConfig json.RawMessage, options ...Option) (Queue[T], error)

QueueCreator is a function that creates a queue from raw JSON config. Used for registering custom queue backend implementations.

type QueueInfo added in v0.12.0

type QueueInfo struct {
	// Name is the queue name (without namespace prefix)
	Name string
	// Key is the full Redis key (with namespace prefix)
	Key string
	// Type indicates the queue type: "main", "retry", or "dlq"
	Type string
	// Depth is the current number of messages in the queue (optional, may be 0 if not fetched)
	Depth int64
}

QueueInfo contains metadata about a discovered queue. This is useful for queue management services to discover and manage queues.

type QueueStats added in v0.12.0

type QueueStats struct {
	// Depth is the current number of messages in the main queue
	Depth int64
	// RetryDepth is the current number of messages in the retry queue
	RetryDepth int64
	// Inflight is the number of messages currently being processed
	Inflight int64
	// ConsumerCount is the number of registered callbacks/consumers
	ConsumerCount int
	// Capacity is the maximum capacity of the queue (-1 for unlimited)
	Capacity int
	// RetryCapacity is the maximum capacity of the retry queue
	RetryCapacity int
}

QueueStats provides queue statistics for monitoring and observability.

type QueueType added in v0.12.0

type QueueType string

QueueType represents the storage backend type

const (
	QueueTypeMemory QueueType = "memory"
	QueueTypeRedis  QueueType = "redis"
)

func GetRegisteredQueueTypes added in v0.12.0

func GetRegisteredQueueTypes() []QueueType

GetRegisteredQueueTypes returns all available queue types (built-in + registered)

type Recoverable added in v0.0.2

type Recoverable[T any] interface {

	// If the queue supports `visibility window` like AWS SQS, the message will be put back to queue atomically without calling `Recover`.
	// It's useful if the panic is from outside of the queue handler.
	// But it's recommended to use `Recover` if the panic is from inside the queue handler for retrying the message fast.
	Recover(context.Context, Message[T]) error
}

type RecoverableQueue

type RecoverableQueue[T any] interface {
	Queue[T]

	Recoverable[T]
}

type RedisFactory added in v0.12.0

type RedisFactory struct {
	// contains filtered or unexported fields
}

RedisFactory is a non-generic factory for creating Redis-backed queues. Unlike RedisQueueFactory[T], this factory doesn't require a type parameter at construction time. The type is specified when creating queues. This allows RedisFactory to implement the Discoverable interface directly.

func NewRedisFactory added in v0.12.0

func NewRedisFactory(redisClient redis.Cmdable) (*RedisFactory, error)

NewRedisFactory creates a new non-generic Redis factory. The factory can create queues of any message type using GetOrCreateSafe.

func (*RedisFactory) Client added in v0.12.0

func (f *RedisFactory) Client() redis.Cmdable

Client returns the underlying Redis client.

func (*RedisFactory) DiscoverAllQueues added in v0.12.0

func (f *RedisFactory) DiscoverAllQueues(ctx context.Context, pattern string) ([]QueueInfo, error)

DiscoverAllQueues returns detailed information about all queues including retry and DLQ.

func (*RedisFactory) DiscoverQueues added in v0.12.0

func (f *RedisFactory) DiscoverQueues(ctx context.Context, pattern string) ([]QueueInfo, error)

DiscoverQueues scans Redis for all queue keys and returns queue information. The pattern parameter supports glob-style matching (e.g., "*", "orders-*"). An empty pattern matches all queues. By default, returns only main queues (excludes retry and DLQ queues).

type RedisQueue added in v0.0.2

type RedisQueue[T any] struct {
	*BaseQueue[T]
	// contains filtered or unexported fields
}

func NewRedisQueue added in v0.0.2

func NewRedisQueue[T any](redisClient redis.Cmdable, name string, defaultMsg Message[T], options ...Option) (*RedisQueue[T], error)

func (*RedisQueue[T]) BDequeue added in v0.9.3

func (q *RedisQueue[T]) BDequeue(ctx context.Context) (Message[T], error)

func (*RedisQueue[T]) BEnqueue added in v0.9.3

func (q *RedisQueue[T]) BEnqueue(ctx context.Context, data T) error

func (*RedisQueue[T]) Close added in v0.12.0

func (q *RedisQueue[T]) Close()

func (*RedisQueue[T]) Dequeue added in v0.0.2

func (q *RedisQueue[T]) Dequeue(ctx context.Context) (Message[T], error)

func (*RedisQueue[T]) Enqueue added in v0.0.2

func (q *RedisQueue[T]) Enqueue(ctx context.Context, data T) error

func (*RedisQueue[T]) EnqueueToRetryQueue added in v0.12.0

func (q *RedisQueue[T]) EnqueueToRetryQueue(ctx context.Context, data T) error

EnqueueToRetryQueue adds a message directly to the retry queue. Used by DLQ.Redrive to ensure recovered messages are processed with priority.

func (*RedisQueue[T]) Purge added in v0.9.1

func (q *RedisQueue[T]) Purge(ctx context.Context) error

func (*RedisQueue[T]) Recover added in v0.0.2

func (q *RedisQueue[T]) Recover(ctx context.Context, msg Message[T]) error

func (*RedisQueue[T]) Stats added in v0.12.0

func (q *RedisQueue[T]) Stats() QueueStats

Stats returns the current queue statistics. Implements the StatsProvider interface.

type RedisQueueConfig added in v0.12.0

type RedisQueueConfig struct {
	// Addr is the Redis server address (e.g., "localhost:6379")
	Addr string `json:"addr"`
	// Password is the Redis password (optional)
	Password string `json:"password,omitempty"`
	// DB is the Redis database number (optional, default 0)
	DB int `json:"db,omitempty"`
}

RedisQueueConfig is the backend-specific config for Redis queues

type RedisQueueFactory added in v0.0.2

type RedisQueueFactory[T any] struct {
	// contains filtered or unexported fields
}

func NewRedisQueueFactory added in v0.0.2

func NewRedisQueueFactory[T any](redisClient redis.Cmdable, defaultMsg Message[T]) *RedisQueueFactory[T]

func (*RedisQueueFactory[T]) GetOrCreate added in v0.0.2

func (f *RedisQueueFactory[T]) GetOrCreate(name string, options ...Option) (Queue[T], error)

func (*RedisQueueFactory[T]) GetOrCreateSafe added in v0.2.0

func (f *RedisQueueFactory[T]) GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)

type RetryQueueEnqueuer added in v0.12.0

type RetryQueueEnqueuer[T any] interface {
	EnqueueToRetryQueue(ctx context.Context, data T) error
}

RetryQueueEnqueuer is an optional interface for queues that support enqueueing directly to the retry queue for priority processing. Used by DLQ.Redrive to ensure recovered messages are processed first.

type SafeQueue

type SafeQueue[T any] interface {
	Queue[T]

	Recoverable[T]
	IsRecoverable() bool

	Purgeable
	IsPurgeable() bool

	DLQer[T]
	IsDLQSupported() bool
}

SafeQueue provides ability to put message back to queue when handler encounters panic and makes sure all function calls are safe. e.g, Returns ErrNotImplemented if calling Recover and it is not implemented

func GetOrCreateSafe added in v0.12.0

func GetOrCreateSafe[T any](f *UnifiedFactory, name string, defaultMsg Message[T], options ...Option) (SafeQueue[T], error)

GetOrCreateSafe creates or returns a cached SafeQueue with the given name and message type. This is a generic function that allows creating queues of different types from the same factory.

Example:

factory, _ := queue.NewUnifiedFactory(config)
bytesQueue, _ := queue.GetOrCreateSafe[[]byte](factory, "bytes-queue", queue.NewJsonMessage([]byte{}))
myTypeQueue, _ := queue.GetOrCreateSafe[MyType](factory, "mytype-queue", queue.NewJsonMessage(MyType{}))

func MemoryGetOrCreateSafe added in v0.12.0

func MemoryGetOrCreateSafe[T any](f *MemoryQueueFactory, name string, defaultMsg Message[T], options ...Option) (SafeQueue[T], error)

MemoryGetOrCreateSafe creates or returns a cached SafeQueue with the given name and message type. This is a generic function that allows creating queues of different types from the same factory.

Example:

factory, _ := queue.NewMemoryQueueFactory()
bytesQueue, _ := queue.MemoryGetOrCreateSafe[[]byte](factory, "bytes-queue", queue.NewJsonMessage([]byte{}))
myTypeQueue, _ := queue.MemoryGetOrCreateSafe[MyType](factory, "mytype-queue", queue.NewJsonMessage(MyType{}))

func RedisGetOrCreateSafe added in v0.12.0

func RedisGetOrCreateSafe[T any](f *RedisFactory, name string, defaultMsg Message[T], options ...Option) (SafeQueue[T], error)

GetOrCreateSafe creates or returns a cached SafeQueue with the given name and message type. This is a generic function that allows creating queues of different types from the same factory.

Example:

factory, _ := queue.NewRedisFactory(redisClient)
bytesQueue, _ := queue.RedisGetOrCreateSafe[[]byte](factory, "bytes-queue", queue.NewJsonMessage([]byte{}))
myTypeQueue, _ := queue.RedisGetOrCreateSafe[MyType](factory, "mytype-queue", queue.NewJsonMessage(MyType{}))

type SimpleQueue added in v0.1.0

type SimpleQueue[T any] struct {
	// contains filtered or unexported fields
}

func NewSimpleQueue added in v0.2.0

func NewSimpleQueue[T any](queue Queue[T], opts ...SimpleQueueOption) (*SimpleQueue[T], error)

func (*SimpleQueue[T]) BDequeue added in v0.9.3

func (q *SimpleQueue[T]) BDequeue(ctx context.Context) (Message[T], error)

func (*SimpleQueue[T]) BEnqueue added in v0.9.3

func (q *SimpleQueue[T]) BEnqueue(ctx context.Context, data T) error

func (*SimpleQueue[T]) Close added in v0.1.0

func (q *SimpleQueue[T]) Close()

func (*SimpleQueue[T]) DLQ added in v0.1.0

func (q *SimpleQueue[T]) DLQ() (DLQ[T], error)

func (*SimpleQueue[T]) Dequeue added in v0.1.0

func (q *SimpleQueue[T]) Dequeue(ctx context.Context) (Message[T], error)

func (*SimpleQueue[T]) Enqueue added in v0.1.0

func (q *SimpleQueue[T]) Enqueue(ctx context.Context, data T) error

func (*SimpleQueue[T]) IsDLQSupported added in v0.1.0

func (q *SimpleQueue[T]) IsDLQSupported() bool

func (*SimpleQueue[T]) IsPurgeable added in v0.1.0

func (q *SimpleQueue[T]) IsPurgeable() bool

func (*SimpleQueue[T]) IsRecoverable added in v0.1.0

func (q *SimpleQueue[T]) IsRecoverable() bool

func (*SimpleQueue[T]) IsStatsProvider added in v0.12.0

func (q *SimpleQueue[T]) IsStatsProvider() bool

IsStatsProvider returns true if the underlying queue implements StatsProvider.

func (*SimpleQueue[T]) Kind added in v0.1.0

func (q *SimpleQueue[T]) Kind() Kind

func (*SimpleQueue[T]) MaxHandleFailures added in v0.1.0

func (q *SimpleQueue[T]) MaxHandleFailures() int

func (*SimpleQueue[T]) MaxSize added in v0.1.0

func (q *SimpleQueue[T]) MaxSize() int

func (*SimpleQueue[T]) Name added in v0.1.0

func (q *SimpleQueue[T]) Name() string

func (*SimpleQueue[T]) Purge added in v0.1.0

func (q *SimpleQueue[T]) Purge(ctx context.Context) error

func (*SimpleQueue[T]) Recover added in v0.1.0

func (q *SimpleQueue[T]) Recover(ctx context.Context, msg Message[T]) error

func (*SimpleQueue[T]) Stats added in v0.12.0

func (q *SimpleQueue[T]) Stats() QueueStats

Stats returns the current queue statistics if the underlying queue supports it. Returns zero-valued QueueStats if not supported.

func (*SimpleQueue[T]) Subscribe added in v0.1.0

func (q *SimpleQueue[T]) Subscribe(ctx context.Context, cb Handler[T])

func (*SimpleQueue[T]) Unwrap added in v0.1.0

func (q *SimpleQueue[T]) Unwrap() Queue[T]

type SimpleQueueConfig added in v0.9.6

type SimpleQueueConfig struct {
	Logger *zerolog.Logger
}

SimpleQueueConfig holds configuration for SimpleQueue

type SimpleQueueOption added in v0.9.6

type SimpleQueueOption func(*SimpleQueueConfig)

SimpleQueueOption is a function option for configuring SimpleQueue

func WithLogger added in v0.9.6

func WithLogger(logger *zerolog.Logger) SimpleQueueOption

WithLogger sets the logger for SimpleQueue

type StatsProvider added in v0.12.0

type StatsProvider interface {
	// Stats returns the current queue statistics
	Stats() QueueStats
}

StatsProvider is an optional interface for queues that support statistics reporting. Used for monitoring and observability.

type TypedFactory added in v0.12.0

type TypedFactory[T any] struct {
	// contains filtered or unexported fields
}

TypedFactory is a typed wrapper around UnifiedFactory for convenience. Use this when you only need queues of a single message type.

func NewTypedFactory added in v0.12.0

func NewTypedFactory[T any](config UnifiedQueueConfig, defaultMsg Message[T]) (*TypedFactory[T], error)

NewTypedFactory creates a typed factory wrapper. This is a convenience for when you only need queues of one message type.

Example:

factory, _ := queue.NewTypedFactory(config, queue.NewJsonMessage([]byte{}))
q, _ := factory.GetOrCreateSafe("my-queue")

func (*TypedFactory[T]) Config added in v0.12.0

func (f *TypedFactory[T]) Config() UnifiedQueueConfig

Config returns the factory's configuration

func (*TypedFactory[T]) Factory added in v0.12.0

func (f *TypedFactory[T]) Factory() *UnifiedFactory

Factory returns the underlying UnifiedFactory

func (*TypedFactory[T]) GetOrCreate added in v0.12.0

func (f *TypedFactory[T]) GetOrCreate(name string, options ...Option) (Queue[T], error)

GetOrCreate creates or returns a cached queue with the given name

func (*TypedFactory[T]) GetOrCreateSafe added in v0.12.0

func (f *TypedFactory[T]) GetOrCreateSafe(name string, options ...Option) (SafeQueue[T], error)

GetOrCreateSafe creates or returns a cached SafeQueue with the given name

type UnifiedFactory added in v0.12.0

type UnifiedFactory struct {
	// contains filtered or unexported fields
}

UnifiedFactory creates queues based on configuration. It is type-agnostic and can create queues of any message type. Built-in types (memory, redis) are created directly without registration. Custom types use the registry.

func NewUnifiedFactory added in v0.12.0

func NewUnifiedFactory(config UnifiedQueueConfig) (*UnifiedFactory, error)

NewUnifiedFactory creates a new unified queue factory. No registration required for built-in types (memory, redis). The factory can create queues of any message type using GetOrCreateSafe.

func (*UnifiedFactory) CacheSize added in v0.12.1

func (f *UnifiedFactory) CacheSize() int

CacheSize returns the number of queues currently in the cache.

func (*UnifiedFactory) ClearCache added in v0.12.1

func (f *UnifiedFactory) ClearCache()

ClearCache removes all cached queues from the factory. This is primarily intended for testing scenarios where tests need to ensure isolation between test cases. After clearing the cache, subsequent GetOrCreateSafe calls will create new queue instances.

WARNING: This does not close the cached queues. Callers should ensure all queues are properly stopped/closed before clearing the cache.

func (*UnifiedFactory) Config added in v0.12.0

func (f *UnifiedFactory) Config() UnifiedQueueConfig

Config returns a copy of the factory's configuration. The returned config is safe to modify without affecting the factory.

func (*UnifiedFactory) DiscoverAllQueues added in v0.12.0

func (f *UnifiedFactory) DiscoverAllQueues(ctx context.Context, pattern string) ([]QueueInfo, error)

DiscoverAllQueues discovers all queues including retry and DLQ. For Redis backends, it returns all queue keys (main, retry, DLQ). For Memory backends, it returns the same as DiscoverQueues (memory doesn't persist retry/DLQ separately).

func (*UnifiedFactory) DiscoverQueues added in v0.12.0

func (f *UnifiedFactory) DiscoverQueues(ctx context.Context, pattern string) ([]QueueInfo, error)

DiscoverQueues discovers queues from the factory's backend. For Redis backends, it scans Redis keys with the queue namespace prefix. For Memory backends, it returns queues from the factory's cache. The pattern parameter supports glob-style matching (e.g., "*", "orders-*").

func (*UnifiedFactory) RemoveFromCache added in v0.12.1

func (f *UnifiedFactory) RemoveFromCache(name string) bool

RemoveFromCache removes a specific queue from the cache by name. Returns true if the queue was found and removed, false otherwise. This is useful for removing a single queue that has been closed.

WARNING: This does not close the queue. Callers should ensure the queue is properly stopped/closed before removing it from the cache.

type UnifiedQueueConfig added in v0.12.0

type UnifiedQueueConfig struct {
	// Type determines which backend to use: "memory", "redis", etc.
	Type QueueType `json:"type"`

	// Common options that apply to all backends
	MaxSize            int  `json:"max_size,omitempty"`             // -1 for unlimited
	MaxHandleFailures  int  `json:"max_handle_failures,omitempty"`  // Max failures before DLQ
	ConsumerCount      int  `json:"consumer_count,omitempty"`       // Number of concurrent consumers
	CallbackParallel   bool `json:"callback_parallel,omitempty"`    // Enable parallel callback execution
	UnlimitedCapacity  int  `json:"unlimited_capacity,omitempty"`   // Buffer size when unlimited (Memory only)
	RetryQueueCapacity int  `json:"retry_queue_capacity,omitempty"` // Retry queue buffer size

	// BackendConfig contains backend-specific configuration as raw JSON
	// Parsed according to Type:
	// - "memory": MemoryQueueConfig (currently empty, reserved for future)
	// - "redis": RedisQueueConfig
	BackendConfig json.RawMessage `json:"backend_config,omitempty"`

	// RedisClient allows using an existing Redis client instead of creating a new one.
	// When set, BackendConfig is ignored for Redis queues.
	// This field is not serializable to JSON.
	RedisClient redis.Cmdable `json:"-"`
}

UnifiedQueueConfig is the top-level configuration for creating queues

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL