Versions in this module Expand all Collapse all v1 v1.0.0 Sep 8, 2025 Changes in this version + type BatchProcessor struct + func NewBatchProcessor(batchSize int, batchTimeout time.Duration) *BatchProcessor + func (bp *BatchProcessor) Stop() + func (bp *BatchProcessor) Submit(msg *Message) error + type CircuitBreaker struct + func NewCircuitBreaker(failureThreshold int, resetTimeout time.Duration) *CircuitBreaker + func (cb *CircuitBreaker) Execute(operation func() error) error + func (cb *CircuitBreaker) GetState() CircuitState + func (cb *CircuitBreaker) GetStats() map[string]interface{} + type CircuitState string + const CircuitClosed + const CircuitHalf + const CircuitOpen + type ConnectionPool struct + func NewConnectionPool(maxConnections int, dbPath string) (*ConnectionPool, error) + func (cp *ConnectionPool) Close() + func (cp *ConnectionPool) GetConnection() (*sql.DB, error) + func (cp *ConnectionPool) GetStats() map[string]interface{} + func (cp *ConnectionPool) ReturnConnection(conn *sql.DB) + type Consumer struct + Group string + LastDelivered string + Name string + PendingEntries map[string]*PendingEntry + type HealthMonitor struct + func NewHealthMonitor(checkInterval time.Duration) *HealthMonitor + func (hm *HealthMonitor) AddError(err string) + func (hm *HealthMonitor) AddWarning(warning string) + func (hm *HealthMonitor) GetHealth() *HealthStatus + func (hm *HealthMonitor) Stop() + type HealthStatus struct + Errors []string + Message string + Metrics map[string]int64 + Status string + Timestamp time.Time + Warnings []string + type MemoryManager struct + func NewMemoryManager(maxMemoryMB int, threshold float64) *MemoryManager + func (mm *MemoryManager) AllocateMemory(mb int) error + func (mm *MemoryManager) FreeMemory(mb int) + func (mm *MemoryManager) GetStats() map[string]interface{} + func (mm *MemoryManager) Stop() + type Message struct + Attempts int + Consumer string + Data interface{} + ExpiresAt *time.Time + ID string + MaxAttempts int + Priority int + RetryAt *time.Time + Status MessageStatus + Timestamp time.Time + Topic string + type MessageHeap []*Message + func (h *MessageHeap) Pop() interface{} + func (h *MessageHeap) Push(x interface{}) + func (h *MessageHeap) RemoveByID(id string) bool + func (h *MessageHeap) UpdatePriority(id string, newPriority int) bool + func (h MessageHeap) FindByID(id string) *Message + func (h MessageHeap) Len() int + func (h MessageHeap) Less(i, j int) bool + func (h MessageHeap) Peek() *Message + func (h MessageHeap) Swap(i, j int) + type MessageStatus string + const StatusCompleted + const StatusFailed + const StatusPending + const StatusProcessing + const StatusRetry + const StatusScheduled + type PendingEntry struct + Consumer string + DeliveryCount int + DeliveryTime time.Time + MessageID string + type Queue struct + Name string + func NewQueue(config *QueueConfig) (*Queue, error) + func (q *Queue) Acknowledge(messageID string) error + func (q *Queue) CleanupExpiredMessages() error + func (q *Queue) Consume(topic string, consumerName string) (*Message, error) + func (q *Queue) Flush() error + func (q *Queue) GetHealth() *HealthStatus + func (q *Queue) GetProductionStats() map[string]interface{} + func (q *Queue) GetStats() *QueueStats + func (q *Queue) GracefulShutdown(timeout time.Duration) error + func (q *Queue) Publish(topic string, data interface{}, priority int) error + func (q *Queue) Reject(messageID string, reason string) error + func (q *Queue) ScaleDown(reduceWorkers int) error + func (q *Queue) ScaleUp(additionalWorkers int) error + func (q *Queue) Stop() + type QueueConfig struct + AsyncOperations bool + BatchSize int + BufferSize int + CircuitBreaker bool + DLQCleanupInterval time.Duration + DLQEnableAutoPrune bool + DLQMaxAge time.Duration + DLQMaxSize int + DataDir string + DurabilityMode string + EnableAutoVacuum bool + EnableCompression bool + EnableFsync bool + EnableMetrics bool + FlushInterval time.Duration + FlushOnPublish bool + HealthCheck bool + MaxConnections int + MaxMemoryMB int + MaxRetries int + MessageTTL time.Duration + Name string + RetryBaseDelay time.Duration + RetryDelay time.Duration + RetryJitter bool + RetryMaxDelay time.Duration + RetryMultiplier float64 + VacuumInterval time.Duration + WorkerPoolSize int + WriteBufferSize int + func DefaultQueueConfig() *QueueConfig + type QueueStats struct + BufferSize int + CompletedCount int + ConsumerCount int + DLQCount int + DLQLastCleanup time.Time + DLQPrunedCount int + DLQTotalCount int + FailedCount int + LastFlush time.Time + LastMessage time.Time + Name string + PendingCount int + ProcessingCount int + RetryCount int + TotalMessages int + type WorkerPool struct + func NewWorkerPool(workerCount int) *WorkerPool + func (wp *WorkerPool) GetStats() map[string]interface{} + func (wp *WorkerPool) Stop() + func (wp *WorkerPool) Submit(msg *Message) error