Documentation
¶
Overview ¶
Package ratelimit provides adaptive rate limiting using PID control. The PID controller uses proportional, integral, and derivative terms with a low-pass filter on the derivative to suppress high-frequency noise.
Index ¶
- Constants
- Variables
- func CalculateTargetMemoryMB(configuredMB int) (int, error)
- func DetectAvailableMemoryMB() uint64
- func DetectTotalMemoryMB() uint64
- func MonitorFromBadgerDB(db *badger.DB) loadmonitor.Monitor
- func MonitorFromNeo4jDriver(driver neo4j.DriverWithContext, querySem chan struct{}, maxConcurrency int) loadmonitor.Monitor
- type BadgerMonitor
- func (m *BadgerMonitor) ForceEmergencyMode(duration time.Duration)
- func (m *BadgerMonitor) GetEmergencyThreshold() float64
- func (m *BadgerMonitor) GetL0Stats() (tables int, score float64)
- func (m *BadgerMonitor) GetMetrics() loadmonitor.Metrics
- func (m *BadgerMonitor) IsCompacting() bool
- func (m *BadgerMonitor) RecordQueryLatency(latency time.Duration)
- func (m *BadgerMonitor) RecordWriteLatency(latency time.Duration)
- func (m *BadgerMonitor) SetEmergencyThreshold(threshold float64)
- func (m *BadgerMonitor) SetMemoryTarget(bytes uint64)
- func (m *BadgerMonitor) Start() <-chan struct{}
- func (m *BadgerMonitor) Stop()
- func (m *BadgerMonitor) TriggerCompaction() error
- type Config
- type Limiter
- func (l *Limiter) ComputeDelay(opType OperationType) time.Duration
- func (l *Limiter) GetStats() Stats
- func (l *Limiter) InEmergencyMode() bool
- func (l *Limiter) IsEnabled() bool
- func (l *Limiter) RecordLatency(opType OperationType, latency time.Duration)
- func (l *Limiter) Reset()
- func (l *Limiter) Start()
- func (l *Limiter) Stop()
- func (l *Limiter) Stopped() <-chan struct{}
- func (l *Limiter) UpdateConfig(config Config)
- func (l *Limiter) Wait(ctx context.Context, opType int) time.Duration
- type MemoryStats
- type Neo4jMonitor
- func (m *Neo4jMonitor) CheckConnectivity(ctx context.Context) error
- func (m *Neo4jMonitor) ForceEmergencyMode(duration time.Duration)
- func (m *Neo4jMonitor) GetConcurrencyStats() (reads, writes int32, semUsed int)
- func (m *Neo4jMonitor) GetEmergencyThreshold() float64
- func (m *Neo4jMonitor) GetMetrics() loadmonitor.Metrics
- func (m *Neo4jMonitor) GetThrottleMultiplier() float64
- func (m *Neo4jMonitor) IncrementActiveReads() func()
- func (m *Neo4jMonitor) IncrementActiveWrites() func()
- func (m *Neo4jMonitor) RecordQueryLatency(latency time.Duration)
- func (m *Neo4jMonitor) RecordWriteLatency(latency time.Duration)
- func (m *Neo4jMonitor) SetEmergencyThreshold(threshold float64)
- func (m *Neo4jMonitor) SetMemoryTarget(bytes uint64)
- func (m *Neo4jMonitor) Start() <-chan struct{}
- func (m *Neo4jMonitor) Stop()
- type OperationType
- type PIDController
- type PIDState
- type ProcessMemoryStats
- type Stats
Constants ¶
const AutoDetectMemoryFraction = 0.66
AutoDetectMemoryFraction is the fraction of available memory to use when auto-detecting.
const DefaultMaxMemoryMB = 1500
DefaultMaxMemoryMB is the default maximum memory target when auto-detecting. This caps the auto-detected value to ensure optimal performance.
const MinimumMemoryMB = 128
MinimumMemoryMB is the minimum memory required to run the relay with rate limiting.
ThrottleCheckInterval is how often to recheck memory and adjust throttling
Variables ¶
var ErrInsufficientMemory = errors.New("insufficient memory: relay requires at least 128MB of available memory")
ErrInsufficientMemory is returned when there isn't enough memory to run the relay.
Functions ¶
func CalculateTargetMemoryMB ¶ added in v0.35.1
CalculateTargetMemoryMB calculates the target memory limit based on configuration. If configuredMB is 0, it auto-detects based on available memory (66% of available, capped at 1.5GB). If configuredMB is non-zero, it validates that it's achievable. Returns an error if there isn't enough memory.
func DetectAvailableMemoryMB ¶ added in v0.35.1
func DetectAvailableMemoryMB() uint64
DetectAvailableMemoryMB returns the available system memory in megabytes. On Linux, this returns the actual available memory (free + cached). On other systems, it returns total memory minus the Go runtime's current usage.
func DetectTotalMemoryMB ¶ added in v0.35.1
func DetectTotalMemoryMB() uint64
DetectTotalMemoryMB returns the total system memory in megabytes.
func MonitorFromBadgerDB ¶
func MonitorFromBadgerDB(db *badger.DB) loadmonitor.Monitor
MonitorFromBadgerDB creates a BadgerMonitor from a Badger database. Exported for use when you need to create the monitor separately.
func MonitorFromNeo4jDriver ¶
func MonitorFromNeo4jDriver( driver neo4j.DriverWithContext, querySem chan struct{}, maxConcurrency int, ) loadmonitor.Monitor
MonitorFromNeo4jDriver creates a Neo4jMonitor from a Neo4j driver. Exported for use when you need to create the monitor separately.
Types ¶
type BadgerMonitor ¶
type BadgerMonitor struct {
// contains filtered or unexported fields
}
BadgerMonitor implements loadmonitor.Monitor for the Badger database. It collects metrics from Badger's LSM tree, caches, and actual process memory. It also implements CompactableMonitor and EmergencyModeMonitor interfaces.
func NewBadgerMonitor ¶
func NewBadgerMonitor(db *badger.DB, updateInterval time.Duration) *BadgerMonitor
NewBadgerMonitor creates a new Badger load monitor. The updateInterval controls how often metrics are collected (default 100ms).
func (*BadgerMonitor) ForceEmergencyMode ¶ added in v0.35.1
func (m *BadgerMonitor) ForceEmergencyMode(duration time.Duration)
ForceEmergencyMode manually triggers emergency mode for a duration.
func (*BadgerMonitor) GetEmergencyThreshold ¶ added in v0.35.1
func (m *BadgerMonitor) GetEmergencyThreshold() float64
GetEmergencyThreshold returns the current emergency threshold as a fraction.
func (*BadgerMonitor) GetL0Stats ¶
func (m *BadgerMonitor) GetL0Stats() (tables int, score float64)
GetL0Stats returns L0-specific statistics for debugging.
func (*BadgerMonitor) GetMetrics ¶
func (m *BadgerMonitor) GetMetrics() loadmonitor.Metrics
GetMetrics returns the current load metrics.
func (*BadgerMonitor) IsCompacting ¶ added in v0.35.1
func (m *BadgerMonitor) IsCompacting() bool
IsCompacting returns true if a compaction is currently in progress.
func (*BadgerMonitor) RecordQueryLatency ¶
func (m *BadgerMonitor) RecordQueryLatency(latency time.Duration)
RecordQueryLatency records a query latency sample using exponential moving average.
func (*BadgerMonitor) RecordWriteLatency ¶
func (m *BadgerMonitor) RecordWriteLatency(latency time.Duration)
RecordWriteLatency records a write latency sample using exponential moving average.
func (*BadgerMonitor) SetEmergencyThreshold ¶ added in v0.35.1
func (m *BadgerMonitor) SetEmergencyThreshold(threshold float64)
SetEmergencyThreshold sets the memory threshold above which emergency mode is triggered. threshold is a fraction, e.g., 1.5 = 150% of target memory.
func (*BadgerMonitor) SetMemoryTarget ¶
func (m *BadgerMonitor) SetMemoryTarget(bytes uint64)
SetMemoryTarget sets the target memory limit in bytes.
func (*BadgerMonitor) Start ¶
func (m *BadgerMonitor) Start() <-chan struct{}
Start begins background metric collection.
func (*BadgerMonitor) Stop ¶
func (m *BadgerMonitor) Stop()
Stop halts background metric collection.
func (*BadgerMonitor) TriggerCompaction ¶ added in v0.35.1
func (m *BadgerMonitor) TriggerCompaction() error
TriggerCompaction initiates a Badger Flatten operation to compact all levels. This should be called when memory pressure is high and the database needs to reclaim space. It runs synchronously and may take significant time.
type Config ¶
type Config struct {
// Enabled controls whether rate limiting is active.
Enabled bool
// TargetMemoryMB is the target memory limit in megabytes.
// Memory pressure is calculated relative to this target.
TargetMemoryMB int
// WriteSetpoint is the target process variable for writes (0.0-1.0).
// Default: 0.85 (throttle when load exceeds 85%)
WriteSetpoint float64
// ReadSetpoint is the target process variable for reads (0.0-1.0).
// Default: 0.90 (more tolerant for reads)
ReadSetpoint float64
// PID gains for writes
WriteKp float64
WriteKi float64
WriteKd float64
// PID gains for reads
ReadKp float64
ReadKi float64
ReadKd float64
// MaxWriteDelayMs is the maximum delay for write operations in milliseconds.
MaxWriteDelayMs int
// MaxReadDelayMs is the maximum delay for read operations in milliseconds.
MaxReadDelayMs int
// MetricUpdateInterval is how often to poll the load monitor.
MetricUpdateInterval time.Duration
// MemoryWeight is the weight given to memory pressure in process variable (0.0-1.0).
// The remaining weight is given to the load metric.
// Default: 0.7 (70% memory, 30% load)
MemoryWeight float64
// EmergencyThreshold is the memory pressure level (fraction of target) that triggers emergency mode.
// Default: 1.167 (116.7% = target + 1/6th)
// When exceeded, writes are aggressively throttled until memory drops below RecoveryThreshold.
EmergencyThreshold float64
// RecoveryThreshold is the memory pressure level below which we exit emergency mode.
// Default: 0.833 (83.3% = target - 1/6th)
// Hysteresis prevents rapid oscillation between normal and emergency modes.
RecoveryThreshold float64
// EmergencyMaxDelayMs is the maximum delay for writes during emergency mode.
// Default: 5000 (5 seconds) - much longer than normal MaxWriteDelayMs
EmergencyMaxDelayMs int
// CompactionCheckInterval controls how often to check if compaction should be triggered.
// Default: 10 seconds
CompactionCheckInterval time.Duration
}
Config holds configuration for the adaptive rate limiter.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a default configuration for the rate limiter.
func NewConfigFromValues ¶
func NewConfigFromValues( enabled bool, targetMB int, writeKp, writeKi, writeKd float64, readKp, readKi, readKd float64, maxWriteMs, maxReadMs int, writeTarget, readTarget float64, emergencyThreshold, recoveryThreshold float64, emergencyMaxMs int, ) Config
NewConfigFromValues creates a Config from individual configuration values. This is useful when loading configuration from environment variables.
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter implements adaptive rate limiting using PID control. It monitors database load metrics and computes appropriate delays to keep the system within its target operating range.
func NewBadgerLimiter ¶
NewBadgerLimiter creates a rate limiter configured for a Badger database. It automatically creates a BadgerMonitor for the provided database.
func NewDisabledLimiter ¶
func NewDisabledLimiter() *Limiter
NewDisabledLimiter creates a rate limiter that is disabled. This is useful when rate limiting is not configured.
func NewLimiter ¶
func NewLimiter(config Config, monitor loadmonitor.Monitor) *Limiter
NewLimiter creates a new adaptive rate limiter. If monitor is nil, the limiter will be disabled.
func NewNeo4jLimiter ¶
func NewNeo4jLimiter( config Config, driver neo4j.DriverWithContext, querySem chan struct{}, maxConcurrency int, ) *Limiter
NewNeo4jLimiter creates a rate limiter configured for a Neo4j database. It automatically creates a Neo4jMonitor for the provided driver. querySem should be the semaphore used to limit concurrent queries. maxConcurrency is typically 10 (matching the semaphore size).
func (*Limiter) ComputeDelay ¶
func (l *Limiter) ComputeDelay(opType OperationType) time.Duration
ComputeDelay calculates the recommended delay for an operation. This can be used to check the delay without actually waiting.
func (*Limiter) GetStats ¶
GetStats returns current rate limiter statistics.
func (*Limiter) InEmergencyMode ¶ added in v0.35.1
InEmergencyMode returns true if the limiter is currently in emergency mode.
func (*Limiter) IsEnabled ¶
IsEnabled returns whether rate limiting is active.
func (*Limiter) RecordLatency ¶
func (l *Limiter) RecordLatency(opType OperationType, latency time.Duration)
RecordLatency records an operation latency for the monitor.
func (*Limiter) Reset ¶
func (l *Limiter) Reset()
Reset clears all PID controller state and statistics.
func (*Limiter) Start ¶
func (l *Limiter) Start()
Start begins the rate limiter's background metric collection.
func (*Limiter) Stopped ¶
func (l *Limiter) Stopped() <-chan struct{}
Stopped returns a channel that closes when the limiter has stopped.
func (*Limiter) UpdateConfig ¶
UpdateConfig updates the rate limiter configuration. This is useful for dynamic tuning.
func (*Limiter) Wait ¶
Wait blocks until the rate limiter permits the operation to proceed. It returns the delay that was applied, or 0 if no delay was needed. If the context is cancelled, it returns immediately. opType accepts int for interface compatibility (0=Read, 1=Write)
type MemoryStats ¶ added in v0.35.1
type MemoryStats struct {
TotalMB uint64
AvailableMB uint64
TargetMB int
GoAllocatedMB uint64
GoSysMB uint64
}
GetMemoryStats returns current memory statistics for logging.
func GetMemoryStats ¶ added in v0.35.1
func GetMemoryStats(targetMB int) MemoryStats
GetMemoryStats returns current memory statistics.
type Neo4jMonitor ¶
type Neo4jMonitor struct {
// contains filtered or unexported fields
}
Neo4jMonitor implements loadmonitor.Monitor for Neo4j database. Since Neo4j driver doesn't expose detailed metrics, we track: - Memory pressure via actual RSS (not Go runtime) - Query concurrency via the semaphore - Latency via recording
This monitor implements aggressive memory-based limiting: When memory exceeds the target, it applies 50% more aggressive throttling. It rechecks every 10 seconds and doubles the throttling multiplier until memory returns under target.
func NewNeo4jMonitor ¶
func NewNeo4jMonitor( driver neo4j.DriverWithContext, querySem chan struct{}, maxConcurrency int, updateInterval time.Duration, ) *Neo4jMonitor
NewNeo4jMonitor creates a new Neo4j load monitor. The querySem should be the same semaphore used for limiting concurrent queries. maxConcurrency is the maximum concurrent query limit (typically 10).
func (*Neo4jMonitor) CheckConnectivity ¶
func (m *Neo4jMonitor) CheckConnectivity(ctx context.Context) error
CheckConnectivity performs a connectivity check to Neo4j. This can be used to verify the database is responsive.
func (*Neo4jMonitor) ForceEmergencyMode ¶ added in v0.35.1
func (m *Neo4jMonitor) ForceEmergencyMode(duration time.Duration)
ForceEmergencyMode manually triggers emergency mode for a duration.
func (*Neo4jMonitor) GetConcurrencyStats ¶
func (m *Neo4jMonitor) GetConcurrencyStats() (reads, writes int32, semUsed int)
GetConcurrencyStats returns current concurrency statistics for debugging.
func (*Neo4jMonitor) GetEmergencyThreshold ¶ added in v0.35.1
func (m *Neo4jMonitor) GetEmergencyThreshold() float64
GetEmergencyThreshold returns the current emergency threshold as a fraction.
func (*Neo4jMonitor) GetMetrics ¶
func (m *Neo4jMonitor) GetMetrics() loadmonitor.Metrics
GetMetrics returns the current load metrics.
func (*Neo4jMonitor) GetThrottleMultiplier ¶ added in v0.35.1
func (m *Neo4jMonitor) GetThrottleMultiplier() float64
GetThrottleMultiplier returns the current throttle multiplier. Returns a value >= 1.0, where 1.0 = no extra throttling, 1.5 = 50% more aggressive, etc.
func (*Neo4jMonitor) IncrementActiveReads ¶
func (m *Neo4jMonitor) IncrementActiveReads() func()
IncrementActiveReads tracks an active read operation. Call this when starting a read, and call the returned function when done.
func (*Neo4jMonitor) IncrementActiveWrites ¶
func (m *Neo4jMonitor) IncrementActiveWrites() func()
IncrementActiveWrites tracks an active write operation. Call this when starting a write, and call the returned function when done.
func (*Neo4jMonitor) RecordQueryLatency ¶
func (m *Neo4jMonitor) RecordQueryLatency(latency time.Duration)
RecordQueryLatency records a query latency sample using exponential moving average.
func (*Neo4jMonitor) RecordWriteLatency ¶
func (m *Neo4jMonitor) RecordWriteLatency(latency time.Duration)
RecordWriteLatency records a write latency sample using exponential moving average.
func (*Neo4jMonitor) SetEmergencyThreshold ¶ added in v0.35.1
func (m *Neo4jMonitor) SetEmergencyThreshold(threshold float64)
SetEmergencyThreshold sets the memory threshold above which emergency mode is triggered. threshold is a fraction, e.g., 1.0 = 100% of target memory.
func (*Neo4jMonitor) SetMemoryTarget ¶
func (m *Neo4jMonitor) SetMemoryTarget(bytes uint64)
SetMemoryTarget sets the target memory limit in bytes.
func (*Neo4jMonitor) Start ¶
func (m *Neo4jMonitor) Start() <-chan struct{}
Start begins background metric collection.
type OperationType ¶
type OperationType int
OperationType distinguishes between read and write operations for applying different rate limiting strategies.
const ( // Read operations (REQ queries) Read OperationType = iota // Write operations (EVENT saves, imports) Write )
func (OperationType) String ¶
func (o OperationType) String() string
String returns a human-readable name for the operation type.
type PIDController ¶
type PIDController struct {
// Gains
Kp float64 // Proportional gain
Ki float64 // Integral gain
Kd float64 // Derivative gain
// Setpoint is the target process variable value (e.g., 0.85 for 85% of target memory).
// The controller drives the process variable toward this setpoint.
Setpoint float64
// DerivativeFilterAlpha is the low-pass filter coefficient for the derivative term.
// Range: 0.0-1.0, where lower values provide stronger filtering.
// Recommended: 0.2 for strong filtering, 0.5 for moderate filtering.
DerivativeFilterAlpha float64
// Integral limits for anti-windup
IntegralMax float64
IntegralMin float64
// Output limits
OutputMin float64 // Minimum output (typically 0 = no delay)
OutputMax float64 // Maximum output (max delay in seconds)
// contains filtered or unexported fields
}
PIDController implements a PID controller with filtered derivative. It is designed for rate limiting database operations based on load metrics.
The controller computes a delay recommendation based on:
- Proportional (P): Immediate response to current error
- Integral (I): Accumulated error to eliminate steady-state offset
- Derivative (D): Rate of change prediction (filtered to reduce noise)
The filtered derivative uses a low-pass filter to attenuate high-frequency noise that would otherwise cause erratic control behavior.
func DefaultPIDControllerForReads ¶
func DefaultPIDControllerForReads() *PIDController
DefaultPIDControllerForReads creates a PID controller tuned for read operations. Reads should be more responsive but with less aggressive throttling.
func DefaultPIDControllerForWrites ¶
func DefaultPIDControllerForWrites() *PIDController
DefaultPIDControllerForWrites creates a PID controller tuned for write operations. Writes benefit from aggressive integral and moderate proportional response.
func NewPIDController ¶
func NewPIDController( kp, ki, kd float64, setpoint float64, derivativeFilterAlpha float64, integralMin, integralMax float64, outputMin, outputMax float64, ) *PIDController
NewPIDController creates a new PID controller with custom parameters.
func (*PIDController) GetState ¶
func (p *PIDController) GetState() (integral, prevError, prevFilteredError float64)
GetState returns the current internal state for monitoring/debugging.
func (*PIDController) Reset ¶
func (p *PIDController) Reset()
Reset clears the controller state, useful when conditions change significantly.
func (*PIDController) SetGains ¶
func (p *PIDController) SetGains(kp, ki, kd float64)
SetGains updates the PID gains.
func (*PIDController) SetSetpoint ¶
func (p *PIDController) SetSetpoint(setpoint float64)
SetSetpoint updates the target setpoint.
func (*PIDController) Update ¶
func (p *PIDController) Update(processVariable float64) float64
Update computes the PID output based on the current process variable. The process variable should be in the range [0.0, 1.0+] representing load level.
Returns the recommended delay in seconds. A value of 0 means no delay needed.
type PIDState ¶
PIDState contains the internal state of a PID controller.
type ProcessMemoryStats ¶ added in v0.35.1
type ProcessMemoryStats struct {
// VmRSS is the resident set size (total physical memory in use) in bytes
VmRSS uint64
// RssShmem is the shared memory portion of RSS in bytes
RssShmem uint64
// RssAnon is the anonymous (non-shared) memory in bytes
RssAnon uint64
// VmHWM is the peak RSS (high water mark) in bytes
VmHWM uint64
}
ProcessMemoryStats contains memory statistics for the current process. On Linux, these are read from /proc/self/status for accurate RSS values. On other platforms, these are approximated from Go runtime stats.
func ReadProcessMemoryStats ¶ added in v0.35.1
func ReadProcessMemoryStats() ProcessMemoryStats
ReadProcessMemoryStats reads memory statistics from /proc/self/status. This provides accurate RSS (Resident Set Size) information on Linux, including the breakdown between shared and anonymous memory.
func (ProcessMemoryStats) PhysicalMemoryBytes ¶ added in v0.35.1
func (p ProcessMemoryStats) PhysicalMemoryBytes() uint64
PhysicalMemoryBytes returns the actual physical memory usage (RSS - shared)
func (ProcessMemoryStats) PhysicalMemoryMB ¶ added in v0.35.1
func (p ProcessMemoryStats) PhysicalMemoryMB() uint64
PhysicalMemoryMB returns the actual physical memory usage in MB
Source Files
¶
- badger_monitor.go
- factory.go
- limiter.go
- memory.go
- memory_linux.go
- neo4j_monitor.go
- pid.go