Documentation
¶
Index ¶
- Constants
- Variables
- type Option
- type Pool
- type PriorityQueue
- type Queue
- type ThroughputLatencyScaler
- func (s *ThroughputLatencyScaler) Decide(currentNumWorkers, queueLen, queueCap int) uint8
- func (s *ThroughputLatencyScaler) EpochDuration() time.Duration
- func (s *ThroughputLatencyScaler) Max() int
- func (s *ThroughputLatencyScaler) Min() int
- func (s *ThroughputLatencyScaler) String() string
- func (s *ThroughputLatencyScaler) Track(duration time.Duration)
Constants ¶
const ( ShouldStay uint8 = iota ShouldScale ShouldShrink )
Variables ¶
var ErrPriority = errors.New("invalid priority")
Functions ¶
This section is empty.
Types ¶
type Option ¶
func WithOnScale ¶
func WithOnShrink ¶
func WithOnStay ¶
func WithPriorityQueue ¶
func WithPriorityQueue[T any](pq *PriorityQueue) Option[T]
type Pool ¶
type Pool[T any] struct { // contains filtered or unexported fields }
Pool primitive auto-scaling pool for concurrent message processing. It accepts a function to process messages and scales the number of workers dynamically based on the message processing time. It also supports priority-based message processing.
func New ¶
func New[T any]( scaler *ThroughputLatencyScaler, receiveFN func(T), capacity int, opts ...Option[T], ) *Pool[T]
New Pool constructor.
func (*Pool[T]) Push ¶
func (p *Pool[T]) Push(msg T)
Push adds a message directly to the pool FIFO queue. Blocks if the queue is full.
func (*Pool[T]) PushPriority ¶
PushPriority adds a message to the priority queue first. Non-blocking as priority queue is a linked-list
func (*Pool[T]) Scaler ¶
func (p *Pool[T]) Scaler() *ThroughputLatencyScaler
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue is a thread-safe queue of queues with a limited small number of priorities. Priority is an integer between 1 and the number of priorities (inclusive). Higher priority values are dequeued first.
func NewPriorityQueue ¶
func NewPriorityQueue(priorities int) *PriorityQueue
func (*PriorityQueue) Pop ¶
func (q *PriorityQueue) Pop() (any, bool)
func (*PriorityQueue) WaitForValues ¶
func (q *PriorityQueue) WaitForValues() <-chan struct{}
WaitForValues returns a channel that will have a value in it when new values are ready to be popped from the priority queue.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Simple thread-safe FIFO queue based on container/list. We can replace the implementation with a more efficient one if needed (ring buffer, MPMC, etc.)
type ThroughputLatencyScaler ¶
type ThroughputLatencyScaler struct {
// contains filtered or unexported fields
}
ThroughputLatencyScaler is a scaler that scales the number of workers based on throughput The more messages are processed, the more workers are scaled up. But if latency percentile is too high, the scaler will shrink the number of workers. It uses a combination of: - EWMA (Exponentially Weighted Moving Average) of throughput - Percentile of latency - Queue length & capacity
func (*ThroughputLatencyScaler) Decide ¶
func (s *ThroughputLatencyScaler) Decide(currentNumWorkers, queueLen, queueCap int) uint8
Decide decides whether to scale up, scale down, or stay the same
func (*ThroughputLatencyScaler) EpochDuration ¶
func (s *ThroughputLatencyScaler) EpochDuration() time.Duration
func (*ThroughputLatencyScaler) Max ¶
func (s *ThroughputLatencyScaler) Max() int
func (*ThroughputLatencyScaler) Min ¶
func (s *ThroughputLatencyScaler) Min() int
func (*ThroughputLatencyScaler) String ¶
func (s *ThroughputLatencyScaler) String() string
func (*ThroughputLatencyScaler) Track ¶
func (s *ThroughputLatencyScaler) Track(duration time.Duration)
Track tracks the duration of a message processing