Documentation
¶
Overview ¶
Package reputation provides endpoint reputation tracking and scoring.
The reputation system tracks the reliability and performance of endpoints based on their responses to requests. Endpoints accumulate a reputation score that influences their selection probability for future requests.
Key concepts:
- Score: A numeric value (0-100) representing endpoint reliability
- Signal: An event that affects an endpoint's score (success, error, timeout, etc.)
- Storage: Backend for persisting scores (memory, Redis, etc.)
Index ¶
- Constants
- Variables
- func GetScoreImpact(t SignalType) float64
- type Cleaner
- type Config
- type EndpointKey
- type RedisConfig
- type ReputationService
- type Score
- type Signal
- func NewCriticalErrorSignal(reason string, latency time.Duration) Signal
- func NewFatalErrorSignal(reason string) Signal
- func NewMajorErrorSignal(reason string, latency time.Duration) Signal
- func NewMinorErrorSignal(reason string) Signal
- func NewRecoverySuccessSignal(latency time.Duration) Signal
- func NewSuccessSignal(latency time.Duration) Signal
- type SignalType
- type Storage
- type SyncConfig
Constants ¶
const ( // MinScore is the lowest possible reputation score. MinScore float64 = 0 // MaxScore is the highest possible reputation score. MaxScore float64 = 100 // InitialScore is the starting score for new endpoints. // Endpoints start with a moderately high score to give them a fair chance. InitialScore float64 = 80 // DefaultMinThreshold is the minimum score required for endpoint selection. // Endpoints below this threshold are excluded from selection. DefaultMinThreshold float64 = 30 )
Score constants define the bounds and defaults for reputation scores.
const ( // DefaultRecoveryTimeout is the duration after which low-scoring endpoints // are reset to initial score if they have no signals. DefaultRecoveryTimeout = 5 * time.Minute // DefaultRefreshInterval is how often to refresh from backend storage. DefaultRefreshInterval = 5 * time.Second // DefaultWriteBufferSize is the max pending writes before blocking. DefaultWriteBufferSize = 1000 // DefaultFlushInterval is how often to flush buffered writes. DefaultFlushInterval = 100 * time.Millisecond )
Recovery and SyncConfig defaults.
Variables ¶
var ( // ErrNotFound is returned when an endpoint's score is not found. ErrNotFound = errors.New("endpoint score not found") // ErrStorageClosed is returned when operations are attempted on a closed storage. ErrStorageClosed = errors.New("storage is closed") )
Common errors returned by storage implementations.
Functions ¶
func GetScoreImpact ¶
func GetScoreImpact(t SignalType) float64
GetScoreImpact returns the default score impact for a signal type. Returns 0 if the signal type is not recognized.
Types ¶
type Cleaner ¶
type Cleaner interface {
// Cleanup removes expired entries from storage.
// This is called periodically by the service to prevent memory bloat.
Cleanup()
}
Cleaner is an optional interface that storage backends can implement to support periodic cleanup of expired entries.
type Config ¶
type Config struct {
// Enabled determines if reputation tracking is active.
// When false, all ReputationService methods are no-ops.
Enabled bool `yaml:"enabled"`
// InitialScore is the starting score for new endpoints.
// Default: 80
InitialScore float64 `yaml:"initial_score"`
// MinThreshold is the minimum score for endpoint selection.
// Default: 30
MinThreshold float64 `yaml:"min_threshold"`
// RecoveryTimeout is the duration after which a low-scoring endpoint
// with no signals is reset to InitialScore. This allows endpoints
// to recover from temporary failures (crashes, network issues, etc.).
// Default: 5m
RecoveryTimeout time.Duration `yaml:"recovery_timeout"`
// StorageType specifies the storage backend ("memory" or "redis").
// Default: "memory"
StorageType string `yaml:"storage_type"`
// SyncConfig configures background synchronization behavior.
SyncConfig SyncConfig `yaml:"sync_config"`
// Redis holds Redis-specific configuration (only used when StorageType is "redis").
Redis *RedisConfig `yaml:"redis,omitempty"`
}
Config holds configuration for the reputation system.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults.
func (*Config) HydrateDefaults ¶
func (c *Config) HydrateDefaults()
HydrateDefaults fills in zero values with defaults.
type EndpointKey ¶
type EndpointKey struct {
ServiceID protocol.ServiceID
EndpointAddr protocol.EndpointAddr
}
EndpointKey uniquely identifies an endpoint for reputation tracking. It combines the service ID and endpoint address to create a unique key.
func NewEndpointKey ¶
func NewEndpointKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey
NewEndpointKey creates a new EndpointKey from service ID and endpoint address.
func (EndpointKey) String ¶
func (k EndpointKey) String() string
String returns a string representation of the endpoint key.
type RedisConfig ¶
type RedisConfig struct {
// Address is the Redis server address (host:port).
Address string `yaml:"address"`
// Password for Redis authentication (optional).
Password string `yaml:"password"`
// DB is the Redis database number.
DB int `yaml:"db"`
// KeyPrefix is prepended to all keys stored in Redis.
// Default: "path:reputation:"
KeyPrefix string `yaml:"key_prefix"`
// PoolSize is the maximum number of socket connections.
// Default: 10
PoolSize int `yaml:"pool_size"`
// DialTimeout is the timeout for establishing new connections.
// Default: 5s
DialTimeout time.Duration `yaml:"dial_timeout"`
// ReadTimeout is the timeout for socket reads.
// Default: 3s
ReadTimeout time.Duration `yaml:"read_timeout"`
// WriteTimeout is the timeout for socket writes.
// Default: 3s
WriteTimeout time.Duration `yaml:"write_timeout"`
}
RedisConfig holds Redis-specific configuration.
func DefaultRedisConfig ¶
func DefaultRedisConfig() RedisConfig
DefaultRedisConfig returns a RedisConfig with sensible defaults.
func (*RedisConfig) HydrateDefaults ¶
func (c *RedisConfig) HydrateDefaults()
HydrateDefaults fills in zero values with defaults.
type ReputationService ¶
type ReputationService interface {
// RecordSignal records a signal event for an endpoint.
// Updates local cache immediately and queues async write to backend storage.
// This method is non-blocking - it returns immediately after updating local cache.
RecordSignal(ctx context.Context, key EndpointKey, signal Signal) error
// GetScore retrieves the current reputation score for an endpoint.
// Always reads from local cache for minimal latency (<1μs).
// Returns the score or an error if the endpoint is not found.
GetScore(ctx context.Context, key EndpointKey) (Score, error)
// GetScores retrieves reputation scores for multiple endpoints.
// Always reads from local cache for minimal latency.
// Returns a map of endpoint keys to scores. Missing endpoints are omitted.
GetScores(ctx context.Context, keys []EndpointKey) (map[EndpointKey]Score, error)
// FilterByScore filters endpoints that meet the minimum score threshold.
// Always reads from local cache for minimal latency.
// Returns only endpoints with scores >= minThreshold.
FilterByScore(ctx context.Context, keys []EndpointKey, minThreshold float64) ([]EndpointKey, error)
// ResetScore resets an endpoint's score to the initial value.
// Used for administrative purposes or testing.
ResetScore(ctx context.Context, key EndpointKey) error
// Start begins background sync processes (e.g., Redis pub/sub, periodic refresh).
// Should be called once during initialization.
Start(ctx context.Context) error
// Stop gracefully shuts down background processes and flushes pending writes.
Stop() error
}
ReputationService provides endpoint reputation tracking and querying. Implementations must be safe for concurrent use.
Performance design:
- All reads (GetScore, FilterByScore) are served from local in-memory cache (<1μs)
- Writes (RecordSignal) update local cache immediately + async sync to backend
- Background refresh keeps local cache in sync with shared backend (Redis)
This ensures the hot path (request handling) is never blocked by storage latency.
func NewService ¶
func NewService(config Config, store Storage) ReputationService
NewService creates a new ReputationService with the given configuration. The service must be started with Start() before use.
type Score ¶
type Score struct {
// Value is the current score (0-100).
// Higher scores indicate more reliable endpoints.
Value float64
// LastUpdated is when the score was last modified.
LastUpdated time.Time
// SuccessCount is the total number of successful requests.
SuccessCount int64
// ErrorCount is the total number of failed requests.
ErrorCount int64
}
Score represents an endpoint's reputation score at a point in time.
type Signal ¶
type Signal struct {
// Type categorizes the signal's severity.
Type SignalType
// Timestamp when the signal was generated.
Timestamp time.Time
// Latency of the request that generated this signal (if applicable).
Latency time.Duration
// Reason provides additional context for the signal.
Reason string
// Metadata holds additional signal-specific data.
Metadata map[string]string
}
Signal represents an event that affects an endpoint's reputation.
func NewCriticalErrorSignal ¶
NewCriticalErrorSignal creates a signal for a critical error (HTTP 5xx).
func NewFatalErrorSignal ¶
NewFatalErrorSignal creates a signal for a fatal error. This replaces the concept of "permanent sanctions" - endpoints can recover.
func NewMajorErrorSignal ¶
NewMajorErrorSignal creates a signal for a major error (timeout, connection issues).
func NewMinorErrorSignal ¶
NewMinorErrorSignal creates a signal for a minor error.
func NewRecoverySuccessSignal ¶
NewRecoverySuccessSignal creates a signal for a successful request from a low-scoring endpoint. This has higher impact (+15) than regular success (+1) to allow endpoints to recover faster when proving they're healthy. Intended for use by Probation system (PR 7) and Health checks (PR 9).
func NewSuccessSignal ¶
NewSuccessSignal creates a signal for a successful request.
func (Signal) GetDefaultImpact ¶
GetDefaultImpact returns the default score impact for this signal.
func (Signal) IsNegative ¶
IsNegative returns true if this signal has a negative impact on score.
func (Signal) IsPositive ¶
IsPositive returns true if this signal has a positive impact on score.
type SignalType ¶
type SignalType string
SignalType categorizes signals by their impact on reputation.
const ( // SignalTypeSuccess indicates a successful request/response. SignalTypeSuccess SignalType = "success" // SignalTypeMinorError indicates a minor error (validation issues, unknown errors). SignalTypeMinorError SignalType = "minor_error" // SignalTypeMajorError indicates a major error (timeout, connection issues). SignalTypeMajorError SignalType = "major_error" // SignalTypeCriticalError indicates a critical error (HTTP 5xx, transport errors). SignalTypeCriticalError SignalType = "critical_error" // SignalTypeFatalError indicates a fatal error (service misconfiguration). // Was previously "permanent sanction" - now just a severe score penalty. SignalTypeFatalError SignalType = "fatal_error" // SignalTypeRecoverySuccess indicates a successful request from a low-scoring endpoint. // This signal has higher impact (+15) than regular success (+1) to allow endpoints // to recover faster when they prove they're healthy again. // Intended for use by: // - Probation system (PR 7): when sampling traffic to low-scoring endpoints // - Health checks (PR 9): when probing excluded endpoints SignalTypeRecoverySuccess SignalType = "recovery_success" )
type Storage ¶
type Storage interface {
// Get retrieves the score for an endpoint.
// Returns ErrNotFound if the endpoint has no stored score.
Get(ctx context.Context, key EndpointKey) (Score, error)
// GetMultiple retrieves scores for multiple endpoints.
// Returns a map containing only the endpoints that were found.
// Missing endpoints are omitted from the result (no error).
GetMultiple(ctx context.Context, keys []EndpointKey) (map[EndpointKey]Score, error)
// Set stores or updates the score for an endpoint.
Set(ctx context.Context, key EndpointKey, score Score) error
// SetMultiple stores or updates scores for multiple endpoints.
SetMultiple(ctx context.Context, scores map[EndpointKey]Score) error
// Delete removes the score for an endpoint.
// Returns nil if the endpoint doesn't exist.
Delete(ctx context.Context, key EndpointKey) error
// List returns all stored endpoint keys for a service.
// If serviceID is empty, returns all endpoint keys.
List(ctx context.Context, serviceID string) ([]EndpointKey, error)
// Close releases any resources held by the storage.
Close() error
}
Storage defines the interface for reputation score persistence. Implementations must be safe for concurrent use.
type SyncConfig ¶
type SyncConfig struct {
// RefreshInterval is how often to refresh scores from the backend storage.
// Only applies when using shared storage (Redis).
// Default: 5s
RefreshInterval time.Duration `yaml:"refresh_interval"`
// WriteBufferSize is the max number of pending writes to buffer before blocking.
// Default: 1000
WriteBufferSize int `yaml:"write_buffer_size"`
// FlushInterval is how often to flush buffered writes to backend storage.
// Default: 100ms
FlushInterval time.Duration `yaml:"flush_interval"`
}
SyncConfig configures background synchronization for the reputation system.
func DefaultSyncConfig ¶
func DefaultSyncConfig() SyncConfig
DefaultSyncConfig returns a SyncConfig with sensible defaults.
func (*SyncConfig) HydrateDefaults ¶
func (s *SyncConfig) HydrateDefaults()
HydrateDefaults fills in zero values with defaults.