ratelimit

package
v0.37.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2025 License: Unlicense Imports: 18 Imported by: 0

Documentation

Overview

Package ratelimit provides adaptive rate limiting using PID control. The PID controller uses proportional, integral, and derivative terms with a low-pass filter on the derivative to suppress high-frequency noise.

Index

Constants

View Source
const AutoDetectMemoryFraction = 0.66

AutoDetectMemoryFraction is the fraction of available memory to use when auto-detecting.

View Source
const DefaultMaxMemoryMB = 1500

DefaultMaxMemoryMB is the default maximum memory target when auto-detecting. This caps the auto-detected value to ensure optimal performance.

View Source
const MinimumMemoryMB = 500

MinimumMemoryMB is the minimum memory required to run the relay with rate limiting.

View Source
const ThrottleCheckInterval = 10 * time.Second

ThrottleCheckInterval is how often to recheck memory and adjust throttling

Variables

View Source
var ErrInsufficientMemory = errors.New("insufficient memory: relay requires at least 500MB of available memory")

ErrInsufficientMemory is returned when there isn't enough memory to run the relay.

Functions

func CalculateTargetMemoryMB added in v0.35.1

func CalculateTargetMemoryMB(configuredMB int) (int, error)

CalculateTargetMemoryMB calculates the target memory limit based on configuration. If configuredMB is 0, it auto-detects based on available memory (66% of available, capped at 1.5GB). If configuredMB is non-zero, it validates that it's achievable. Returns an error if there isn't enough memory.

func DetectAvailableMemoryMB added in v0.35.1

func DetectAvailableMemoryMB() uint64

DetectAvailableMemoryMB returns the available system memory in megabytes. On Linux, this returns the actual available memory (free + cached). On other systems, it returns total memory minus the Go runtime's current usage.

func DetectTotalMemoryMB added in v0.35.1

func DetectTotalMemoryMB() uint64

DetectTotalMemoryMB returns the total system memory in megabytes.

func MonitorFromBadgerDB

func MonitorFromBadgerDB(db *badger.DB) loadmonitor.Monitor

MonitorFromBadgerDB creates a BadgerMonitor from a Badger database. Exported for use when you need to create the monitor separately.

func MonitorFromNeo4jDriver

func MonitorFromNeo4jDriver(
	driver neo4j.DriverWithContext,
	querySem chan struct{},
	maxConcurrency int,
) loadmonitor.Monitor

MonitorFromNeo4jDriver creates a Neo4jMonitor from a Neo4j driver. Exported for use when you need to create the monitor separately.

Types

type BadgerMonitor

type BadgerMonitor struct {
	// contains filtered or unexported fields
}

BadgerMonitor implements loadmonitor.Monitor for the Badger database. It collects metrics from Badger's LSM tree, caches, and actual process memory. It also implements CompactableMonitor and EmergencyModeMonitor interfaces.

func NewBadgerMonitor

func NewBadgerMonitor(db *badger.DB, updateInterval time.Duration) *BadgerMonitor

NewBadgerMonitor creates a new Badger load monitor. The updateInterval controls how often metrics are collected (default 100ms).

func (*BadgerMonitor) ForceEmergencyMode added in v0.35.1

func (m *BadgerMonitor) ForceEmergencyMode(duration time.Duration)

ForceEmergencyMode manually triggers emergency mode for a duration.

func (*BadgerMonitor) GetEmergencyThreshold added in v0.35.1

func (m *BadgerMonitor) GetEmergencyThreshold() float64

GetEmergencyThreshold returns the current emergency threshold as a fraction.

func (*BadgerMonitor) GetL0Stats

func (m *BadgerMonitor) GetL0Stats() (tables int, score float64)

GetL0Stats returns L0-specific statistics for debugging.

func (*BadgerMonitor) GetMetrics

func (m *BadgerMonitor) GetMetrics() loadmonitor.Metrics

GetMetrics returns the current load metrics.

func (*BadgerMonitor) IsCompacting added in v0.35.1

func (m *BadgerMonitor) IsCompacting() bool

IsCompacting returns true if a compaction is currently in progress.

func (*BadgerMonitor) RecordQueryLatency

func (m *BadgerMonitor) RecordQueryLatency(latency time.Duration)

RecordQueryLatency records a query latency sample using exponential moving average.

func (*BadgerMonitor) RecordWriteLatency

func (m *BadgerMonitor) RecordWriteLatency(latency time.Duration)

RecordWriteLatency records a write latency sample using exponential moving average.

func (*BadgerMonitor) SetEmergencyThreshold added in v0.35.1

func (m *BadgerMonitor) SetEmergencyThreshold(threshold float64)

SetEmergencyThreshold sets the memory threshold above which emergency mode is triggered. threshold is a fraction, e.g., 1.5 = 150% of target memory.

func (*BadgerMonitor) SetMemoryTarget

func (m *BadgerMonitor) SetMemoryTarget(bytes uint64)

SetMemoryTarget sets the target memory limit in bytes.

func (*BadgerMonitor) Start

func (m *BadgerMonitor) Start() <-chan struct{}

Start begins background metric collection.

func (*BadgerMonitor) Stop

func (m *BadgerMonitor) Stop()

Stop halts background metric collection.

func (*BadgerMonitor) TriggerCompaction added in v0.35.1

func (m *BadgerMonitor) TriggerCompaction() error

TriggerCompaction initiates a Badger Flatten operation to compact all levels. This should be called when memory pressure is high and the database needs to reclaim space. It runs synchronously and may take significant time.

type Config

type Config struct {
	// Enabled controls whether rate limiting is active.
	Enabled bool

	// TargetMemoryMB is the target memory limit in megabytes.
	// Memory pressure is calculated relative to this target.
	TargetMemoryMB int

	// WriteSetpoint is the target process variable for writes (0.0-1.0).
	// Default: 0.85 (throttle when load exceeds 85%)
	WriteSetpoint float64

	// ReadSetpoint is the target process variable for reads (0.0-1.0).
	// Default: 0.90 (more tolerant for reads)
	ReadSetpoint float64

	// PID gains for writes
	WriteKp float64
	WriteKi float64
	WriteKd float64

	// PID gains for reads
	ReadKp float64
	ReadKi float64
	ReadKd float64

	// MaxWriteDelayMs is the maximum delay for write operations in milliseconds.
	MaxWriteDelayMs int

	// MaxReadDelayMs is the maximum delay for read operations in milliseconds.
	MaxReadDelayMs int

	// MetricUpdateInterval is how often to poll the load monitor.
	MetricUpdateInterval time.Duration

	// MemoryWeight is the weight given to memory pressure in process variable (0.0-1.0).
	// The remaining weight is given to the load metric.
	// Default: 0.7 (70% memory, 30% load)
	MemoryWeight float64

	// EmergencyThreshold is the memory pressure level (fraction of target) that triggers emergency mode.
	// Default: 1.167 (116.7% = target + 1/6th)
	// When exceeded, writes are aggressively throttled until memory drops below RecoveryThreshold.
	EmergencyThreshold float64

	// RecoveryThreshold is the memory pressure level below which we exit emergency mode.
	// Default: 0.833 (83.3% = target - 1/6th)
	// Hysteresis prevents rapid oscillation between normal and emergency modes.
	RecoveryThreshold float64

	// EmergencyMaxDelayMs is the maximum delay for writes during emergency mode.
	// Default: 5000 (5 seconds) - much longer than normal MaxWriteDelayMs
	EmergencyMaxDelayMs int

	// CompactionCheckInterval controls how often to check if compaction should be triggered.
	// Default: 10 seconds
	CompactionCheckInterval time.Duration
}

Config holds configuration for the adaptive rate limiter.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a default configuration for the rate limiter.

func NewConfigFromValues

func NewConfigFromValues(
	enabled bool,
	targetMB int,
	writeKp, writeKi, writeKd float64,
	readKp, readKi, readKd float64,
	maxWriteMs, maxReadMs int,
	writeTarget, readTarget float64,
	emergencyThreshold, recoveryThreshold float64,
	emergencyMaxMs int,
) Config

NewConfigFromValues creates a Config from individual configuration values. This is useful when loading configuration from environment variables.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements adaptive rate limiting using PID control. It monitors database load metrics and computes appropriate delays to keep the system within its target operating range.

func NewBadgerLimiter

func NewBadgerLimiter(config Config, db *badger.DB) *Limiter

NewBadgerLimiter creates a rate limiter configured for a Badger database. It automatically creates a BadgerMonitor for the provided database.

func NewDisabledLimiter

func NewDisabledLimiter() *Limiter

NewDisabledLimiter creates a rate limiter that is disabled. This is useful when rate limiting is not configured.

func NewLimiter

func NewLimiter(config Config, monitor loadmonitor.Monitor) *Limiter

NewLimiter creates a new adaptive rate limiter. If monitor is nil, the limiter will be disabled.

func NewNeo4jLimiter

func NewNeo4jLimiter(
	config Config,
	driver neo4j.DriverWithContext,
	querySem chan struct{},
	maxConcurrency int,
) *Limiter

NewNeo4jLimiter creates a rate limiter configured for a Neo4j database. It automatically creates a Neo4jMonitor for the provided driver. querySem should be the semaphore used to limit concurrent queries. maxConcurrency is typically 10 (matching the semaphore size).

func (*Limiter) ComputeDelay

func (l *Limiter) ComputeDelay(opType OperationType) time.Duration

ComputeDelay calculates the recommended delay for an operation. This can be used to check the delay without actually waiting.

func (*Limiter) GetStats

func (l *Limiter) GetStats() Stats

GetStats returns current rate limiter statistics.

func (*Limiter) InEmergencyMode added in v0.35.1

func (l *Limiter) InEmergencyMode() bool

InEmergencyMode returns true if the limiter is currently in emergency mode.

func (*Limiter) IsEnabled

func (l *Limiter) IsEnabled() bool

IsEnabled returns whether rate limiting is active.

func (*Limiter) RecordLatency

func (l *Limiter) RecordLatency(opType OperationType, latency time.Duration)

RecordLatency records an operation latency for the monitor.

func (*Limiter) Reset

func (l *Limiter) Reset()

Reset clears all PID controller state and statistics.

func (*Limiter) Start

func (l *Limiter) Start()

Start begins the rate limiter's background metric collection.

func (*Limiter) Stop

func (l *Limiter) Stop()

Stop halts the rate limiter.

func (*Limiter) Stopped

func (l *Limiter) Stopped() <-chan struct{}

Stopped returns a channel that closes when the limiter has stopped.

func (*Limiter) UpdateConfig

func (l *Limiter) UpdateConfig(config Config)

UpdateConfig updates the rate limiter configuration. This is useful for dynamic tuning.

func (*Limiter) Wait

func (l *Limiter) Wait(ctx context.Context, opType int) time.Duration

Wait blocks until the rate limiter permits the operation to proceed. It returns the delay that was applied, or 0 if no delay was needed. If the context is cancelled, it returns immediately. opType accepts int for interface compatibility (0=Read, 1=Write)

type MemoryStats added in v0.35.1

type MemoryStats struct {
	TotalMB       uint64
	AvailableMB   uint64
	TargetMB      int
	GoAllocatedMB uint64
	GoSysMB       uint64
}

GetMemoryStats returns current memory statistics for logging.

func GetMemoryStats added in v0.35.1

func GetMemoryStats(targetMB int) MemoryStats

GetMemoryStats returns current memory statistics.

type Neo4jMonitor

type Neo4jMonitor struct {
	// contains filtered or unexported fields
}

Neo4jMonitor implements loadmonitor.Monitor for Neo4j database. Since Neo4j driver doesn't expose detailed metrics, we track: - Memory pressure via actual RSS (not Go runtime) - Query concurrency via the semaphore - Latency via recording

This monitor implements aggressive memory-based limiting: When memory exceeds the target, it applies 50% more aggressive throttling. It rechecks every 10 seconds and doubles the throttling multiplier until memory returns under target.

func NewNeo4jMonitor

func NewNeo4jMonitor(
	driver neo4j.DriverWithContext,
	querySem chan struct{},
	maxConcurrency int,
	updateInterval time.Duration,
) *Neo4jMonitor

NewNeo4jMonitor creates a new Neo4j load monitor. The querySem should be the same semaphore used for limiting concurrent queries. maxConcurrency is the maximum concurrent query limit (typically 10).

func (*Neo4jMonitor) CheckConnectivity

func (m *Neo4jMonitor) CheckConnectivity(ctx context.Context) error

CheckConnectivity performs a connectivity check to Neo4j. This can be used to verify the database is responsive.

func (*Neo4jMonitor) ForceEmergencyMode added in v0.35.1

func (m *Neo4jMonitor) ForceEmergencyMode(duration time.Duration)

ForceEmergencyMode manually triggers emergency mode for a duration.

func (*Neo4jMonitor) GetConcurrencyStats

func (m *Neo4jMonitor) GetConcurrencyStats() (reads, writes int32, semUsed int)

GetConcurrencyStats returns current concurrency statistics for debugging.

func (*Neo4jMonitor) GetEmergencyThreshold added in v0.35.1

func (m *Neo4jMonitor) GetEmergencyThreshold() float64

GetEmergencyThreshold returns the current emergency threshold as a fraction.

func (*Neo4jMonitor) GetMetrics

func (m *Neo4jMonitor) GetMetrics() loadmonitor.Metrics

GetMetrics returns the current load metrics.

func (*Neo4jMonitor) GetThrottleMultiplier added in v0.35.1

func (m *Neo4jMonitor) GetThrottleMultiplier() float64

GetThrottleMultiplier returns the current throttle multiplier. Returns a value >= 1.0, where 1.0 = no extra throttling, 1.5 = 50% more aggressive, etc.

func (*Neo4jMonitor) IncrementActiveReads

func (m *Neo4jMonitor) IncrementActiveReads() func()

IncrementActiveReads tracks an active read operation. Call this when starting a read, and call the returned function when done.

func (*Neo4jMonitor) IncrementActiveWrites

func (m *Neo4jMonitor) IncrementActiveWrites() func()

IncrementActiveWrites tracks an active write operation. Call this when starting a write, and call the returned function when done.

func (*Neo4jMonitor) RecordQueryLatency

func (m *Neo4jMonitor) RecordQueryLatency(latency time.Duration)

RecordQueryLatency records a query latency sample using exponential moving average.

func (*Neo4jMonitor) RecordWriteLatency

func (m *Neo4jMonitor) RecordWriteLatency(latency time.Duration)

RecordWriteLatency records a write latency sample using exponential moving average.

func (*Neo4jMonitor) SetEmergencyThreshold added in v0.35.1

func (m *Neo4jMonitor) SetEmergencyThreshold(threshold float64)

SetEmergencyThreshold sets the memory threshold above which emergency mode is triggered. threshold is a fraction, e.g., 1.0 = 100% of target memory.

func (*Neo4jMonitor) SetMemoryTarget

func (m *Neo4jMonitor) SetMemoryTarget(bytes uint64)

SetMemoryTarget sets the target memory limit in bytes.

func (*Neo4jMonitor) Start

func (m *Neo4jMonitor) Start() <-chan struct{}

Start begins background metric collection.

func (*Neo4jMonitor) Stop

func (m *Neo4jMonitor) Stop()

Stop halts background metric collection.

type OperationType

type OperationType int

OperationType distinguishes between read and write operations for applying different rate limiting strategies.

const (
	// Read operations (REQ queries)
	Read OperationType = iota
	// Write operations (EVENT saves, imports)
	Write
)

func (OperationType) String

func (o OperationType) String() string

String returns a human-readable name for the operation type.

type PIDController

type PIDController struct {
	// Gains
	Kp float64 // Proportional gain
	Ki float64 // Integral gain
	Kd float64 // Derivative gain

	// Setpoint is the target process variable value (e.g., 0.85 for 85% of target memory).
	// The controller drives the process variable toward this setpoint.
	Setpoint float64

	// DerivativeFilterAlpha is the low-pass filter coefficient for the derivative term.
	// Range: 0.0-1.0, where lower values provide stronger filtering.
	// Recommended: 0.2 for strong filtering, 0.5 for moderate filtering.
	DerivativeFilterAlpha float64

	// Integral limits for anti-windup
	IntegralMax float64
	IntegralMin float64

	// Output limits
	OutputMin float64 // Minimum output (typically 0 = no delay)
	OutputMax float64 // Maximum output (max delay in seconds)
	// contains filtered or unexported fields
}

PIDController implements a PID controller with filtered derivative. It is designed for rate limiting database operations based on load metrics.

The controller computes a delay recommendation based on:

  • Proportional (P): Immediate response to current error
  • Integral (I): Accumulated error to eliminate steady-state offset
  • Derivative (D): Rate of change prediction (filtered to reduce noise)

The filtered derivative uses a low-pass filter to attenuate high-frequency noise that would otherwise cause erratic control behavior.

func DefaultPIDControllerForReads

func DefaultPIDControllerForReads() *PIDController

DefaultPIDControllerForReads creates a PID controller tuned for read operations. Reads should be more responsive but with less aggressive throttling.

func DefaultPIDControllerForWrites

func DefaultPIDControllerForWrites() *PIDController

DefaultPIDControllerForWrites creates a PID controller tuned for write operations. Writes benefit from aggressive integral and moderate proportional response.

func NewPIDController

func NewPIDController(
	kp, ki, kd float64,
	setpoint float64,
	derivativeFilterAlpha float64,
	integralMin, integralMax float64,
	outputMin, outputMax float64,
) *PIDController

NewPIDController creates a new PID controller with custom parameters.

func (*PIDController) GetState

func (p *PIDController) GetState() (integral, prevError, prevFilteredError float64)

GetState returns the current internal state for monitoring/debugging.

func (*PIDController) Reset

func (p *PIDController) Reset()

Reset clears the controller state, useful when conditions change significantly.

func (*PIDController) SetGains

func (p *PIDController) SetGains(kp, ki, kd float64)

SetGains updates the PID gains.

func (*PIDController) SetSetpoint

func (p *PIDController) SetSetpoint(setpoint float64)

SetSetpoint updates the target setpoint.

func (*PIDController) Update

func (p *PIDController) Update(processVariable float64) float64

Update computes the PID output based on the current process variable. The process variable should be in the range [0.0, 1.0+] representing load level.

Returns the recommended delay in seconds. A value of 0 means no delay needed.

type PIDState

type PIDState struct {
	Integral          float64
	PrevError         float64
	PrevFilteredError float64
}

PIDState contains the internal state of a PID controller.

type ProcessMemoryStats added in v0.35.1

type ProcessMemoryStats struct {
	// VmRSS is the resident set size (total physical memory in use) in bytes
	VmRSS uint64
	// RssShmem is the shared memory portion of RSS in bytes
	RssShmem uint64
	// RssAnon is the anonymous (non-shared) memory in bytes
	RssAnon uint64
	// VmHWM is the peak RSS (high water mark) in bytes
	VmHWM uint64
}

ProcessMemoryStats contains memory statistics for the current process. On Linux, these are read from /proc/self/status for accurate RSS values. On other platforms, these are approximated from Go runtime stats.

func ReadProcessMemoryStats added in v0.35.1

func ReadProcessMemoryStats() ProcessMemoryStats

ReadProcessMemoryStats reads memory statistics from /proc/self/status. This provides accurate RSS (Resident Set Size) information on Linux, including the breakdown between shared and anonymous memory.

func (ProcessMemoryStats) PhysicalMemoryBytes added in v0.35.1

func (p ProcessMemoryStats) PhysicalMemoryBytes() uint64

PhysicalMemoryBytes returns the actual physical memory usage (RSS - shared)

func (ProcessMemoryStats) PhysicalMemoryMB added in v0.35.1

func (p ProcessMemoryStats) PhysicalMemoryMB() uint64

PhysicalMemoryMB returns the actual physical memory usage in MB

type Stats

type Stats struct {
	WriteThrottles    int64
	ReadThrottles     int64
	TotalWriteDelayMs int64
	TotalReadDelayMs  int64
	EmergencyEvents   int64
	InEmergencyMode   bool
	CurrentMetrics    loadmonitor.Metrics
	WritePIDState     PIDState
	ReadPIDState      PIDState
}

Stats returns rate limiter statistics.

Source Files

  • badger_monitor.go
  • factory.go
  • limiter.go
  • memory.go
  • memory_linux.go
  • neo4j_monitor.go
  • pid.go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL