Documentation
¶
Overview ¶
Package ledgerq provides a persistent, disk-backed message queue.
LedgerQ is designed for local message persistence with FIFO guarantees, automatic segment rotation, and efficient replay capabilities.
Example usage:
q, err := ledgerq.Open("/path/to/queue", nil)
if err != nil {
log.Fatal(err)
}
defer q.Close()
// Enqueue a message
offset, err := q.Enqueue([]byte("Hello, World!"))
if err != nil {
log.Fatal(err)
}
// Dequeue a message
msg, err := q.Dequeue()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Message: %s\n", msg.Payload)
Index ¶
- Constants
- func CalculateBackoff(retryCount int, baseDelay, maxBackoff time.Duration) time.Duration
- func NewMetricsCollector(queueName string) *metrics.Collector
- type BatchEnqueueOptions
- type CompactionResult
- type CompressionType
- type EnqueueOptions
- type LogField
- type Logger
- type Message
- type MetricsCollector
- type MetricsSnapshot
- type Options
- type Priority
- type Queue
- func (q *Queue) Ack(msgID uint64) error
- func (q *Queue) Close() error
- func (q *Queue) Compact() (*CompactionResult, error)
- func (q *Queue) Dequeue() (*Message, error)
- func (q *Queue) DequeueBatch(maxMessages int) ([]*Message, error)
- func (q *Queue) Enqueue(payload []byte) (uint64, error)
- func (q *Queue) EnqueueBatch(payloads [][]byte) ([]uint64, error)
- func (q *Queue) EnqueueBatchWithOptions(messages []BatchEnqueueOptions) ([]uint64, error)
- func (q *Queue) EnqueueWithAllOptions(payload []byte, opts EnqueueOptions) (uint64, error)
- func (q *Queue) EnqueueWithCompression(payload []byte, compression CompressionType) (uint64, error)
- func (q *Queue) EnqueueWithDedup(payload []byte, dedupID string, window time.Duration) (uint64, bool, error)
- func (q *Queue) EnqueueWithHeaders(payload []byte, headers map[string]string) (uint64, error)
- func (q *Queue) EnqueueWithOptions(payload []byte, ttl time.Duration, headers map[string]string) (uint64, error)
- func (q *Queue) EnqueueWithPriority(payload []byte, priority Priority) (uint64, error)
- func (q *Queue) EnqueueWithTTL(payload []byte, ttl time.Duration) (uint64, error)
- func (q *Queue) GetDLQ() *Queue
- func (q *Queue) GetRetryInfo(msgID uint64) *RetryInfo
- func (q *Queue) Nack(msgID uint64, reason string) error
- func (q *Queue) RequeueFromDLQ(dlqMsgID uint64) error
- func (q *Queue) SeekToMessageID(msgID uint64) error
- func (q *Queue) SeekToTimestamp(timestamp int64) error
- func (q *Queue) Stats() *Stats
- func (q *Queue) Stream(ctx context.Context, handler StreamHandler) error
- func (q *Queue) Sync() error
- type RetentionPolicy
- type RetryInfo
- type RotationPolicy
- type Stats
- type StreamHandler
Constants ¶
const Version = "1.4.1"
Version is the current version of LedgerQ. This is the single source of truth for the application version.
Variables ¶
This section is empty.
Functions ¶
func CalculateBackoff ¶
CalculateBackoff calculates exponential backoff duration based on retry count (v1.2.0+). This is a helper function for implementing retry logic with the DLQ system. The backoff duration increases exponentially: baseDelay * 2^retryCount, capped at maxBackoff.
Parameters:
- retryCount: Number of previous retry attempts (typically from RetryInfo.RetryCount)
- baseDelay: Base delay for the first retry (e.g., 1 second)
- maxBackoff: Maximum backoff duration to prevent excessively long waits
Example usage:
msg, _ := q.Dequeue()
if info := q.GetRetryInfo(msg.ID); info != nil && info.RetryCount > 0 {
backoff := CalculateBackoff(info.RetryCount, time.Second, 5*time.Minute)
time.Sleep(backoff)
}
Returns the calculated backoff duration, always between baseDelay and maxBackoff.
func NewMetricsCollector ¶
NewMetricsCollector creates a new metrics collector for a queue. The queue name is used to identify metrics from this specific queue.
Types ¶
type BatchEnqueueOptions ¶
type BatchEnqueueOptions struct {
// Payload is the message data
Payload []byte
// Priority is the message priority level
// Default: PriorityLow (0)
Priority Priority
// TTL is the time-to-live duration for the message
// Set to 0 for no expiration
// Default: 0 (no expiration)
TTL time.Duration
// Headers contains key-value metadata for the message
// Default: nil
Headers map[string]string
// Compression is the compression type for the message (v1.3.0+)
// Set to CompressionNone to use queue's DefaultCompression
// Default: CompressionNone (uses queue default)
Compression CompressionType
}
BatchEnqueueOptions contains options for enqueueing a single message in a batch operation (v1.1.0+, v1.3.0+).
type CompactionResult ¶
type CompactionResult struct {
// SegmentsRemoved is the number of segments removed
SegmentsRemoved int
// BytesFreed is the total bytes freed
BytesFreed int64
}
CompactionResult contains the result of a compaction operation.
type CompressionType ¶
type CompressionType uint8
CompressionType represents the compression algorithm used for message payloads.
const ( // CompressionNone indicates no compression (default). CompressionNone CompressionType = 0 // CompressionGzip indicates GZIP compression using stdlib compress/gzip. CompressionGzip CompressionType = 1 )
type EnqueueOptions ¶
type EnqueueOptions struct {
// Priority is the message priority level (v1.1.0+)
// Default: PriorityLow
Priority Priority
// TTL is the time-to-live duration for the message
// Set to 0 for no expiration
// Default: 0 (no expiration)
TTL time.Duration
// Headers contains key-value metadata for the message
// Default: nil
Headers map[string]string
// Compression is the compression type for the message (v1.3.0+)
// Set to CompressionNone to use queue's DefaultCompression
// Default: CompressionNone (uses queue default)
Compression CompressionType
}
EnqueueOptions contains options for enqueuing a message with all features.
type LogField ¶
type LogField struct {
Key string
Value interface{}
}
LogField represents a structured log field.
type Logger ¶
type Logger interface {
Debug(msg string, fields ...LogField)
Info(msg string, fields ...LogField)
Warn(msg string, fields ...LogField)
Error(msg string, fields ...LogField)
}
Logger interface for pluggable logging.
type Message ¶
type Message struct {
// ID is the unique message identifier
ID uint64
// Offset is the file offset where the message is stored
Offset uint64
// Payload is the message data
Payload []byte
// Timestamp is when the message was enqueued (Unix nanoseconds)
Timestamp int64
// ExpiresAt is when the message expires (Unix nanoseconds), 0 if no TTL
ExpiresAt int64
// Priority is the message priority level (v1.1.0+)
Priority Priority
// Headers contains key-value metadata for the message
Headers map[string]string
}
Message represents a dequeued message.
type MetricsCollector ¶
type MetricsCollector interface {
RecordEnqueue(payloadSize int, duration time.Duration)
RecordDequeue(payloadSize int, duration time.Duration)
RecordEnqueueBatch(count, totalPayloadSize int, duration time.Duration)
RecordDequeueBatch(count, totalPayloadSize int, duration time.Duration)
RecordEnqueueError()
RecordDequeueError()
RecordSeek()
RecordCompaction(segmentsRemoved int, bytesFreed int64, duration time.Duration)
RecordCompactionError()
UpdateQueueState(pending, segments, nextMsgID, readMsgID uint64)
}
MetricsCollector defines the interface for recording queue metrics.
type MetricsSnapshot ¶
MetricsSnapshot is a point-in-time view of queue metrics.
func GetMetricsSnapshot ¶
func GetMetricsSnapshot(collector MetricsCollector) *MetricsSnapshot
GetMetricsSnapshot returns a snapshot of current metrics from a collector.
type Options ¶
type Options struct {
// AutoSync enables automatic syncing after each write
// Default: false
AutoSync bool
// SyncInterval for periodic syncing (if AutoSync is false)
// Default: 1 second
SyncInterval time.Duration
// CompactionInterval for automatic background compaction
// Set to 0 to disable automatic compaction (default)
CompactionInterval time.Duration
// MaxSegmentSize is the maximum size of a segment file in bytes
// Default: 1GB
MaxSegmentSize uint64
// MaxSegmentMessages is the maximum number of messages per segment
// Default: unlimited (0)
MaxSegmentMessages uint64
// RotationPolicy determines when to rotate segments
// Options: RotateBySize, RotateByCount, RotateByBoth
// Default: RotateBySize
RotationPolicy RotationPolicy
// RetentionPolicy configures segment retention and cleanup
// Default: nil (no automatic cleanup)
RetentionPolicy *RetentionPolicy
// EnablePriorities enables priority queue mode (v1.1.0+)
// When disabled, all messages are treated as PriorityLow (FIFO behavior)
// Default: false
EnablePriorities bool
// PriorityStarvationWindow prevents low-priority message starvation (v1.1.0+)
// Low-priority messages waiting longer than this duration will be promoted
// Set to 0 to disable starvation prevention
// Default: 30 seconds
PriorityStarvationWindow time.Duration
// DLQPath is the path to the dead letter queue directory (v1.2.0+)
// If empty, DLQ is disabled. Messages that fail processing after MaxRetries
// will be moved to this separate queue for inspection and potential reprocessing.
// Default: "" (disabled)
DLQPath string
// MaxRetries is the maximum number of delivery attempts before moving to DLQ (v1.2.0+)
// Set to 0 for unlimited retries (messages never move to DLQ).
// Only effective when DLQPath is configured.
// Default: 3
MaxRetries int
// MaxMessageSize is the maximum size in bytes for a single message payload (v1.2.0+)
// Messages larger than this will be rejected during enqueue.
// Set to 0 for unlimited message size (not recommended for production).
// Default: 10 MB
MaxMessageSize int64
// MinFreeDiskSpace is the minimum required free disk space in bytes (v1.2.0+)
// Enqueue operations will fail if available disk space falls below this threshold.
// Set to 0 to disable disk space checking (not recommended for production).
// Default: 100 MB
MinFreeDiskSpace int64
// DLQMaxAge is the maximum age for messages in the DLQ (v1.2.0+)
// Messages older than this duration will be removed during compaction.
// Set to 0 to keep DLQ messages indefinitely.
// Default: 0 (no age-based cleanup)
DLQMaxAge time.Duration
// DLQMaxSize is the maximum total size in bytes for the DLQ (v1.2.0+)
// When DLQ exceeds this size, oldest messages will be removed during compaction.
// Set to 0 for unlimited DLQ size.
// Default: 0 (no size limit)
DLQMaxSize int64
// DefaultCompression is the compression type used when not explicitly specified (v1.3.0+)
// Set to CompressionNone to disable compression by default
// Default: CompressionNone (no compression)
DefaultCompression CompressionType
// CompressionLevel is the compression level for algorithms that support it (v1.3.0+)
// For GZIP: 1 (fastest) to 9 (best compression), 0 = default (6)
// Higher values = better compression but slower
// Default: 0 (use algorithm default)
CompressionLevel int
// MinCompressionSize is the minimum payload size to compress (v1.3.0+)
// Messages smaller than this are not compressed even if compression is requested
// This avoids the CPU overhead when compression doesn't help much
// Default: 1024 bytes (1KB)
MinCompressionSize int
// DefaultDeduplicationWindow is the default time window for dedup tracking (v1.4.0+)
// Set to 0 to disable deduplication by default
// Default: 0 (disabled)
DefaultDeduplicationWindow time.Duration
// MaxDeduplicationEntries is the maximum number of dedup entries to track (v1.4.0+)
// Prevents unbounded memory growth
// Default: 100,000 entries (~6.4 MB)
MaxDeduplicationEntries int
// Logger for structured logging (nil = no logging)
// Default: no logging
Logger Logger
// MetricsCollector for collecting queue metrics (nil = no metrics)
// Default: no metrics
MetricsCollector MetricsCollector
}
Options configures queue behavior.
func DefaultOptions ¶
DefaultOptions returns sensible defaults for queue configuration.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue represents a persistent message queue.
func Open ¶
Open opens or creates a queue at the specified directory. If opts is nil, default options are used.
func (*Queue) Ack ¶
Ack acknowledges successful processing of a message (v1.2.0+). When DLQ is enabled, this removes the message from retry tracking. If DLQ is not configured, this method is a no-op.
This should be called after successfully processing a dequeued message to indicate that the message does not need to be retried.
Example usage:
msg, err := q.Dequeue()
if err != nil {
log.Fatal(err)
}
// Process the message
if err := processMessage(msg); err != nil {
// Processing failed - report failure
q.Nack(msg.ID, err.Error())
} else {
// Processing succeeded - acknowledge
q.Ack(msg.ID)
}
func (*Queue) Compact ¶
func (q *Queue) Compact() (*CompactionResult, error)
Compact manually triggers compaction of old segments based on retention policy. Returns the number of segments removed and bytes freed.
func (*Queue) Dequeue ¶
Dequeue retrieves the next message from the queue. Returns an error if no messages are available. Automatically skips expired messages with TTL. When EnablePriorities is true, returns the highest priority message first.
func (*Queue) DequeueBatch ¶
DequeueBatch retrieves up to maxMessages from the queue in a single operation. Returns fewer messages if the queue has fewer than maxMessages available. Automatically skips expired messages with TTL.
Note: DequeueBatch always returns messages in FIFO order (by message ID), even when EnablePriorities is true. For priority-aware consumption, use Dequeue() in a loop or the Stream() API. This is a performance trade-off: batch dequeue optimizes for sequential I/O rather than priority ordering.
func (*Queue) Enqueue ¶
Enqueue appends a message to the queue. Returns the offset where the message was written.
func (*Queue) EnqueueBatch ¶
EnqueueBatch appends multiple messages to the queue in a single operation. This is more efficient than calling Enqueue() multiple times. Returns the offsets where the messages were written. Note: All messages in the batch have default priority (PriorityLow). Use EnqueueBatchWithOptions for priority support.
func (*Queue) EnqueueBatchWithOptions ¶
func (q *Queue) EnqueueBatchWithOptions(messages []BatchEnqueueOptions) ([]uint64, error)
EnqueueBatchWithOptions appends multiple messages with individual options to the queue (v1.1.0+). This is more efficient than calling EnqueueWithAllOptions() multiple times as it performs a single fsync for all messages. Each message can have different priority, TTL, and headers. Returns the offsets where the messages were written.
func (*Queue) EnqueueWithAllOptions ¶
func (q *Queue) EnqueueWithAllOptions(payload []byte, opts EnqueueOptions) (uint64, error)
EnqueueWithAllOptions appends a message with priority, TTL, headers, and compression (v1.1.0+, v1.3.0+). This is the most flexible enqueue method, combining all available features. Returns the offset where the message was written.
func (*Queue) EnqueueWithCompression ¶
func (q *Queue) EnqueueWithCompression(payload []byte, compression CompressionType) (uint64, error)
EnqueueWithCompression appends a message with explicit compression (v1.3.0+). This allows overriding the queue's DefaultCompression setting for individual messages. Returns the offset where the message was written.
func (*Queue) EnqueueWithDedup ¶
func (q *Queue) EnqueueWithDedup(payload []byte, dedupID string, window time.Duration) (uint64, bool, error)
EnqueueWithDedup appends a message with deduplication tracking (v1.4.0+). If a message with the same dedupID was enqueued within the deduplication window, this returns the original message ID and offset without writing a duplicate. Returns (offset, isDuplicate, error).
dedupID: A unique identifier for this message (e.g., order ID, request ID) window: How long to track this message for deduplication (0 = use queue default)
Example:
offset, isDup, err := q.EnqueueWithDedup(payload, "order-12345", 5*time.Minute)
if err != nil {
log.Fatal(err)
}
if isDup {
log.Printf("Duplicate message detected, original at offset %d", offset)
} else {
log.Printf("New message enqueued at offset %d", offset)
}
func (*Queue) EnqueueWithHeaders ¶
EnqueueWithHeaders appends a message to the queue with key-value metadata headers. Headers can be used for routing, tracing, content-type indication, or message classification. Returns the offset where the message was written.
func (*Queue) EnqueueWithOptions ¶
func (q *Queue) EnqueueWithOptions(payload []byte, ttl time.Duration, headers map[string]string) (uint64, error)
EnqueueWithOptions appends a message with both TTL and headers. This allows combining multiple features in a single enqueue operation. Returns the offset where the message was written.
func (*Queue) EnqueueWithPriority ¶
EnqueueWithPriority appends a message with a specific priority level (v1.1.0+). Priority determines the order in which messages are dequeued when EnablePriorities is true. When EnablePriorities is false, priority is ignored and FIFO order is maintained. Returns the offset where the message was written.
func (*Queue) EnqueueWithTTL ¶
EnqueueWithTTL appends a message to the queue with a time-to-live duration. The message will expire after the specified TTL and will be skipped during dequeue. Returns the offset where the message was written.
func (*Queue) GetDLQ ¶
GetDLQ returns the dead letter queue for inspection (v1.2.0+). Returns nil if DLQ is not configured. The returned queue can be used to inspect or dequeue messages from the DLQ.
Example usage:
dlq := q.GetDLQ()
if dlq != nil {
stats := dlq.Stats()
fmt.Printf("DLQ has %d pending messages\n", stats.PendingMessages)
// Inspect DLQ messages
msg, err := dlq.Dequeue()
if err == nil {
fmt.Printf("Failed message: %s\n", msg.Payload)
fmt.Printf("Failure reason: %s\n", msg.Headers["dlq.failure_reason"])
}
}
func (*Queue) GetRetryInfo ¶
GetRetryInfo returns retry information for a message (v1.2.0+). Returns nil if DLQ is not configured or if the message has no retry tracking. This is useful for implementing custom retry logic and backoff strategies.
The returned RetryInfo contains:
- MessageID: The message ID being tracked
- RetryCount: Number of times Nack() has been called for this message
- LastFailure: Timestamp of the most recent Nack() call
- FailureReason: Reason string from the most recent Nack() call
Example usage:
msg, _ := q.Dequeue()
info := q.GetRetryInfo(msg.ID)
if info != nil && info.RetryCount > 0 {
// Calculate exponential backoff based on retry count
backoff := CalculateBackoff(info.RetryCount, time.Second, 5*time.Minute)
log.Printf("Message has failed %d times, waiting %v before retry",
info.RetryCount, backoff)
time.Sleep(backoff)
}
// Process the message
if err := processMessage(msg.Payload); err != nil {
q.Nack(msg.ID, err.Error())
} else {
q.Ack(msg.ID)
}
func (*Queue) Nack ¶
Nack reports a message processing failure (v1.2.0+). When DLQ is enabled, this increments the retry count and potentially moves the message to the dead letter queue if max retries are exceeded. If DLQ is not configured, this method is a no-op.
The reason parameter should describe why the message processing failed. This reason is stored with the retry metadata for debugging purposes.
When a message exceeds the configured MaxRetries, it is automatically moved to the DLQ with metadata headers containing:
- dlq.original_msg_id: Original message ID from the main queue
- dlq.retry_count: Number of failed attempts
- dlq.failure_reason: Last failure reason provided to Nack()
- dlq.last_failure: Timestamp of the last failure
func (*Queue) RequeueFromDLQ ¶
RequeueFromDLQ moves a message from the DLQ back to the main queue (v1.2.0+). The message ID should be from the DLQ (not the original message ID). Returns an error if DLQ is not configured or the message is not found.
The message is enqueued to the main queue with its original payload and headers, but DLQ-specific metadata headers are removed.
Example usage:
dlq := q.GetDLQ()
if dlq != nil {
msg, err := dlq.Dequeue()
if err == nil {
// Requeue the message back to main queue
if err := q.RequeueFromDLQ(msg.ID); err != nil {
log.Printf("Failed to requeue: %v", err)
}
}
}
func (*Queue) SeekToMessageID ¶
SeekToMessageID sets the read position to a specific message ID. Subsequent Dequeue() calls will start reading from this message.
func (*Queue) SeekToTimestamp ¶
SeekToTimestamp sets the read position to the first message at or after the given timestamp. The timestamp should be in Unix nanoseconds.
func (*Queue) Stream ¶
func (q *Queue) Stream(ctx context.Context, handler StreamHandler) error
Stream continuously reads messages from the queue and calls the handler for each message. Streaming continues until the context is cancelled, an error occurs, or no more messages are available.
The Stream method polls for new messages with a configurable interval (100ms by default). When a message is available, it's immediately passed to the handler. If no messages are available, Stream waits briefly before checking again.
Context cancellation will gracefully stop streaming and return context.Canceled. Handler errors will stop streaming and return the handler error.
Example usage:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := q.Stream(ctx, func(msg *Message) error {
fmt.Printf("Received: %s\n", msg.Payload)
return nil
})
type RetentionPolicy ¶
type RetentionPolicy struct {
// MaxAge is the maximum age of segments to keep
MaxAge time.Duration
// MaxSize is the maximum total size of all segments
MaxSize uint64
// MaxSegments is the maximum number of segments to keep
MaxSegments int
// MinSegments is the minimum number of segments to always keep
MinSegments int
}
RetentionPolicy configures segment retention.
type RetryInfo ¶
type RetryInfo struct {
// MessageID is the message ID being tracked
MessageID uint64
// RetryCount is the number of times Nack() has been called for this message
RetryCount int
// LastFailure is the timestamp of the most recent Nack() call
LastFailure time.Time
// FailureReason is the reason string from the most recent Nack() call
FailureReason string
}
RetryInfo contains retry metadata for a message (v1.2.0+).
type RotationPolicy ¶
type RotationPolicy int
RotationPolicy determines when segment rotation occurs.
const ( // RotateBySize rotates when segment size exceeds MaxSegmentSize RotateBySize RotationPolicy = iota // RotateByCount rotates when message count exceeds MaxSegmentMessages RotateByCount // RotateByBoth rotates when either size or count limit is reached RotateByBoth )
type Stats ¶
type Stats struct {
// TotalMessages is the total number of messages ever enqueued
TotalMessages uint64
// PendingMessages is the number of unread messages
PendingMessages uint64
// NextMessageID is the ID that will be assigned to the next enqueued message
NextMessageID uint64
// ReadMessageID is the ID of the next message to be dequeued
ReadMessageID uint64
// SegmentCount is the number of segments
SegmentCount int
// DLQMessages is the total number of messages in the DLQ
DLQMessages uint64
// DLQPendingMessages is the number of unprocessed messages in the DLQ
DLQPendingMessages uint64
// RetryTrackedMessages is the number of messages currently being tracked for retries
RetryTrackedMessages int
// DedupTrackedEntries is the number of active dedup entries (v1.4.0+)
DedupTrackedEntries int
}
Stats contains queue statistics.
type StreamHandler ¶
StreamHandler is called for each message in the stream. Return an error to stop streaming.