Documentation
¶
Index ¶
- Constants
- Variables
- func NewCachingFullNode(logger polylog.Logger, lazyFullNode *LazyFullNode, cacheConfig CacheConfig) (*cachingFullNode, error)
- type CacheConfig
- type FullNode
- type FullNodeConfig
- type GatewayConfig
- type LazyFullNode
- func (lfn *LazyFullNode) GetAccountClient() *sdk.AccountClient
- func (lfn *LazyFullNode) GetApp(ctx context.Context, appAddr string) (*apptypes.Application, error)
- func (lfn *LazyFullNode) GetCurrentBlockHeight(ctx context.Context) (int64, error)
- func (lfn *LazyFullNode) GetSession(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (sessiontypes.Session, error)
- func (lfn *LazyFullNode) GetSessionWithExtendedValidity(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (sessiontypes.Session, error)
- func (lfn *LazyFullNode) GetSharedParams(ctx context.Context) (*sharedtypes.Params, error)
- func (lfn *LazyFullNode) IsHealthy() bool
- func (lfn *LazyFullNode) IsInSessionRollover() bool
- func (lfn *LazyFullNode) ValidateRelayResponse(supplierAddr sdk.SupplierAddress, responseBz []byte) (*servicetypes.RelayResponse, error)
- type LoadTestingConfig
- type LoadTestingRelayMinerConfig
- type Protocol
- func (p *Protocol) ApplyHTTPObservations(observations *protocolobservations.Observations) error
- func (p *Protocol) ApplyWebSocketObservations(observations *protocolobservations.Observations) error
- func (p *Protocol) AvailableHTTPEndpoints(ctx context.Context, serviceID protocol.ServiceID, httpReq *http.Request) (protocol.EndpointAddrList, protocolobservations.Observations, error)
- func (p *Protocol) AvailableWebsocketEndpoints(ctx context.Context, serviceID protocol.ServiceID, httpReq *http.Request) (protocol.EndpointAddrList, protocolobservations.Observations, error)
- func (p *Protocol) BuildHTTPRequestContextForEndpoint(ctx context.Context, serviceID protocol.ServiceID, ...) (gateway.ProtocolRequestContext, protocolobservations.Observations, error)
- func (p *Protocol) BuildWebsocketRequestContextForEndpoint(ctx context.Context, serviceID protocol.ServiceID, ...) (gateway.ProtocolRequestContextWebsocket, ...)
- func (p *Protocol) CheckWebsocketConnection(ctx context.Context, serviceID protocol.ServiceID, ...) *protocolobservations.Observations
- func (p *Protocol) ConfiguredServiceIDs() map[protocol.ServiceID]struct{}
- func (p *Protocol) GetTotalServiceEndpointsCount(serviceID protocol.ServiceID, httpReq *http.Request) (int, error)
- func (p *Protocol) HydrateDisqualifiedEndpointsResponse(serviceID protocol.ServiceID, details *devtools.DisqualifiedEndpointResponse)
- func (p *Protocol) IsAlive() bool
- func (p *Protocol) Name() string
- func (p *Protocol) SupportedGatewayModes() []protocol.GatewayMode
- type RelayRequestSigner
- type SanctionConfig
- type ServiceFallback
Constants ¶
const MaxConcurrentRelaysPerRequest = 10
MaxConcurrentRelaysPerRequest limits the number of concurrent relay goroutines per request. This prevents DoS attacks via large batch requests that could spawn unbounded goroutines. TODO_IMPROVE: Make this configurable via gateway settings.
Variables ¶
var ( ErrShannonInvalidGatewayPrivateKey = errors.New("invalid shannon gateway private key") ErrShannonInvalidGatewayAddress = errors.New("invalid shannon gateway address") ErrShannonInvalidNodeUrl = errors.New("invalid shannon node URL") ErrShannonInvalidGrpcHostPort = errors.New("invalid shannon grpc host:port") ErrShannonUnsupportedGatewayMode = errors.New("invalid shannon gateway mode") ErrShannonCentralizedGatewayModeRequiresOwnedApps = errors.New("shannon Centralized gateway mode requires at-least 1 owned app") ErrShannonCacheConfigSetForLazyMode = errors.New("cache config cannot be set for lazy mode") ErrShannonInvalidServiceFallback = errors.New("invalid service fallback configuration") ErrShannonInvalidSessionRolloverBlocks = errors.New("session_rollover_blocks must be positive") )
Functions ¶
func NewCachingFullNode ¶
func NewCachingFullNode( logger polylog.Logger, lazyFullNode *LazyFullNode, cacheConfig CacheConfig, ) (*cachingFullNode, error)
NewCachingFullNode wraps a LazyFullNode with:
- Session cache: caches sessions, refreshes early
- Account cache: indefinite cache for account data
Both use early refresh to avoid thundering herd/latency spikes.
Types ¶
type CacheConfig ¶
type FullNode ¶
type FullNode interface {
// GetApp returns the onchain application matching the application address
GetApp(ctx context.Context, appAddr string) (*apptypes.Application, error)
// GetSession returns the latest session matching the supplied service+app combination.
// Sessions are solely used for sending relays, and therefore only the latest session for any service+app combination is needed.
// Note: Shannon returns the latest session for a service+app combination if no blockHeight is provided.
GetSession(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (sessiontypes.Session, error)
// GetSessionWithExtendedValidity implements session retrieval with support for
// Pocket Network's native "session grace period" business logic.
//
// At the protocol level, it is used to account for the case when:
// - RelayMiner.FullNode.Height > Gateway.FullNode.Height
// AND
// - RelayMiner.FullNode.Session > Gateway.FullNodeSession
//
// PATH leverages it by accounting for the case when:
// - RelayMiner.FullNode.Height < Gateway.FullNode.Height
// AND
// - Gateway.FullNode.Session > RelayMiner.FullNodeSession
//
// This enables signing and sending relays to Suppliers who are behind the Gateway.
//
// The recommendation usage is to use both GetSession and GetSessionWithExtendedValidity
// in order to account for both cases when selecting the pool of available Suppliers.
//
// Protocol References:
// - https://github.com/pokt-network/poktroll/blob/main/proto/pocket/shared/params.proto
// - https://dev.poktroll.com/protocol/governance/gov_params
// - https://dev.poktroll.com/protocol/primitives/claim_and_proof_lifecycle
// If within grace period of a session rollover, it may return the previous session.
GetSessionWithExtendedValidity(ctx context.Context, serviceID protocol.ServiceID, appAddr string) (sessiontypes.Session, error)
GetSharedParams(ctx context.Context) (*sharedtypes.Params, error)
// GetCurrentBlockHeight returns the current block height from the blockchain.
GetCurrentBlockHeight(ctx context.Context) (int64, error)
// ValidateRelayResponse validates the raw bytes returned from an endpoint (in response to a relay request) and returns the parsed response.
ValidateRelayResponse(supplierAddr sdk.SupplierAddress, responseBz []byte) (*servicetypes.RelayResponse, error)
// IsHealthy returns true if the FullNode instance is healthy.
// A LazyFullNode will always return true.
// A CachingFullNode will return true if it has data in app and session caches.
IsHealthy() bool
// GetAccountClient returns the account client from the fullnode, to be used in building relay request signers.
GetAccountClient() *sdk.AccountClient
// IsInSessionRollover returns true if the system is currently in a session rollover period.
//
// A session rollover period is a critical time window that occurs around session transitions
// and can cause reliability issues for relay operations. The rollover period is defined as:
// - 1 block before the session end height
// - Plus a configurable grace period after the session end
//
// This method enables the gateway to implement adaptive retry strategies during rollover periods
//
// The monitoring is performed automatically in the background and this method
// provides a thread-safe way to check the current rollover status.
IsInSessionRollover() bool
}
FullNode defines the set of capabilities the Shannon protocol integration needs from a fullnode for sending relays.
A properly initialized fullNode struct can: 1. Return the onchain apps matching a service ID. 2. Fetch a session for a (service,app) combination. 3. Validate a relay response. 4. Etc...
type FullNodeConfig ¶
type FullNodeConfig struct {
RpcURL string `yaml:"rpc_url"`
GRPCConfig grpc.GRPCConfig `yaml:"grpc_config"`
// LazyMode, if set to true, will disable all caching of onchain data. For
// example, this disables caching of apps and sessions.
LazyMode bool `yaml:"lazy_mode" default:"true"`
// Configuration options for the cache when LazyMode is false
CacheConfig CacheConfig `yaml:"cache_config"`
// SessionRolloverBlocks is a temporary fix to handle session rollover issues.
// TODO_TECHDEBT(@commoddity): Should be removed when the rollover issue is solved at the protocol level.
SessionRolloverBlocks int64 `yaml:"session_rollover_blocks"`
}
func (*FullNodeConfig) HydrateDefaults ¶
func (fnc *FullNodeConfig) HydrateDefaults()
HydrateDefaults applies default values to FullNodeConfig
func (FullNodeConfig) Validate ¶
func (c FullNodeConfig) Validate() error
type GatewayConfig ¶
type GatewayConfig struct {
GatewayMode protocol.GatewayMode `yaml:"gateway_mode"`
GatewayAddress string `yaml:"gateway_address"`
GatewayPrivateKeyHex string `yaml:"gateway_private_key_hex"`
OwnedAppsPrivateKeysHex []string `yaml:"owned_apps_private_keys_hex"`
ServiceFallback []ServiceFallback `yaml:"service_fallback"`
// Optional.
// Puts the Gateway in LoadTesting mode if specified.
// All relays will be sent to a fixed URL.
// Allows measuring performance of PATH and full node(s) in isolation.
LoadTestingConfig *LoadTestingConfig `yaml:"load_testing_config"`
// Optional.
// Configures the endpoint sanction system parameters.
// If not specified, sensible defaults will be used.
SanctionConfig SanctionConfig `yaml:"sanction_config"`
// Optional.
// Configures the endpoint reputation system.
// If not specified or disabled, binary sanctions will be used instead.
ReputationConfig reputation.Config `yaml:"reputation_config"`
}
func (GatewayConfig) Validate ¶
func (gc GatewayConfig) Validate() error
type LazyFullNode ¶
type LazyFullNode struct {
// contains filtered or unexported fields
}
TODO_MVP(@adshmh): Rename `LazyFullNode`: this struct does not perform any caching and should be named accordingly.
LazyFullNode: default implementation of a full node for the Shannon.
Key differences from a caching full node: - Intentionally avoids caching:
- Enables support for short block times (e.g. LocalNet)
- Use CachingFullNode struct if caching is desired for performance
func NewLazyFullNode ¶
func NewLazyFullNode(logger polylog.Logger, config FullNodeConfig) (*LazyFullNode, error)
NewLazyFullNode builds and returns a LazyFullNode using the provided configuration.
func (*LazyFullNode) GetAccountClient ¶
func (lfn *LazyFullNode) GetAccountClient() *sdk.AccountClient
GetAccountClient: - Returns the account client created by the lazy fullnode. - Used to create relay request signers.
func (*LazyFullNode) GetApp ¶
func (lfn *LazyFullNode) GetApp(ctx context.Context, appAddr string) (*apptypes.Application, error)
GetApp: - Returns the onchain application matching the supplied application address. - Required to fulfill the FullNode interface.
func (*LazyFullNode) GetCurrentBlockHeight ¶
func (lfn *LazyFullNode) GetCurrentBlockHeight(ctx context.Context) (int64, error)
GetCurrentBlockHeight: - Returns the current block height from the blockchain. - Used for session validation and grace period calculations.
func (*LazyFullNode) GetSession ¶
func (lfn *LazyFullNode) GetSession( ctx context.Context, serviceID protocol.ServiceID, appAddr string, ) (sessiontypes.Session, error)
GetSession: - Uses the Shannon SDK to fetch a session for the (serviceID, appAddr) combination. - Required to fulfill the FullNode interface.
func (*LazyFullNode) GetSessionWithExtendedValidity ¶
func (lfn *LazyFullNode) GetSessionWithExtendedValidity( ctx context.Context, serviceID protocol.ServiceID, appAddr string, ) (sessiontypes.Session, error)
GetSessionWithExtendedValidity: - Returns the appropriate session considering grace period logic. - If within grace period of a session rollover, it may return the previous session.
func (*LazyFullNode) GetSharedParams ¶
func (lfn *LazyFullNode) GetSharedParams(ctx context.Context) (*sharedtypes.Params, error)
GetSharedParams: - Returns the shared module parameters from the blockchain. - Used to get grace period and session configuration.
func (*LazyFullNode) IsHealthy ¶
func (lfn *LazyFullNode) IsHealthy() bool
IsHealthy: - Always returns true for a LazyFullNode. - Required to fulfill the FullNode interface.
func (*LazyFullNode) IsInSessionRollover ¶
func (lfn *LazyFullNode) IsInSessionRollover() bool
IsInSessionRollover returns true if we're currently in a session rollover period.
func (*LazyFullNode) ValidateRelayResponse ¶
func (lfn *LazyFullNode) ValidateRelayResponse(supplierAddr sdk.SupplierAddress, responseBz []byte) (*servicetypes.RelayResponse, error)
ValidateRelayResponse:
- Validates the raw response bytes received from an endpoint.
- Uses the SDK and the lazy full node's account client for validation.
- Will make a call to the remote full node to fetch the account public key.
type LoadTestingConfig ¶
type LoadTestingConfig struct {
// The URL to use for sending relays.
// If set:
// 1. Relays are sent to the backend server's URL, and
// 2. The backend server's response is returned as-is (no parsing, signature verification, etc.)
BackendServiceURL *string `yaml:"backend_service_url"`
// The RelayMiner to use for load testing.
// If set:
// 1. Relays are only send to the RelayMiner's URL, and
// 2. The RelayMiner's response is returned after parsing and signature verification.
RelayMinerConfig *LoadTestingRelayMinerConfig `yaml:"relay_miner_config"`
}
Load testing configuration. Used to track Gateway's performance when using "perfect" endpoints. If specified: - Directs all relays to the specified backend service URL - No protocol or fallback endpoint used. - Assumes high throughput backend service (e.g. nginx with a fixed response)
func (*LoadTestingConfig) Validate ¶
func (ltc *LoadTestingConfig) Validate() error
type LoadTestingRelayMinerConfig ¶
type LoadTestingRelayMinerConfig struct {
// Public URL of the RelayMiner to use in load testing mode.
URL string `yaml:"url"`
// The Supplier address to use in relays.
// In load testing mode:
// - The supplier address is fixed.
// - A single RelayMiner will receive all the relays.
SupplierAddr string `yaml:"supplier_addr"`
}
Configuration of a RelayMiner used in load testing.
type Protocol ¶
type Protocol struct {
FullNode
// contains filtered or unexported fields
}
Protocol provides the functionality needed by the gateway package for sending a relay to a specific endpoint.
func NewProtocol ¶
func NewProtocol( ctx context.Context, logger polylog.Logger, config GatewayConfig, fullNode FullNode, ) (*Protocol, error)
NewProtocol instantiates an instance of the Shannon protocol integration.
func (*Protocol) ApplyHTTPObservations ¶
func (p *Protocol) ApplyHTTPObservations(observations *protocolobservations.Observations) error
ApplyHTTPObservations updates protocol instance state based on endpoint observations. Examples: - Mark endpoints as invalid based on response quality - Disqualify endpoints for a time period
Implements gateway.Protocol interface.
func (*Protocol) ApplyWebSocketObservations ¶
func (p *Protocol) ApplyWebSocketObservations(observations *protocolobservations.Observations) error
ApplyWebSocketObservations updates protocol instance state based on endpoint observations. Examples: - Mark endpoints as invalid based on response quality - Disqualify endpoints for a time period
Implements gateway.Protocol interface.
func (*Protocol) AvailableHTTPEndpoints ¶
func (p *Protocol) AvailableHTTPEndpoints( ctx context.Context, serviceID protocol.ServiceID, httpReq *http.Request, ) (protocol.EndpointAddrList, protocolobservations.Observations, error)
AvailableHTTPEndpoints returns the available endpoints for a given service ID.
- Provides the list of endpoints that can serve the specified service ID. - Returns a list of valid endpoint addresses, protocol observations, and any error encountered.
Usage:
- In Delegated mode, httpReq must contain the appropriate headers for app selection.
- In Centralized mode, httpReq may be nil.
Returns:
- protocol.EndpointAddrList: the discovered endpoints for the service.
- protocolobservations.Observations: contextual observations (e.g., error context).
- error: if any error occurs during endpoint discovery or validation.
func (*Protocol) AvailableWebsocketEndpoints ¶
func (p *Protocol) AvailableWebsocketEndpoints( ctx context.Context, serviceID protocol.ServiceID, httpReq *http.Request, ) (protocol.EndpointAddrList, protocolobservations.Observations, error)
AvailableWebsocketEndpoints returns the available endpoints for a given service ID.
- Provides the list of endpoints that can serve the specified service ID. - Returns a list of valid endpoint addresses, protocol observations, and any error encountered.
Usage:
- In Delegated mode, httpReq must contain the appropriate headers for app selection.
- In Centralized mode, httpReq may be nil.
Returns:
- protocol.EndpointAddrList: the discovered endpoints for the service.
- protocolobservations.Observations: contextual observations (e.g., error context).
- error: if any error occurs during endpoint discovery or validation.
func (*Protocol) BuildHTTPRequestContextForEndpoint ¶
func (p *Protocol) BuildHTTPRequestContextForEndpoint( ctx context.Context, serviceID protocol.ServiceID, selectedEndpointAddr protocol.EndpointAddr, httpReq *http.Request, ) (gateway.ProtocolRequestContext, protocolobservations.Observations, error)
BuildHTTPRequestContextForEndpoint creates a new protocol request context for a specified service and endpoint.
Parameters:
- ctx: Context for cancellation, deadlines, and logging.
- serviceID: The unique identifier of the target service.
- selectedEndpointAddr: The address of the endpoint to use for the request.
- httpReq: ONLY used in Delegated mode to extract the selected app from headers.
- TODO_TECHDEBT: Decouple context building for different gateway modes.
Behavior:
- Retrieves active sessions for the given service ID from the full node.
- Retrieves unique endpoints available across all active sessions
- Filtering out sanctioned endpoints from list of unique endpoints.
- Obtains the relay request signer appropriate for the current gateway mode.
- Returns a fully initialized request context for use in downstream protocol operations.
- On failure, logs the error, returns a context setup observation, and a non-nil error.
Implements the gateway.Protocol interface.
func (*Protocol) BuildWebsocketRequestContextForEndpoint ¶
func (p *Protocol) BuildWebsocketRequestContextForEndpoint( ctx context.Context, serviceID protocol.ServiceID, selectedEndpointAddr protocol.EndpointAddr, websocketMessageProcessor websockets.WebsocketMessageProcessor, httpReq *http.Request, httpResponseWriter http.ResponseWriter, messageObservationsChan chan *observation.RequestResponseObservations, ) (gateway.ProtocolRequestContextWebsocket, <-chan *protocolobservations.Observations, error)
BuildWebsocketRequestContextForEndpoint creates a new Websocket protocol request context for a specified service and endpoint. This method immediately establishes the Websocket connection and starts the bridge.
Parameters:
- ctx: Context for cancellation, deadlines, and logging.
- serviceID: The unique identifier of the target service.
- selectedEndpointAddr: The address of the endpoint to use for the request.
- httpReq: HTTP request used for Websocket upgrade and delegated mode app extraction.
- httpResponseWriter: HTTP response writer for Websocket upgrade.
- messageObservationsChan: Channel for sending message-level observations to the gateway.
func (*Protocol) CheckWebsocketConnection ¶
func (p *Protocol) CheckWebsocketConnection( ctx context.Context, serviceID protocol.ServiceID, selectedEndpointAddr protocol.EndpointAddr, ) *protocolobservations.Observations
CheckWebsocketConnection checks if the websocket connection to the endpoint is established. This method is used by the websocket hydrator to check if the endpoint supports websocket RPC type. It uses a simplified version of the websocket bridge connection process to avoid unnecessary overhead.
func (*Protocol) ConfiguredServiceIDs ¶
ConfiguredServiceIDs returns the list of all all service IDs that are configured to be supported by the Gateway.
func (*Protocol) GetTotalServiceEndpointsCount ¶
func (p *Protocol) GetTotalServiceEndpointsCount(serviceID protocol.ServiceID, httpReq *http.Request) (int, error)
GetTotalServiceEndpointsCount returns the count of all unique endpoints for a service ID without filtering sanctioned endpoints.
func (*Protocol) HydrateDisqualifiedEndpointsResponse ¶
func (p *Protocol) HydrateDisqualifiedEndpointsResponse(serviceID protocol.ServiceID, details *devtools.DisqualifiedEndpointResponse)
HydrateDisqualifiedEndpointsResponse hydrates the disqualified endpoint response with the protocol-specific data.
- takes a pointer to the DisqualifiedEndpointResponse
- called by the devtools.DisqualifiedEndpointReporter to fill it with the protocol-specific data.
func (*Protocol) SupportedGatewayModes ¶
func (p *Protocol) SupportedGatewayModes() []protocol.GatewayMode
TODO_DOCUMENT(@adshmh): Convert the following notion doc into a proper README.
Gateway Mode defines the behavior of a specific mode of operation of PATH. See the following link for more details on PATH's different modes of operation. https://www.notion.so/buildwithgrove/Different-Modes-of-Operation-PATH-LocalNet-Discussions-122a36edfff6805e9090c9a14f72f3b5
SupportedGatewayModes returns the list of gateway modes supported by the Shannon protocol integration. Implements the gateway.Protocol interface.
type RelayRequestSigner ¶
type RelayRequestSigner interface {
SignRelayRequest(req *servicetypes.RelayRequest, app apptypes.Application) (*servicetypes.RelayRequest, error)
}
RelayRequestSigner:
- Used by requestContext to sign relay requests
- Takes an unsigned relay request and an application
- Returns a relay request signed by the gateway (with delegation from the app)
- In future Permissionless Gateway Mode, may use the app's own private key for signing
type SanctionConfig ¶ added in v1.0.5
type SanctionConfig struct {
// SessionSanctionDuration is the TTL for session-based sanctions.
// Endpoints with session sanctions will be excluded from selection for this duration.
// Default: 1 hour
SessionSanctionDuration time.Duration `yaml:"session_sanction_duration"`
// CacheCleanupInterval is the interval for purging expired sanction entries from the cache.
// Default: 10 minutes
CacheCleanupInterval time.Duration `yaml:"cache_cleanup_interval"`
}
SanctionConfig holds configurable parameters for the endpoint sanction system. All fields are optional and will use sensible defaults if not specified.
func (*SanctionConfig) HydrateDefaults ¶ added in v1.0.5
func (sc *SanctionConfig) HydrateDefaults() SanctionConfig
HydrateDefaults applies default values to SanctionConfig
type ServiceFallback ¶
type ServiceFallback struct {
ServiceID protocol.ServiceID `yaml:"service_id"`
FallbackEndpoints []map[string]string `yaml:"fallback_endpoints"`
// If true, all traffic will be sent to the fallback endpoints for the service,
// regardless of the health of the protocol endpoints.
SendAllTraffic bool `yaml:"send_all_traffic"`
}
TODO_TECHDEBT(@adshmh): Make configuration and implementation explicit: - Criteria to decide whether the "fallback" URL should be used at all. - Criteria to decide the order in which a Shannon endpoint vs. a fallback URL should be used. - Support "weighted" distribution to Shannon endpoints vs. "fallback" URLs.
ServiceFallback is a configuration struct for specifying fallback endpoints for a service.
Source Files
¶
- apps.go
- config.go
- context.go
- endpoint.go
- errors.go
- full_node.go
- fullnode_account_fetcher.go
- fullnode_cache.go
- fullnode_lazy.go
- fullnode_session_rollover.go
- gateway_mode.go
- log.go
- mode_centralized.go
- mode_delegated.go
- observation.go
- observation_websocket.go
- protocol.go
- reputation.go
- sanction.go
- sanctioned_endpoints_store.go
- sanctions.go
- session.go
- signer.go
- websocket_context.go