reputation

package
v1.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2026 License: MIT Imports: 13 Imported by: 0

README

Reputation Package

The reputation package provides endpoint reputation tracking and scoring for PATH. It replaces binary sanctions with a gradual scoring system that tracks endpoint reliability over time.

Overview

The reputation system tracks endpoint performance through signals (success/error events) and maintains a score for each endpoint. Endpoints with scores below the threshold are excluded from selection for requests.

Key Features
  • Performance-optimized design: All reads from local cache (<1μs latency)
  • Async writes: Updates local cache immediately, syncs to backend asynchronously
  • Background refresh: Periodically syncs from shared storage (for multi-instance deployments)
  • Configurable thresholds: Tune initial scores, minimum thresholds, and impact values

Configuration

reputation:
  enabled: true
  initial_score: 80        # Starting score for new endpoints
  min_threshold: 30        # Minimum score to be eligible for selection
  recovery_timeout: 5m     # Time before low-scoring endpoints recover
  storage_type: memory     # Storage backend: "memory" or "redis"
  sync_config:
    refresh_interval: 5s   # How often to refresh from backend
    write_buffer_size: 1000  # Max pending writes before blocking
    flush_interval: 100ms  # How often to flush writes to backend

Score Range and Impacts

Scores range from 0 (worst) to 100 (best):

Signal Type Impact Description
Success +1 Request completed successfully
Recovery Success +15 Success from low-scoring endpoint (probation)
Minor Error -3 Validation or format errors
Major Error -10 Timeouts, connection failures
Critical Error -25 HTTP 5xx errors, service unavailable
Fatal Error -50 Configuration errors, invalid service

Default values:

  • Initial Score: 80 (new endpoints start here)
  • Minimum Threshold: 30 (below this, endpoint is excluded)
  • Recovery Timeout: 5m (low-scoring endpoints recover after this time)

Recovery Mechanism

When an endpoint's score falls below the threshold, it gets excluded from selection. To prevent endpoints from being "jailed" forever, the system automatically recovers endpoints that have been below threshold for longer than recovery_timeout:

  1. Endpoint fails repeatedly → score drops below threshold
  2. Endpoint is excluded from request selection
  3. After recovery_timeout (default 5m) with no signals
  4. Next time the endpoint is checked, score resets to initial_score
  5. Endpoint becomes eligible for selection again

This allows endpoints to recover from temporary issues (crashes, network problems, etc.) without manual intervention.

Usage

Recording Signals
key := reputation.NewEndpointKey("eth", "https://endpoint.example.com")

// On success
svc.RecordSignal(ctx, key, reputation.NewSuccessSignal(latency))

// On error
svc.RecordSignal(ctx, key, reputation.NewMajorErrorSignal("timeout", latency))
Filtering Endpoints
// Get endpoints above threshold
eligible, err := svc.FilterByScore(ctx, endpoints, config.MinThreshold)
Getting Scores
score, err := svc.GetScore(ctx, key)
fmt.Printf("Score: %.1f, Success: %d, Errors: %d\n",
    score.Value, score.SuccessCount, score.ErrorCount)

Architecture

┌─────────────────┐     ┌──────────────┐     ┌─────────────┐
│  Hot Path       │────▶│ Local Cache  │────▶│ Read <1μs   │
│  (Requests)     │     │ (sync.Map)   │     └─────────────┘
└─────────────────┘     └──────────────┘
                               │
                               │ Async Write
                               ▼
                        ┌──────────────┐     ┌─────────────┐
                        │ Write Buffer │────▶│   Storage   │
                        │   Channel    │     │ (Memory/    │
                        └──────────────┘     │  Redis)     │
                               ▲             └─────────────┘
                               │                    │
                        ┌──────────────┐            │
                        │  Background  │◀───────────┘
                        │   Refresh    │  Periodic Sync
                        └──────────────┘

Storage Backends

Memory (Default)
  • Single-instance only
  • Data lost on restart
  • Fastest performance
Redis (Future PR)
  • Shared across instances
  • Persistent storage
  • Required for multi-instance deployments

Thread Safety

The ReputationService is safe for concurrent use. All methods can be called from multiple goroutines without external synchronization.

Documentation

Overview

Package reputation provides endpoint reputation tracking and scoring.

The reputation system tracks the reliability and performance of endpoints based on their responses to requests. Endpoints accumulate a reputation score that influences their selection probability for future requests.

Key concepts:

  • Score: A numeric value (0-100) representing endpoint reliability
  • Signal: An event that affects an endpoint's score (success, error, timeout, etc.)
  • Storage: Backend for persisting scores (memory, Redis, etc.)

Index

Constants

View Source
const (
	// MinScore is the lowest possible reputation score.
	MinScore float64 = 0

	// MaxScore is the highest possible reputation score.
	MaxScore float64 = 100

	// InitialScore is the starting score for new endpoints.
	// Endpoints start at tier 1/2 boundary (80) giving new endpoints benefit of doubt.
	// With buffer of 50 points above min threshold (30), endpoints can survive
	// transient errors without being filtered out prematurely.
	InitialScore float64 = 80

	// DefaultMinThreshold is the minimum score required for endpoint selection.
	// Endpoints below this threshold are excluded from selection.
	DefaultMinThreshold float64 = 30
)

Score constants define the bounds and defaults for reputation scores.

View Source
const (
	// DefaultStrikeThreshold is the number of consecutive critical/fatal errors
	// before an extended cooldown is applied. Set to 5 for more tolerance.
	DefaultStrikeThreshold = 5

	// DefaultBaseCooldown is the initial cooldown duration after hitting the strike threshold.
	// Each additional strike doubles this duration (exponential backoff).
	DefaultBaseCooldown = 5 * time.Minute

	// DefaultMaxCooldown is the maximum cooldown duration regardless of strike count.
	DefaultMaxCooldown = 1 * time.Hour
)

Strike system constants control extended cooldowns for persistently failing endpoints.

View Source
const (
	// KeyGranularityEndpoint scores each endpoint URL separately (the finest granularity).
	// Key format: serviceID:supplierAddr-endpointURL
	// This is the default behavior.
	KeyGranularityEndpoint = "per-endpoint"

	// KeyGranularityDomain scores all endpoints from the same hosting domain together.
	// Key format: serviceID:domain (e.g., eth:nodefleet.net)
	// Use when a hosting provider's overall reliability matters.
	KeyGranularityDomain = "per-domain"

	// KeyGranularitySupplier scores all endpoints from the same supplier together.
	// Key format: serviceID:supplierAddr
	// Use when a supplier's overall reliability matters more than individual endpoints.
	KeyGranularitySupplier = "per-supplier"
)

Key granularity options determine how endpoints are grouped for scoring. Ordered from finest to coarsest granularity.

View Source
const (
	// DefaultRecoveryTimeout is the duration after which low-scoring endpoints
	// are reset to initial score if they have no signals.
	DefaultRecoveryTimeout = 5 * time.Minute

	// DefaultRefreshInterval is how often to refresh from backend storage.
	DefaultRefreshInterval = 5 * time.Second

	// DefaultWriteBufferSize is the max pending writes before blocking.
	DefaultWriteBufferSize = 1000

	// DefaultFlushInterval is how often to flush buffered writes.
	DefaultFlushInterval = 100 * time.Millisecond
)

Recovery and SyncConfig defaults.

View Source
const (
	// DefaultLatencyFastThreshold is the max latency for "fast" responses.
	DefaultLatencyFastThreshold = 100 * time.Millisecond

	// DefaultLatencyNormalThreshold is the max latency for "normal" responses.
	DefaultLatencyNormalThreshold = 500 * time.Millisecond

	// DefaultLatencySlowThreshold is the max latency for "slow" responses.
	DefaultLatencySlowThreshold = 1000 * time.Millisecond

	// DefaultLatencyPenaltyThreshold triggers slow_response penalty.
	DefaultLatencyPenaltyThreshold = 2000 * time.Millisecond

	// DefaultLatencySevereThreshold triggers very_slow_response penalty.
	DefaultLatencySevereThreshold = 5000 * time.Millisecond

	// DefaultFastBonus is the multiplier for success when response is fast.
	DefaultFastBonus = 2.0

	// DefaultSlowPenalty is the multiplier for success when response is slow.
	DefaultSlowPenalty = 0.5

	// DefaultVerySlowPenalty is the multiplier for success when response is very slow.
	DefaultVerySlowPenalty = 0.0
)

Latency defaults for reputation scoring.

View Source
const (
	LatencyProfileEVM     = "evm"
	LatencyProfileCosmos  = "cosmos"
	LatencyProfileSolana  = "solana"
	LatencyProfileLLM     = "llm"
	LatencyProfileGeneric = "generic"
)

Well-known latency profile names.

Variables

View Source
var (
	// ErrNotFound is returned when an endpoint's score is not found.
	ErrNotFound = errors.New("endpoint score not found")

	// ErrStorageClosed is returned when operations are attempted on a closed storage.
	ErrStorageClosed = errors.New("storage is closed")
)

Common errors returned by storage implementations.

View Source
var ErrNoEndpointsAvailable = errors.New("no endpoints available for selection")

ErrNoEndpointsAvailable is returned when no endpoints are available for selection.

Functions

func DefaultLatencyProfiles added in v1.0.10

func DefaultLatencyProfiles() map[string]LatencyProfile

DefaultLatencyProfiles returns the built-in latency profiles for common service types. These can be overridden or extended in configuration.

func GetScoreImpact

func GetScoreImpact(t SignalType) float64

GetScoreImpact returns the default score impact for a signal type. Returns 0 if the signal type is not recognized.

Types

type Cleaner

type Cleaner interface {
	// Cleanup removes expired entries from storage.
	// This is called periodically by the service to prevent memory bloat.
	Cleanup()
}

Cleaner is an optional interface that storage backends can implement to support periodic cleanup of expired entries.

type Config

type Config struct {
	// Enabled determines if reputation tracking is active.
	// IMPORTANT: Reputation is MANDATORY - setting this to false is ignored.
	// This field is kept for backward compatibility.
	Enabled bool `yaml:"enabled"`

	// InitialScore is the starting score for new endpoints.
	// Default: 80
	InitialScore float64 `yaml:"initial_score"`

	// MinThreshold is the minimum score for endpoint selection.
	// Default: 30
	MinThreshold float64 `yaml:"min_threshold"`

	// RecoveryTimeout is the duration after which a low-scoring endpoint
	// with no signals is reset to InitialScore. This allows endpoints
	// to recover from temporary failures (crashes, network issues, etc.).
	// IMPORTANT: This is IGNORED when TieredSelection.Probation or HealthChecks are enabled.
	// Signal-based recovery takes precedence over time-based recovery.
	// Default: 5m
	RecoveryTimeout time.Duration `yaml:"recovery_timeout"`

	// StorageType specifies the storage backend ("memory" or "redis").
	// Default: "memory"
	StorageType string `yaml:"storage_type"`

	// KeyGranularity determines how endpoints are grouped for scoring (global default).
	// Options: "per-endpoint" (default), "per-domain", "per-supplier"
	// Default: "per-endpoint"
	KeyGranularity string `yaml:"key_granularity"`

	// ServiceOverrides allows per-service configuration overrides.
	// Use this to apply different granularity rules to specific services.
	ServiceOverrides map[string]ServiceConfig `yaml:"service_overrides,omitempty"`

	// SyncConfig configures background synchronization behavior.
	SyncConfig SyncConfig `yaml:"sync_config"`

	// TieredSelection configures endpoint selection based on reputation tiers.
	// Endpoints are grouped into tiers based on their scores.
	TieredSelection TieredSelectionConfig `yaml:"tiered_selection,omitempty"`

	// Latency configures how response latency affects reputation scoring.
	// Fast endpoints get bonuses, slow endpoints get penalties.
	Latency LatencyConfig `yaml:"latency,omitempty"`

	// SignalImpacts configures how much each signal type affects the reputation score.
	// This allows tuning how aggressively endpoints are penalized or rewarded.
	SignalImpacts SignalImpactsConfig `yaml:"signal_impacts,omitempty"`
}

Config holds configuration for the reputation system.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults. Reputation is MANDATORY and cannot be disabled - it is the unified QoS system.

func (*Config) HydrateDefaults

func (c *Config) HydrateDefaults()

HydrateDefaults fills in zero values with defaults. This is only called when reputation is enabled.

func (*Config) ResolveRecoveryConflict added in v1.0.10

func (c *Config) ResolveRecoveryConflict(healthChecksEnabled bool)

ResolveRecoveryConflict disables recovery_timeout if signal-based recovery is active. This should be called after ValidateRecoveryConfig to apply the resolution.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks that the configuration values are valid. Returns an error if any values are out of bounds or inconsistent.

func (*Config) ValidateRecoveryConfig added in v1.0.10

func (c *Config) ValidateRecoveryConfig(healthChecksEnabled bool) RecoveryConflictResult

ValidateRecoveryConfig checks for conflicts between time-based (recovery_timeout) and signal-based recovery (probation, health_checks).

Recovery mechanisms:

  • recovery_timeout: Time-based. Resets score after X time with no signals.
  • probation: Signal-based. Gives low-scoring endpoints traffic to recover.
  • health_checks: Signal-based. Probes endpoints to generate recovery signals.

When signal-based recovery is enabled, time-based recovery should be disabled because it can prematurely promote endpoints that are still failing.

Returns a RecoveryConflictResult with:

  • HasConflict: true if both time-based and signal-based recovery are enabled
  • HasNoRecovery: true if no recovery mechanism is configured
  • Message: description of the issue

type DomainKeyBuilder added in v1.0.6

type DomainKeyBuilder struct{}

DomainKeyBuilder creates keys with per-domain granularity. All endpoints from the same hosting domain share a score, tracked per RPC type. Key format: serviceID:domain:rpcType (e.g., eth:nodefleet.net:json_rpc)

func (*DomainKeyBuilder) BuildKey added in v1.0.6

func (b *DomainKeyBuilder) BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr, rpcType sharedtypes.RPCType) EndpointKey

BuildKey creates a key using the domain extracted from the endpoint URL and RPC type. If the domain cannot be extracted, falls back to full endpoint address. The RPC type is always included to track separate scores for different protocols.

type EndpointKey

type EndpointKey struct {
	ServiceID    protocol.ServiceID
	EndpointAddr protocol.EndpointAddr
	RPCType      sharedtypes.RPCType // REQUIRED: RPC type dimension for reputation tracking
}

EndpointKey uniquely identifies an endpoint for reputation tracking. It combines the service ID, endpoint address, and RPC type to create a unique key. The RPC type dimension allows tracking separate reputation scores for different protocols (e.g., json_rpc vs websocket) at the same endpoint URL.

func NewEndpointKey

func NewEndpointKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr, rpcType sharedtypes.RPCType) EndpointKey

NewEndpointKey creates a new EndpointKey from service ID, endpoint address, and RPC type. The RPC type is required to track reputation separately for different protocols (e.g., json_rpc, websocket, rest) at the same endpoint URL.

func (EndpointKey) String

func (k EndpointKey) String() string

String returns a string representation of the endpoint key. Format: "serviceID:endpointAddr:rpcType" Example: "eth:pokt1abc-https://node.example.com:json_rpc"

type EndpointKeyBuilder added in v1.0.6

type EndpointKeyBuilder struct{}

EndpointKeyBuilder creates keys with per-endpoint granularity. Each endpoint URL is scored separately, with separate scores per RPC type. Key format: serviceID:supplierAddr-endpointURL:rpcType

func (*EndpointKeyBuilder) BuildKey added in v1.0.6

func (b *EndpointKeyBuilder) BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr, rpcType sharedtypes.RPCType) EndpointKey

BuildKey creates a key using the full endpoint address and RPC type.

type KeyBuilder added in v1.0.6

type KeyBuilder interface {
	// BuildKey creates an EndpointKey for the given service, endpoint, and RPC type.
	// The RPC type is required to track reputation separately for different protocols.
	BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr, rpcType sharedtypes.RPCType) EndpointKey
}

KeyBuilder creates EndpointKeys with a specific granularity. Different implementations group endpoints differently for scoring. The RPC type is always included in keys to track separate reputation scores for different protocols (e.g., json_rpc vs websocket) at the same endpoint.

func NewKeyBuilder added in v1.0.6

func NewKeyBuilder(granularity string) KeyBuilder

NewKeyBuilder creates a KeyBuilder for the specified granularity. If the granularity is invalid or empty, it defaults to per-endpoint.

type LatencyConfig added in v1.0.10

type LatencyConfig struct {
	// Enabled enables latency-aware scoring. Default: true
	Enabled bool `yaml:"enabled,omitempty"`

	// FastThreshold is the maximum latency for "fast" responses.
	// Fast responses get a bonus multiplier on success signals.
	// Default: 100ms (appropriate for blockchain services)
	FastThreshold time.Duration `yaml:"fast_threshold,omitempty"`

	// NormalThreshold is the maximum latency for "normal" responses.
	// Normal responses get standard success impact.
	// Default: 500ms
	NormalThreshold time.Duration `yaml:"normal_threshold,omitempty"`

	// SlowThreshold is the maximum latency for "slow" responses.
	// Slow responses get reduced success impact.
	// Default: 1000ms (1 second)
	SlowThreshold time.Duration `yaml:"slow_threshold,omitempty"`

	// PenaltyThreshold triggers a slow_response penalty signal.
	// Responses slower than this get a reputation penalty even if successful.
	// Default: 2000ms (2 seconds)
	PenaltyThreshold time.Duration `yaml:"penalty_threshold,omitempty"`

	// SevereThreshold triggers a very_slow_response penalty signal.
	// Responses slower than this get a larger reputation penalty.
	// Default: 5000ms (5 seconds)
	SevereThreshold time.Duration `yaml:"severe_threshold,omitempty"`

	// FastBonus is the multiplier for success impact when response is fast.
	// Default: 2.0 (fast success = +2 instead of +1)
	FastBonus float64 `yaml:"fast_bonus,omitempty"`

	// SlowPenalty is the multiplier for success impact when response is slow.
	// Default: 0.5 (slow success = +0.5 instead of +1)
	SlowPenalty float64 `yaml:"slow_penalty,omitempty"`

	// VerySlowPenalty is the multiplier for success when response is very slow.
	// Default: 0.0 (very slow success = +0, no reputation gain)
	VerySlowPenalty float64 `yaml:"very_slow_penalty,omitempty"`

	// ServiceProfiles defines latency thresholds for different service types.
	// Services can reference a profile by name, or define inline thresholds.
	// If a service doesn't match any profile, global defaults are used.
	ServiceProfiles map[string]LatencyProfile `yaml:"service_profiles,omitempty"`
}

LatencyConfig configures how latency affects reputation scoring. Latency is measured from both health checks (background) and client requests.

IMPORTANT: These are global defaults. Different services have different latency profiles:

  • EVM: 50-200ms typical
  • LLM: 2-30s typical

Use ServiceLatencyProfiles to define per-service-type thresholds, or configure latency thresholds per-service in health_checks config.

func DefaultLatencyConfig added in v1.0.10

func DefaultLatencyConfig() LatencyConfig

DefaultLatencyConfig returns a LatencyConfig with sensible defaults.

func (*LatencyConfig) GetProfileForService added in v1.0.10

func (c *LatencyConfig) GetProfileForService(serviceID, profileName string) LatencyProfile

GetProfileForService returns the latency profile for a service. Lookup order:

  1. ServiceProfiles[serviceID] - exact service match
  2. ServiceProfiles[profileName] - profile name match (e.g., "evm")
  3. Built-in defaults - DefaultLatencyProfiles()
  4. Global defaults - LatencyConfig thresholds

The profileName parameter is optional - if empty, only serviceID is checked.

func (*LatencyConfig) HydrateDefaults added in v1.0.10

func (l *LatencyConfig) HydrateDefaults()

HydrateDefaults fills in zero values with defaults for latency configuration.

type LatencyImpactResult added in v1.0.10

type LatencyImpactResult struct {
	FinalImpact     float64
	BaseImpact      float64
	Modifier        float64
	LatencyCategory string // "fast", "normal", "slow", "very_slow", or "skipped"
	Latency         time.Duration
	Config          LatencyConfig
}

LatencyImpactResult contains the result of latency-aware impact calculation along with metadata useful for logging/observability.

type LatencyMetrics added in v1.0.10

type LatencyMetrics struct {
	// LastLatency is the most recent response latency.
	LastLatency time.Duration

	// AvgLatency is the exponential moving average of response latency.
	// Updated using: avg = (1-alpha)*avg + alpha*new_sample, where alpha=0.1
	AvgLatency time.Duration

	// MinLatency is the minimum latency observed (best case).
	MinLatency time.Duration

	// MaxLatency is the maximum latency observed (worst case).
	MaxLatency time.Duration

	// SampleCount is the number of latency samples collected.
	SampleCount int64

	// LastUpdated is when the latency metrics were last updated.
	LastUpdated time.Time
}

LatencyMetrics tracks response latency statistics for an endpoint. These metrics are used for endpoint selection within the same reputation tier.

func (*LatencyMetrics) UpdateLatency added in v1.0.10

func (m *LatencyMetrics) UpdateLatency(latency time.Duration)

UpdateLatency updates the latency metrics with a new sample. Uses exponential moving average with alpha=0.1 for AvgLatency.

type LatencyProfile added in v1.0.10

type LatencyProfile struct {
	// Name is the profile identifier (e.g., "evm", "llm", "cosmos")
	Name string `yaml:"name,omitempty"`

	// Description explains what services this profile is for
	Description string `yaml:"description,omitempty"`

	// FastThreshold - responses faster than this get a bonus
	FastThreshold time.Duration `yaml:"fast_threshold"`

	// NormalThreshold - responses faster than this are considered normal
	NormalThreshold time.Duration `yaml:"normal_threshold"`

	// SlowThreshold - responses faster than this are slow but acceptable
	SlowThreshold time.Duration `yaml:"slow_threshold"`

	// PenaltyThreshold - responses slower than this get a penalty signal
	PenaltyThreshold time.Duration `yaml:"penalty_threshold"`

	// SevereThreshold - responses slower than this get a severe penalty
	SevereThreshold time.Duration `yaml:"severe_threshold"`
}

LatencyProfile defines latency thresholds for a category of services. This allows different service types (blockchain, LLM, etc.) to have appropriate latency expectations.

func (LatencyProfile) ToLatencyConfig added in v1.0.10

func (p LatencyProfile) ToLatencyConfig(baseConfig LatencyConfig) LatencyConfig

ToLatencyConfig converts a LatencyProfile to a LatencyConfig for use in calculations. This allows using the profile thresholds with the CalculateLatencyAwareImpact method.

type ProbationConfig added in v1.0.10

type ProbationConfig struct {
	// Enabled enables/disables the probation system.
	Enabled bool `yaml:"enabled,omitempty"`
	// Threshold is the score below which endpoints enter probation.
	Threshold float64 `yaml:"threshold,omitempty"`
	// TrafficPercent is the percentage of traffic sent to probation endpoints.
	TrafficPercent float64 `yaml:"traffic_percent,omitempty"`
	// RecoveryMultiplier multiplies the score increase for successful requests from probation.
	RecoveryMultiplier float64 `yaml:"recovery_multiplier,omitempty"`
}

ProbationConfig configures the probation system for endpoint recovery. Probation gives low-scoring endpoints a small percentage of traffic to allow them to recover via successful requests.

type RecoveryConflictResult added in v1.0.10

type RecoveryConflictResult struct {
	// HasConflict indicates if there's a conflict between recovery mechanisms.
	HasConflict bool
	// HasNoRecovery indicates if no recovery mechanism is configured at all.
	HasNoRecovery bool
	// OriginalRecoveryTimeout is the recovery_timeout value before resolution.
	OriginalRecoveryTimeout time.Duration
	// Message contains a description of the issue found.
	Message string
}

RecoveryConflictResult contains the result of recovery configuration validation.

type RedisConfig

type RedisConfig struct {
	// Address is the Redis server address (host:port).
	Address string `yaml:"address"`

	// Password for Redis authentication (optional).
	Password string `yaml:"password"`

	// DB is the Redis database number.
	DB int `yaml:"db"`

	// KeyPrefix is prepended to all keys stored in Redis.
	// Default: "path:reputation:"
	KeyPrefix string `yaml:"key_prefix"`

	// PoolSize is the maximum number of socket connections.
	// Default: 10
	PoolSize int `yaml:"pool_size"`

	// DialTimeout is the timeout for establishing new connections.
	// Default: 5s
	DialTimeout time.Duration `yaml:"dial_timeout"`

	// ReadTimeout is the timeout for socket reads.
	// Default: 3s
	ReadTimeout time.Duration `yaml:"read_timeout"`

	// WriteTimeout is the timeout for socket writes.
	// Default: 3s
	WriteTimeout time.Duration `yaml:"write_timeout"`
}

RedisConfig holds Redis-specific configuration.

func DefaultRedisConfig

func DefaultRedisConfig() RedisConfig

DefaultRedisConfig returns a RedisConfig with sensible defaults.

func (*RedisConfig) HydrateDefaults

func (c *RedisConfig) HydrateDefaults()

HydrateDefaults fills in zero values with defaults.

type ReputationService

type ReputationService interface {
	// RecordSignal records a signal event for an endpoint.
	// Updates local cache immediately and queues async write to backend storage.
	// This method is non-blocking - it returns immediately after updating local cache.
	RecordSignal(ctx context.Context, key EndpointKey, signal Signal) error

	// GetScore retrieves the current reputation score for an endpoint.
	// Always reads from local cache for minimal latency (<1μs).
	// Returns the score or an error if the endpoint is not found.
	GetScore(ctx context.Context, key EndpointKey) (Score, error)

	// GetScores retrieves reputation scores for multiple endpoints.
	// Always reads from local cache for minimal latency.
	// Returns a map of endpoint keys to scores. Missing endpoints are omitted.
	GetScores(ctx context.Context, keys []EndpointKey) (map[EndpointKey]Score, error)

	// FilterByScore filters endpoints that meet the minimum score threshold.
	// Always reads from local cache for minimal latency.
	// Returns only endpoints with scores >= minThreshold.
	FilterByScore(ctx context.Context, keys []EndpointKey, minThreshold float64) ([]EndpointKey, error)

	// ResetScore resets an endpoint's score to the initial value.
	// Used for administrative purposes or testing.
	ResetScore(ctx context.Context, key EndpointKey) error

	// KeyBuilderForService returns the KeyBuilder for the given service.
	// Uses service-specific config if available, otherwise falls back to global default.
	KeyBuilderForService(serviceID protocol.ServiceID) KeyBuilder

	// SetServiceConfig sets per-service reputation configuration overrides.
	// This allows different services to have different initial scores and min thresholds.
	SetServiceConfig(serviceID protocol.ServiceID, config ServiceConfig)

	// GetInitialScoreForService returns the initial score for a service.
	// Uses per-service config if set, otherwise falls back to global default.
	GetInitialScoreForService(serviceID protocol.ServiceID) float64

	// GetMinThresholdForService returns the min threshold for a service.
	// Uses per-service config if set, otherwise falls back to global default.
	GetMinThresholdForService(serviceID protocol.ServiceID) float64

	// SetLatencyProfile sets per-service latency profile configuration.
	// This allows different services to have different latency thresholds and bonuses/penalties.
	SetLatencyProfile(serviceID protocol.ServiceID, latencyConfig LatencyConfig)

	// GetLatencyConfigForService returns the latency config for a service.
	// Uses per-service config if set, otherwise falls back to global default.
	GetLatencyConfigForService(serviceID protocol.ServiceID) LatencyConfig

	// Start begins background sync processes (e.g., Redis pub/sub, periodic refresh).
	// Should be called once during initialization.
	Start(ctx context.Context) error

	// Stop gracefully shuts down background processes and flushes pending writes.
	Stop() error

	// SetLogger sets the logger for the reputation service.
	// This enables debug logging for latency scoring and other operations.
	SetLogger(logger polylog.Logger)
}

ReputationService provides endpoint reputation tracking and querying. Implementations must be safe for concurrent use.

Performance design:

  • All reads (GetScore, FilterByScore) are served from local in-memory cache (<1μs)
  • Writes (RecordSignal) update local cache immediately + async sync to backend
  • Background refresh keeps local cache in sync with shared backend (Redis)

This ensures the hot path (request handling) is never blocked by storage latency.

func NewService

func NewService(config Config, store Storage) ReputationService

NewService creates a new ReputationService with the given configuration. The service must be started with Start() before use.

type Score

type Score struct {
	// Value is the current score (0-100).
	// Higher scores indicate more reliable endpoints.
	Value float64

	// LastUpdated is when the score was last modified.
	LastUpdated time.Time

	// SuccessCount is the total number of successful requests.
	SuccessCount int64

	// ErrorCount is the total number of failed requests.
	ErrorCount int64

	// LatencyMetrics tracks response latency statistics for this endpoint.
	// Updated from both health checks and client requests.
	LatencyMetrics LatencyMetrics

	// CriticalStrikes counts consecutive critical/fatal errors without recovery.
	// Used by the strike system to apply extended cooldowns for persistently failing endpoints.
	// Reset to 0 when endpoint has a successful request.
	CriticalStrikes int

	// CooldownUntil is when the endpoint's cooldown period ends.
	// Endpoints in cooldown are excluded from selection even if their score is above threshold.
	// Set when CriticalStrikes exceeds StrikeThreshold.
	CooldownUntil time.Time
}

Score represents an endpoint's reputation score at a point in time.

func (Score) CooldownRemaining added in v1.0.13

func (s Score) CooldownRemaining() time.Duration

CooldownRemaining returns the remaining cooldown duration, or 0 if not in cooldown.

func (Score) IsInCooldown added in v1.0.13

func (s Score) IsInCooldown() bool

IsInCooldown returns true if the endpoint is currently in a strike cooldown period. Endpoints in cooldown should be excluded from selection even if their score is above threshold.

func (Score) IsValid

func (s Score) IsValid() bool

IsValid returns true if the score is within valid bounds.

type ServiceConfig added in v1.0.6

type ServiceConfig struct {
	// KeyGranularity overrides the global key granularity for this service.
	// Options: "per-endpoint", "per-domain", "per-supplier"
	KeyGranularity string `yaml:"key_granularity"`

	// InitialScore overrides the initial score for new endpoints in this service.
	// If 0, uses the global default.
	InitialScore float64 `yaml:"initial_score,omitempty"`

	// MinThreshold overrides the minimum score for endpoint selection in this service.
	// If 0, uses the global default.
	MinThreshold float64 `yaml:"min_threshold,omitempty"`

	// RecoveryTimeout overrides the recovery timeout for this service.
	// If 0, uses the global default.
	RecoveryTimeout time.Duration `yaml:"recovery_timeout,omitempty"`

	// HealthChecksEnabled indicates whether health checks are enabled for this service.
	// If true, time-based recovery is skipped (health checks handle recovery).
	HealthChecksEnabled bool

	// ProbationEnabled indicates whether probation is enabled for this service.
	// If true, time-based recovery is skipped (probation handles recovery).
	ProbationEnabled bool
}

ServiceConfig holds service-specific reputation configuration overrides.

type Signal

type Signal struct {
	// Type categorizes the signal's severity.
	Type SignalType

	// Timestamp when the signal was generated.
	Timestamp time.Time

	// Latency of the request that generated this signal (if applicable).
	Latency time.Duration

	// Reason provides additional context for the signal.
	Reason string

	// Metadata holds additional signal-specific data.
	Metadata map[string]string
}

Signal represents an event that affects an endpoint's reputation.

func NewCriticalErrorSignal

func NewCriticalErrorSignal(reason string, latency time.Duration) Signal

NewCriticalErrorSignal creates a signal for a critical error (HTTP 5xx).

func NewFatalErrorSignal

func NewFatalErrorSignal(reason string) Signal

NewFatalErrorSignal creates a signal for a fatal error. This replaces the concept of "permanent sanctions" - endpoints can recover.

func NewMajorErrorSignal

func NewMajorErrorSignal(reason string, latency time.Duration) Signal

NewMajorErrorSignal creates a signal for a major error (timeout, connection issues).

func NewMinorErrorSignal

func NewMinorErrorSignal(reason string) Signal

NewMinorErrorSignal creates a signal for a minor error.

func NewRecoverySuccessSignal

func NewRecoverySuccessSignal(latency time.Duration) Signal

NewRecoverySuccessSignal creates a signal for a successful request from a low-scoring endpoint. This has moderate impact (+5) than regular success (+1) to allow endpoints to recover when proving they're healthy, but requires sustained good behavior to fully recover from critical errors. Reduced from +15 to +5 to prevent rapid recovery. Intended for use by Probation system (PR 7) and Health checks (PR 9).

func NewSlowResponseSignal added in v1.0.10

func NewSlowResponseSignal(latency time.Duration) Signal

NewSlowResponseSignal creates a signal for a successful but slow response. This applies a small penalty (-1) even though the request succeeded. Use when response latency exceeds PenaltyThreshold (default: 2000ms).

func NewSuccessSignal

func NewSuccessSignal(latency time.Duration) Signal

NewSuccessSignal creates a signal for a successful request.

func NewVerySlowResponseSignal added in v1.0.10

func NewVerySlowResponseSignal(latency time.Duration) Signal

NewVerySlowResponseSignal creates a signal for a successful but very slow response. This applies a moderate penalty (-3) even though the request succeeded. Use when response latency exceeds SevereThreshold (default: 5000ms).

func (Signal) CalculateLatencyAwareImpact added in v1.0.10

func (s Signal) CalculateLatencyAwareImpact(config LatencyConfig) float64

CalculateLatencyAwareImpact calculates the score impact with latency modifiers. For success signals, the impact is modified based on response latency:

  • Fast (< FastThreshold): base_impact * FastBonus (default: +1 * 2.0 = +2)
  • Normal (< NormalThreshold): base_impact * 1.0 (default: +1)
  • Slow (< SlowThreshold): base_impact * SlowPenalty (default: +1 * 0.5 = +0.5)
  • Very slow (>= SlowThreshold): base_impact * VerySlowPenalty (default: +1 * 0.0 = 0)

For error signals, the latency modifier is not applied - errors have fixed impact.

Additionally, if latency exceeds PenaltyThreshold or SevereThreshold, an additional penalty signal should be recorded separately.

func (Signal) CalculateLatencyAwareImpactWithConfig added in v1.0.10

func (s Signal) CalculateLatencyAwareImpactWithConfig(config LatencyConfig, baseImpact float64) LatencyImpactResult

CalculateLatencyAwareImpactWithConfig calculates the score impact with latency modifiers using a provided base impact value. This allows using configurable signal impacts.

func (Signal) CalculateLatencyAwareImpactWithDetails added in v1.0.10

func (s Signal) CalculateLatencyAwareImpactWithDetails(config LatencyConfig) LatencyImpactResult

CalculateLatencyAwareImpactWithDetails calculates the score impact with latency modifiers and returns detailed information about the calculation for logging purposes. Uses the default (hardcoded) base impact from GetDefaultImpact().

func (Signal) GetDefaultImpact

func (s Signal) GetDefaultImpact() float64

GetDefaultImpact returns the default score impact for this signal.

func (Signal) GetRecoveryMultiplier added in v1.0.10

func (s Signal) GetRecoveryMultiplier() float64

GetRecoveryMultiplier returns the recovery multiplier from signal metadata. Returns 1.0 if no multiplier is set (no modification to impact). This is used by calculateImpact to boost recovery scoring for probation endpoints.

func (Signal) IsNegative

func (s Signal) IsNegative() bool

IsNegative returns true if this signal has a negative impact on score.

func (Signal) IsPositive

func (s Signal) IsPositive() bool

IsPositive returns true if this signal has a positive impact on score.

func (Signal) WithMultiplier added in v1.0.10

func (s Signal) WithMultiplier(multiplier float64) Signal

WithMultiplier returns a new Signal with a multiplier applied to its impact. This is used by probation to boost recovery when endpoints successfully handle requests. The multiplier only affects the metadata - the actual impact calculation happens in the scoring service which can read the multiplier from metadata.

type SignalImpactsConfig added in v1.0.10

type SignalImpactsConfig struct {
	// Success is the score change for successful responses. Default: +1
	Success float64 `yaml:"success,omitempty"`

	// MinorError is the score change for minor errors (validation issues). Default: -3
	MinorError float64 `yaml:"minor_error,omitempty"`

	// MajorError is the score change for major errors (timeout, connection). Default: -10
	MajorError float64 `yaml:"major_error,omitempty"`

	// CriticalError is the score change for critical errors (HTTP 5xx). Default: -25
	CriticalError float64 `yaml:"critical_error,omitempty"`

	// FatalError is the score change for fatal errors (config issues). Default: -50
	FatalError float64 `yaml:"fatal_error,omitempty"`

	// RecoverySuccess is the score change for successful probation recovery. Default: +5
	// Reduced from +15 to require sustained good behavior for recovery.
	RecoverySuccess float64 `yaml:"recovery_success,omitempty"`

	// SlowResponse is the score change for slow responses. Default: -1
	SlowResponse float64 `yaml:"slow_response,omitempty"`

	// VerySlowResponse is the score change for very slow responses. Default: -3
	VerySlowResponse float64 `yaml:"very_slow_response,omitempty"`
}

SignalImpactsConfig configures the score impact for each signal type. These values determine how much the reputation score changes when a signal is recorded. All values should be provided - use defaults from DefaultSignalImpacts() if not specified.

func DefaultSignalImpacts added in v1.0.10

func DefaultSignalImpacts() SignalImpactsConfig

DefaultSignalImpacts returns the default signal impact values.

func (*SignalImpactsConfig) GetImpact added in v1.0.10

func (c *SignalImpactsConfig) GetImpact(signalType SignalType) float64

GetImpact returns the configured impact for a signal type. Falls back to default values if not configured (zero value).

type SignalType

type SignalType string

SignalType categorizes signals by their impact on reputation.

const (
	// SignalTypeSuccess indicates a successful request/response.
	SignalTypeSuccess SignalType = "success"

	// SignalTypeMinorError indicates a minor error (validation issues, unknown errors).
	SignalTypeMinorError SignalType = "minor_error"

	// SignalTypeMajorError indicates a major error (timeout, connection issues).
	SignalTypeMajorError SignalType = "major_error"

	// SignalTypeCriticalError indicates a critical error (HTTP 5xx, transport errors).
	SignalTypeCriticalError SignalType = "critical_error"

	// SignalTypeFatalError indicates a fatal error (service misconfiguration).
	// Was previously "permanent sanction" - now just a severe score penalty.
	SignalTypeFatalError SignalType = "fatal_error"

	// SignalTypeRecoverySuccess indicates a successful request from a low-scoring endpoint.
	// This signal has moderate impact (+5) than regular success (+1) to allow endpoints
	// to recover when they prove they're healthy again, but requires sustained good behavior.
	// Reduced from +15 to +5 to prevent rapid recovery from critical errors.
	// Intended for use by:
	//   - Probation system (PR 7): when sampling traffic to low-scoring endpoints
	//   - Health checks (PR 9): when probing excluded endpoints
	SignalTypeRecoverySuccess SignalType = "recovery_success"

	// SignalTypeSlowResponse indicates a successful but slow response (> PenaltyThreshold).
	// This applies a small penalty to the endpoint's reputation even though the request
	// succeeded, because slow responses indicate degraded performance.
	// Default penalty threshold: 2000ms
	SignalTypeSlowResponse SignalType = "slow_response"

	// SignalTypeVerySlowResponse indicates a successful but very slow response (> SevereThreshold).
	// This applies a moderate penalty to the endpoint's reputation.
	// Default severe threshold: 5000ms
	SignalTypeVerySlowResponse SignalType = "very_slow_response"
)

func ClassifyLatency added in v1.0.10

func ClassifyLatency(latency time.Duration, config LatencyConfig) *SignalType

ClassifyLatency returns the latency signal type based on thresholds. Returns nil if no additional penalty signal is needed. This is separate from success/error signals - it's an additional penalty.

type Storage

type Storage interface {
	// Get retrieves the score for an endpoint.
	// Returns ErrNotFound if the endpoint has no stored score.
	Get(ctx context.Context, key EndpointKey) (Score, error)

	// GetMultiple retrieves scores for multiple endpoints.
	// Returns a map containing only the endpoints that were found.
	// Missing endpoints are omitted from the result (no error).
	GetMultiple(ctx context.Context, keys []EndpointKey) (map[EndpointKey]Score, error)

	// Set stores or updates the score for an endpoint.
	Set(ctx context.Context, key EndpointKey, score Score) error

	// SetMultiple stores or updates scores for multiple endpoints.
	SetMultiple(ctx context.Context, scores map[EndpointKey]Score) error

	// Delete removes the score for an endpoint.
	// Returns nil if the endpoint doesn't exist.
	Delete(ctx context.Context, key EndpointKey) error

	// List returns all stored endpoint keys for a service.
	// If serviceID is empty, returns all endpoint keys.
	List(ctx context.Context, serviceID string) ([]EndpointKey, error)

	// Close releases any resources held by the storage.
	Close() error
}

Storage defines the interface for reputation score persistence. Implementations must be safe for concurrent use.

type SupplierKeyBuilder added in v1.0.6

type SupplierKeyBuilder struct{}

SupplierKeyBuilder creates keys with per-supplier granularity. All endpoint URLs from the same supplier share a score, tracked per RPC type. Key format: serviceID:supplierAddr:rpcType

func (*SupplierKeyBuilder) BuildKey added in v1.0.6

func (b *SupplierKeyBuilder) BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr, rpcType sharedtypes.RPCType) EndpointKey

BuildKey creates a key using only the supplier address and RPC type. If the endpoint address cannot be parsed, falls back to full address. The RPC type is always included to track separate scores for different protocols.

type SyncConfig

type SyncConfig struct {
	// RefreshInterval is how often to refresh scores from the backend storage.
	// Only applies when using shared storage (Redis).
	// Default: 5s
	RefreshInterval time.Duration `yaml:"refresh_interval"`

	// WriteBufferSize is the max number of pending writes to buffer before blocking.
	// Default: 1000
	WriteBufferSize int `yaml:"write_buffer_size"`

	// FlushInterval is how often to flush buffered writes to backend storage.
	// Default: 100ms
	FlushInterval time.Duration `yaml:"flush_interval"`
}

SyncConfig configures background synchronization for the reputation system.

func DefaultSyncConfig

func DefaultSyncConfig() SyncConfig

DefaultSyncConfig returns a SyncConfig with sensible defaults.

func (*SyncConfig) HydrateDefaults

func (s *SyncConfig) HydrateDefaults()

HydrateDefaults fills in zero values with defaults.

type TieredSelectionConfig added in v1.0.7

type TieredSelectionConfig struct {
	// Enabled enables/disables tiered selection.
	Enabled bool `yaml:"enabled,omitempty"`
	// Tier1Threshold is the minimum score for tier 1 (highest quality).
	Tier1Threshold float64 `yaml:"tier1_threshold,omitempty"`
	// Tier2Threshold is the minimum score for tier 2 (medium quality).
	Tier2Threshold float64 `yaml:"tier2_threshold,omitempty"`
	// Probation configures the probation system for recovering low-scoring endpoints.
	Probation ProbationConfig `yaml:"probation,omitempty"`
}

TieredSelectionConfig configures tier-based endpoint selection.

type TieredSelector added in v1.0.7

type TieredSelector struct {
	// contains filtered or unexported fields
}

TieredSelector selects endpoints using cascade-down tier logic. It groups endpoints into tiers based on their reputation scores and selects from the highest available tier.

func NewTieredSelector added in v1.0.7

func NewTieredSelector(config TieredSelectionConfig, minThreshold float64) *TieredSelector

NewTieredSelector creates a new TieredSelector with the given configuration.

func NewTieredSelectorWithLogger added in v1.0.10

func NewTieredSelectorWithLogger(logger polylog.Logger, config TieredSelectionConfig, minThreshold float64) *TieredSelector

NewTieredSelectorWithLogger creates a new TieredSelector with the given configuration and logger.

func (*TieredSelector) Config added in v1.0.7

Config returns the tiered selection configuration.

func (*TieredSelector) GroupByTier added in v1.0.7

func (s *TieredSelector) GroupByTier(endpoints map[EndpointKey]float64) (tier1, tier2, tier3 []EndpointKey)

GroupByTier groups endpoints into three tiers based on their scores. Tier 1 (Premium): score >= Tier1Threshold Tier 2 (Good): score >= Tier2Threshold and < Tier1Threshold Tier 3 (Fair): score >= MinThreshold and < Tier2Threshold Endpoints below MinThreshold are excluded (should already be filtered).

func (*TieredSelector) IsInProbation added in v1.0.10

func (s *TieredSelector) IsInProbation(key EndpointKey) bool

IsInProbation returns true if the endpoint is currently in probation.

func (*TieredSelector) MinThreshold added in v1.0.7

func (s *TieredSelector) MinThreshold() float64

MinThreshold returns the minimum threshold for Tier 3.

func (*TieredSelector) SelectEndpoint added in v1.0.7

func (s *TieredSelector) SelectEndpoint(endpoints map[EndpointKey]float64) (EndpointKey, int, error)

SelectEndpoint selects one endpoint using cascade-down tier logic. It returns the selected endpoint key and the tier it was selected from (1, 2, or 3). Returns ErrNoEndpointsAvailable if no endpoints are available in any tier.

func (*TieredSelector) SetLogger added in v1.0.10

func (s *TieredSelector) SetLogger(logger polylog.Logger)

SetLogger sets the logger for the TieredSelector. This can be used to add logging after creation.

func (*TieredSelector) ShouldRouteToProbation added in v1.0.10

func (s *TieredSelector) ShouldRouteToProbation() bool

ShouldRouteToProbation determines if this request should be routed to a probation endpoint. Returns true with probability = traffic_percent / 100. For example, if traffic_percent = 10, returns true 10% of the time.

func (*TieredSelector) TierForScore added in v1.0.7

func (s *TieredSelector) TierForScore(score float64) int

TierForScore returns the tier number (1, 2, or 3) for a given score. Returns 0 if the score is below the minimum threshold.

func (*TieredSelector) UpdateProbationStatus added in v1.0.10

func (s *TieredSelector) UpdateProbationStatus(endpoints map[EndpointKey]float64) []EndpointKey

UpdateProbationStatus updates probation tracking based on current scores. Returns the list of endpoints currently in probation. An endpoint enters probation when: score < probationThreshold AND score >= minThreshold An endpoint exits probation when: score >= probationThreshold

Directories

Path Synopsis
Package storage provides storage backend implementations for reputation scores.
Package storage provides storage backend implementations for reputation scores.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL