autopool

package
v0.39.0-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ShouldStay uint8 = iota
	ShouldScale
	ShouldShrink
)

Variables

View Source
var ErrPriority = errors.New("invalid priority")

Functions

This section is empty.

Types

type Option

type Option[T any] func(*Pool[T])

func WithLogger

func WithLogger[T any](logger log.Logger) Option[T]

func WithOnScale

func WithOnScale[T any](onScale func()) Option[T]

func WithOnShrink

func WithOnShrink[T any](onShrink func()) Option[T]

func WithOnStay

func WithOnStay[T any](onStay func()) Option[T]

func WithPriorityQueue

func WithPriorityQueue[T any](pq *PriorityQueue) Option[T]

type Pool

type Pool[T any] struct {
	// contains filtered or unexported fields
}

Pool primitive auto-scaling pool for concurrent message processing. It accepts a function to process messages and scales the number of workers dynamically based on the message processing time. It also supports priority-based message processing.

func New

func New[T any](
	scaler *ThroughputLatencyScaler,
	receiveFN func(T),
	capacity int,
	opts ...Option[T],
) *Pool[T]

New Pool constructor.

func (*Pool[T]) Cap

func (p *Pool[T]) Cap() int

func (*Pool[T]) Len

func (p *Pool[T]) Len() int

func (*Pool[T]) Push

func (p *Pool[T]) Push(msg T)

Push adds a message directly to the pool FIFO queue. Blocks if the queue is full.

func (*Pool[T]) PushPriority

func (p *Pool[T]) PushPriority(msg T, priority int) error

PushPriority adds a message to the priority queue first. Non-blocking as priority queue is a linked-list

func (*Pool[T]) Scaler

func (p *Pool[T]) Scaler() *ThroughputLatencyScaler

func (*Pool[T]) Start

func (p *Pool[T]) Start()

func (*Pool[T]) Stop

func (p *Pool[T]) Stop()

Stop stops the pool and all workers safe to call multiple times

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue is a thread-safe queue of queues with a limited small number of priorities. Priority is an integer between 1 and the number of priorities (inclusive). Higher priority values are dequeued first.

func NewPriorityQueue

func NewPriorityQueue(priorities int) *PriorityQueue

func (*PriorityQueue) Pop

func (q *PriorityQueue) Pop() (any, bool)

func (*PriorityQueue) Push

func (q *PriorityQueue) Push(value any, priority int) error

func (*PriorityQueue) WaitForValues

func (q *PriorityQueue) WaitForValues() <-chan struct{}

WaitForValues returns a channel that will have a value in it when new values are ready to be popped from the priority queue.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Simple thread-safe FIFO queue based on container/list. We can replace the implementation with a more efficient one if needed (ring buffer, MPMC, etc.)

func NewQueue

func NewQueue() *Queue

New Queue constructor.

func (*Queue) Len

func (q *Queue) Len() int

func (*Queue) Pop

func (q *Queue) Pop() (any, bool)

func (*Queue) Push

func (q *Queue) Push(value any)

type ThroughputLatencyScaler

type ThroughputLatencyScaler struct {
	// contains filtered or unexported fields
}

ThroughputLatencyScaler is a scaler that scales the number of workers based on throughput The more messages are processed, the more workers are scaled up. But if latency percentile is too high, the scaler will shrink the number of workers. It uses a combination of: - EWMA (Exponentially Weighted Moving Average) of throughput - Percentile of latency - Queue length & capacity

func NewThroughputLatencyScaler

func NewThroughputLatencyScaler(
	min, max int,
	thresholdPercentile float64,
	thresholdLatency time.Duration,
	epochDuration time.Duration,
	logger log.Logger,
) *ThroughputLatencyScaler

func (*ThroughputLatencyScaler) Decide

func (s *ThroughputLatencyScaler) Decide(currentNumWorkers, queueLen, queueCap int) uint8

Decide decides whether to scale up, scale down, or stay the same

func (*ThroughputLatencyScaler) EpochDuration

func (s *ThroughputLatencyScaler) EpochDuration() time.Duration

func (*ThroughputLatencyScaler) Max

func (s *ThroughputLatencyScaler) Max() int

func (*ThroughputLatencyScaler) Min

func (s *ThroughputLatencyScaler) Min() int

func (*ThroughputLatencyScaler) String

func (s *ThroughputLatencyScaler) String() string

func (*ThroughputLatencyScaler) Track

func (s *ThroughputLatencyScaler) Track(duration time.Duration)

Track tracks the duration of a message processing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL