Versions in this module Expand all Collapse all v1 v1.4.2 Feb 15, 2026 Changes in this version + const MetadataFileName + const MetadataSize + const MetadataVersion + func CalculateBackoff(retryCount int, baseDelay, maxBackoff time.Duration) time.Duration + type BatchEnqueueOptions struct + Compression format.CompressionType + Headers map[string]string + Payload []byte + Priority uint8 + TTL time.Duration + type DedupTracker struct + func NewDedupTracker(statePath string, maxSize int) *DedupTracker + func (dt *DedupTracker) ActiveCount() int + func (dt *DedupTracker) Check(dedupID string, window time.Duration) (uint64, bool) + func (dt *DedupTracker) CleanExpired() int + func (dt *DedupTracker) Close() error + func (dt *DedupTracker) Count() int + func (dt *DedupTracker) EnsureStateDir() error + func (dt *DedupTracker) Load() error + func (dt *DedupTracker) Persist() error + func (dt *DedupTracker) Track(dedupID string, msgID uint64, offset uint64, window time.Duration) error + type Message struct + ExpiresAt int64 + Headers map[string]string + ID uint64 + Offset uint64 + Payload []byte + Priority uint8 + Timestamp int64 + type Metadata struct + NextMsgID uint64 + ReadMsgID uint64 + Version uint32 + func OpenMetadata(dir string, autoSync bool) (*Metadata, error) + func (m *Metadata) Close() error + func (m *Metadata) GetState() (nextMsgID, readMsgID uint64) + func (m *Metadata) SetNextMsgID(id uint64) error + func (m *Metadata) SetReadMsgID(id uint64) error + func (m *Metadata) Sync() error + func (m *Metadata) UpdateState(nextMsgID, readMsgID uint64) error + type MetricsCollector interface + RecordCompaction func(segmentsRemoved int, bytesFreed int64, duration time.Duration) + RecordCompactionError func() + RecordDequeue func(payloadSize int, duration time.Duration) + RecordDequeueBatch func(count, totalPayloadSize int, duration time.Duration) + RecordDequeueError func() + RecordEnqueue func(payloadSize int, duration time.Duration) + RecordEnqueueBatch func(count, totalPayloadSize int, duration time.Duration) + RecordEnqueueError func() + RecordSeek func() + UpdateQueueState func(pending, segments, nextMsgID, readMsgID uint64) + type Options struct + AutoSync bool + CompactionInterval time.Duration + CompressionLevel int + DLQMaxAge time.Duration + DLQMaxSize int64 + DLQPath string + DefaultCompression format.CompressionType + DefaultDeduplicationWindow time.Duration + EnablePriorities bool + Logger logging.Logger + MaxDeduplicationEntries int + MaxMessageSize int64 + MaxRetries int + MetricsCollector MetricsCollector + MinCompressionSize int + MinFreeDiskSpace int64 + PriorityStarvationWindow time.Duration + SegmentOptions *segment.ManagerOptions + SyncInterval time.Duration + func DefaultOptions(dir string) *Options + func (o *Options) Validate() error + type Queue struct + func Open(dir string, opts *Options) (*Queue, error) + func (q *Queue) Ack(msgID uint64) error + func (q *Queue) Close() error + func (q *Queue) Compact() (*segment.CompactionResult, error) + func (q *Queue) Dequeue() (*Message, error) + func (q *Queue) DequeueBatch(maxMessages int) ([]*Message, error) + func (q *Queue) Enqueue(payload []byte) (uint64, error) + func (q *Queue) EnqueueBatch(payloads [][]byte) ([]uint64, error) + func (q *Queue) EnqueueBatchWithOptions(messages []BatchEnqueueOptions) ([]uint64, error) + func (q *Queue) EnqueueWithAllOptions(payload []byte, priority uint8, ttl time.Duration, headers map[string]string) (uint64, error) + func (q *Queue) EnqueueWithCompression(payload []byte, compression format.CompressionType) (uint64, error) + func (q *Queue) EnqueueWithDedup(payload []byte, dedupID string, window time.Duration) (uint64, bool, error) + func (q *Queue) EnqueueWithHeaders(payload []byte, headers map[string]string) (uint64, error) + func (q *Queue) EnqueueWithOptions(payload []byte, ttl time.Duration, headers map[string]string) (uint64, error) + func (q *Queue) EnqueueWithPriority(payload []byte, priority uint8) (uint64, error) + func (q *Queue) EnqueueWithTTL(payload []byte, ttl time.Duration) (uint64, error) + func (q *Queue) GetDLQ() *Queue + func (q *Queue) GetRetryInfo(msgID uint64) *RetryInfo + func (q *Queue) IsClosed() bool + func (q *Queue) Nack(msgID uint64, reason string) error + func (q *Queue) RequeueFromDLQ(dlqMsgID uint64) error + func (q *Queue) SeekToMessageID(msgID uint64) error + func (q *Queue) SeekToTimestamp(timestamp int64) error + func (q *Queue) Stats() *Stats + func (q *Queue) Stream(ctx context.Context, handler StreamHandler) error + func (q *Queue) Sync() error + type RetryInfo struct + FailureReason string + LastFailure time.Time + MessageID uint64 + RetryCount int + type RetryTracker struct + func NewRetryTracker(path string, maxRetries int) (*RetryTracker, error) + func (rt *RetryTracker) Ack(msgID uint64) error + func (rt *RetryTracker) Clear() error + func (rt *RetryTracker) Close() error + func (rt *RetryTracker) Count() int + func (rt *RetryTracker) GetInfo(msgID uint64) *RetryInfo + func (rt *RetryTracker) Nack(msgID uint64, reason string) (bool, error) + type Stats struct + DLQMessages uint64 + DLQPendingMessages uint64 + DedupTrackedEntries int + NextMessageID uint64 + PendingMessages uint64 + ReadMessageID uint64 + RetryTrackedMessages int + SegmentCount int + TotalMessages uint64 + type StreamHandler func(*Message) error