memory

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package memory provides an in-memory priority queue with worker pools, circuit breaker, and backpressure handling.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBackpressureManager

func NewBackpressureManager(maxQueueSize int, maxMemoryMB int64) *backpressureManager

NewBackpressureManager creates a new public instance (backward compatible export)

func NewCircuitBreaker

func NewCircuitBreaker(threshold float64, minSamples int, recoveryTime time.Duration) *circuitBreaker

NewCircuitBreaker creates a new public circuit breaker instance (backward compatible export)

func NewPriorityRouter

func NewPriorityRouter(config QueueConfig) *priorityRouter

NewPriorityRouter creates a new public priority router with given config (backward compatible export)

Types

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

MemoryQueue implements port.QueuePort with in-memory storage Uses three priority-based channels for job routing

func NewMemoryQueue

func NewMemoryQueue() *MemoryQueue

NewMemoryQueue creates a new in-memory queue with default configuration

func NewMemoryQueueWithConfig

func NewMemoryQueueWithConfig(config shared.QueueConfig) *MemoryQueue

NewMemoryQueueWithConfig creates a new in-memory queue with custom configuration

func (*MemoryQueue) Dequeue

func (q *MemoryQueue) Dequeue(ctx context.Context, priority entity.Priority) (*entity.Job, error)

Dequeue retrieves and removes the next job from specified priority queue Returns port.ErrQueueEmpty if no jobs are available within timeout

func (*MemoryQueue) Enqueue

func (q *MemoryQueue) Enqueue(ctx context.Context, job *entity.Job) error

Enqueue adds a job to the appropriate priority queue Returns error if context is cancelled or backpressure is engaged

func (*MemoryQueue) GetDepth

func (q *MemoryQueue) GetDepth(priority entity.Priority) int

GetDepth returns the current queue length for a specific priority

func (*MemoryQueue) GetStats

func (q *MemoryQueue) GetStats() *port.QueueStats

GetStats returns a copy of current queue statistics

func (*MemoryQueue) GetWorkerCount

func (q *MemoryQueue) GetWorkerCount() int

GetWorkerCount returns the current number of active workers

func (*MemoryQueue) IsCircuitOpen

func (q *MemoryQueue) IsCircuitOpen() bool

IsCircuitOpen checks if circuit breaker is open

func (*MemoryQueue) Start

func (q *MemoryQueue) Start(ctx context.Context) error

Start begins processing jobs with worker pool Initializes workers and starts monitoring/scaling routines

func (*MemoryQueue) Stop

func (q *MemoryQueue) Stop(ctx context.Context) error

Stop gracefully shuts down the queue and all workers Waits for in-flight jobs to complete within timeout

type QueueConfig

type QueueConfig = shared.QueueConfig

QueueConfig is a type alias for backward compatibility

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL