Documentation
¶
Overview ¶
Package ratelimit provides adaptive rate limiting using PID control. The PID controller uses proportional, integral, and derivative terms with a low-pass filter on the derivative to suppress high-frequency noise.
Index ¶
- func MonitorFromBadgerDB(db *badger.DB) loadmonitor.Monitor
- func MonitorFromNeo4jDriver(driver neo4j.DriverWithContext, querySem chan struct{}, maxConcurrency int) loadmonitor.Monitor
- type BadgerMonitor
- func (m *BadgerMonitor) GetL0Stats() (tables int, score float64)
- func (m *BadgerMonitor) GetMetrics() loadmonitor.Metrics
- func (m *BadgerMonitor) RecordQueryLatency(latency time.Duration)
- func (m *BadgerMonitor) RecordWriteLatency(latency time.Duration)
- func (m *BadgerMonitor) SetMemoryTarget(bytes uint64)
- func (m *BadgerMonitor) Start() <-chan struct{}
- func (m *BadgerMonitor) Stop()
- type Config
- type Limiter
- func (l *Limiter) ComputeDelay(opType OperationType) time.Duration
- func (l *Limiter) GetStats() Stats
- func (l *Limiter) IsEnabled() bool
- func (l *Limiter) RecordLatency(opType OperationType, latency time.Duration)
- func (l *Limiter) Reset()
- func (l *Limiter) Start()
- func (l *Limiter) Stop()
- func (l *Limiter) Stopped() <-chan struct{}
- func (l *Limiter) UpdateConfig(config Config)
- func (l *Limiter) Wait(ctx context.Context, opType OperationType) time.Duration
- type Neo4jMonitor
- func (m *Neo4jMonitor) CheckConnectivity(ctx context.Context) error
- func (m *Neo4jMonitor) GetConcurrencyStats() (reads, writes int32, semUsed int)
- func (m *Neo4jMonitor) GetMetrics() loadmonitor.Metrics
- func (m *Neo4jMonitor) IncrementActiveReads() func()
- func (m *Neo4jMonitor) IncrementActiveWrites() func()
- func (m *Neo4jMonitor) RecordQueryLatency(latency time.Duration)
- func (m *Neo4jMonitor) RecordWriteLatency(latency time.Duration)
- func (m *Neo4jMonitor) SetMemoryTarget(bytes uint64)
- func (m *Neo4jMonitor) Start() <-chan struct{}
- func (m *Neo4jMonitor) Stop()
- type OperationType
- type PIDController
- type PIDState
- type Stats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MonitorFromBadgerDB ¶
func MonitorFromBadgerDB(db *badger.DB) loadmonitor.Monitor
MonitorFromBadgerDB creates a BadgerMonitor from a Badger database. Exported for use when you need to create the monitor separately.
func MonitorFromNeo4jDriver ¶
func MonitorFromNeo4jDriver( driver neo4j.DriverWithContext, querySem chan struct{}, maxConcurrency int, ) loadmonitor.Monitor
MonitorFromNeo4jDriver creates a Neo4jMonitor from a Neo4j driver. Exported for use when you need to create the monitor separately.
Types ¶
type BadgerMonitor ¶
type BadgerMonitor struct {
// contains filtered or unexported fields
}
BadgerMonitor implements loadmonitor.Monitor for the Badger database. It collects metrics from Badger's LSM tree, caches, and Go runtime.
func NewBadgerMonitor ¶
func NewBadgerMonitor(db *badger.DB, updateInterval time.Duration) *BadgerMonitor
NewBadgerMonitor creates a new Badger load monitor. The updateInterval controls how often metrics are collected (default 100ms).
func (*BadgerMonitor) GetL0Stats ¶
func (m *BadgerMonitor) GetL0Stats() (tables int, score float64)
GetL0Stats returns L0-specific statistics for debugging.
func (*BadgerMonitor) GetMetrics ¶
func (m *BadgerMonitor) GetMetrics() loadmonitor.Metrics
GetMetrics returns the current load metrics.
func (*BadgerMonitor) RecordQueryLatency ¶
func (m *BadgerMonitor) RecordQueryLatency(latency time.Duration)
RecordQueryLatency records a query latency sample using exponential moving average.
func (*BadgerMonitor) RecordWriteLatency ¶
func (m *BadgerMonitor) RecordWriteLatency(latency time.Duration)
RecordWriteLatency records a write latency sample using exponential moving average.
func (*BadgerMonitor) SetMemoryTarget ¶
func (m *BadgerMonitor) SetMemoryTarget(bytes uint64)
SetMemoryTarget sets the target memory limit in bytes.
func (*BadgerMonitor) Start ¶
func (m *BadgerMonitor) Start() <-chan struct{}
Start begins background metric collection.
func (*BadgerMonitor) Stop ¶
func (m *BadgerMonitor) Stop()
Stop halts background metric collection.
type Config ¶
type Config struct {
// Enabled controls whether rate limiting is active.
Enabled bool
// TargetMemoryMB is the target memory limit in megabytes.
// Memory pressure is calculated relative to this target.
TargetMemoryMB int
// WriteSetpoint is the target process variable for writes (0.0-1.0).
// Default: 0.85 (throttle when load exceeds 85%)
WriteSetpoint float64
// ReadSetpoint is the target process variable for reads (0.0-1.0).
// Default: 0.90 (more tolerant for reads)
ReadSetpoint float64
// PID gains for writes
WriteKp float64
WriteKi float64
WriteKd float64
// PID gains for reads
ReadKp float64
ReadKi float64
ReadKd float64
// MaxWriteDelayMs is the maximum delay for write operations in milliseconds.
MaxWriteDelayMs int
// MaxReadDelayMs is the maximum delay for read operations in milliseconds.
MaxReadDelayMs int
// MetricUpdateInterval is how often to poll the load monitor.
MetricUpdateInterval time.Duration
// MemoryWeight is the weight given to memory pressure in process variable (0.0-1.0).
// The remaining weight is given to the load metric.
// Default: 0.7 (70% memory, 30% load)
MemoryWeight float64
}
Config holds configuration for the adaptive rate limiter.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a default configuration for the rate limiter.
func NewConfigFromValues ¶
func NewConfigFromValues( enabled bool, targetMB int, writeKp, writeKi, writeKd float64, readKp, readKi, readKd float64, maxWriteMs, maxReadMs int, writeTarget, readTarget float64, ) Config
NewConfigFromValues creates a Config from individual configuration values. This is useful when loading configuration from environment variables.
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter implements adaptive rate limiting using PID control. It monitors database load metrics and computes appropriate delays to keep the system within its target operating range.
func NewBadgerLimiter ¶
NewBadgerLimiter creates a rate limiter configured for a Badger database. It automatically creates a BadgerMonitor for the provided database.
func NewDisabledLimiter ¶
func NewDisabledLimiter() *Limiter
NewDisabledLimiter creates a rate limiter that is disabled. This is useful when rate limiting is not configured.
func NewLimiter ¶
func NewLimiter(config Config, monitor loadmonitor.Monitor) *Limiter
NewLimiter creates a new adaptive rate limiter. If monitor is nil, the limiter will be disabled.
func NewNeo4jLimiter ¶
func NewNeo4jLimiter( config Config, driver neo4j.DriverWithContext, querySem chan struct{}, maxConcurrency int, ) *Limiter
NewNeo4jLimiter creates a rate limiter configured for a Neo4j database. It automatically creates a Neo4jMonitor for the provided driver. querySem should be the semaphore used to limit concurrent queries. maxConcurrency is typically 10 (matching the semaphore size).
func (*Limiter) ComputeDelay ¶
func (l *Limiter) ComputeDelay(opType OperationType) time.Duration
ComputeDelay calculates the recommended delay for an operation. This can be used to check the delay without actually waiting.
func (*Limiter) GetStats ¶
GetStats returns current rate limiter statistics.
func (*Limiter) IsEnabled ¶
IsEnabled returns whether rate limiting is active.
func (*Limiter) RecordLatency ¶
func (l *Limiter) RecordLatency(opType OperationType, latency time.Duration)
RecordLatency records an operation latency for the monitor.
func (*Limiter) Reset ¶
func (l *Limiter) Reset()
Reset clears all PID controller state and statistics.
func (*Limiter) Start ¶
func (l *Limiter) Start()
Start begins the rate limiter's background metric collection.
func (*Limiter) Stopped ¶
func (l *Limiter) Stopped() <-chan struct{}
Stopped returns a channel that closes when the limiter has stopped.
func (*Limiter) UpdateConfig ¶
UpdateConfig updates the rate limiter configuration. This is useful for dynamic tuning.
type Neo4jMonitor ¶
type Neo4jMonitor struct {
// contains filtered or unexported fields
}
Neo4jMonitor implements loadmonitor.Monitor for Neo4j database. Since Neo4j driver doesn't expose detailed metrics, we track: - Memory pressure via Go runtime - Query concurrency via the semaphore - Latency via recording
func NewNeo4jMonitor ¶
func NewNeo4jMonitor( driver neo4j.DriverWithContext, querySem chan struct{}, maxConcurrency int, updateInterval time.Duration, ) *Neo4jMonitor
NewNeo4jMonitor creates a new Neo4j load monitor. The querySem should be the same semaphore used for limiting concurrent queries. maxConcurrency is the maximum concurrent query limit (typically 10).
func (*Neo4jMonitor) CheckConnectivity ¶
func (m *Neo4jMonitor) CheckConnectivity(ctx context.Context) error
CheckConnectivity performs a connectivity check to Neo4j. This can be used to verify the database is responsive.
func (*Neo4jMonitor) GetConcurrencyStats ¶
func (m *Neo4jMonitor) GetConcurrencyStats() (reads, writes int32, semUsed int)
GetConcurrencyStats returns current concurrency statistics for debugging.
func (*Neo4jMonitor) GetMetrics ¶
func (m *Neo4jMonitor) GetMetrics() loadmonitor.Metrics
GetMetrics returns the current load metrics.
func (*Neo4jMonitor) IncrementActiveReads ¶
func (m *Neo4jMonitor) IncrementActiveReads() func()
IncrementActiveReads tracks an active read operation. Call this when starting a read, and call the returned function when done.
func (*Neo4jMonitor) IncrementActiveWrites ¶
func (m *Neo4jMonitor) IncrementActiveWrites() func()
IncrementActiveWrites tracks an active write operation. Call this when starting a write, and call the returned function when done.
func (*Neo4jMonitor) RecordQueryLatency ¶
func (m *Neo4jMonitor) RecordQueryLatency(latency time.Duration)
RecordQueryLatency records a query latency sample using exponential moving average.
func (*Neo4jMonitor) RecordWriteLatency ¶
func (m *Neo4jMonitor) RecordWriteLatency(latency time.Duration)
RecordWriteLatency records a write latency sample using exponential moving average.
func (*Neo4jMonitor) SetMemoryTarget ¶
func (m *Neo4jMonitor) SetMemoryTarget(bytes uint64)
SetMemoryTarget sets the target memory limit in bytes.
func (*Neo4jMonitor) Start ¶
func (m *Neo4jMonitor) Start() <-chan struct{}
Start begins background metric collection.
type OperationType ¶
type OperationType int
OperationType distinguishes between read and write operations for applying different rate limiting strategies.
const ( // Read operations (REQ queries) Read OperationType = iota // Write operations (EVENT saves, imports) Write )
func (OperationType) String ¶
func (o OperationType) String() string
String returns a human-readable name for the operation type.
type PIDController ¶
type PIDController struct {
// Gains
Kp float64 // Proportional gain
Ki float64 // Integral gain
Kd float64 // Derivative gain
// Setpoint is the target process variable value (e.g., 0.85 for 85% of target memory).
// The controller drives the process variable toward this setpoint.
Setpoint float64
// DerivativeFilterAlpha is the low-pass filter coefficient for the derivative term.
// Range: 0.0-1.0, where lower values provide stronger filtering.
// Recommended: 0.2 for strong filtering, 0.5 for moderate filtering.
DerivativeFilterAlpha float64
// Integral limits for anti-windup
IntegralMax float64
IntegralMin float64
// Output limits
OutputMin float64 // Minimum output (typically 0 = no delay)
OutputMax float64 // Maximum output (max delay in seconds)
// contains filtered or unexported fields
}
PIDController implements a PID controller with filtered derivative. It is designed for rate limiting database operations based on load metrics.
The controller computes a delay recommendation based on:
- Proportional (P): Immediate response to current error
- Integral (I): Accumulated error to eliminate steady-state offset
- Derivative (D): Rate of change prediction (filtered to reduce noise)
The filtered derivative uses a low-pass filter to attenuate high-frequency noise that would otherwise cause erratic control behavior.
func DefaultPIDControllerForReads ¶
func DefaultPIDControllerForReads() *PIDController
DefaultPIDControllerForReads creates a PID controller tuned for read operations. Reads should be more responsive but with less aggressive throttling.
func DefaultPIDControllerForWrites ¶
func DefaultPIDControllerForWrites() *PIDController
DefaultPIDControllerForWrites creates a PID controller tuned for write operations. Writes benefit from aggressive integral and moderate proportional response.
func NewPIDController ¶
func NewPIDController( kp, ki, kd float64, setpoint float64, derivativeFilterAlpha float64, integralMin, integralMax float64, outputMin, outputMax float64, ) *PIDController
NewPIDController creates a new PID controller with custom parameters.
func (*PIDController) GetState ¶
func (p *PIDController) GetState() (integral, prevError, prevFilteredError float64)
GetState returns the current internal state for monitoring/debugging.
func (*PIDController) Reset ¶
func (p *PIDController) Reset()
Reset clears the controller state, useful when conditions change significantly.
func (*PIDController) SetGains ¶
func (p *PIDController) SetGains(kp, ki, kd float64)
SetGains updates the PID gains.
func (*PIDController) SetSetpoint ¶
func (p *PIDController) SetSetpoint(setpoint float64)
SetSetpoint updates the target setpoint.
func (*PIDController) Update ¶
func (p *PIDController) Update(processVariable float64) float64
Update computes the PID output based on the current process variable. The process variable should be in the range [0.0, 1.0+] representing load level.
Returns the recommended delay in seconds. A value of 0 means no delay needed.
type PIDState ¶
PIDState contains the internal state of a PID controller.
Source Files
¶
- badger_monitor.go
- factory.go
- limiter.go
- neo4j_monitor.go
- pid.go