ratelimit

package
v0.34.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2025 License: Unlicense Imports: 11 Imported by: 0

Documentation

Overview

Package ratelimit provides adaptive rate limiting using PID control. The PID controller uses proportional, integral, and derivative terms with a low-pass filter on the derivative to suppress high-frequency noise.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MonitorFromBadgerDB

func MonitorFromBadgerDB(db *badger.DB) loadmonitor.Monitor

MonitorFromBadgerDB creates a BadgerMonitor from a Badger database. Exported for use when you need to create the monitor separately.

func MonitorFromNeo4jDriver

func MonitorFromNeo4jDriver(
	driver neo4j.DriverWithContext,
	querySem chan struct{},
	maxConcurrency int,
) loadmonitor.Monitor

MonitorFromNeo4jDriver creates a Neo4jMonitor from a Neo4j driver. Exported for use when you need to create the monitor separately.

Types

type BadgerMonitor

type BadgerMonitor struct {
	// contains filtered or unexported fields
}

BadgerMonitor implements loadmonitor.Monitor for the Badger database. It collects metrics from Badger's LSM tree, caches, and Go runtime.

func NewBadgerMonitor

func NewBadgerMonitor(db *badger.DB, updateInterval time.Duration) *BadgerMonitor

NewBadgerMonitor creates a new Badger load monitor. The updateInterval controls how often metrics are collected (default 100ms).

func (*BadgerMonitor) GetL0Stats

func (m *BadgerMonitor) GetL0Stats() (tables int, score float64)

GetL0Stats returns L0-specific statistics for debugging.

func (*BadgerMonitor) GetMetrics

func (m *BadgerMonitor) GetMetrics() loadmonitor.Metrics

GetMetrics returns the current load metrics.

func (*BadgerMonitor) RecordQueryLatency

func (m *BadgerMonitor) RecordQueryLatency(latency time.Duration)

RecordQueryLatency records a query latency sample using exponential moving average.

func (*BadgerMonitor) RecordWriteLatency

func (m *BadgerMonitor) RecordWriteLatency(latency time.Duration)

RecordWriteLatency records a write latency sample using exponential moving average.

func (*BadgerMonitor) SetMemoryTarget

func (m *BadgerMonitor) SetMemoryTarget(bytes uint64)

SetMemoryTarget sets the target memory limit in bytes.

func (*BadgerMonitor) Start

func (m *BadgerMonitor) Start() <-chan struct{}

Start begins background metric collection.

func (*BadgerMonitor) Stop

func (m *BadgerMonitor) Stop()

Stop halts background metric collection.

type Config

type Config struct {
	// Enabled controls whether rate limiting is active.
	Enabled bool

	// TargetMemoryMB is the target memory limit in megabytes.
	// Memory pressure is calculated relative to this target.
	TargetMemoryMB int

	// WriteSetpoint is the target process variable for writes (0.0-1.0).
	// Default: 0.85 (throttle when load exceeds 85%)
	WriteSetpoint float64

	// ReadSetpoint is the target process variable for reads (0.0-1.0).
	// Default: 0.90 (more tolerant for reads)
	ReadSetpoint float64

	// PID gains for writes
	WriteKp float64
	WriteKi float64
	WriteKd float64

	// PID gains for reads
	ReadKp float64
	ReadKi float64
	ReadKd float64

	// MaxWriteDelayMs is the maximum delay for write operations in milliseconds.
	MaxWriteDelayMs int

	// MaxReadDelayMs is the maximum delay for read operations in milliseconds.
	MaxReadDelayMs int

	// MetricUpdateInterval is how often to poll the load monitor.
	MetricUpdateInterval time.Duration

	// MemoryWeight is the weight given to memory pressure in process variable (0.0-1.0).
	// The remaining weight is given to the load metric.
	// Default: 0.7 (70% memory, 30% load)
	MemoryWeight float64
}

Config holds configuration for the adaptive rate limiter.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a default configuration for the rate limiter.

func NewConfigFromValues

func NewConfigFromValues(
	enabled bool,
	targetMB int,
	writeKp, writeKi, writeKd float64,
	readKp, readKi, readKd float64,
	maxWriteMs, maxReadMs int,
	writeTarget, readTarget float64,
) Config

NewConfigFromValues creates a Config from individual configuration values. This is useful when loading configuration from environment variables.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements adaptive rate limiting using PID control. It monitors database load metrics and computes appropriate delays to keep the system within its target operating range.

func NewBadgerLimiter

func NewBadgerLimiter(config Config, db *badger.DB) *Limiter

NewBadgerLimiter creates a rate limiter configured for a Badger database. It automatically creates a BadgerMonitor for the provided database.

func NewDisabledLimiter

func NewDisabledLimiter() *Limiter

NewDisabledLimiter creates a rate limiter that is disabled. This is useful when rate limiting is not configured.

func NewLimiter

func NewLimiter(config Config, monitor loadmonitor.Monitor) *Limiter

NewLimiter creates a new adaptive rate limiter. If monitor is nil, the limiter will be disabled.

func NewNeo4jLimiter

func NewNeo4jLimiter(
	config Config,
	driver neo4j.DriverWithContext,
	querySem chan struct{},
	maxConcurrency int,
) *Limiter

NewNeo4jLimiter creates a rate limiter configured for a Neo4j database. It automatically creates a Neo4jMonitor for the provided driver. querySem should be the semaphore used to limit concurrent queries. maxConcurrency is typically 10 (matching the semaphore size).

func (*Limiter) ComputeDelay

func (l *Limiter) ComputeDelay(opType OperationType) time.Duration

ComputeDelay calculates the recommended delay for an operation. This can be used to check the delay without actually waiting.

func (*Limiter) GetStats

func (l *Limiter) GetStats() Stats

GetStats returns current rate limiter statistics.

func (*Limiter) IsEnabled

func (l *Limiter) IsEnabled() bool

IsEnabled returns whether rate limiting is active.

func (*Limiter) RecordLatency

func (l *Limiter) RecordLatency(opType OperationType, latency time.Duration)

RecordLatency records an operation latency for the monitor.

func (*Limiter) Reset

func (l *Limiter) Reset()

Reset clears all PID controller state and statistics.

func (*Limiter) Start

func (l *Limiter) Start()

Start begins the rate limiter's background metric collection.

func (*Limiter) Stop

func (l *Limiter) Stop()

Stop halts the rate limiter.

func (*Limiter) Stopped

func (l *Limiter) Stopped() <-chan struct{}

Stopped returns a channel that closes when the limiter has stopped.

func (*Limiter) UpdateConfig

func (l *Limiter) UpdateConfig(config Config)

UpdateConfig updates the rate limiter configuration. This is useful for dynamic tuning.

func (*Limiter) Wait

func (l *Limiter) Wait(ctx context.Context, opType OperationType) time.Duration

Wait blocks until the rate limiter permits the operation to proceed. It returns the delay that was applied, or 0 if no delay was needed. If the context is cancelled, it returns immediately.

type Neo4jMonitor

type Neo4jMonitor struct {
	// contains filtered or unexported fields
}

Neo4jMonitor implements loadmonitor.Monitor for Neo4j database. Since Neo4j driver doesn't expose detailed metrics, we track: - Memory pressure via Go runtime - Query concurrency via the semaphore - Latency via recording

func NewNeo4jMonitor

func NewNeo4jMonitor(
	driver neo4j.DriverWithContext,
	querySem chan struct{},
	maxConcurrency int,
	updateInterval time.Duration,
) *Neo4jMonitor

NewNeo4jMonitor creates a new Neo4j load monitor. The querySem should be the same semaphore used for limiting concurrent queries. maxConcurrency is the maximum concurrent query limit (typically 10).

func (*Neo4jMonitor) CheckConnectivity

func (m *Neo4jMonitor) CheckConnectivity(ctx context.Context) error

CheckConnectivity performs a connectivity check to Neo4j. This can be used to verify the database is responsive.

func (*Neo4jMonitor) GetConcurrencyStats

func (m *Neo4jMonitor) GetConcurrencyStats() (reads, writes int32, semUsed int)

GetConcurrencyStats returns current concurrency statistics for debugging.

func (*Neo4jMonitor) GetMetrics

func (m *Neo4jMonitor) GetMetrics() loadmonitor.Metrics

GetMetrics returns the current load metrics.

func (*Neo4jMonitor) IncrementActiveReads

func (m *Neo4jMonitor) IncrementActiveReads() func()

IncrementActiveReads tracks an active read operation. Call this when starting a read, and call the returned function when done.

func (*Neo4jMonitor) IncrementActiveWrites

func (m *Neo4jMonitor) IncrementActiveWrites() func()

IncrementActiveWrites tracks an active write operation. Call this when starting a write, and call the returned function when done.

func (*Neo4jMonitor) RecordQueryLatency

func (m *Neo4jMonitor) RecordQueryLatency(latency time.Duration)

RecordQueryLatency records a query latency sample using exponential moving average.

func (*Neo4jMonitor) RecordWriteLatency

func (m *Neo4jMonitor) RecordWriteLatency(latency time.Duration)

RecordWriteLatency records a write latency sample using exponential moving average.

func (*Neo4jMonitor) SetMemoryTarget

func (m *Neo4jMonitor) SetMemoryTarget(bytes uint64)

SetMemoryTarget sets the target memory limit in bytes.

func (*Neo4jMonitor) Start

func (m *Neo4jMonitor) Start() <-chan struct{}

Start begins background metric collection.

func (*Neo4jMonitor) Stop

func (m *Neo4jMonitor) Stop()

Stop halts background metric collection.

type OperationType

type OperationType int

OperationType distinguishes between read and write operations for applying different rate limiting strategies.

const (
	// Read operations (REQ queries)
	Read OperationType = iota
	// Write operations (EVENT saves, imports)
	Write
)

func (OperationType) String

func (o OperationType) String() string

String returns a human-readable name for the operation type.

type PIDController

type PIDController struct {
	// Gains
	Kp float64 // Proportional gain
	Ki float64 // Integral gain
	Kd float64 // Derivative gain

	// Setpoint is the target process variable value (e.g., 0.85 for 85% of target memory).
	// The controller drives the process variable toward this setpoint.
	Setpoint float64

	// DerivativeFilterAlpha is the low-pass filter coefficient for the derivative term.
	// Range: 0.0-1.0, where lower values provide stronger filtering.
	// Recommended: 0.2 for strong filtering, 0.5 for moderate filtering.
	DerivativeFilterAlpha float64

	// Integral limits for anti-windup
	IntegralMax float64
	IntegralMin float64

	// Output limits
	OutputMin float64 // Minimum output (typically 0 = no delay)
	OutputMax float64 // Maximum output (max delay in seconds)
	// contains filtered or unexported fields
}

PIDController implements a PID controller with filtered derivative. It is designed for rate limiting database operations based on load metrics.

The controller computes a delay recommendation based on:

  • Proportional (P): Immediate response to current error
  • Integral (I): Accumulated error to eliminate steady-state offset
  • Derivative (D): Rate of change prediction (filtered to reduce noise)

The filtered derivative uses a low-pass filter to attenuate high-frequency noise that would otherwise cause erratic control behavior.

func DefaultPIDControllerForReads

func DefaultPIDControllerForReads() *PIDController

DefaultPIDControllerForReads creates a PID controller tuned for read operations. Reads should be more responsive but with less aggressive throttling.

func DefaultPIDControllerForWrites

func DefaultPIDControllerForWrites() *PIDController

DefaultPIDControllerForWrites creates a PID controller tuned for write operations. Writes benefit from aggressive integral and moderate proportional response.

func NewPIDController

func NewPIDController(
	kp, ki, kd float64,
	setpoint float64,
	derivativeFilterAlpha float64,
	integralMin, integralMax float64,
	outputMin, outputMax float64,
) *PIDController

NewPIDController creates a new PID controller with custom parameters.

func (*PIDController) GetState

func (p *PIDController) GetState() (integral, prevError, prevFilteredError float64)

GetState returns the current internal state for monitoring/debugging.

func (*PIDController) Reset

func (p *PIDController) Reset()

Reset clears the controller state, useful when conditions change significantly.

func (*PIDController) SetGains

func (p *PIDController) SetGains(kp, ki, kd float64)

SetGains updates the PID gains.

func (*PIDController) SetSetpoint

func (p *PIDController) SetSetpoint(setpoint float64)

SetSetpoint updates the target setpoint.

func (*PIDController) Update

func (p *PIDController) Update(processVariable float64) float64

Update computes the PID output based on the current process variable. The process variable should be in the range [0.0, 1.0+] representing load level.

Returns the recommended delay in seconds. A value of 0 means no delay needed.

type PIDState

type PIDState struct {
	Integral          float64
	PrevError         float64
	PrevFilteredError float64
}

PIDState contains the internal state of a PID controller.

type Stats

type Stats struct {
	WriteThrottles    int64
	ReadThrottles     int64
	TotalWriteDelayMs int64
	TotalReadDelayMs  int64
	CurrentMetrics    loadmonitor.Metrics
	WritePIDState     PIDState
	ReadPIDState      PIDState
}

Stats returns rate limiter statistics.

Source Files

  • badger_monitor.go
  • factory.go
  • limiter.go
  • neo4j_monitor.go
  • pid.go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL