Documentation
¶
Overview ¶
Package reputation provides endpoint reputation tracking and scoring.
The reputation system tracks the reliability and performance of endpoints based on their responses to requests. Endpoints accumulate a reputation score that influences their selection probability for future requests.
Key concepts:
- Score: A numeric value (0-100) representing endpoint reliability
- Signal: An event that affects an endpoint's score (success, error, timeout, etc.)
- Storage: Backend for persisting scores (memory, Redis, etc.)
Index ¶
- Constants
- Variables
- func GetScoreImpact(t SignalType) float64
- type Cleaner
- type Config
- type DomainKeyBuilder
- type EndpointKey
- type EndpointKeyBuilder
- type KeyBuilder
- type RedisConfig
- type ReputationService
- type Score
- type ServiceConfig
- type Signal
- func NewCriticalErrorSignal(reason string, latency time.Duration) Signal
- func NewFatalErrorSignal(reason string) Signal
- func NewMajorErrorSignal(reason string, latency time.Duration) Signal
- func NewMinorErrorSignal(reason string) Signal
- func NewRecoverySuccessSignal(latency time.Duration) Signal
- func NewSuccessSignal(latency time.Duration) Signal
- type SignalType
- type Storage
- type SupplierKeyBuilder
- type SyncConfig
- type TieredSelectionConfig
- type TieredSelector
- func (s *TieredSelector) Config() TieredSelectionConfig
- func (s *TieredSelector) GroupByTier(endpoints map[EndpointKey]float64) (tier1, tier2, tier3 []EndpointKey)
- func (s *TieredSelector) MinThreshold() float64
- func (s *TieredSelector) SelectEndpoint(endpoints map[EndpointKey]float64) (EndpointKey, int, error)
- func (s *TieredSelector) TierForScore(score float64) int
Constants ¶
const ( // MinScore is the lowest possible reputation score. MinScore float64 = 0 // MaxScore is the highest possible reputation score. MaxScore float64 = 100 // InitialScore is the starting score for new endpoints. // Endpoints start with a moderately high score to give them a fair chance. InitialScore float64 = 80 // DefaultMinThreshold is the minimum score required for endpoint selection. // Endpoints below this threshold are excluded from selection. DefaultMinThreshold float64 = 30 )
Score constants define the bounds and defaults for reputation scores.
const ( // KeyGranularityEndpoint scores each endpoint URL separately (finest granularity). // Key format: serviceID:supplierAddr-endpointURL // This is the default behavior. KeyGranularityEndpoint = "per-endpoint" // KeyGranularityDomain scores all endpoints from the same hosting domain together. // Key format: serviceID:domain (e.g., eth:nodefleet.net) // Use when a hosting provider's overall reliability matters. KeyGranularityDomain = "per-domain" // KeyGranularitySupplier scores all endpoints from the same supplier together. // Key format: serviceID:supplierAddr // Use when a supplier's overall reliability matters more than individual endpoints. KeyGranularitySupplier = "per-supplier" )
Key granularity options determine how endpoints are grouped for scoring. Ordered from finest to coarsest granularity.
const ( // DefaultRecoveryTimeout is the duration after which low-scoring endpoints // are reset to initial score if they have no signals. DefaultRecoveryTimeout = 5 * time.Minute // DefaultRefreshInterval is how often to refresh from backend storage. DefaultRefreshInterval = 5 * time.Second // DefaultWriteBufferSize is the max pending writes before blocking. DefaultWriteBufferSize = 1000 // DefaultFlushInterval is how often to flush buffered writes. DefaultFlushInterval = 100 * time.Millisecond )
Recovery and SyncConfig defaults.
const ( // DefaultTier1Threshold is the minimum score for Premium tier endpoints. DefaultTier1Threshold float64 = 70 // DefaultTier2Threshold is the minimum score for Good tier endpoints. DefaultTier2Threshold float64 = 50 )
Tiered selection defaults.
Variables ¶
var ( // ErrNotFound is returned when an endpoint's score is not found. ErrNotFound = errors.New("endpoint score not found") // ErrStorageClosed is returned when operations are attempted on a closed storage. ErrStorageClosed = errors.New("storage is closed") )
Common errors returned by storage implementations.
var ErrNoEndpointsAvailable = errors.New("no endpoints available for selection")
ErrNoEndpointsAvailable is returned when no endpoints are available for selection.
Functions ¶
func GetScoreImpact ¶
func GetScoreImpact(t SignalType) float64
GetScoreImpact returns the default score impact for a signal type. Returns 0 if the signal type is not recognized.
Types ¶
type Cleaner ¶
type Cleaner interface {
// Cleanup removes expired entries from storage.
// This is called periodically by the service to prevent memory bloat.
Cleanup()
}
Cleaner is an optional interface that storage backends can implement to support periodic cleanup of expired entries.
type Config ¶
type Config struct {
// Enabled determines if reputation tracking is active.
// When false, all ReputationService methods are no-ops.
Enabled bool `yaml:"enabled"`
// InitialScore is the starting score for new endpoints.
// Default: 80
InitialScore float64 `yaml:"initial_score"`
// MinThreshold is the minimum score for endpoint selection.
// Default: 30
MinThreshold float64 `yaml:"min_threshold"`
// RecoveryTimeout is the duration after which a low-scoring endpoint
// with no signals is reset to InitialScore. This allows endpoints
// to recover from temporary failures (crashes, network issues, etc.).
// Default: 5m
RecoveryTimeout time.Duration `yaml:"recovery_timeout"`
// StorageType specifies the storage backend ("memory" or "redis").
// Default: "memory"
StorageType string `yaml:"storage_type"`
// KeyGranularity determines how endpoints are grouped for scoring (global default).
// Options: "per-endpoint" (default), "per-domain", "per-supplier"
// Default: "per-endpoint"
KeyGranularity string `yaml:"key_granularity"`
// ServiceOverrides allows per-service configuration overrides.
// Use this to apply different granularity rules to specific services.
ServiceOverrides map[string]ServiceConfig `yaml:"service_overrides,omitempty"`
// SyncConfig configures background synchronization behavior.
SyncConfig SyncConfig `yaml:"sync_config"`
// TieredSelection configures tiered endpoint selection.
// When enabled, high-reputation endpoints are preferred over lower-reputation ones.
TieredSelection TieredSelectionConfig `yaml:"tiered_selection"`
// Redis holds Redis-specific configuration (only used when StorageType is "redis").
Redis *RedisConfig `yaml:"redis,omitempty"`
}
Config holds configuration for the reputation system.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults.
func (*Config) HydrateDefaults ¶
func (c *Config) HydrateDefaults()
HydrateDefaults fills in zero values with defaults.
type DomainKeyBuilder ¶ added in v1.0.6
type DomainKeyBuilder struct{}
DomainKeyBuilder creates keys with per-domain granularity. All endpoints from the same hosting domain share a score. Key format: serviceID:domain (e.g., eth:nodefleet.net)
func (*DomainKeyBuilder) BuildKey ¶ added in v1.0.6
func (b *DomainKeyBuilder) BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey
BuildKey creates a key using the domain extracted from the endpoint URL. If the domain cannot be extracted, falls back to full endpoint address.
type EndpointKey ¶
type EndpointKey struct {
ServiceID protocol.ServiceID
EndpointAddr protocol.EndpointAddr
}
EndpointKey uniquely identifies an endpoint for reputation tracking. It combines the service ID and endpoint address to create a unique key.
func NewEndpointKey ¶
func NewEndpointKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey
NewEndpointKey creates a new EndpointKey from service ID and endpoint address.
func (EndpointKey) String ¶
func (k EndpointKey) String() string
String returns a string representation of the endpoint key.
type EndpointKeyBuilder ¶ added in v1.0.6
type EndpointKeyBuilder struct{}
EndpointKeyBuilder creates keys with per-endpoint granularity. Each endpoint URL is scored separately. Key format: serviceID:supplierAddr-endpointURL
func (*EndpointKeyBuilder) BuildKey ¶ added in v1.0.6
func (b *EndpointKeyBuilder) BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey
BuildKey creates a key using the full endpoint address.
type KeyBuilder ¶ added in v1.0.6
type KeyBuilder interface {
// BuildKey creates an EndpointKey for the given service and endpoint.
BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey
}
KeyBuilder creates EndpointKeys with a specific granularity. Different implementations group endpoints differently for scoring.
func NewKeyBuilder ¶ added in v1.0.6
func NewKeyBuilder(granularity string) KeyBuilder
NewKeyBuilder creates a KeyBuilder for the specified granularity. If the granularity is invalid or empty, it defaults to per-endpoint.
type RedisConfig ¶
type RedisConfig struct {
// Address is the Redis server address (host:port).
Address string `yaml:"address"`
// Password for Redis authentication (optional).
Password string `yaml:"password"`
// DB is the Redis database number.
DB int `yaml:"db"`
// KeyPrefix is prepended to all keys stored in Redis.
// Default: "path:reputation:"
KeyPrefix string `yaml:"key_prefix"`
// PoolSize is the maximum number of socket connections.
// Default: 10
PoolSize int `yaml:"pool_size"`
// DialTimeout is the timeout for establishing new connections.
// Default: 5s
DialTimeout time.Duration `yaml:"dial_timeout"`
// ReadTimeout is the timeout for socket reads.
// Default: 3s
ReadTimeout time.Duration `yaml:"read_timeout"`
// WriteTimeout is the timeout for socket writes.
// Default: 3s
WriteTimeout time.Duration `yaml:"write_timeout"`
}
RedisConfig holds Redis-specific configuration.
func DefaultRedisConfig ¶
func DefaultRedisConfig() RedisConfig
DefaultRedisConfig returns a RedisConfig with sensible defaults.
func (*RedisConfig) HydrateDefaults ¶
func (c *RedisConfig) HydrateDefaults()
HydrateDefaults fills in zero values with defaults.
type ReputationService ¶
type ReputationService interface {
// RecordSignal records a signal event for an endpoint.
// Updates local cache immediately and queues async write to backend storage.
// This method is non-blocking - it returns immediately after updating local cache.
RecordSignal(ctx context.Context, key EndpointKey, signal Signal) error
// GetScore retrieves the current reputation score for an endpoint.
// Always reads from local cache for minimal latency (<1μs).
// Returns the score or an error if the endpoint is not found.
GetScore(ctx context.Context, key EndpointKey) (Score, error)
// GetScores retrieves reputation scores for multiple endpoints.
// Always reads from local cache for minimal latency.
// Returns a map of endpoint keys to scores. Missing endpoints are omitted.
GetScores(ctx context.Context, keys []EndpointKey) (map[EndpointKey]Score, error)
// FilterByScore filters endpoints that meet the minimum score threshold.
// Always reads from local cache for minimal latency.
// Returns only endpoints with scores >= minThreshold.
FilterByScore(ctx context.Context, keys []EndpointKey, minThreshold float64) ([]EndpointKey, error)
// ResetScore resets an endpoint's score to the initial value.
// Used for administrative purposes or testing.
ResetScore(ctx context.Context, key EndpointKey) error
// KeyBuilderForService returns the KeyBuilder for the given service.
// Uses service-specific config if available, otherwise falls back to global default.
KeyBuilderForService(serviceID protocol.ServiceID) KeyBuilder
// Start begins background sync processes (e.g., Redis pub/sub, periodic refresh).
// Should be called once during initialization.
Start(ctx context.Context) error
// Stop gracefully shuts down background processes and flushes pending writes.
Stop() error
}
ReputationService provides endpoint reputation tracking and querying. Implementations must be safe for concurrent use.
Performance design:
- All reads (GetScore, FilterByScore) are served from local in-memory cache (<1μs)
- Writes (RecordSignal) update local cache immediately + async sync to backend
- Background refresh keeps local cache in sync with shared backend (Redis)
This ensures the hot path (request handling) is never blocked by storage latency.
func NewService ¶
func NewService(config Config, store Storage) ReputationService
NewService creates a new ReputationService with the given configuration. The service must be started with Start() before use.
type Score ¶
type Score struct {
// Value is the current score (0-100).
// Higher scores indicate more reliable endpoints.
Value float64
// LastUpdated is when the score was last modified.
LastUpdated time.Time
// SuccessCount is the total number of successful requests.
SuccessCount int64
// ErrorCount is the total number of failed requests.
ErrorCount int64
}
Score represents an endpoint's reputation score at a point in time.
type ServiceConfig ¶ added in v1.0.6
type ServiceConfig struct {
// KeyGranularity overrides the global key granularity for this service.
// Options: "per-endpoint", "per-domain", "per-supplier"
KeyGranularity string `yaml:"key_granularity"`
}
ServiceConfig holds service-specific reputation configuration overrides.
type Signal ¶
type Signal struct {
// Type categorizes the signal's severity.
Type SignalType
// Timestamp when the signal was generated.
Timestamp time.Time
// Latency of the request that generated this signal (if applicable).
Latency time.Duration
// Reason provides additional context for the signal.
Reason string
// Metadata holds additional signal-specific data.
Metadata map[string]string
}
Signal represents an event that affects an endpoint's reputation.
func NewCriticalErrorSignal ¶
NewCriticalErrorSignal creates a signal for a critical error (HTTP 5xx).
func NewFatalErrorSignal ¶
NewFatalErrorSignal creates a signal for a fatal error. This replaces the concept of "permanent sanctions" - endpoints can recover.
func NewMajorErrorSignal ¶
NewMajorErrorSignal creates a signal for a major error (timeout, connection issues).
func NewMinorErrorSignal ¶
NewMinorErrorSignal creates a signal for a minor error.
func NewRecoverySuccessSignal ¶
NewRecoverySuccessSignal creates a signal for a successful request from a low-scoring endpoint. This has higher impact (+15) than regular success (+1) to allow endpoints to recover faster when proving they're healthy. Intended for use by Probation system (PR 7) and Health checks (PR 9).
func NewSuccessSignal ¶
NewSuccessSignal creates a signal for a successful request.
func (Signal) GetDefaultImpact ¶
GetDefaultImpact returns the default score impact for this signal.
func (Signal) IsNegative ¶
IsNegative returns true if this signal has a negative impact on score.
func (Signal) IsPositive ¶
IsPositive returns true if this signal has a positive impact on score.
type SignalType ¶
type SignalType string
SignalType categorizes signals by their impact on reputation.
const ( // SignalTypeSuccess indicates a successful request/response. SignalTypeSuccess SignalType = "success" // SignalTypeMinorError indicates a minor error (validation issues, unknown errors). SignalTypeMinorError SignalType = "minor_error" // SignalTypeMajorError indicates a major error (timeout, connection issues). SignalTypeMajorError SignalType = "major_error" // SignalTypeCriticalError indicates a critical error (HTTP 5xx, transport errors). SignalTypeCriticalError SignalType = "critical_error" // SignalTypeFatalError indicates a fatal error (service misconfiguration). // Was previously "permanent sanction" - now just a severe score penalty. SignalTypeFatalError SignalType = "fatal_error" // SignalTypeRecoverySuccess indicates a successful request from a low-scoring endpoint. // This signal has higher impact (+15) than regular success (+1) to allow endpoints // to recover faster when they prove they're healthy again. // Intended for use by: // - Probation system (PR 7): when sampling traffic to low-scoring endpoints // - Health checks (PR 9): when probing excluded endpoints SignalTypeRecoverySuccess SignalType = "recovery_success" )
type Storage ¶
type Storage interface {
// Get retrieves the score for an endpoint.
// Returns ErrNotFound if the endpoint has no stored score.
Get(ctx context.Context, key EndpointKey) (Score, error)
// GetMultiple retrieves scores for multiple endpoints.
// Returns a map containing only the endpoints that were found.
// Missing endpoints are omitted from the result (no error).
GetMultiple(ctx context.Context, keys []EndpointKey) (map[EndpointKey]Score, error)
// Set stores or updates the score for an endpoint.
Set(ctx context.Context, key EndpointKey, score Score) error
// SetMultiple stores or updates scores for multiple endpoints.
SetMultiple(ctx context.Context, scores map[EndpointKey]Score) error
// Delete removes the score for an endpoint.
// Returns nil if the endpoint doesn't exist.
Delete(ctx context.Context, key EndpointKey) error
// List returns all stored endpoint keys for a service.
// If serviceID is empty, returns all endpoint keys.
List(ctx context.Context, serviceID string) ([]EndpointKey, error)
// Close releases any resources held by the storage.
Close() error
}
Storage defines the interface for reputation score persistence. Implementations must be safe for concurrent use.
type SupplierKeyBuilder ¶ added in v1.0.6
type SupplierKeyBuilder struct{}
SupplierKeyBuilder creates keys with per-supplier granularity. All endpoint URLs from the same supplier share a score. Key format: serviceID:supplierAddr
func (*SupplierKeyBuilder) BuildKey ¶ added in v1.0.6
func (b *SupplierKeyBuilder) BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey
BuildKey creates a key using only the supplier address. If the endpoint address cannot be parsed, falls back to full address.
type SyncConfig ¶
type SyncConfig struct {
// RefreshInterval is how often to refresh scores from the backend storage.
// Only applies when using shared storage (Redis).
// Default: 5s
RefreshInterval time.Duration `yaml:"refresh_interval"`
// WriteBufferSize is the max number of pending writes to buffer before blocking.
// Default: 1000
WriteBufferSize int `yaml:"write_buffer_size"`
// FlushInterval is how often to flush buffered writes to backend storage.
// Default: 100ms
FlushInterval time.Duration `yaml:"flush_interval"`
}
SyncConfig configures background synchronization for the reputation system.
func DefaultSyncConfig ¶
func DefaultSyncConfig() SyncConfig
DefaultSyncConfig returns a SyncConfig with sensible defaults.
func (*SyncConfig) HydrateDefaults ¶
func (s *SyncConfig) HydrateDefaults()
HydrateDefaults fills in zero values with defaults.
type TieredSelectionConfig ¶ added in v1.0.7
type TieredSelectionConfig struct {
// Enabled determines if tiered selection is active.
// When false, random selection is used among all endpoints above MinThreshold.
// Default: true (when reputation is enabled)
Enabled bool `yaml:"enabled"`
// Tier1Threshold is the minimum score for Premium tier (Tier 1).
// Endpoints with scores >= Tier1Threshold are selected first.
// Default: 70
Tier1Threshold float64 `yaml:"tier1_threshold"`
// Tier2Threshold is the minimum score for Good tier (Tier 2).
// Endpoints with scores >= Tier2Threshold but < Tier1Threshold are selected
// only if no Tier 1 endpoints are available.
// Default: 50
Tier2Threshold float64 `yaml:"tier2_threshold"`
}
TieredSelectionConfig configures tiered endpoint selection. When enabled, endpoints are grouped into tiers based on their reputation score, and selection prefers higher-tier endpoints using cascade-down logic.
func DefaultTieredSelectionConfig ¶ added in v1.0.7
func DefaultTieredSelectionConfig() TieredSelectionConfig
DefaultTieredSelectionConfig returns a TieredSelectionConfig with sensible defaults.
func (*TieredSelectionConfig) HydrateDefaults ¶ added in v1.0.7
func (t *TieredSelectionConfig) HydrateDefaults()
HydrateDefaults fills in zero values with defaults for TieredSelectionConfig.
func (*TieredSelectionConfig) Validate ¶ added in v1.0.7
func (t *TieredSelectionConfig) Validate(minThreshold float64) error
Validate checks that the TieredSelectionConfig values are valid. If tiered selection is disabled, no validation is performed.
type TieredSelector ¶ added in v1.0.7
type TieredSelector struct {
// contains filtered or unexported fields
}
TieredSelector selects endpoints using cascade-down tier logic. It groups endpoints into tiers based on their reputation scores and selects from the highest available tier.
func NewTieredSelector ¶ added in v1.0.7
func NewTieredSelector(config TieredSelectionConfig, minThreshold float64) *TieredSelector
NewTieredSelector creates a new TieredSelector with the given configuration.
func (*TieredSelector) Config ¶ added in v1.0.7
func (s *TieredSelector) Config() TieredSelectionConfig
Config returns the tiered selection configuration.
func (*TieredSelector) GroupByTier ¶ added in v1.0.7
func (s *TieredSelector) GroupByTier(endpoints map[EndpointKey]float64) (tier1, tier2, tier3 []EndpointKey)
GroupByTier groups endpoints into three tiers based on their scores. Tier 1 (Premium): score >= Tier1Threshold Tier 2 (Good): score >= Tier2Threshold and < Tier1Threshold Tier 3 (Fair): score >= MinThreshold and < Tier2Threshold Endpoints below MinThreshold are excluded (should already be filtered).
func (*TieredSelector) MinThreshold ¶ added in v1.0.7
func (s *TieredSelector) MinThreshold() float64
MinThreshold returns the minimum threshold for Tier 3.
func (*TieredSelector) SelectEndpoint ¶ added in v1.0.7
func (s *TieredSelector) SelectEndpoint(endpoints map[EndpointKey]float64) (EndpointKey, int, error)
SelectEndpoint selects one endpoint using cascade-down tier logic. It returns the selected endpoint key and the tier it was selected from (1, 2, or 3). Returns ErrNoEndpointsAvailable if no endpoints are available in any tier.
func (*TieredSelector) TierForScore ¶ added in v1.0.7
func (s *TieredSelector) TierForScore(score float64) int
TierForScore returns the tier number (1, 2, or 3) for a given score. Returns 0 if the score is below the minimum threshold.