reputation

package
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2025 License: MIT Imports: 7 Imported by: 0

README

Reputation Package

The reputation package provides endpoint reputation tracking and scoring for PATH. It replaces binary sanctions with a gradual scoring system that tracks endpoint reliability over time.

Overview

The reputation system tracks endpoint performance through signals (success/error events) and maintains a score for each endpoint. Endpoints with scores below the threshold are excluded from selection for requests.

Key Features
  • Performance-optimized design: All reads from local cache (<1μs latency)
  • Async writes: Updates local cache immediately, syncs to backend asynchronously
  • Background refresh: Periodically syncs from shared storage (for multi-instance deployments)
  • Configurable thresholds: Tune initial scores, minimum thresholds, and impact values

Configuration

reputation:
  enabled: true
  initial_score: 80        # Starting score for new endpoints
  min_threshold: 30        # Minimum score to be eligible for selection
  recovery_timeout: 5m     # Time before low-scoring endpoints recover
  storage_type: memory     # Storage backend: "memory" or "redis"
  sync_config:
    refresh_interval: 5s   # How often to refresh from backend
    write_buffer_size: 1000  # Max pending writes before blocking
    flush_interval: 100ms  # How often to flush writes to backend

Score Range and Impacts

Scores range from 0 (worst) to 100 (best):

Signal Type Impact Description
Success +1 Request completed successfully
Recovery Success +15 Success from low-scoring endpoint (probation)
Minor Error -3 Validation or format errors
Major Error -10 Timeouts, connection failures
Critical Error -25 HTTP 5xx errors, service unavailable
Fatal Error -50 Configuration errors, invalid service

Default values:

  • Initial Score: 80 (new endpoints start here)
  • Minimum Threshold: 30 (below this, endpoint is excluded)
  • Recovery Timeout: 5m (low-scoring endpoints recover after this time)

Recovery Mechanism

When an endpoint's score falls below the threshold, it gets excluded from selection. To prevent endpoints from being "jailed" forever, the system automatically recovers endpoints that have been below threshold for longer than recovery_timeout:

  1. Endpoint fails repeatedly → score drops below threshold
  2. Endpoint is excluded from request selection
  3. After recovery_timeout (default 5m) with no signals
  4. Next time the endpoint is checked, score resets to initial_score
  5. Endpoint becomes eligible for selection again

This allows endpoints to recover from temporary issues (crashes, network problems, etc.) without manual intervention.

Usage

Recording Signals
key := reputation.NewEndpointKey("eth", "https://endpoint.example.com")

// On success
svc.RecordSignal(ctx, key, reputation.NewSuccessSignal(latency))

// On error
svc.RecordSignal(ctx, key, reputation.NewMajorErrorSignal("timeout", latency))
Filtering Endpoints
// Get endpoints above threshold
eligible, err := svc.FilterByScore(ctx, endpoints, config.MinThreshold)
Getting Scores
score, err := svc.GetScore(ctx, key)
fmt.Printf("Score: %.1f, Success: %d, Errors: %d\n",
    score.Value, score.SuccessCount, score.ErrorCount)

Architecture

┌─────────────────┐     ┌──────────────┐     ┌─────────────┐
│  Hot Path       │────▶│ Local Cache  │────▶│ Read <1μs   │
│  (Requests)     │     │ (sync.Map)   │     └─────────────┘
└─────────────────┘     └──────────────┘
                               │
                               │ Async Write
                               ▼
                        ┌──────────────┐     ┌─────────────┐
                        │ Write Buffer │────▶│   Storage   │
                        │   Channel    │     │ (Memory/    │
                        └──────────────┘     │  Redis)     │
                               ▲             └─────────────┘
                               │                    │
                        ┌──────────────┐            │
                        │  Background  │◀───────────┘
                        │   Refresh    │  Periodic Sync
                        └──────────────┘

Storage Backends

Memory (Default)
  • Single-instance only
  • Data lost on restart
  • Fastest performance
Redis (Future PR)
  • Shared across instances
  • Persistent storage
  • Required for multi-instance deployments

Thread Safety

The ReputationService is safe for concurrent use. All methods can be called from multiple goroutines without external synchronization.

Documentation

Overview

Package reputation provides endpoint reputation tracking and scoring.

The reputation system tracks the reliability and performance of endpoints based on their responses to requests. Endpoints accumulate a reputation score that influences their selection probability for future requests.

Key concepts:

  • Score: A numeric value (0-100) representing endpoint reliability
  • Signal: An event that affects an endpoint's score (success, error, timeout, etc.)
  • Storage: Backend for persisting scores (memory, Redis, etc.)

Index

Constants

View Source
const (
	// MinScore is the lowest possible reputation score.
	MinScore float64 = 0

	// MaxScore is the highest possible reputation score.
	MaxScore float64 = 100

	// InitialScore is the starting score for new endpoints.
	// Endpoints start with a moderately high score to give them a fair chance.
	InitialScore float64 = 80

	// DefaultMinThreshold is the minimum score required for endpoint selection.
	// Endpoints below this threshold are excluded from selection.
	DefaultMinThreshold float64 = 30
)

Score constants define the bounds and defaults for reputation scores.

View Source
const (
	// KeyGranularityEndpoint scores each endpoint URL separately (finest granularity).
	// Key format: serviceID:supplierAddr-endpointURL
	// This is the default behavior.
	KeyGranularityEndpoint = "per-endpoint"

	// KeyGranularityDomain scores all endpoints from the same hosting domain together.
	// Key format: serviceID:domain (e.g., eth:nodefleet.net)
	// Use when a hosting provider's overall reliability matters.
	KeyGranularityDomain = "per-domain"

	// KeyGranularitySupplier scores all endpoints from the same supplier together.
	// Key format: serviceID:supplierAddr
	// Use when a supplier's overall reliability matters more than individual endpoints.
	KeyGranularitySupplier = "per-supplier"
)

Key granularity options determine how endpoints are grouped for scoring. Ordered from finest to coarsest granularity.

View Source
const (
	// DefaultRecoveryTimeout is the duration after which low-scoring endpoints
	// are reset to initial score if they have no signals.
	DefaultRecoveryTimeout = 5 * time.Minute

	// DefaultRefreshInterval is how often to refresh from backend storage.
	DefaultRefreshInterval = 5 * time.Second

	// DefaultWriteBufferSize is the max pending writes before blocking.
	DefaultWriteBufferSize = 1000

	// DefaultFlushInterval is how often to flush buffered writes.
	DefaultFlushInterval = 100 * time.Millisecond
)

Recovery and SyncConfig defaults.

Variables

View Source
var (
	// ErrNotFound is returned when an endpoint's score is not found.
	ErrNotFound = errors.New("endpoint score not found")

	// ErrStorageClosed is returned when operations are attempted on a closed storage.
	ErrStorageClosed = errors.New("storage is closed")
)

Common errors returned by storage implementations.

Functions

func GetScoreImpact

func GetScoreImpact(t SignalType) float64

GetScoreImpact returns the default score impact for a signal type. Returns 0 if the signal type is not recognized.

Types

type Cleaner

type Cleaner interface {
	// Cleanup removes expired entries from storage.
	// This is called periodically by the service to prevent memory bloat.
	Cleanup()
}

Cleaner is an optional interface that storage backends can implement to support periodic cleanup of expired entries.

type Config

type Config struct {
	// Enabled determines if reputation tracking is active.
	// When false, all ReputationService methods are no-ops.
	Enabled bool `yaml:"enabled"`

	// InitialScore is the starting score for new endpoints.
	// Default: 80
	InitialScore float64 `yaml:"initial_score"`

	// MinThreshold is the minimum score for endpoint selection.
	// Default: 30
	MinThreshold float64 `yaml:"min_threshold"`

	// RecoveryTimeout is the duration after which a low-scoring endpoint
	// with no signals is reset to InitialScore. This allows endpoints
	// to recover from temporary failures (crashes, network issues, etc.).
	// Default: 5m
	RecoveryTimeout time.Duration `yaml:"recovery_timeout"`

	// StorageType specifies the storage backend ("memory" or "redis").
	// Default: "memory"
	StorageType string `yaml:"storage_type"`

	// KeyGranularity determines how endpoints are grouped for scoring (global default).
	// Options: "per-endpoint" (default), "per-domain", "per-supplier"
	// Default: "per-endpoint"
	KeyGranularity string `yaml:"key_granularity"`

	// ServiceOverrides allows per-service configuration overrides.
	// Use this to apply different granularity rules to specific services.
	ServiceOverrides map[string]ServiceConfig `yaml:"service_overrides,omitempty"`

	// SyncConfig configures background synchronization behavior.
	SyncConfig SyncConfig `yaml:"sync_config"`

	// Redis holds Redis-specific configuration (only used when StorageType is "redis").
	Redis *RedisConfig `yaml:"redis,omitempty"`
}

Config holds configuration for the reputation system.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults.

func (*Config) HydrateDefaults

func (c *Config) HydrateDefaults()

HydrateDefaults fills in zero values with defaults.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks that the configuration values are valid. Returns an error if any values are out of bounds or inconsistent.

type DomainKeyBuilder added in v1.0.6

type DomainKeyBuilder struct{}

DomainKeyBuilder creates keys with per-domain granularity. All endpoints from the same hosting domain share a score. Key format: serviceID:domain (e.g., eth:nodefleet.net)

func (*DomainKeyBuilder) BuildKey added in v1.0.6

func (b *DomainKeyBuilder) BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey

BuildKey creates a key using the domain extracted from the endpoint URL. If the domain cannot be extracted, falls back to full endpoint address.

type EndpointKey

type EndpointKey struct {
	ServiceID    protocol.ServiceID
	EndpointAddr protocol.EndpointAddr
}

EndpointKey uniquely identifies an endpoint for reputation tracking. It combines the service ID and endpoint address to create a unique key.

func NewEndpointKey

func NewEndpointKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey

NewEndpointKey creates a new EndpointKey from service ID and endpoint address.

func (EndpointKey) String

func (k EndpointKey) String() string

String returns a string representation of the endpoint key.

type EndpointKeyBuilder added in v1.0.6

type EndpointKeyBuilder struct{}

EndpointKeyBuilder creates keys with per-endpoint granularity. Each endpoint URL is scored separately. Key format: serviceID:supplierAddr-endpointURL

func (*EndpointKeyBuilder) BuildKey added in v1.0.6

func (b *EndpointKeyBuilder) BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey

BuildKey creates a key using the full endpoint address.

type KeyBuilder added in v1.0.6

type KeyBuilder interface {
	// BuildKey creates an EndpointKey for the given service and endpoint.
	BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey
}

KeyBuilder creates EndpointKeys with a specific granularity. Different implementations group endpoints differently for scoring.

func NewKeyBuilder added in v1.0.6

func NewKeyBuilder(granularity string) KeyBuilder

NewKeyBuilder creates a KeyBuilder for the specified granularity. If the granularity is invalid or empty, it defaults to per-endpoint.

type RedisConfig

type RedisConfig struct {
	// Address is the Redis server address (host:port).
	Address string `yaml:"address"`

	// Password for Redis authentication (optional).
	Password string `yaml:"password"`

	// DB is the Redis database number.
	DB int `yaml:"db"`

	// KeyPrefix is prepended to all keys stored in Redis.
	// Default: "path:reputation:"
	KeyPrefix string `yaml:"key_prefix"`

	// PoolSize is the maximum number of socket connections.
	// Default: 10
	PoolSize int `yaml:"pool_size"`

	// DialTimeout is the timeout for establishing new connections.
	// Default: 5s
	DialTimeout time.Duration `yaml:"dial_timeout"`

	// ReadTimeout is the timeout for socket reads.
	// Default: 3s
	ReadTimeout time.Duration `yaml:"read_timeout"`

	// WriteTimeout is the timeout for socket writes.
	// Default: 3s
	WriteTimeout time.Duration `yaml:"write_timeout"`
}

RedisConfig holds Redis-specific configuration.

func DefaultRedisConfig

func DefaultRedisConfig() RedisConfig

DefaultRedisConfig returns a RedisConfig with sensible defaults.

func (*RedisConfig) HydrateDefaults

func (c *RedisConfig) HydrateDefaults()

HydrateDefaults fills in zero values with defaults.

type ReputationService

type ReputationService interface {
	// RecordSignal records a signal event for an endpoint.
	// Updates local cache immediately and queues async write to backend storage.
	// This method is non-blocking - it returns immediately after updating local cache.
	RecordSignal(ctx context.Context, key EndpointKey, signal Signal) error

	// GetScore retrieves the current reputation score for an endpoint.
	// Always reads from local cache for minimal latency (<1μs).
	// Returns the score or an error if the endpoint is not found.
	GetScore(ctx context.Context, key EndpointKey) (Score, error)

	// GetScores retrieves reputation scores for multiple endpoints.
	// Always reads from local cache for minimal latency.
	// Returns a map of endpoint keys to scores. Missing endpoints are omitted.
	GetScores(ctx context.Context, keys []EndpointKey) (map[EndpointKey]Score, error)

	// FilterByScore filters endpoints that meet the minimum score threshold.
	// Always reads from local cache for minimal latency.
	// Returns only endpoints with scores >= minThreshold.
	FilterByScore(ctx context.Context, keys []EndpointKey, minThreshold float64) ([]EndpointKey, error)

	// ResetScore resets an endpoint's score to the initial value.
	// Used for administrative purposes or testing.
	ResetScore(ctx context.Context, key EndpointKey) error

	// KeyBuilderForService returns the KeyBuilder for the given service.
	// Uses service-specific config if available, otherwise falls back to global default.
	KeyBuilderForService(serviceID protocol.ServiceID) KeyBuilder

	// Start begins background sync processes (e.g., Redis pub/sub, periodic refresh).
	// Should be called once during initialization.
	Start(ctx context.Context) error

	// Stop gracefully shuts down background processes and flushes pending writes.
	Stop() error
}

ReputationService provides endpoint reputation tracking and querying. Implementations must be safe for concurrent use.

Performance design:

  • All reads (GetScore, FilterByScore) are served from local in-memory cache (<1μs)
  • Writes (RecordSignal) update local cache immediately + async sync to backend
  • Background refresh keeps local cache in sync with shared backend (Redis)

This ensures the hot path (request handling) is never blocked by storage latency.

func NewService

func NewService(config Config, store Storage) ReputationService

NewService creates a new ReputationService with the given configuration. The service must be started with Start() before use.

type Score

type Score struct {
	// Value is the current score (0-100).
	// Higher scores indicate more reliable endpoints.
	Value float64

	// LastUpdated is when the score was last modified.
	LastUpdated time.Time

	// SuccessCount is the total number of successful requests.
	SuccessCount int64

	// ErrorCount is the total number of failed requests.
	ErrorCount int64
}

Score represents an endpoint's reputation score at a point in time.

func (Score) IsValid

func (s Score) IsValid() bool

IsValid returns true if the score is within valid bounds.

type ServiceConfig added in v1.0.6

type ServiceConfig struct {
	// KeyGranularity overrides the global key granularity for this service.
	// Options: "per-endpoint", "per-domain", "per-supplier"
	KeyGranularity string `yaml:"key_granularity"`
}

ServiceConfig holds service-specific reputation configuration overrides.

type Signal

type Signal struct {
	// Type categorizes the signal's severity.
	Type SignalType

	// Timestamp when the signal was generated.
	Timestamp time.Time

	// Latency of the request that generated this signal (if applicable).
	Latency time.Duration

	// Reason provides additional context for the signal.
	Reason string

	// Metadata holds additional signal-specific data.
	Metadata map[string]string
}

Signal represents an event that affects an endpoint's reputation.

func NewCriticalErrorSignal

func NewCriticalErrorSignal(reason string, latency time.Duration) Signal

NewCriticalErrorSignal creates a signal for a critical error (HTTP 5xx).

func NewFatalErrorSignal

func NewFatalErrorSignal(reason string) Signal

NewFatalErrorSignal creates a signal for a fatal error. This replaces the concept of "permanent sanctions" - endpoints can recover.

func NewMajorErrorSignal

func NewMajorErrorSignal(reason string, latency time.Duration) Signal

NewMajorErrorSignal creates a signal for a major error (timeout, connection issues).

func NewMinorErrorSignal

func NewMinorErrorSignal(reason string) Signal

NewMinorErrorSignal creates a signal for a minor error.

func NewRecoverySuccessSignal

func NewRecoverySuccessSignal(latency time.Duration) Signal

NewRecoverySuccessSignal creates a signal for a successful request from a low-scoring endpoint. This has higher impact (+15) than regular success (+1) to allow endpoints to recover faster when proving they're healthy. Intended for use by Probation system (PR 7) and Health checks (PR 9).

func NewSuccessSignal

func NewSuccessSignal(latency time.Duration) Signal

NewSuccessSignal creates a signal for a successful request.

func (Signal) GetDefaultImpact

func (s Signal) GetDefaultImpact() float64

GetDefaultImpact returns the default score impact for this signal.

func (Signal) IsNegative

func (s Signal) IsNegative() bool

IsNegative returns true if this signal has a negative impact on score.

func (Signal) IsPositive

func (s Signal) IsPositive() bool

IsPositive returns true if this signal has a positive impact on score.

type SignalType

type SignalType string

SignalType categorizes signals by their impact on reputation.

const (
	// SignalTypeSuccess indicates a successful request/response.
	SignalTypeSuccess SignalType = "success"

	// SignalTypeMinorError indicates a minor error (validation issues, unknown errors).
	SignalTypeMinorError SignalType = "minor_error"

	// SignalTypeMajorError indicates a major error (timeout, connection issues).
	SignalTypeMajorError SignalType = "major_error"

	// SignalTypeCriticalError indicates a critical error (HTTP 5xx, transport errors).
	SignalTypeCriticalError SignalType = "critical_error"

	// SignalTypeFatalError indicates a fatal error (service misconfiguration).
	// Was previously "permanent sanction" - now just a severe score penalty.
	SignalTypeFatalError SignalType = "fatal_error"

	// SignalTypeRecoverySuccess indicates a successful request from a low-scoring endpoint.
	// This signal has higher impact (+15) than regular success (+1) to allow endpoints
	// to recover faster when they prove they're healthy again.
	// Intended for use by:
	//   - Probation system (PR 7): when sampling traffic to low-scoring endpoints
	//   - Health checks (PR 9): when probing excluded endpoints
	SignalTypeRecoverySuccess SignalType = "recovery_success"
)

type Storage

type Storage interface {
	// Get retrieves the score for an endpoint.
	// Returns ErrNotFound if the endpoint has no stored score.
	Get(ctx context.Context, key EndpointKey) (Score, error)

	// GetMultiple retrieves scores for multiple endpoints.
	// Returns a map containing only the endpoints that were found.
	// Missing endpoints are omitted from the result (no error).
	GetMultiple(ctx context.Context, keys []EndpointKey) (map[EndpointKey]Score, error)

	// Set stores or updates the score for an endpoint.
	Set(ctx context.Context, key EndpointKey, score Score) error

	// SetMultiple stores or updates scores for multiple endpoints.
	SetMultiple(ctx context.Context, scores map[EndpointKey]Score) error

	// Delete removes the score for an endpoint.
	// Returns nil if the endpoint doesn't exist.
	Delete(ctx context.Context, key EndpointKey) error

	// List returns all stored endpoint keys for a service.
	// If serviceID is empty, returns all endpoint keys.
	List(ctx context.Context, serviceID string) ([]EndpointKey, error)

	// Close releases any resources held by the storage.
	Close() error
}

Storage defines the interface for reputation score persistence. Implementations must be safe for concurrent use.

type SupplierKeyBuilder added in v1.0.6

type SupplierKeyBuilder struct{}

SupplierKeyBuilder creates keys with per-supplier granularity. All endpoint URLs from the same supplier share a score. Key format: serviceID:supplierAddr

func (*SupplierKeyBuilder) BuildKey added in v1.0.6

func (b *SupplierKeyBuilder) BuildKey(serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr) EndpointKey

BuildKey creates a key using only the supplier address. If the endpoint address cannot be parsed, falls back to full address.

type SyncConfig

type SyncConfig struct {
	// RefreshInterval is how often to refresh scores from the backend storage.
	// Only applies when using shared storage (Redis).
	// Default: 5s
	RefreshInterval time.Duration `yaml:"refresh_interval"`

	// WriteBufferSize is the max number of pending writes to buffer before blocking.
	// Default: 1000
	WriteBufferSize int `yaml:"write_buffer_size"`

	// FlushInterval is how often to flush buffered writes to backend storage.
	// Default: 100ms
	FlushInterval time.Duration `yaml:"flush_interval"`
}

SyncConfig configures background synchronization for the reputation system.

func DefaultSyncConfig

func DefaultSyncConfig() SyncConfig

DefaultSyncConfig returns a SyncConfig with sensible defaults.

func (*SyncConfig) HydrateDefaults

func (s *SyncConfig) HydrateDefaults()

HydrateDefaults fills in zero values with defaults.

Directories

Path Synopsis
Package storage provides storage backend implementations for reputation scores.
Package storage provides storage backend implementations for reputation scores.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL