Documentation
¶
Overview ¶
Package gateway implements components for operating a gateway service.
Protocol (Shannon): - Provide available endpoints for a service - Send relays to specific endpoints
Gateways: - Select endpoints for relay transmission
QoS Services: - Interpret user requests into endpoint payloads - Select optimal endpoints for request handling
TODO_MVP(@adshmh): add a README with a diagram of all the above. TODO_MVP(@adshmh): add a section for the following packages once they are added: Metrics, Message.
Package gateway provides configuration types for pluggable health checks.
These types are defined in the gateway package to avoid import cycles, since protocol/shannon imports gateway.
Package gateway provides the health check executor for pluggable health checks.
The HealthCheckExecutor executes YAML-configurable health checks against endpoints through the protocol layer (sending synthetic relay requests) and records results to the reputation system.
Unlike direct HTTP calls, health checks are sent through the protocol just like regular user requests, ensuring the full path (including relay miners) is tested.
Supported health check types:
- jsonrpc: JSON-RPC endpoints (HTTP POST with JSON body)
- rest: REST endpoints (HTTP GET/POST)
- websocket: WebSocket endpoints (connect, optionally send/receive message)
- grpc: gRPC endpoints (future implementation)
Package gateway provides leader election for health checks in distributed deployments.
The LeaderElector ensures only one PATH instance runs health checks at a time, preventing duplicate traffic to endpoints and multiple reputation updates.
Redis Operations ¶
Acquisition uses SET NX EX (atomic set-if-not-exists with expiry).
Renewal and release use Lua scripts for atomicity. Why Lua scripts? These operations require "check-then-act" logic (e.g., "if I own the key, extend it"). Without atomicity, a race condition can occur:
Instance A: GET key → sees itself as leader
(key expires here, Instance B acquires leadership)
Instance A: EXPIRE key → accidentally extends B's lock!
Lua scripts execute atomically on the Redis server - the entire script runs as a single uninterruptible operation. No other Redis commands can execute between the GET and EXPIRE, eliminating the race condition.
The scripts are sent to Redis on first use via EVAL, then cached by Redis and referenced by SHA1 hash (EVALSHA) for subsequent calls. No pre-registration or Redis configuration is required.
Package gateway provides the health check QoS context for pluggable health checks.
HealthCheckQoSContext implements RequestQoSContext for YAML-configurable health checks. It allows sending configured payloads through the protocol layer and validating responses.
Package gateway provides the default observation handler for async processing.
Package gateway provides async observation processing for QoS data extraction.
The ObservationQueue enables non-blocking response processing by separating the hot path (storing bytes + recording reputation signals) from the heavy parsing work (extracting block_height, chain_id, etc.).
Architecture:
Response from Endpoint
│
▼
┌─────────────────────────────────────┐
│ UpdateWithResponse (FAST) │
├─────────────────────────────────────┤
│ 1. Store raw bytes (always) │ ← for client write-back
│ 2. Record reputation (always) │ ← status + latency (no parsing)
│ 3. Queue for parsing (sampled) │ ← async, non-blocking
└─────────────────────────────────────┘
│ │
▼ ▼ (async worker)
Write to Client Deep parse response
(immediate) Update endpointStore
This design ensures:
- Client latency is minimal (no parsing in hot path)
- Reputation gets 100% of status/latency signals
- Heavy parsing (JSON decode, validation) is sampled and async
Package gateway provides unified service configuration types.
These types are defined in the gateway package to avoid import cycles, since protocol/shannon imports gateway.
Index ¶
- Constants
- Variables
- func GetAllValidRPCTypeStrings() []string
- func GetHTTPBasedRPCTypeStrings() []string
- func IsBuiltInLatencyProfile(name string) bool
- func IsHTTPBasedRPCType(rpcTypeStr string) bool
- func NewRPCTypeValidationErrorResponse(serviceID protocol.ServiceID, detectedType string, allowedRPCTypes []string, ...) *rpcTypeValidationErrorResponse
- func NewServiceNotConfiguredErrorResponse(serviceID protocol.ServiceID, availableServices []protocol.ServiceID, ...) *rpcTypeValidationErrorResponse
- func ValidateServiceType(t ServiceType) error
- type ActiveHealthChecksConfig
- type BlockHeightOperator
- type BlockHeightReference
- type BlockHeightReferenceType
- type BlockHeightValidation
- type BlockHeightValidator
- type ConcurrencyConfig
- type DefaultObservationHandler
- type EndpointInfo
- type ErrorDetection
- type ErrorPattern
- type ErrorStatusCode
- type ExternalConfigSource
- type ExternalReferenceCache
- type Gateway
- type HTTPRequestParser
- type HealthCheckConfig
- type HealthCheckExecutor
- func (e *HealthCheckExecutor) ExecuteCheckViaProtocol(ctx context.Context, serviceID protocol.ServiceID, ...) (time.Duration, error)
- func (e *HealthCheckExecutor) ExecuteWebSocketCheckViaProtocol(ctx context.Context, serviceID protocol.ServiceID, ...) (time.Duration, error)
- func (e *HealthCheckExecutor) GetConfigForService(serviceID protocol.ServiceID) *ServiceHealthCheckConfig
- func (e *HealthCheckExecutor) GetServiceConfigs() []ServiceHealthCheckConfig
- func (e *HealthCheckExecutor) InitExternalConfig(ctx context.Context)
- func (e *HealthCheckExecutor) IsEnabled() bool
- func (e *HealthCheckExecutor) RunAllChecksViaProtocol(ctx context.Context, ...) error
- func (e *HealthCheckExecutor) RunChecksForEndpointViaProtocol(ctx context.Context, serviceID protocol.ServiceID, ...) map[string]error
- func (e *HealthCheckExecutor) SetQoSInstances(instances map[protocol.ServiceID]QoSService)
- func (e *HealthCheckExecutor) ShouldRunChecks() bool
- func (e *HealthCheckExecutor) Stop()
- type HealthCheckExecutorConfig
- type HealthCheckQoSContext
- func (hc *HealthCheckQoSContext) GetEndpointSelector() protocol.EndpointSelector
- func (hc *HealthCheckQoSContext) GetError() string
- func (hc *HealthCheckQoSContext) GetHTTPResponse() pathhttp.HTTPResponse
- func (hc *HealthCheckQoSContext) GetObservations() qosobservations.Observations
- func (hc *HealthCheckQoSContext) GetServicePayloads() []protocol.Payload
- func (hc *HealthCheckQoSContext) IsSuccess() bool
- func (hc *HealthCheckQoSContext) UpdateWithResponse(endpointAddr protocol.EndpointAddr, endpointSerializedResponse []byte, ...)
- type HealthCheckQoSContextConfig
- type HealthCheckType
- type HealthChecksConfig
- type JSONRPCErrorCode
- type LatencyProfileConfig
- type LeaderElectionConfig
- type LeaderElector
- type LeaderElectorConfig
- type ObservationHandler
- type ObservationPipelineConfig
- type ObservationQueue
- func (q *ObservationQueue) GetMetrics() (queued, processed, dropped, skipped int64)
- func (q *ObservationQueue) GetSampleRate() float64
- func (q *ObservationQueue) IsEnabled() bool
- func (q *ObservationQueue) ProcessSync(obs *QueuedObservation)
- func (q *ObservationQueue) SetHandler(handler ObservationHandler)
- func (q *ObservationQueue) SetPerServiceRate(serviceID protocol.ServiceID, rate float64)
- func (q *ObservationQueue) SetRegistry(registry *qostypes.ExtractorRegistry)
- func (q *ObservationQueue) Stop()
- func (q *ObservationQueue) Submit(obs *QueuedObservation) bool
- func (q *ObservationQueue) TryQueue(obs *QueuedObservation) bool
- type ObservationQueueConfig
- type ObservationSource
- type ParentConfigDefaults
- type PassthroughConfig
- type Protocol
- type ProtocolRequestContext
- type ProtocolRequestContextWebsocket
- type QoSContextBuilder
- type QoSEndpointCheckGenerator
- type QoSService
- type QueuedObservation
- type RPCTypeDetector
- type RPCTypeMapper
- type RPCTypeValidator
- type RequestQoSContext
- type RequestResponseReporter
- type RetryConfig
- type ServiceConcurrencyConfig
- type ServiceConfig
- type ServiceDefaults
- type ServiceFallbackConfig
- type ServiceHealthCheckConfig
- type ServiceHealthCheckOverride
- type ServiceLatencyConfig
- type ServiceObservationConfig
- type ServiceProbationConfig
- type ServiceReputationConfig
- type ServiceRetryConfig
- type ServiceTieredSelectionConfig
- type ServiceType
- type UnifiedServicesConfig
- func (c *UnifiedServicesConfig) GetConfiguredServiceIDs() []protocol.ServiceID
- func (c *UnifiedServicesConfig) GetLatencyProfile(name string) *LatencyProfileConfig
- func (c *UnifiedServicesConfig) GetMergedServiceConfig(serviceID protocol.ServiceID) *ServiceConfig
- func (c *UnifiedServicesConfig) GetServiceConfig(serviceID protocol.ServiceID) *ServiceConfig
- func (c *UnifiedServicesConfig) GetServiceRPCTypes(serviceID protocol.ServiceID) []string
- func (c *UnifiedServicesConfig) GetServiceType(serviceID protocol.ServiceID) ServiceType
- func (c *UnifiedServicesConfig) GetSyncAllowanceForService(serviceID protocol.ServiceID) uint64
- func (c *UnifiedServicesConfig) HasService(serviceID protocol.ServiceID) bool
- func (c *UnifiedServicesConfig) HasServices() bool
- func (c *UnifiedServicesConfig) HydrateDefaults()
- func (c *UnifiedServicesConfig) SetDefaultsFromParent(parent ParentConfigDefaults)
- func (c *UnifiedServicesConfig) Validate() error
Constants ¶
const ( DefaultHealthCheckTimeout = 5 * time.Second DefaultHealthCheckInterval = 10 * time.Second DefaultLeaderLeaseDuration = 30 * time.Second DefaultLeaderRenewInterval = 10 * time.Second DefaultLeaderKey = "path:health_check_leader" DefaultExternalConfigTimeout = 30 * time.Second DefaultExpectedStatusCode = 200 DefaultReputationSignal = "minor_error" // DefaultSyncAllowance is the default number of blocks behind the latest block // that an endpoint can be before it's considered out of sync. // 0 means disabled (no sync allowance check). DefaultSyncAllowance = 0 )
Health check configuration defaults
const ( // DefaultObservationPipelineSampleRate is the default fraction of requests to deep-parse. // 10% provides good coverage while minimizing latency impact. DefaultObservationPipelineSampleRate = 0.1 // DefaultObservationPipelineWorkerCount is the default number of parsing workers. DefaultObservationPipelineWorkerCount = 4 // DefaultObservationPipelineQueueSize is the default observation queue size. DefaultObservationPipelineQueueSize = 1000 )
Observation pipeline configuration defaults
const ( DefaultExternalReferenceTimeout = 5 * time.Second DefaultExternalReferenceCacheDuration = 10 * time.Second )
Default values for block height validation
const ( // APIVersionPrefix is the prefix for the API version. // It is used and removed by the routing middleware. // // Example: // // /v1/{rest_path_1} -> /{rest_path_1} // /v1/{rest_path_1}/{rest_path_2} -> /{rest_path_1}/{rest_path_2} // /v1/{rest_path_1}/{rest_path_2}/... -> /{rest_path_1}/{rest_path_2}/... APIVersionPrefix = "/v1" // HttpHeaderPortalAppID is the header key for the portal app ID. // // It is also used by the routing middleware in `router.go` to ensure // the portal app ID is not present in the request path that is forwarded to // the service endpoint. // // Example: // // /1a2b3c4d/path/segment -> /path/segment // /1a2b3c4d/path -> /path HttpHeaderPortalAppID = "Portal-Application-ID" // Portal API Key HTTP header. HttpHeaderAuthorization = "Authorization" )
const ( DefaultObservationSampleRate = 0.1 // 10% of requests get deep parsing DefaultObservationWorkerCount = 4 // 4 worker goroutines DefaultObservationQueueSize = 1000 // 1000 pending observations max )
Default configuration values.
const ( RPCTypeStringJSONRPC = "json_rpc" RPCTypeStringREST = "rest" RPCTypeStringCometBFT = "comet_bft" RPCTypeStringWebSocket = "websocket" RPCTypeStringGRPC = "grpc" )
Canonical RPC type string values used in configuration. These exact strings must be used in: - Service rpc_types config - Health check type field - RPC-Type HTTP header (for detection optimization)
const DefaultWebsocketMessageBufferSize = 100
DefaultWebsocketMessageBufferSize is the default buffer size for websocket message observations. This should match the default in config/router.go. At 100: 100 × ~3KB × 100 connections = ~30MB.
const ( // JSONRPCInvalidRequest represents -32600: Invalid Request // Used when the RPC type is not supported by the service JSONRPCInvalidRequest = -32600 )
JSON-RPC error codes
const RPCTypeHeader = "RPC-Type"
Standard HTTP header name for explicit RPC type specification. No X- prefix (deprecated by RFC 6648). Clients can send this header to bypass detection and improve performance.
const ( // RelayRequestTimeout is the timeout for relay requests // TODO_TECHDEBT: Look into whether we can remove this variable altogether and consolidate // it with HTTP level timeouts. RelayRequestTimeout = 60 * time.Second )
Variables ¶
var ( // ErrUnsupportedRPCType is returned when a detected RPC type is not in service's rpc_types config ErrUnsupportedRPCType = fmt.Errorf("unsupported RPC type for service") // ErrRPCTypeDetectionFailed is returned when RPC type detection fails ErrRPCTypeDetectionFailed = fmt.Errorf("RPC type detection failed") // ErrServiceNotConfigured is returned when a requested service is not configured ErrServiceNotConfigured = fmt.Errorf("service not configured") )
var ( // no service ID was provided by the user. ErrGatewayNoServiceIDProvided = errors.New("no service ID provided") )
Publicly exposed errors
Functions ¶
func GetAllValidRPCTypeStrings ¶ added in v1.0.10
func GetAllValidRPCTypeStrings() []string
GetAllValidRPCTypeStrings returns a list of all valid RPC type configuration strings. This is useful for displaying available options in error messages and documentation.
func GetHTTPBasedRPCTypeStrings ¶ added in v1.0.10
func GetHTTPBasedRPCTypeStrings() []string
GetHTTPBasedRPCTypeStrings returns only the RPC types that use HTTP delivery. This excludes websocket and grpc which use different transport protocols.
func IsBuiltInLatencyProfile ¶ added in v1.0.10
IsBuiltInLatencyProfile returns true if the profile name is a built-in profile.
func IsHTTPBasedRPCType ¶ added in v1.0.10
IsHTTPBasedRPCType checks if an RPC type string represents an HTTP-based protocol.
func NewRPCTypeValidationErrorResponse ¶ added in v1.0.10
func NewRPCTypeValidationErrorResponse( serviceID protocol.ServiceID, detectedType string, allowedRPCTypes []string, errorMessage string, ) *rpcTypeValidationErrorResponse
NewRPCTypeValidationErrorResponse creates a new error response for RPC type validation failures
func NewServiceNotConfiguredErrorResponse ¶ added in v1.0.10
func NewServiceNotConfiguredErrorResponse( serviceID protocol.ServiceID, availableServices []protocol.ServiceID, errorMessage string, ) *rpcTypeValidationErrorResponse
NewServiceNotConfiguredErrorResponse creates a new error response for unconfigured services
func ValidateServiceType ¶ added in v1.0.10
func ValidateServiceType(t ServiceType) error
ValidateServiceType validates that a service type is one of the supported types.
Types ¶
type ActiveHealthChecksConfig ¶ added in v1.0.10
type ActiveHealthChecksConfig struct {
// Enabled enables/disables the active health check system.
Enabled bool `yaml:"enabled,omitempty"`
// SyncAllowance is the default number of blocks behind the latest block that an endpoint
// can be before it's considered out of sync. Per-service overrides can set different values.
// 0 means disabled (no sync allowance check). Default: 0 (disabled).
SyncAllowance int `yaml:"sync_allowance,omitempty"`
// MaxWorkers is the maximum number of concurrent health check workers.
// Higher values allow faster health check cycles but increase load on endpoints.
// Default: 10 workers
MaxWorkers int `yaml:"max_workers,omitempty"`
// Coordination configures leader election for distributed deployments.
Coordination LeaderElectionConfig `yaml:"coordination,omitempty"`
// External is an optional external URL for health check rules.
// Local rules override external rules on conflict.
External *ExternalConfigSource `yaml:"external,omitempty"`
// Local defines health checks in the local config.
// These override any checks from External with the same service_id + name.
Local []ServiceHealthCheckConfig `yaml:"local,omitempty"`
}
ActiveHealthChecksConfig is the top-level configuration for active (proactive) health checks. Active health checks send periodic test requests to endpoints to detect issues before user traffic. This replaces hardcoded QoS checks with YAML-configurable checks.
func (*ActiveHealthChecksConfig) HydrateDefaults ¶ added in v1.0.10
func (hc *ActiveHealthChecksConfig) HydrateDefaults(hasRedis bool)
HydrateDefaults applies default values to ActiveHealthChecksConfig.
func (*ActiveHealthChecksConfig) Validate ¶ added in v1.0.10
func (hc *ActiveHealthChecksConfig) Validate() error
Validate validates the ActiveHealthChecksConfig. Returns an error if validation fails.
func (*ActiveHealthChecksConfig) ValidateUniqueness ¶ added in v1.0.10
func (hc *ActiveHealthChecksConfig) ValidateUniqueness() error
ValidateUniqueness ensures that service_id + check.name combinations are unique.
type BlockHeightOperator ¶ added in v1.0.12
type BlockHeightOperator string
BlockHeightOperator defines the comparison operator for block height validation.
const ( // BlockHeightOperatorGreaterThanOrEqual checks if endpoint height >= reference - tolerance. BlockHeightOperatorGreaterThanOrEqual BlockHeightOperator = ">=" // BlockHeightOperatorGreaterThan checks if endpoint height > reference - tolerance. BlockHeightOperatorGreaterThan BlockHeightOperator = ">" // BlockHeightOperatorLessThanOrEqual checks if endpoint height <= reference + tolerance. BlockHeightOperatorLessThanOrEqual BlockHeightOperator = "<=" // BlockHeightOperatorLessThan checks if endpoint height < reference + tolerance. BlockHeightOperatorLessThan BlockHeightOperator = "<" // BlockHeightOperatorEqual checks if endpoint height == reference (within tolerance if set). BlockHeightOperatorEqual BlockHeightOperator = "==" )
type BlockHeightReference ¶ added in v1.0.12
type BlockHeightReference struct {
// Type is the reference source: "static", "external", "perceived"
Type BlockHeightReferenceType `yaml:"type"`
// Value is the static block height (only for type="static")
Value int64 `yaml:"value,omitempty"`
// Endpoint is the external RPC endpoint to query (only for type="external")
// Example: "https://arb-mainnet.g.alchemy.com/v2/demo"
Endpoint string `yaml:"endpoint,omitempty"`
// Method is the RPC method to call on external endpoint (only for type="external")
// Example: "eth_blockNumber" for EVM chains
Method string `yaml:"method,omitempty"`
// Tolerance is the allowed difference (only for type="external" and type="perceived")
// For external: endpoint_height >= (external_height - tolerance)
// For perceived: endpoint_height >= (perceived_height - tolerance)
// Default: 0 (no tolerance)
Tolerance int64 `yaml:"tolerance,omitempty"`
// Headers for external endpoint authentication (only for type="external")
Headers map[string]string `yaml:"headers,omitempty"`
// Timeout for external endpoint query (default: 5s)
Timeout time.Duration `yaml:"timeout,omitempty"`
// CacheDuration how long to cache external endpoint response (default: 10s)
// Prevents hammering external endpoint on every health check
CacheDuration time.Duration `yaml:"cache_duration,omitempty"`
}
BlockHeightReference specifies the source of the reference block height.
func (*BlockHeightReference) HydrateDefaults ¶ added in v1.0.12
func (bhr *BlockHeightReference) HydrateDefaults()
HydrateDefaults applies default values to BlockHeightReference.
func (*BlockHeightReference) Validate ¶ added in v1.0.12
func (bhr *BlockHeightReference) Validate(checkName string) error
Validate validates the BlockHeightReference configuration.
type BlockHeightReferenceType ¶ added in v1.0.12
type BlockHeightReferenceType string
BlockHeightReferenceType defines the source of the reference block height.
const ( // BlockHeightReferenceTypeStatic uses a fixed block height value. BlockHeightReferenceTypeStatic BlockHeightReferenceType = "static" // BlockHeightReferenceTypeExternal queries an external endpoint for the reference height. BlockHeightReferenceTypeExternal BlockHeightReferenceType = "external" // BlockHeightReferenceTypePerceived uses the QoS perceived block number. BlockHeightReferenceTypePerceived BlockHeightReferenceType = "perceived" )
type BlockHeightValidation ¶ added in v1.0.12
type BlockHeightValidation struct {
// Operator is the comparison operator: "<", ">", "<=", ">=", "=="
Operator BlockHeightOperator `yaml:"operator"`
// Reference specifies where to get the reference value for comparison.
Reference BlockHeightReference `yaml:"reference"`
}
BlockHeightValidation configures block height comparison checks. This allows health checks to validate that an endpoint's block height meets certain criteria (e.g., not too far behind a reference value).
func (*BlockHeightValidation) Validate ¶ added in v1.0.12
func (bhv *BlockHeightValidation) Validate(checkName string) error
Validate validates the BlockHeightValidation configuration.
type BlockHeightValidator ¶ added in v1.0.12
type BlockHeightValidator struct {
// contains filtered or unexported fields
}
BlockHeightValidator validates block heights from health check responses.
func NewBlockHeightValidator ¶ added in v1.0.12
func NewBlockHeightValidator( logger polylog.Logger, externalCache *ExternalReferenceCache, ) *BlockHeightValidator
NewBlockHeightValidator creates a new block height validator.
func (*BlockHeightValidator) SetQoSInstances ¶ added in v1.0.12
func (v *BlockHeightValidator) SetQoSInstances(instances map[protocol.ServiceID]QoSService)
SetQoSInstances sets the QoS instances for accessing perceived block numbers.
func (*BlockHeightValidator) ValidateBlockHeight ¶ added in v1.0.12
func (v *BlockHeightValidator) ValidateBlockHeight( ctx context.Context, serviceID protocol.ServiceID, responseBody []byte, validation *BlockHeightValidation, ) error
ValidateBlockHeight validates a health check response's block height against the configured reference. Returns an error if the validation fails, nil if it passes.
type ConcurrencyConfig ¶ added in v1.0.10
type ConcurrencyConfig struct {
// MaxParallelEndpoints is the maximum number of endpoints to query in parallel per request.
// Higher values reduce latency but increase load. Range: 1-10. Default: 1.
MaxParallelEndpoints int `yaml:"max_parallel_endpoints,omitempty"`
// MaxConcurrentRelays is the global limit on concurrent relay goroutines across all requests.
// Prevents resource exhaustion from too many simultaneous relays. Range: 100-10000. Default: 5500.
MaxConcurrentRelays int `yaml:"max_concurrent_relays,omitempty"`
// MaxBatchPayloads is the maximum number of payloads allowed in a batch request.
// Must be less than or equal to MaxConcurrentRelays. Range: 1-10000. Default: 5500.
MaxBatchPayloads int `yaml:"max_batch_payloads,omitempty"`
}
ConcurrencyConfig defines global concurrency limits for request processing. These limits protect against resource exhaustion from batch requests and parallel relays.
type DefaultObservationHandler ¶ added in v1.0.10
type DefaultObservationHandler struct {
// contains filtered or unexported fields
}
DefaultObservationHandler is the default implementation of ObservationHandler. It logs extracted data at debug level and updates QoS state with extracted data (e.g., perceived block number) without blocking user requests.
func NewDefaultObservationHandler ¶ added in v1.0.10
func NewDefaultObservationHandler(logger polylog.Logger) *DefaultObservationHandler
NewDefaultObservationHandler creates a new DefaultObservationHandler.
func (*DefaultObservationHandler) HandleExtractedData ¶ added in v1.0.10
func (h *DefaultObservationHandler) HandleExtractedData(obs *QueuedObservation, data *qostypes.ExtractedData) error
HandleExtractedData processes the extracted data from an observation. This is called by worker pool goroutines after async parsing completes.
This method:
- Updates QoS state (e.g., perceived block number) via UpdateFromExtractedData
- Logs extracted data at debug level for observability
func (*DefaultObservationHandler) SetQoSInstances ¶ added in v1.0.10
func (h *DefaultObservationHandler) SetQoSInstances(instances map[protocol.ServiceID]QoSService)
SetQoSInstances sets the QoS instances used for updating state from extracted data. This should be called after the QoS instances are created.
type EndpointInfo ¶ added in v1.0.10
type EndpointInfo struct {
// Addr is the unique identifier for the endpoint (supplier-url format).
Addr protocol.EndpointAddr
// HTTPURL is the URL for HTTP-based health checks (jsonrpc, rest).
// This is the endpoint's public URL for JSON-RPC requests.
HTTPURL string
// WebSocketURL is the URL for WebSocket health checks.
// May be empty if the endpoint doesn't support WebSocket.
WebSocketURL string
// SessionID is the session this endpoint belongs to.
// Used to detect session rollover - if session is no longer active,
// health checks for this endpoint should be skipped.
SessionID string
}
EndpointInfo contains endpoint information needed for health checks. Each endpoint may have different URLs for different RPC types.
type ErrorDetection ¶ added in v1.0.12
type ErrorDetection struct {
// StatusCodes defines HTTP status codes to detect and penalize.
StatusCodes []ErrorStatusCode `yaml:"status_codes,omitempty"`
// ResponsePatterns defines error patterns to search for in response body.
// Matches are case-insensitive substring searches.
ResponsePatterns []ErrorPattern `yaml:"response_patterns,omitempty"`
// JSONRPCErrorCodes defines JSON-RPC error codes to detect and penalize.
JSONRPCErrorCodes []JSONRPCErrorCode `yaml:"jsonrpc_error_codes,omitempty"`
}
ErrorDetection configures error pattern matching in health check responses. This allows health checks to detect and penalize known error patterns like rate limits, bad gateways, quota errors, etc.
func (*ErrorDetection) Validate ¶ added in v1.0.12
func (ed *ErrorDetection) Validate(checkName string) error
Validate validates the ErrorDetection configuration.
type ErrorPattern ¶ added in v1.0.12
type ErrorPattern struct {
// Pattern is the substring to search for in the response body (case-insensitive).
// Examples: "rate limit", "exceeded your limit", "bad gateway", "quota"
Pattern string `yaml:"pattern"`
// ReputationSignal is the signal to send when this pattern is detected.
// Values: "minor_error", "major_error", "critical_error", "fatal_error"
ReputationSignal string `yaml:"reputation_signal"`
}
ErrorPattern defines a response body pattern to detect and its reputation signal.
type ErrorStatusCode ¶ added in v1.0.12
type ErrorStatusCode struct {
// Code is the HTTP status code to match (e.g., 429, 502, 503).
Code int `yaml:"code"`
// ReputationSignal is the signal to send when this status code is detected.
// Values: "minor_error", "major_error", "critical_error", "fatal_error"
ReputationSignal string `yaml:"reputation_signal"`
}
ErrorStatusCode defines an HTTP status code to detect and its reputation signal.
type ExternalConfigSource ¶ added in v1.0.10
type ExternalConfigSource struct {
// URL is the URL to fetch health check rules from (e.g., GitHub raw file).
URL string `yaml:"url"`
// RefreshInterval is how often to re-fetch the external config.
// 0 means only fetch at startup.
RefreshInterval time.Duration `yaml:"refresh_interval,omitempty"`
// Timeout is the HTTP timeout for fetching the external config.
Timeout time.Duration `yaml:"timeout,omitempty"`
}
ExternalConfigSource defines an external URL for health check rules.
func (*ExternalConfigSource) HydrateDefaults ¶ added in v1.0.10
func (ecs *ExternalConfigSource) HydrateDefaults()
HydrateDefaults applies default values to ExternalConfigSource.
func (*ExternalConfigSource) Validate ¶ added in v1.0.10
func (ecs *ExternalConfigSource) Validate() error
Validate validates the ExternalConfigSource.
type ExternalReferenceCache ¶ added in v1.0.12
type ExternalReferenceCache struct {
// contains filtered or unexported fields
}
ExternalReferenceCache caches block heights from external endpoints. This prevents hammering external endpoints (like Alchemy, Infura) on every health check.
func NewExternalReferenceCache ¶ added in v1.0.12
func NewExternalReferenceCache(logger polylog.Logger) *ExternalReferenceCache
NewExternalReferenceCache creates a new external reference cache.
func (*ExternalReferenceCache) ClearCache ¶ added in v1.0.12
func (c *ExternalReferenceCache) ClearCache()
ClearCache clears all cached block heights (for testing).
func (*ExternalReferenceCache) GetBlockHeight ¶ added in v1.0.12
func (c *ExternalReferenceCache) GetBlockHeight( ctx context.Context, endpoint string, method string, headers map[string]string, timeout time.Duration, cacheDuration time.Duration, ) (int64, error)
GetBlockHeight fetches the block height from an external endpoint, using cache if available. Returns the block height and an error if the fetch fails.
type Gateway ¶
type Gateway struct {
Logger polylog.Logger
// HTTPRequestParser is used by the gateway instance to
// interpret an HTTP request as a pair of service ID and
// its corresponding QoS instance.
HTTPRequestParser
// The Protocol instance is used to fulfill the
// service requests received by the gateway through
// sending the service payload to an endpoint.
Protocol
// RPCTypeValidator validates detected RPC types against service configuration.
// Used to fail fast with clear errors for unsupported RPC types.
RPCTypeValidator *RPCTypeValidator
// MetricsReporter is used to export metrics based on observations made in handling service requests.
MetricsReporter RequestResponseReporter
// WebsocketMessageBufferSize is the buffer size for websocket message observation channels.
// Configurable to balance memory usage vs throughput for websocket connections.
// Default: DefaultWebsocketMessageBufferSize (100)
WebsocketMessageBufferSize int
// ObservationQueue handles async, sampled observation processing for QoS data extraction.
// When enabled, sampled requests are queued for deep parsing (block height, chain ID, etc.)
// without blocking the hot path. This is optional - if nil, no async observation processing occurs.
ObservationQueue *ObservationQueue
}
Gateway handles end-to-end service requests via HandleHTTPServiceRequest: - Receives user request - Processes request - Returns response
TODO_FUTURE: Current HTTP-only format supports JSONRPC, REST, Websockets and gRPC. May expand to other formats in future.
func (Gateway) HandleServiceRequest ¶
func (g Gateway) HandleServiceRequest( ctx context.Context, httpReq *http.Request, responseWriter http.ResponseWriter, )
HandleServiceRequest implements PATH gateway's service request processing.
This method acts as a request router that: 1. Determines the type of incoming request (e.g. HTTP or Websocket upgrade) 2. Delegates to the appropriate handler:
- Websocket: Long-lived bidirectional connection with message-based observations
- HTTP: Request-response cycle with single observation broadcast
This separation allows for different processing flows while maintaining a unified entry point.
TODO_FUTURE: Refactor when adding other protocols (e.g. gRPC):
- Extract generic processing into common method
- Keep protocol-specific details separate
type HTTPRequestParser ¶
type HTTPRequestParser interface {
// GetQoSService returns the qos for the service matching an HTTP request.
GetQoSService(context.Context, *http.Request) (protocol.ServiceID, QoSService, error)
// GetHTTPErrorResponse returns an HTTP response using the supplied error.
// It will only be called if the GetQoSService method above returns an error.
GetHTTPErrorResponse(context.Context, error) pathhttp.HTTPResponse
}
HTTPRequestParser is used, in handling an HTTP service request, to extract the service ID and corresponding QoS service from an HTTP request.
type HealthCheckConfig ¶ added in v1.0.10
type HealthCheckConfig struct {
// Name is a unique identifier for this check within a service.
Name string `yaml:"name"`
// Type specifies the RPC protocol type for this check.
// REQUIRED - must match one of the service's configured rpc_types.
// Valid values: "json_rpc", "rest", "comet_bft", "websocket", "grpc"
//
// BREAKING CHANGE: Old value "jsonrpc" is now "json_rpc" for consistency.
// No default - explicit specification required to avoid ambiguity.
Type HealthCheckType `yaml:"type"`
// Enabled allows disabling individual checks without removing them.
Enabled *bool `yaml:"enabled,omitempty"`
// Method is the HTTP method (GET, POST). Required for jsonrpc/rest types.
// Ignored for websocket and grpc types.
Method string `yaml:"method,omitempty"`
// Path is the URL path to send the request to (e.g., "/" or "/ws").
Path string `yaml:"path"`
// Headers is a map of HTTP headers to send with the request.
// If not specified, defaults to {"Content-Type": "application/json"} for POST requests.
// Can be used to set custom headers like Authorization, X-Api-Key, etc.
Headers map[string]string `yaml:"headers,omitempty"`
// Body is the request body (e.g., JSON-RPC payload).
// For websocket: if provided, sent after connection and response is expected.
// If empty for websocket: only connection test is performed.
Body string `yaml:"body,omitempty"`
// ExpectedStatusCode is the HTTP status code expected for success (default: 200).
// Only applies to jsonrpc and rest types.
ExpectedStatusCode int `yaml:"expected_status_code,omitempty"`
// ExpectedResponseContains is an optional substring to look for in the response body.
// If specified, the check fails if this substring is not found in the response.
// For websocket with body: checked against any received message within timeout.
ExpectedResponseContains string `yaml:"expected_response_contains,omitempty"`
// BlockHeightValidation enables block height comparison checks.
// If specified, the health check will extract the block height from the response
// and compare it against a reference value using the specified operator.
// This is useful for detecting stuck or lagging nodes.
BlockHeightValidation *BlockHeightValidation `yaml:"block_height_validation,omitempty"`
// ErrorDetection enables error pattern detection in health check responses.
// If specified, the health check will detect known error patterns (rate limits,
// bad gateway errors, quota errors, etc.) and send appropriate reputation signals.
ErrorDetection *ErrorDetection `yaml:"error_detection,omitempty"`
// Timeout is the request timeout for this check.
// For websocket: how long to wait for connection + optional response.
Timeout time.Duration `yaml:"timeout,omitempty"`
// Archival indicates if this is an archival-specific check.
Archival bool `yaml:"archival,omitempty"`
// ReputationSignal is the signal type to record on failure.
// Values: "minor_error", "major_error", "critical_error", "fatal_error"
ReputationSignal string `yaml:"reputation_signal,omitempty"`
}
HealthCheckConfig defines a single configurable health check. This replaces hardcoded QoS checks with YAML-configurable checks.
func (*HealthCheckConfig) HydrateDefaults ¶ added in v1.0.10
func (hcc *HealthCheckConfig) HydrateDefaults()
HydrateDefaults applies default values to HealthCheckConfig.
func (*HealthCheckConfig) Validate ¶ added in v1.0.10
func (hcc *HealthCheckConfig) Validate() error
Validate validates the HealthCheckConfig.
type HealthCheckExecutor ¶ added in v1.0.10
type HealthCheckExecutor struct {
// contains filtered or unexported fields
}
HealthCheckExecutor executes configurable health checks against endpoints and records results to the reputation system.
Health checks are sent through the protocol layer (via synthetic relay requests) to ensure the full path including relay miners is tested.
func NewHealthCheckExecutor ¶ added in v1.0.10
func NewHealthCheckExecutor(cfg HealthCheckExecutorConfig) *HealthCheckExecutor
NewHealthCheckExecutor creates a new HealthCheckExecutor.
func (*HealthCheckExecutor) ExecuteCheckViaProtocol ¶ added in v1.0.10
func (e *HealthCheckExecutor) ExecuteCheckViaProtocol( ctx context.Context, serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr, check HealthCheckConfig, ) (time.Duration, error)
ExecuteCheckViaProtocol executes a health check through the protocol layer. This sends the health check as a synthetic relay request, testing the full path including relay miners, just like regular user requests.
This is the preferred method for health checks as it validates the entire request path.
func (*HealthCheckExecutor) ExecuteWebSocketCheckViaProtocol ¶ added in v1.0.10
func (e *HealthCheckExecutor) ExecuteWebSocketCheckViaProtocol( ctx context.Context, serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr, check HealthCheckConfig, ) (time.Duration, error)
ExecuteWebSocketCheckViaProtocol executes a WebSocket health check through the protocol layer. This uses protocol.CheckWebsocketConnection() to test WebSocket connectivity.
Enhancement over old hydrator: We now wrap protocol observations in RequestResponseObservations and publish to metrics/data reporters for full visibility into WebSocket health check results.
func (*HealthCheckExecutor) GetConfigForService ¶ added in v1.0.10
func (e *HealthCheckExecutor) GetConfigForService(serviceID protocol.ServiceID) *ServiceHealthCheckConfig
GetConfigForService returns the health check configuration for a specific service. Returns nil if no config exists for the service.
func (*HealthCheckExecutor) GetServiceConfigs ¶ added in v1.0.10
func (e *HealthCheckExecutor) GetServiceConfigs() []ServiceHealthCheckConfig
GetServiceConfigs returns the merged health check configurations for all services. This merges external (if configured), local configs, and unified services config, with the following precedence: unified services > local > external.
func (*HealthCheckExecutor) InitExternalConfig ¶ added in v1.0.10
func (e *HealthCheckExecutor) InitExternalConfig(ctx context.Context)
InitExternalConfig initializes external config fetching. Should be called after NewHealthCheckExecutor to start loading external configs.
func (*HealthCheckExecutor) IsEnabled ¶ added in v1.0.10
func (e *HealthCheckExecutor) IsEnabled() bool
IsEnabled returns true if the health check executor is enabled.
func (*HealthCheckExecutor) RunAllChecksViaProtocol ¶ added in v1.0.10
func (e *HealthCheckExecutor) RunAllChecksViaProtocol( ctx context.Context, getEndpointInfos func(protocol.ServiceID) ([]EndpointInfo, error), ) error
RunAllChecksViaProtocol runs health checks through the protocol layer for all configured services. This is the main entry point for protocol-based health checks. Health checks are executed in parallel using a pond worker pool.
func (*HealthCheckExecutor) RunChecksForEndpointViaProtocol ¶ added in v1.0.10
func (e *HealthCheckExecutor) RunChecksForEndpointViaProtocol( ctx context.Context, serviceID protocol.ServiceID, endpointAddr protocol.EndpointAddr, ) map[string]error
RunChecksForEndpointViaProtocol runs all configured checks for a service through the protocol. This sends synthetic relay requests for each health check, testing the full relay path.
func (*HealthCheckExecutor) SetQoSInstances ¶ added in v1.0.12
func (e *HealthCheckExecutor) SetQoSInstances(instances map[protocol.ServiceID]QoSService)
SetQoSInstances sets the QoS instances for block height validation. This should be called after QoS instances are created in the main application.
func (*HealthCheckExecutor) ShouldRunChecks ¶ added in v1.0.10
func (e *HealthCheckExecutor) ShouldRunChecks() bool
ShouldRunChecks returns true if this instance should run health checks. If leader election is configured and we're not the leader, returns false.
func (*HealthCheckExecutor) Stop ¶ added in v1.0.10
func (e *HealthCheckExecutor) Stop()
Stop stops the external config refresh goroutine and worker pool. Safe to call multiple times - uses sync.Once to prevent double-close panic.
type HealthCheckExecutorConfig ¶ added in v1.0.10
type HealthCheckExecutorConfig struct {
Config *ActiveHealthChecksConfig
ReputationSvc reputation.ReputationService
Logger polylog.Logger
Protocol Protocol
MetricsReporter RequestResponseReporter
LeaderElector *LeaderElector
ObservationQueue *ObservationQueue
MaxWorkers int
UnifiedServicesConfig *UnifiedServicesConfig
}
HealthCheckExecutorConfig contains configuration for creating a HealthCheckExecutor.
type HealthCheckQoSContext ¶ added in v1.0.10
type HealthCheckQoSContext struct {
// contains filtered or unexported fields
}
HealthCheckQoSContext represents a health check request context. It is created from YAML configuration and used to send synthetic requests through the protocol.
func NewHealthCheckQoSContext ¶ added in v1.0.10
func NewHealthCheckQoSContext(cfg HealthCheckQoSContextConfig) *HealthCheckQoSContext
NewHealthCheckQoSContext creates a new HealthCheckQoSContext from configuration.
func (*HealthCheckQoSContext) GetEndpointSelector ¶ added in v1.0.10
func (hc *HealthCheckQoSContext) GetEndpointSelector() protocol.EndpointSelector
GetEndpointSelector returns a no-op selector since health checks use pre-selected endpoints. This method is required by the RequestQoSContext interface but is not used for health checks.
func (*HealthCheckQoSContext) GetError ¶ added in v1.0.10
func (hc *HealthCheckQoSContext) GetError() string
GetError returns the error message if the health check failed.
func (*HealthCheckQoSContext) GetHTTPResponse ¶ added in v1.0.10
func (hc *HealthCheckQoSContext) GetHTTPResponse() pathhttp.HTTPResponse
GetHTTPResponse returns a minimal HTTP response for health checks. This method is required by the RequestQoSContext interface but is not used for health checks.
func (*HealthCheckQoSContext) GetObservations ¶ added in v1.0.10
func (hc *HealthCheckQoSContext) GetObservations() qosobservations.Observations
GetObservations returns QoS-level observations for the health check. This method is required by the RequestQoSContext interface but is not used for health checks.
func (*HealthCheckQoSContext) GetServicePayloads ¶ added in v1.0.10
func (hc *HealthCheckQoSContext) GetServicePayloads() []protocol.Payload
GetServicePayloads returns the configured payload for the health check. Implements RequestQoSContext interface.
func (*HealthCheckQoSContext) IsSuccess ¶ added in v1.0.10
func (hc *HealthCheckQoSContext) IsSuccess() bool
IsSuccess returns true if the health check passed all validation criteria.
func (*HealthCheckQoSContext) UpdateWithResponse ¶ added in v1.0.10
func (hc *HealthCheckQoSContext) UpdateWithResponse( endpointAddr protocol.EndpointAddr, endpointSerializedResponse []byte, httpStatusCode int, )
UpdateWithResponse processes the endpoint response for the health check. It validates the response against expected criteria (status code, body content). Implements RequestQoSContext interface.
type HealthCheckQoSContextConfig ¶ added in v1.0.10
type HealthCheckQoSContextConfig struct {
Logger polylog.Logger
ServiceID protocol.ServiceID
CheckConfig HealthCheckConfig
ServicePayload protocol.Payload
}
HealthCheckQoSContextConfig contains configuration for creating a HealthCheckQoSContext.
type HealthCheckType ¶ added in v1.0.10
type HealthCheckType string
HealthCheckType defines the RPC protocol type for a health check. These values are aligned with service rpc_types configuration to ensure consistency.
BREAKING CHANGE: The enum values have been updated to match service rpc_types:
- "jsonrpc" → "json_rpc" (aligned with service config)
- "comet_bft" is newly added for Cosmos CometBFT health checks
Delivery mechanisms by type:
- json_rpc, rest, comet_bft: HTTP delivery
- websocket: WebSocket delivery
- grpc: gRPC delivery (future)
const ( // HealthCheckTypeJSONRPC is for JSON-RPC endpoints (HTTP POST with JSON body). // Aligned with service rpc_types: "json_rpc" HealthCheckTypeJSONRPC HealthCheckType = "json_rpc" // HealthCheckTypeREST is for REST API endpoints (HTTP GET/POST). // Aligned with service rpc_types: "rest" HealthCheckTypeREST HealthCheckType = "rest" // HealthCheckTypeCometBFT is for CometBFT RPC endpoints (Cosmos chains). // Aligned with service rpc_types: "comet_bft" HealthCheckTypeCometBFT HealthCheckType = "comet_bft" // HealthCheckTypeWebSocket is for WebSocket endpoints (connect, optionally send/receive). // Aligned with service rpc_types: "websocket" HealthCheckTypeWebSocket HealthCheckType = "websocket" // HealthCheckTypeGRPC is for gRPC endpoints (future implementation). // Uses the standard grpc.health.v1.Health service. // Aligned with service rpc_types: "grpc" HealthCheckTypeGRPC HealthCheckType = "grpc" )
type HealthChecksConfig ¶ added in v1.0.10
type HealthChecksConfig = ActiveHealthChecksConfig
HealthChecksConfig is an alias for ActiveHealthChecksConfig (deprecated name)
type JSONRPCErrorCode ¶ added in v1.0.12
type JSONRPCErrorCode struct {
// Code is the JSON-RPC error code to match (e.g., -31002, -32001).
Code int `yaml:"code"`
// ReputationSignal is the signal to send when this error code is detected.
// Values: "minor_error", "major_error", "critical_error", "fatal_error"
ReputationSignal string `yaml:"reputation_signal"`
}
JSONRPCErrorCode defines a JSON-RPC error code to detect and its reputation signal.
type LatencyProfileConfig ¶ added in v1.0.10
type LatencyProfileConfig struct {
// FastThreshold is the maximum latency for "fast" responses.
FastThreshold time.Duration `yaml:"fast_threshold"`
// NormalThreshold is the maximum latency for "normal" responses.
NormalThreshold time.Duration `yaml:"normal_threshold"`
// SlowThreshold is the maximum latency for "slow" responses.
SlowThreshold time.Duration `yaml:"slow_threshold"`
// PenaltyThreshold triggers a slow_response penalty signal.
PenaltyThreshold time.Duration `yaml:"penalty_threshold"`
// SevereThreshold triggers a very_slow_response penalty signal.
SevereThreshold time.Duration `yaml:"severe_threshold"`
// FastBonus is the multiplier for success impact when response is fast.
FastBonus float64 `yaml:"fast_bonus,omitempty"`
// SlowPenalty is the multiplier for success impact when response is slow.
SlowPenalty float64 `yaml:"slow_penalty,omitempty"`
// VerySlowPenalty is the multiplier for success when response is very slow.
VerySlowPenalty float64 `yaml:"very_slow_penalty,omitempty"`
}
LatencyProfileConfig defines latency thresholds for a category of services. These profiles are defined once in latency_profiles and referenced by name in services.
type LeaderElectionConfig ¶ added in v1.0.10
type LeaderElectionConfig struct {
// Type is the coordination type: "leader_election" or "none".
// Default: "leader_election" when Redis is configured, "none" otherwise.
Type string `yaml:"type,omitempty"`
// LeaseDuration is how long the leader holds the lock.
LeaseDuration time.Duration `yaml:"lease_duration,omitempty"`
// RenewInterval is how often the leader renews the lock.
RenewInterval time.Duration `yaml:"renew_interval,omitempty"`
// Key is the Redis key for the leader lock.
Key string `yaml:"key,omitempty"`
}
LeaderElectionConfig configures leader election for health checks. Only the leader runs health checks to avoid duplicate traffic.
func (*LeaderElectionConfig) HydrateDefaults ¶ added in v1.0.10
func (lec *LeaderElectionConfig) HydrateDefaults()
HydrateDefaults applies default values to LeaderElectionConfig.
type LeaderElector ¶ added in v1.0.10
type LeaderElector struct {
// contains filtered or unexported fields
}
LeaderElector provides distributed leader election using Redis. Only the leader instance runs health checks to avoid duplicate traffic.
func NewLeaderElector ¶ added in v1.0.10
func NewLeaderElector(cfg LeaderElectorConfig) *LeaderElector
NewLeaderElector creates a new LeaderElector. Returns nil if the coordination type is "none" or Redis client is nil.
func (*LeaderElector) GetLeaderInstanceID ¶ added in v1.0.10
func (l *LeaderElector) GetLeaderInstanceID(ctx context.Context) string
GetLeaderInstanceID returns the current leader's instance ID. Returns empty string if no leader or on error.
func (*LeaderElector) IsLeader ¶ added in v1.0.10
func (l *LeaderElector) IsLeader() bool
IsLeader returns true if this instance is currently the leader.
func (*LeaderElector) Start ¶ added in v1.0.10
func (l *LeaderElector) Start(ctx context.Context) error
Start begins the leader election process. It will attempt to acquire leadership immediately, then continue renewing/acquiring at the configured interval.
func (*LeaderElector) Stop ¶ added in v1.0.10
func (l *LeaderElector) Stop() error
Stop gracefully shuts down the leader elector. If this instance is the leader, it will release leadership.
type LeaderElectorConfig ¶ added in v1.0.10
type LeaderElectorConfig struct {
Client *redis.Client
Config LeaderElectionConfig
Logger polylog.Logger
}
LeaderElectorConfig contains configuration for creating a LeaderElector.
type ObservationHandler ¶ added in v1.0.10
type ObservationHandler interface {
// HandleExtractedData processes the extracted data from an observation.
// Called by worker pool goroutines, must be thread-safe.
HandleExtractedData(obs *QueuedObservation, data *qostypes.ExtractedData) error
}
ObservationHandler processes extracted data from observations. This is called after the extractor runs to update endpoint state.
type ObservationPipelineConfig ¶ added in v1.0.10
type ObservationPipelineConfig struct {
// Enabled enables async observation processing mode.
// When true, responses are returned as raw bytes without blocking on parsing.
// When false (default), legacy behavior is used (parse then re-encode).
Enabled bool `yaml:"enabled,omitempty"`
// SampleRate is the fraction of requests to deep-parse (0.0 to 1.0).
// Only applies when Enabled is true.
// Default: 0.1 (10% of requests get queued for deep parsing)
// Set to 0.0 to disable sampling (only active health checks update endpointStore)
// Set to 1.0 to parse all requests async (not recommended for high traffic)
SampleRate float64 `yaml:"sample_rate,omitempty"`
// WorkerCount is the number of worker goroutines for async parsing.
// Default: 4
WorkerCount int `yaml:"worker_count,omitempty"`
// QueueSize is the max number of pending observations.
// If queue is full, new observations are dropped (non-blocking).
// Default: 1000
QueueSize int `yaml:"queue_size,omitempty"`
}
ObservationPipelineConfig configures the observation processing pipeline. Controls how observations from user requests are processed and fed into the reputation system.
Architecture:
- Client response: Raw bytes passed through immediately (no parsing) when enabled
- Reputation signals: Always recorded (status code + latency from protocol layer)
- Deep parsing: Sampled requests queued to worker pool for async processing
- endpointStore: Updated by active health checks (100%) + sampled user requests
func (*ObservationPipelineConfig) HydrateDefaults ¶ added in v1.0.10
func (pc *ObservationPipelineConfig) HydrateDefaults()
HydrateDefaults applies default values to ObservationPipelineConfig.
func (*ObservationPipelineConfig) Validate ¶ added in v1.0.10
func (pc *ObservationPipelineConfig) Validate() error
Validate validates the ObservationPipelineConfig.
type ObservationQueue ¶ added in v1.0.10
type ObservationQueue struct {
// contains filtered or unexported fields
}
ObservationQueue handles async, non-blocking observation processing. It uses a worker pool to parse sampled responses without blocking the hot path.
Architecture:
- Uses ExtractorRegistry to get the right DataExtractor for each service
- Runs extraction in worker goroutines (heavy parsing is async)
- Calls ObservationHandler with extracted data to update endpoint state
func NewObservationQueue ¶ added in v1.0.10
func NewObservationQueue(config ObservationQueueConfig, logger polylog.Logger) *ObservationQueue
NewObservationQueue creates a new observation queue with the given config. The registry and handler must be set before use via SetRegistry and SetHandler.
func (*ObservationQueue) GetMetrics ¶ added in v1.0.10
func (q *ObservationQueue) GetMetrics() (queued, processed, dropped, skipped int64)
GetMetrics returns current queue metrics.
func (*ObservationQueue) GetSampleRate ¶ added in v1.0.10
func (q *ObservationQueue) GetSampleRate() float64
GetSampleRate returns the current sample rate.
func (*ObservationQueue) IsEnabled ¶ added in v1.0.10
func (q *ObservationQueue) IsEnabled() bool
IsEnabled returns true if async observation processing is enabled.
func (*ObservationQueue) ProcessSync ¶ added in v1.0.10
func (q *ObservationQueue) ProcessSync(obs *QueuedObservation)
ProcessSync processes an observation SYNCHRONOUSLY (blocking). Use this for health checks which already run in background workers. Unlike TryQueue/Submit, this does NOT use the worker pool - it processes immediately. This avoids wasting queue space on health checks and ensures immediate block height updates.
func (*ObservationQueue) SetHandler ¶ added in v1.0.10
func (q *ObservationQueue) SetHandler(handler ObservationHandler)
SetHandler sets the handler for processing extracted data.
func (*ObservationQueue) SetPerServiceRate ¶ added in v1.0.10
func (q *ObservationQueue) SetPerServiceRate(serviceID protocol.ServiceID, rate float64)
SetPerServiceRate sets a sample rate override for a specific service. Use this to sample different services at different rates. For example, high-traffic services might use a lower rate.
func (*ObservationQueue) SetRegistry ¶ added in v1.0.10
func (q *ObservationQueue) SetRegistry(registry *qostypes.ExtractorRegistry)
SetRegistry sets the extractor registry for looking up service-specific extractors.
func (*ObservationQueue) Stop ¶ added in v1.0.10
func (q *ObservationQueue) Stop()
Stop gracefully shuts down the observation queue. Waits for all pending observations to be processed.
func (*ObservationQueue) Submit ¶ added in v1.0.10
func (q *ObservationQueue) Submit(obs *QueuedObservation) bool
Submit queues an observation without sampling (always queues if queue not full). Use this for health checks which should always be processed. Returns true if queued, false if dropped (queue full).
func (*ObservationQueue) TryQueue ¶ added in v1.0.10
func (q *ObservationQueue) TryQueue(obs *QueuedObservation) bool
TryQueue attempts to queue an observation for async parsing. Returns true if queued, false if skipped (not sampled) or dropped (queue full). This method is NON-BLOCKING - it never waits.
Sampling logic:
- Uses per-service rate if configured, otherwise default rate
- If not sampled, observation is skipped (not an error)
- If queue is full, observation is dropped (logged as warning)
type ObservationQueueConfig ¶ added in v1.0.10
type ObservationQueueConfig struct {
// Enabled enables/disables async observation processing.
// When disabled, all parsing happens synchronously (legacy behavior).
Enabled bool `yaml:"enabled,omitempty"`
// SampleRate is the fraction of requests to deep-parse (0.0 to 1.0).
// Default: 0.1 (10% of requests get deep parsing)
// Note: Reputation signals are recorded for 100% of requests regardless.
SampleRate float64 `yaml:"sample_rate,omitempty"`
// WorkerCount is the number of worker goroutines for parsing.
// Default: 4
WorkerCount int `yaml:"worker_count,omitempty"`
// QueueSize is the max number of pending observations.
// If queue is full, new observations are dropped (non-blocking).
// Default: 1000
QueueSize int `yaml:"queue_size,omitempty"`
}
ObservationQueueConfig configures the async observation processing.
func (*ObservationQueueConfig) HydrateDefaults ¶ added in v1.0.10
func (c *ObservationQueueConfig) HydrateDefaults()
HydrateDefaults applies default values to ObservationQueueConfig.
type ObservationSource ¶ added in v1.0.10
type ObservationSource string
ObservationSource indicates where the observation came from.
const ( // SourceUserRequest indicates the observation is from a sampled user request. SourceUserRequest ObservationSource = "user_request" // SourceHealthCheck indicates the observation is from a background health check. SourceHealthCheck ObservationSource = "health_check" )
type ParentConfigDefaults ¶ added in v1.0.10
type ParentConfigDefaults struct {
// TieredSelectionEnabled from reputation_config.tiered_selection.enabled
TieredSelectionEnabled bool
// Tier1Threshold from reputation_config.tiered_selection.tier1_threshold
Tier1Threshold float64
// Tier2Threshold from reputation_config.tiered_selection.tier2_threshold
Tier2Threshold float64
// ProbationEnabled from reputation_config.tiered_selection.probation.enabled
ProbationEnabled bool
// ProbationThreshold from reputation_config.tiered_selection.probation.threshold
ProbationThreshold float64
// ProbationTrafficPercent from reputation_config.tiered_selection.probation.traffic_percent
ProbationTrafficPercent float64
// ProbationRecoveryMultiplier from reputation_config.tiered_selection.probation.recovery_multiplier
ProbationRecoveryMultiplier float64
// RetryEnabled from retry_config.enabled
RetryEnabled bool
// MaxRetries from retry_config.max_retries
MaxRetries int
// RetryOn5xx from retry_config.retry_on_5xx
RetryOn5xx bool
// RetryOnTimeout from retry_config.retry_on_timeout
RetryOnTimeout bool
// RetryOnConnection from retry_config.retry_on_connection
RetryOnConnection bool
// MaxRetryLatency from retry_config.max_retry_latency
MaxRetryLatency time.Duration
// ObservationPipelineEnabled from observation_pipeline.enabled
ObservationPipelineEnabled bool
// SampleRate from observation_pipeline.sample_rate
SampleRate float64
// HealthChecksEnabled from active_health_checks.enabled
HealthChecksEnabled bool
// HealthCheckInterval from active_health_checks interval
HealthCheckInterval time.Duration
// SyncAllowance from active_health_checks.sync_allowance
SyncAllowance int
}
ParentConfigDefaults contains values from the parent config that serve as defaults. This struct is used to pass values from gateway_config to UnifiedServicesConfig, eliminating the need for a separate "defaults" section in YAML.
type PassthroughConfig ¶ added in v1.0.10
type PassthroughConfig = ObservationPipelineConfig
PassthroughConfig is an alias for ObservationPipelineConfig (deprecated name)
type Protocol ¶
type Protocol interface {
// AvailableHTTPEndpoints returns the list of available HTTP endpoints matching both the service ID
//
// If the Pocket Network Gateway is in delegated mode, the staked application is passed via
// the `App-Address` header. In all other modes, *http.Request will be nil.
//
// Context may contain a deadline that protocol should respect on best-effort basis.
// Return observation if endpoint lookup fails.
// Used as protocol observation for the request when no protocol context exists.
AvailableHTTPEndpoints(
context.Context,
protocol.ServiceID,
sharedtypes.RPCType,
*http.Request,
) (protocol.EndpointAddrList, protocolobservations.Observations, error)
// AvailableWebsocketEndpoints returns the list of available websocket endpoints matching both the service ID
//
// If the Pocket Network Gateway is in delegated mode, the staked application is passed via
// the `App-Address` header. In all other modes, *http.Request will be nil.
//
// Context may contain a deadline that protocol should respect on best-effort basis.
// Return observation if endpoint lookup fails.
// Used as protocol observation for the request when no protocol context exists.
AvailableWebsocketEndpoints(
context.Context,
protocol.ServiceID,
*http.Request,
) (protocol.EndpointAddrList, protocolobservations.Observations, error)
// BuildRequestContextForEndpoint builds and returns a ProtocolRequestContext containing a single selected endpoint.
// One `ProtocolRequestContext` correspond to a single request, which is sent to a single endpoint.
//
// If the Pocket Network Gateway is in delegated mode, the staked application is passed via
// the `App-Address` header. In all other modes, *http.Request will be nil.
//
// Context may contain a deadline that protocol should respect on best-effort basis.
//
// Return observation if the context setup fails.
// Used as protocol observation for the request when no protocol context exists.
// filterByReputation controls whether to filter endpoints by reputation score.
// Pass true for normal requests (respects reputation), false for health checks (reaches all endpoints).
BuildHTTPRequestContextForEndpoint(
ctx context.Context,
serviceID protocol.ServiceID,
endpointAddr protocol.EndpointAddr,
rpcType sharedtypes.RPCType,
httpReq *http.Request,
filterByReputation bool,
) (ProtocolRequestContext, protocolobservations.Observations, error)
// BuildWebsocketRequestContextForEndpoint builds and returns a ProtocolRequestContextWebsocket containing a single selected endpoint.
// One `ProtocolRequestContextWebsocket` corresponds to a single long-lived websocket connection to a single endpoint.
// This method immediately establishes the Websocket connection and starts the bridge.
//
// If the Pocket Network Gateway is in delegated mode, the staked application is passed via
// the `App-Address` header. In all other modes, *http.Request will be nil.
//
// Return observation channel for connection-level observations (establishment, closure, errors).
// Message observations are sent through the provided messageObservationsChan.
// Return error if the context setup or connection establishment fails.
BuildWebsocketRequestContextForEndpoint(
context.Context,
protocol.ServiceID,
protocol.EndpointAddr,
websockets.WebsocketMessageProcessor,
*http.Request,
http.ResponseWriter,
chan *observation.RequestResponseObservations,
) (ProtocolRequestContextWebsocket, <-chan *protocolobservations.Observations, error)
// SupportedGatewayModes returns the Gateway modes supported by the protocol instance.
// See protocol/gateway_mode.go for more details.
SupportedGatewayModes() []protocol.GatewayMode
// ApplyHTTPObservations applies the supplied observations to the protocol instance's internal state.
// Hypothetical example (for illustrative purposes only):
// - protocol: Shannon
// - observation: "endpoint maxed-out or over-serviced (i.e. onchain rate limiting)"
// - result: skip the endpoint for a set time period until a new session begins.
ApplyHTTPObservations(*protocolobservations.Observations) error
// ApplyWebSocketObservations applies the supplied observations to the protocol instance's internal state.
// Hypothetical example (for illustrative purposes only):
// - protocol: Shannon
// - observation: "endpoint maxed-out or over-serviced (i.e. onchain rate limiting)"
// - result: skip the endpoint for a set time period until a new session begins.
ApplyWebSocketObservations(*protocolobservations.Observations) error
// TODO_FUTURE(@adshmh): support specifying the app(s) used for sending/signing synthetic relay requests by the hydrator.
// TODO_TECHDEBT: Enable the hydrator for gateway modes beyond Centralized only.
//
// ConfiguredServiceIDs returns the list of service IDs that the protocol instance is configured to serve.
ConfiguredServiceIDs() map[protocol.ServiceID]struct{}
// GetTotalServiceEndpointsCount returns the count of all unique endpoints for a service ID
// without filtering sanctioned endpoints.
GetTotalServiceEndpointsCount(protocol.ServiceID, *http.Request) (int, error)
// HydrateDisqualifiedEndpointsResponse hydrates the disqualified endpoint response with the protocol-specific data.
HydrateDisqualifiedEndpointsResponse(protocol.ServiceID, *devtools.DisqualifiedEndpointResponse)
// CheckWebsocketConnection checks if the websocket connection to the endpoint is established.
CheckWebsocketConnection(context.Context, protocol.ServiceID, protocol.EndpointAddr) *protocolobservations.Observations
// GetReputationService returns the reputation service instance used by the protocol.
// This is used by the health check executor to record health check results.
GetReputationService() reputation.ReputationService
// GetEndpointsForHealthCheck returns a function that gets endpoints for health checks.
// The returned function takes a service ID and returns endpoint info suitable for health checks.
// Note: This does NOT filter by reputation - health checks should run against all endpoints.
GetEndpointsForHealthCheck() func(protocol.ServiceID) ([]EndpointInfo, error)
// GetUnifiedServicesConfig returns the unified services configuration.
// This is used by components that need access to per-service configuration overrides.
GetUnifiedServicesConfig() *UnifiedServicesConfig
// GetConcurrencyConfig returns the concurrency configuration.
// This is used by components that need to respect concurrency limits.
GetConcurrencyConfig() ConcurrencyConfig
// UnblacklistSupplier removes a supplier from the blacklist.
// Called when a health check succeeds for a previously blacklisted supplier.
// Returns true if the supplier was blacklisted and has been removed.
UnblacklistSupplier(serviceID protocol.ServiceID, supplierAddr string) bool
// IsSupplierBlacklisted checks if a supplier is currently blacklisted.
IsSupplierBlacklisted(serviceID protocol.ServiceID, supplierAddr string) bool
// IsSessionActive checks if a session is currently active for a service.
// Used by health check executor to detect session rollover before attempting health checks.
// Returns true if the session is still in the current active sessions list.
IsSessionActive(ctx context.Context, serviceID protocol.ServiceID, sessionID string) bool
// health.Check interface is used to verify protocol instance's health status.
health.Check
}
Protocol defines the core functionality of a protocol from the perspective of a gateway. The gateway expects a protocol to build and return a request context for a particular service ID.
type ProtocolRequestContext ¶
type ProtocolRequestContext interface {
// HandleServiceRequest sends the supplied payload to the endpoint selected using the above SelectEndpoint method,
// and receives and verifies the response.
HandleServiceRequest([]protocol.Payload) ([]protocol.Response, error)
// GetObservations builds and returns the set of protocol-specific observations using the current context.
//
// Hypothetical illustrative example.
//
// If the context is:
// - Protocol: Shannon
// - SelectedEndpoint: `endpoint_101`
// - Event: HandleServiceRequest returned a "maxed-out endpoint" error
//
// Then the observation can be:
// - `maxed-out endpoint` on `endpoint_101`.
GetObservations() protocolobservations.Observations
}
ProtocolRequestContext defines the functionality expected by the gateway from the protocol, for a particular service ID.
These include but not limited to:
- Listing the endpoints available for sending relays for a specific service.
- Send a relay to a specific endpoint and return its response.
The implementation of this interface is in the relayer/shannon package.
type ProtocolRequestContextWebsocket ¶
type ProtocolRequestContextWebsocket interface {
// ProcessProtocolClientWebsocketMessage processes a message from the client.
ProcessProtocolClientWebsocketMessage([]byte) ([]byte, error)
// ProcessProtocolEndpointWebsocketMessage processes a message from the endpoint.
ProcessProtocolEndpointWebsocketMessage([]byte) ([]byte, protocolobservations.Observations, error)
}
ProtocolRequestContextWebsocket defines the functionality expected by the gateway from the protocol, specifically for websocket requests
type QoSContextBuilder ¶
type QoSContextBuilder interface {
// ParseHTTPRequest:
// - Ensures the HTTP request is valid for the target service.
// - detectedRPCType: The RPC type detected by the gateway (or UNKNOWN_RPC if detection failed)
// - Services should respect this if not UNKNOWN_RPC, or apply their own fallback logic
ParseHTTPRequest(ctx context.Context, httpReq *http.Request, detectedRPCType sharedtypes.RPCType) (RequestQoSContext, bool)
// ParseWebsocketRequest:
// - Ensures a Websocket request is valid for the target service.
// - Websocket connection requests have no body, so no parsing needed.
// - If service supports Websocket, returns a valid RequestQoSContext.
//
// TODO_TECHDEBT(@adshmh,@commoddity): Remove ParseWebsocketRequest and update ParseHTTPRequest to be the single entry point to QoS.
// It should perform basic validation of the HTTP handshake request in the case that it is a Websocket request.
// eg. check that the request is a websocket request, check headers, etc.
ParseWebsocketRequest(context.Context) (RequestQoSContext, bool)
}
QoSContextBuilder
Builds the QoS context required for all steps of a service request. Example: Generate a user-facing HTTP response from an endpoint's response.
type QoSEndpointCheckGenerator ¶
type QoSEndpointCheckGenerator interface {
// TODO_FUTURE:
// - Add GetOptionalQualityChecks() to collect additional QoS data (e.g., endpoint latency).
//
// GetRequiredQualityChecks:
// - Returns required quality checks for a QoS instance to assess endpoint validity.
// - Example: EVM QoS may skip block height check if chain ID check already failed.
GetRequiredQualityChecks(protocol.EndpointAddr) []RequestQoSContext
// TODO_TECHDEBT(@commoddity): Currently websocket QoS only performs a protocol-level check
// which determines if an endpoint connection request is successful or not.
//
// This only require a simple bool that tells the hydrator if the endpoint should be checked
// for Websocket connection and applies protocol-level sanctions if it fails.
//
// In the future, we may want to add QoS-level checks that take into account specific endpoint
// responses to apply websocket-related filtering at the QoS level.
//
// CheckWebsocketConnection
// - Checks if the endpoint supports Websocket connections.
// - Returns a boolean indicating whether the endpoint should be checked for Websocket connection.
CheckWebsocketConnection() bool
}
QoSEndpointCheckGenerator
Returns one or more service request contexts that: - Provide data on endpoint quality by sending payloads and parsing responses. - Checks are service-specific; the QoS instance decides what checks to run.
type QoSService ¶
type QoSService interface {
QoSContextBuilder
QoSEndpointCheckGenerator
// ApplyObservations:
// - Applies QoS-related observations to the local QoS instance.
// - TODO_FUTURE: Observations can be:
// - "local": from requests sent to an endpoint by THIS PATH instance.
// - "shared": from QoS observations shared by OTHER PATH instances.
ApplyObservations(*qos.Observations) error
// UpdateFromExtractedData:
// - Updates QoS state from extracted observation data.
// - Called by the observation pipeline after async parsing completes.
// - This method updates endpoint state (e.g., perceived block number) without
// blocking the hot path of user requests.
// - Data is extracted via DataExtractor from sampled requests and health checks.
UpdateFromExtractedData(endpointAddr protocol.EndpointAddr, data *qostypes.ExtractedData) error
// GetPerceivedBlockNumber:
// - Returns the perceived current block number for the service.
// - Used by health checks for block height validation.
// - Returns 0 if no block number has been observed yet.
GetPerceivedBlockNumber() uint64
// HydrateDisqualifiedEndpointsResponse:
// - Fills the disqualified endpoint response with QoS-specific data.
HydrateDisqualifiedEndpointsResponse(protocol.ServiceID, *devtools.DisqualifiedEndpointResponse)
}
TODO_IMPLEMENT: - Add a QoS instance per service supported by the gateway (e.g., Ethereum, Solana, RESTful).
QoSService: - Represents the embedded definition of a service (e.g., JSONRPC blockchain). - Responsibilities:
- QoSRequestParser: Translates service requests (currently only HTTP) into service request contexts.
- EndpointSelector: Chooses the best endpoint for a specific service request.
type QueuedObservation ¶ added in v1.0.10
type QueuedObservation struct {
// ServiceID identifies the service (e.g., "eth", "base", "cosmos").
ServiceID protocol.ServiceID
// EndpointAddr identifies the endpoint that responded.
EndpointAddr protocol.EndpointAddr
// Source indicates where this observation came from.
Source ObservationSource
// Timestamp when the response was received.
Timestamp time.Time
// Latency of the request-response cycle.
Latency time.Duration
// RequestID is a unique identifier for this request, used for log correlation.
// It is either extracted from the X-Request-ID header or generated as a new UUID.
RequestID string
// RequestPath is the URL path (e.g., "/", "/v1/completions").
RequestPath string
// RequestHTTPMethod is the HTTP method (GET, POST, etc.).
RequestHTTPMethod string
// RequestHeaders are the request headers (optional, for context).
RequestHeaders map[string]string
// RequestBody is the raw request payload (for determining RPC method, etc.).
RequestBody []byte
// ResponseStatusCode is the HTTP status code from the endpoint.
ResponseStatusCode int
// ResponseHeaders are the response headers (optional, for context).
ResponseHeaders map[string]string
// ResponseBody is the raw endpoint response to parse.
ResponseBody []byte
}
QueuedObservation represents a sampled request/response to be parsed async. Contains EVERYTHING needed for parsing - all the heavy work happens in the worker.
type RPCTypeDetector ¶ added in v1.0.10
type RPCTypeDetector struct {
// contains filtered or unexported fields
}
RPCTypeDetector provides smart RPC type detection from HTTP requests. It uses a multi-step approach to minimize latency:
- Check RPC-Type header (fastest, explicit)
- Easy detection from request properties (websocket, grpc)
- Process of elimination based on service config
- Payload inspection (only when absolutely necessary)
func NewRPCTypeDetector ¶ added in v1.0.10
func NewRPCTypeDetector() *RPCTypeDetector
NewRPCTypeDetector creates a new RPC type detector.
func (*RPCTypeDetector) DetectRPCType ¶ added in v1.0.10
func (d *RPCTypeDetector) DetectRPCType( httpReq *http.Request, serviceID string, serviceRPCTypes []string, ) (sharedtypes.RPCType, error)
DetectRPCType detects the RPC type from an HTTP request. It uses smart elimination to avoid payload inspection when possible.
Parameters:
- httpReq: The incoming HTTP request
- serviceID: Service identifier (for error messages)
- serviceRPCTypes: List of RPC types supported by the service (e.g., ["json_rpc", "websocket"])
Returns the detected RPC type or an error if:
- RPC-Type header is invalid or not supported by service
- Detection is ambiguous and payload inspection fails
- Service doesn't support the detected RPC type
type RPCTypeMapper ¶ added in v1.0.10
type RPCTypeMapper struct{}
RPCTypeMapper provides bidirectional mapping between config strings and protocol enum values.
func NewRPCTypeMapper ¶ added in v1.0.10
func NewRPCTypeMapper() *RPCTypeMapper
NewRPCTypeMapper creates a new RPC type mapper.
func (*RPCTypeMapper) FormatRPCType ¶ added in v1.0.10
func (m *RPCTypeMapper) FormatRPCType(rpcType sharedtypes.RPCType) string
FormatRPCType converts an RPCType enum value to its canonical config string representation. This is the inverse of ParseRPCType().
Examples:
- sharedtypes.RPCType_JSON_RPC → "json_rpc"
- sharedtypes.RPCType_REST → "rest"
- sharedtypes.RPCType_COMET_BFT → "comet_bft"
Returns empty string for UNKNOWN_RPC.
func (*RPCTypeMapper) ParseRPCType ¶ added in v1.0.10
func (m *RPCTypeMapper) ParseRPCType(configStr string) (sharedtypes.RPCType, error)
ParseRPCType converts a configuration string to the corresponding RPCType enum value. Performs case-insensitive matching and validates the string is a known type.
Examples:
- "json_rpc" → sharedtypes.RPCType_JSON_RPC
- "JSON_RPC" → sharedtypes.RPCType_JSON_RPC
- "rest" → sharedtypes.RPCType_REST
Returns error if the string is not a valid RPC type.
func (*RPCTypeMapper) ValidateRPCTypeString ¶ added in v1.0.10
func (m *RPCTypeMapper) ValidateRPCTypeString(str string) error
ValidateRPCTypeString validates that a string is a valid RPC type without converting it. This is useful for config validation where we want to check validity early without needing the enum value.
type RPCTypeValidator ¶ added in v1.0.10
type RPCTypeValidator struct {
// contains filtered or unexported fields
}
RPCTypeValidator validates RPC types against service configuration
func NewRPCTypeValidator ¶ added in v1.0.10
func NewRPCTypeValidator( unifiedConfig *UnifiedServicesConfig, rpcTypeMapper *RPCTypeMapper, ) *RPCTypeValidator
NewRPCTypeValidator creates a new RPC type validator
func (*RPCTypeValidator) ValidateRPCType ¶ added in v1.0.10
func (v *RPCTypeValidator) ValidateRPCType( serviceID protocol.ServiceID, rpcType sharedtypes.RPCType, ) error
ValidateRPCType checks if the detected RPC type is in service's configured rpc_types list
func (*RPCTypeValidator) ValidateServiceConfigured ¶ added in v1.0.10
func (v *RPCTypeValidator) ValidateServiceConfigured(serviceID protocol.ServiceID) error
ValidateServiceConfigured checks if a service is configured in UnifiedServicesConfig
type RequestQoSContext ¶
type RequestQoSContext interface {
// TODO_TECHDEBT: Should eventually return []Payload
// - Allows mapping a single RelayRequest into multiple ServiceRequests.
// - Example: A batch relay request on JSONRPC should decompose into multiple independent requests.
GetServicePayloads() []protocol.Payload
// TODO_FUTURE:
// - Add retry-related return values to UpdateWithResponse,
// or add retry-related methods (e.g., Failed(), ShouldRetry()).
//
// UpdateWithResponse:
// - Informs the request QoS context of the payload returned by a specific endpoint.
// - Response is for the service payload produced by GetServicePayload.
// - httpStatusCode is the original HTTP status code from the backend endpoint.
UpdateWithResponse(endpointAddr protocol.EndpointAddr, endpointSerializedResponse []byte, httpStatusCode int)
// GetHTTPResponse:
// - Returns the user-facing HTTP response.
// - Response depends on the current state of the service request context.
// - State is set at context creation and updated via UpdateWithResponse.
// - If never updated, may return 404 HTTP status.
GetHTTPResponse() pathhttp.HTTPResponse
// GetObservations:
// - Returns QoS-level observations in the context.
//
// Example:
// Context:
// - Service: Solana
// - SelectedEndpoint: `endpoint_101`
// - Request: `getHealth`
// - Endpoint response: error
// Observation:
// - `endpoint_101` is unhealthy.
GetObservations() qos.Observations
// GetEndpointSelector:
// - Enables specialized endpoint selection (e.g., method-based selection for EVM requests).
GetEndpointSelector() protocol.EndpointSelector
}
RequestQoSContext
Represents interactions between the gateway and the QoS instance for a given service request.
Construction methods: - Parse an organic request from an end-user. - Rebuild from a shared context deserialized from another PATH instance.
type RequestResponseReporter ¶
type RequestResponseReporter interface {
// Publish exports observations made on a service request and response(s), to the external component used by the corresponding implementation.
Publish(*observation.RequestResponseObservations)
}
RequestResponseReporter defines the interface for reporting observations with respect to a request, its corresponding response, and the set of events to any interested entity. Examples of reporters include:
- MetricsReporter: exports metrics based on the observations
- DataReporter: exports observations to external components (e.g.Messaging system or Database)
type RetryConfig ¶ added in v1.0.10
type RetryConfig struct {
// Enabled enables/disables automatic retries.
Enabled bool `yaml:"enabled,omitempty"`
// MaxRetries is the maximum number of retry attempts.
MaxRetries int `yaml:"max_retries,omitempty"`
// MaxRetryLatency is the maximum latency threshold for retries.
// Only retry if failed request took less than this duration.
MaxRetryLatency *time.Duration `yaml:"max_retry_latency,omitempty"`
// RetryOn5xx enables retrying on 5xx errors.
RetryOn5xx bool `yaml:"retry_on_5xx,omitempty"`
// RetryOnTimeout enables retrying on timeout errors.
RetryOnTimeout bool `yaml:"retry_on_timeout,omitempty"`
// RetryOnConnection enables retrying on connection errors.
RetryOnConnection bool `yaml:"retry_on_connection,omitempty"`
}
RetryConfig configures automatic retry behavior for failed requests.
type ServiceConcurrencyConfig ¶ added in v1.0.10
type ServiceConcurrencyConfig struct {
MaxParallelEndpoints *int `yaml:"max_parallel_endpoints,omitempty"`
MaxBatchPayloads *int `yaml:"max_batch_payloads,omitempty"`
}
ServiceConcurrencyConfig holds per-service concurrency configuration. Note: max_concurrent_relays is GLOBAL only (set in gateway_config.concurrency_config). Per-service config supports max_parallel_endpoints and max_batch_payloads overrides.
type ServiceConfig ¶ added in v1.0.10
type ServiceConfig struct {
ID protocol.ServiceID `yaml:"id"`
Type ServiceType `yaml:"type,omitempty"`
RPCTypes []string `yaml:"rpc_types,omitempty"`
RPCTypeFallbacks map[string]string `yaml:"rpc_type_fallbacks,omitempty"`
LatencyProfile string `yaml:"latency_profile,omitempty"`
ReputationConfig *ServiceReputationConfig `yaml:"reputation_config,omitempty"`
Latency *ServiceLatencyConfig `yaml:"latency,omitempty"`
TieredSelection *ServiceTieredSelectionConfig `yaml:"tiered_selection,omitempty"`
Probation *ServiceProbationConfig `yaml:"probation,omitempty"`
RetryConfig *ServiceRetryConfig `yaml:"retry_config,omitempty"`
ObservationPipeline *ServiceObservationConfig `yaml:"observation_pipeline,omitempty"`
ConcurrencyConfig *ServiceConcurrencyConfig `yaml:"concurrency_config,omitempty"`
Fallback *ServiceFallbackConfig `yaml:"fallback,omitempty"`
HealthChecks *ServiceHealthCheckOverride `yaml:"health_checks,omitempty"`
}
ServiceConfig defines configuration for a single service.
type ServiceDefaults ¶ added in v1.0.10
type ServiceDefaults struct {
Type ServiceType `yaml:"type,omitempty"`
RPCTypes []string `yaml:"rpc_types,omitempty"`
LatencyProfile string `yaml:"latency_profile,omitempty"`
ReputationConfig ServiceReputationConfig `yaml:"reputation_config,omitempty"`
Latency ServiceLatencyConfig `yaml:"latency,omitempty"`
TieredSelection ServiceTieredSelectionConfig `yaml:"tiered_selection,omitempty"`
Probation ServiceProbationConfig `yaml:"probation,omitempty"`
RetryConfig ServiceRetryConfig `yaml:"retry_config,omitempty"`
ObservationPipeline ServiceObservationConfig `yaml:"observation_pipeline,omitempty"`
ConcurrencyConfig ServiceConcurrencyConfig `yaml:"concurrency_config,omitempty"`
ActiveHealthChecks ServiceHealthCheckOverride `yaml:"active_health_checks,omitempty"`
}
ServiceDefaults contains default settings inherited by all services.
type ServiceFallbackConfig ¶ added in v1.0.10
type ServiceFallbackConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
SendAllTraffic bool `yaml:"send_all_traffic,omitempty"`
Endpoints []map[string]string `yaml:"endpoints,omitempty"`
}
ServiceFallbackConfig holds per-service fallback endpoint configuration.
type ServiceHealthCheckConfig ¶ added in v1.0.10
type ServiceHealthCheckConfig struct {
// ServiceID is the service identifier (e.g., "eth", "base", "poly").
ServiceID protocol.ServiceID `yaml:"service_id"`
// CheckInterval is how often to run health checks for this service.
CheckInterval time.Duration `yaml:"check_interval,omitempty"`
// Enabled allows disabling all checks for this service.
Enabled *bool `yaml:"enabled,omitempty"`
// SyncAllowance is the number of blocks behind the latest block that an endpoint
// can be before it's considered out of sync. Overrides the global default for this service.
// 0 means disabled (no sync allowance check). Default: 0 (disabled).
SyncAllowance *int `yaml:"sync_allowance,omitempty"`
// Checks is the list of health checks to run for this service.
Checks []HealthCheckConfig `yaml:"checks"`
}
ServiceHealthCheckConfig defines health checks for a specific service.
func (*ServiceHealthCheckConfig) HydrateDefaults ¶ added in v1.0.10
func (shc *ServiceHealthCheckConfig) HydrateDefaults()
HydrateDefaults applies default values to ServiceHealthCheckConfig.
func (*ServiceHealthCheckConfig) Validate ¶ added in v1.0.10
func (shc *ServiceHealthCheckConfig) Validate() error
Validate validates the ServiceHealthCheckConfig.
type ServiceHealthCheckOverride ¶ added in v1.0.10
type ServiceHealthCheckOverride struct {
Enabled *bool `yaml:"enabled,omitempty"`
Interval time.Duration `yaml:"interval,omitempty"`
SyncAllowance *int `yaml:"sync_allowance,omitempty"`
External *ExternalConfigSource `yaml:"external,omitempty"`
Local []HealthCheckConfig `yaml:"local,omitempty"`
}
ServiceHealthCheckOverride holds per-service health check configuration overrides.
type ServiceLatencyConfig ¶ added in v1.0.10
type ServiceLatencyConfig struct {
Enabled *bool `yaml:"enabled,omitempty"`
TargetMs int `yaml:"target_ms,omitempty"`
PenaltyWeight float64 `yaml:"penalty_weight,omitempty"`
}
ServiceLatencyConfig holds per-service latency configuration.
type ServiceObservationConfig ¶ added in v1.0.10
type ServiceObservationConfig struct {
Enabled *bool `yaml:"enabled,omitempty"`
SampleRate *float64 `yaml:"sample_rate,omitempty"`
}
ServiceObservationConfig holds per-service observation pipeline configuration. Note: worker_count and queue_size are GLOBAL only (set in gateway_config.observation_pipeline). Per-service config only supports sample_rate override.
type ServiceProbationConfig ¶ added in v1.0.10
type ServiceProbationConfig struct {
Enabled *bool `yaml:"enabled,omitempty"`
Threshold *float64 `yaml:"threshold,omitempty"`
TrafficPercent *float64 `yaml:"traffic_percent,omitempty"`
RecoveryMultiplier *float64 `yaml:"recovery_multiplier,omitempty"`
}
ServiceProbationConfig holds per-service probation configuration.
type ServiceReputationConfig ¶ added in v1.0.10
type ServiceReputationConfig struct {
Enabled *bool `yaml:"enabled,omitempty"`
InitialScore *float64 `yaml:"initial_score,omitempty"`
MinThreshold *float64 `yaml:"min_threshold,omitempty"`
KeyGranularity string `yaml:"key_granularity,omitempty"`
RecoveryTimeout *time.Duration `yaml:"recovery_timeout,omitempty"`
}
ServiceReputationConfig holds per-service reputation configuration.
type ServiceRetryConfig ¶ added in v1.0.10
type ServiceRetryConfig struct {
Enabled *bool `yaml:"enabled,omitempty"`
MaxRetries *int `yaml:"max_retries,omitempty"`
RetryOn5xx *bool `yaml:"retry_on_5xx,omitempty"`
RetryOnTimeout *bool `yaml:"retry_on_timeout,omitempty"`
RetryOnConnection *bool `yaml:"retry_on_connection,omitempty"`
MaxRetryLatency *time.Duration `yaml:"max_retry_latency,omitempty"` // Only retry if failed request took less than this duration
}
ServiceRetryConfig holds per-service retry configuration.
type ServiceTieredSelectionConfig ¶ added in v1.0.10
type ServiceTieredSelectionConfig struct {
Enabled *bool `yaml:"enabled,omitempty"`
Tier1Threshold *float64 `yaml:"tier1_threshold,omitempty"`
Tier2Threshold *float64 `yaml:"tier2_threshold,omitempty"`
}
ServiceTieredSelectionConfig holds per-service tiered selection configuration.
type ServiceType ¶ added in v1.0.10
type ServiceType string
ServiceType defines the QoS type for a service. This determines which QoS implementation handles the service.
const ( // ServiceTypeEVM is for EVM-compatible blockchains (Ethereum, Base, Polygon, etc.) ServiceTypeEVM ServiceType = "evm" // ServiceTypeSolana is for Solana blockchain ServiceTypeSolana ServiceType = "solana" // ServiceTypeCosmos is for Cosmos SDK chains ServiceTypeCosmos ServiceType = "cosmos" // ServiceTypeGeneric is for generic JSON-RPC services ServiceTypeGeneric ServiceType = "generic" // ServiceTypePassthrough is for services with no QoS processing (pass-through) ServiceTypePassthrough ServiceType = "passthrough" )
type UnifiedServicesConfig ¶ added in v1.0.10
type UnifiedServicesConfig struct {
LatencyProfiles map[string]LatencyProfileConfig `yaml:"latency_profiles,omitempty"`
Defaults ServiceDefaults `yaml:"defaults,omitempty"`
Services []ServiceConfig `yaml:"services,omitempty"`
}
UnifiedServicesConfig is the top-level configuration for the unified service system.
func (*UnifiedServicesConfig) GetConfiguredServiceIDs ¶ added in v1.0.10
func (c *UnifiedServicesConfig) GetConfiguredServiceIDs() []protocol.ServiceID
GetConfiguredServiceIDs returns a list of all configured service IDs.
func (*UnifiedServicesConfig) GetLatencyProfile ¶ added in v1.0.10
func (c *UnifiedServicesConfig) GetLatencyProfile(name string) *LatencyProfileConfig
GetLatencyProfile returns the latency profile configuration for a profile name.
func (*UnifiedServicesConfig) GetMergedServiceConfig ¶ added in v1.0.10
func (c *UnifiedServicesConfig) GetMergedServiceConfig(serviceID protocol.ServiceID) *ServiceConfig
GetMergedServiceConfig returns a service config with defaults merged in. Returns nil if the service is not configured.
func (*UnifiedServicesConfig) GetServiceConfig ¶ added in v1.0.10
func (c *UnifiedServicesConfig) GetServiceConfig(serviceID protocol.ServiceID) *ServiceConfig
GetServiceConfig returns the configuration for a specific service.
func (*UnifiedServicesConfig) GetServiceRPCTypes ¶ added in v1.0.10
func (c *UnifiedServicesConfig) GetServiceRPCTypes(serviceID protocol.ServiceID) []string
GetServiceRPCTypes returns the supported RPC types for a service.
func (*UnifiedServicesConfig) GetServiceType ¶ added in v1.0.10
func (c *UnifiedServicesConfig) GetServiceType(serviceID protocol.ServiceID) ServiceType
GetServiceType returns the QoS type for a service.
func (*UnifiedServicesConfig) GetSyncAllowanceForService ¶ added in v1.0.10
func (c *UnifiedServicesConfig) GetSyncAllowanceForService(serviceID protocol.ServiceID) uint64
GetSyncAllowanceForService returns the sync allowance for a service. It checks per-service config first, then falls back to global defaults. Returns DefaultSyncAllowance (5) if not configured.
func (*UnifiedServicesConfig) HasService ¶ added in v1.0.10
func (c *UnifiedServicesConfig) HasService(serviceID protocol.ServiceID) bool
HasService checks if a specific service is configured
func (*UnifiedServicesConfig) HasServices ¶ added in v1.0.10
func (c *UnifiedServicesConfig) HasServices() bool
HasServices returns true if any services are configured.
func (*UnifiedServicesConfig) HydrateDefaults ¶ added in v1.0.10
func (c *UnifiedServicesConfig) HydrateDefaults()
HydrateDefaults applies default values to UnifiedServicesConfig.
func (*UnifiedServicesConfig) SetDefaultsFromParent ¶ added in v1.0.10
func (c *UnifiedServicesConfig) SetDefaultsFromParent(parent ParentConfigDefaults)
SetDefaultsFromParent populates the internal Defaults field from parent config values. This allows gateway_config top-level settings to serve as defaults for all services, eliminating the need for a separate "defaults" section in YAML.
Call this method after loading the config to wire up defaults from:
- reputation_config.tiered_selection -> Defaults.TieredSelection
- reputation_config.tiered_selection.probation -> Defaults.Probation
- retry_config -> Defaults.RetryConfig
- observation_pipeline -> Defaults.ObservationPipeline
- active_health_checks -> Defaults.ActiveHealthChecks
func (*UnifiedServicesConfig) Validate ¶ added in v1.0.10
func (c *UnifiedServicesConfig) Validate() error
Validate validates the UnifiedServicesConfig.
Source Files
¶
- block_height_reference_cache.go
- block_height_validator.go
- errors.go
- gateway.go
- health_check_config.go
- health_check_executor.go
- health_check_leader.go
- health_check_qos_context.go
- http_request_context.go
- http_request_context_handle_request.go
- observation.go
- observation_handler.go
- observation_queue.go
- observation_websockets.go
- protocol.go
- qos.go
- reporter.go
- request.go
- request_type.go
- rpc_type_detector.go
- rpc_type_error_response.go
- rpc_type_validator.go
- rpctype_mapper.go
- unified_service_config.go
- websocket_request_context.go