Documentation
¶
Index ¶
- Constants
- type Client
- func (c *Client) BatchCallContext(ctx context.Context, method string, params []interface{}) ([]json.RawMessage, error)
- func (c *Client) BatchGetTransactionsByHash(ctx context.Context, hashes []string) ([]json.RawMessage, error)
- func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
- func (c *Client) DebugStateSize(ctx context.Context, blockNumber string) (*DebugStateSizeResponse, error)
- func (c *Client) GetClientInfo(ctx context.Context, version *string) error
- func (c *Client) GetClientMetadata() ClientMetadata
- func (c *Client) GetPendingTransactions(ctx context.Context) ([]json.RawMessage, error)
- func (c *Client) GetRPCClient() *rpc.Client
- func (c *Client) GetSender(tx *types.Transaction) (common.Address, error)
- func (c *Client) GetSigner() types.Signer
- func (c *Client) GetTxpoolContent(ctx context.Context) (json.RawMessage, error)
- func (c *Client) GetWebSocketClient() *rpc.Client
- func (c *Client) InitSigner(ctx context.Context)
- func (c *Client) Start(ctx context.Context) error
- func (c *Client) Stop(ctx context.Context) error
- func (c *Client) SubscribeToNewHeads(ctx context.Context) (headerChan <-chan *types.Header, errChan <-chan error, err error)
- func (c *Client) SubscribeToNewPendingTxs(ctx context.Context) (<-chan string, <-chan error, error)
- type ClientMetadata
- type ClientProvider
- type Config
- type DebugStateSizeResponse
- type EventCallback
- type MempoolWatcher
- type Metrics
- func (m *Metrics) AddMempoolTxExpired(count int)
- func (m *Metrics) AddMempoolTxProcessed(count int)
- func (m *Metrics) AddMempoolTxReceived(count int)
- func (m *Metrics) AddQueueRejections(count int)
- func (m *Metrics) AddQueueThroughput(count int)
- func (m *Metrics) AddTxBySource(source string, count int)
- func (m *Metrics) AddTxProcessingOutcome(outcome string, count int)
- func (m *Metrics) ObserveBatchProcessingDuration(duration float64)
- func (m *Metrics) ObserveRPCRequestDuration(method string, duration float64)
- func (m *Metrics) SetCircuitBreakerState(breakerName, state string)
- func (m *Metrics) SetMempoolTxPending(count int)
- func (m *Metrics) SetProcessedCacheSize(size int)
- func (m *Metrics) SetQueueSize(size int)
- func (m *Metrics) SetWebsocketConnected(connected bool)
- type PendingTxRecord
- type StateSizeConfig
- type StateSizeWatcher
- type SubscriptionType
- type TransactionCallback
Constants ¶
const ( // SubNewPendingTransactions subscribes to new pending transactions. SubNewPendingTransactions SubscriptionType = "newPendingTransactions" // RPCMethodTxpoolContent is the RPC method for getting the content of the transaction pool. RPCMethodTxpoolContent = "txpool_content" // RPCMethodPendingTransactions is the RPC method for getting pending transactions. RPCMethodPendingTransactions = "eth_pendingTransactions" // RPCMethodGetTransactionByHash is the RPC method for getting a transaction by its hash. RPCMethodGetTransactionByHash = "eth_getTransactionByHash" // RPCMethodDebugStateSize is the RPC method for getting the execution layer state size. RPCMethodDebugStateSize = "debug_stateSize" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
ExecutionClient represents a unified execution client with both WebSocket and RPC capabilities.
func (*Client) BatchCallContext ¶
func (c *Client) BatchCallContext(ctx context.Context, method string, params []interface{}) ([]json.RawMessage, error)
BatchCallContext performs a batch JSON-RPC call for multiple transactions.
func (*Client) BatchGetTransactionsByHash ¶
func (c *Client) BatchGetTransactionsByHash(ctx context.Context, hashes []string) ([]json.RawMessage, error)
BatchGetTransactionsByHash retrieves transactions by their hashes.
func (*Client) CallContext ¶
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
CallContext calls an RPC method with the given context.
func (*Client) DebugStateSize ¶ added in v1.6.0
func (c *Client) DebugStateSize(ctx context.Context, blockNumber string) (*DebugStateSizeResponse, error)
DebugStateSize retrieves the state size from the execution client. blockNumber can be "latest", "earliest", "pending", or a specific block number (e.g., "0x1234"). If empty, defaults to "latest".
func (*Client) GetClientInfo ¶
GetClientInfo gets the client version info.
func (*Client) GetClientMetadata ¶ added in v1.6.0
func (c *Client) GetClientMetadata() ClientMetadata
GetClientMetadata returns the cached execution client metadata. Returns empty strings if metadata hasn't been initialized yet.
func (*Client) GetPendingTransactions ¶
GetPendingTransactions retrieves pending transactions.
func (*Client) GetRPCClient ¶
GetRPCClient provides access to the RPC client directly.
func (*Client) GetTxpoolContent ¶
GetTxpoolContent retrieves the full transaction pool content.
func (*Client) GetWebSocketClient ¶
GetWebSocketClient provides access to the WebSocket client directly.
func (*Client) InitSigner ¶
InitSigner initialises the transaction signer. This is used to determine mempool tx senders.
func (*Client) SubscribeToNewHeads ¶ added in v1.6.0
func (c *Client) SubscribeToNewHeads(ctx context.Context) (headerChan <-chan *types.Header, errChan <-chan error, err error)
SubscribeToNewHeads subscribes to new block header notifications via WebSocket. Returns a channel for receiving block headers and a channel for errors. Requires WebSocket to be enabled in the client configuration.
type ClientMetadata ¶ added in v1.6.0
type ClientMetadata struct {
Implementation string
Version string
VersionMajor string
VersionMinor string
VersionPatch string
Initialized bool
}
ClientMetadata contains parsed execution client version information.
type ClientProvider ¶
type ClientProvider interface {
// GetTxpoolContent retrieves the full transaction pool content.
GetTxpoolContent(ctx context.Context) (json.RawMessage, error)
// GetPendingTransactions retrieves pending transactions.
GetPendingTransactions(ctx context.Context) ([]json.RawMessage, error)
// BatchGetTransactionsByHash retrieves transactions by their hashes.
BatchGetTransactionsByHash(ctx context.Context, hashes []string) ([]json.RawMessage, error)
// SubscribeToNewPendingTxs subscribes to new pending transaction notifications.
SubscribeToNewPendingTxs(ctx context.Context) (<-chan string, <-chan error, error)
// CallContext performs a JSON-RPC call with the given arguments.
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
}
ClientProvider defines the interface for unified execution client (WS and RPC) operations.
type Config ¶
type Config struct {
// Enabled is whether the execution client is enabled.
Enabled bool `yaml:"enabled" default:"false"`
// EthPendingTxsEnabled is whether we suppliment fetching tx's using eth_pendingTransactions periodically.
EthPendingTxsEnabled bool `yaml:"ethPendingTxsEnabled" default:"false"`
// TxPoolContentEnabled is whether we suppliment fetching tx's using txpool_content periodically.
TxPoolContentEnabled bool `yaml:"txPoolContentEnabled" default:"true"`
// WebsocketEnabled is whether the websocket is enabled.
WebsocketEnabled bool `yaml:"websocketEnabled" default:"false"`
// WSAddress is the WebSocket address of the execution client for subscriptions.
WSAddress string `yaml:"wsAddress"`
// RPCAddress is the RPC address of the execution client for txpool_content calls.
RPCAddress string `yaml:"rpcAddress"`
// Headers is a map of headers to send to the execution client.
Headers map[string]string `yaml:"headers"`
// FetchInterval is how often to fetch txpool_content (in seconds).
FetchInterval int `yaml:"fetchInterval" default:"15"`
// PruneDuration is how long to keep pending transactions in memory before pruning (in seconds).
PruneDuration int `yaml:"pruneDuration" default:"300"`
// ProcessorWorkerCount is the number of worker goroutines for processing transactions.
ProcessorWorkerCount int `yaml:"processorWorkerCount" default:"50"`
// RpcBatchSize is the number of transactions to include in a single RPC batch call.
RpcBatchSize int `yaml:"rpcBatchSize" default:"40"`
// QueueSize is the size of the transaction processing queue.
QueueSize int `yaml:"queueSize" default:"5000"`
// ProcessingInterval is the interval at which to process batches of transactions (in milliseconds).
ProcessingInterval int `yaml:"processingInterval" default:"500"`
// MaxConcurrency is the maximum number of concurrent batch RPC requests.
MaxConcurrency int `yaml:"maxConcurrency" default:"5"`
// CircuitBreakerFailureThreshold is the number of consecutive failures before opening the circuit breaker.
CircuitBreakerFailureThreshold int `yaml:"circuitBreakerFailureThreshold" default:"5"`
// CircuitBreakerResetTimeout is the time to wait before transitioning from open to half-open (in seconds).
CircuitBreakerResetTimeout int `yaml:"circuitBreakerResetTimeout" default:"30"`
// StateSize is the configuration for state size monitoring.
StateSize *StateSizeConfig `yaml:"stateSize"`
}
Config defines configuration for connecting to an execution client.
type DebugStateSizeResponse ¶ added in v1.6.0
type DebugStateSizeResponse struct {
AccountBytes string `json:"accountBytes"`
AccountTrienodeBytes string `json:"accountTrienodeBytes"`
AccountTrienodes string `json:"accountTrienodes"`
Accounts string `json:"accounts"`
BlockNumber string `json:"blockNumber"`
ContractCodeBytes string `json:"contractCodeBytes"`
ContractCodes string `json:"contractCodes"`
StateRoot string `json:"stateRoot"`
StorageBytes string `json:"storageBytes"`
StorageTrienodeBytes string `json:"storageTrienodeBytes"`
StorageTrienodes string `json:"storageTrienodes"`
Storages string `json:"storages"`
}
DebugStateSizeResponse represents the response from debug_stateSize RPC call.
type EventCallback ¶
EventCallback is a generic callback function for subscription events.
type MempoolWatcher ¶
type MempoolWatcher struct {
// contains filtered or unexported fields
}
MempoolWatcher captures and processes pending transactions from the Ethereum execution client. It uses three complementary methods to ensure comprehensive transaction coverage: 1. Websocket subscription to newPendingTransactions for real-time notification 2. Periodic polling of txpool_content for full transaction details 3. Periodic polling of eth_pendingTransactions as a fallback/supplementary source
Transactions flow through the system as follows: - Detected via any of the three sources above - Added to pendingTxs map (temporary storage until processed) - Queued for processing via txQueue - Processed by worker goroutines - Marked as processed in txQueue.processed map to prevent reprocessing - Removed from pendingTxs map
The pendingTxs map functions as a temporary workspace for in-flight processing, not as a permanent mirror of the mempool state.
func NewMempoolWatcher ¶
func NewMempoolWatcher( client ClientProvider, log logrus.FieldLogger, config *Config, processTxCallback func(context.Context, *PendingTxRecord, json.RawMessage) error, metrics *Metrics, ) *MempoolWatcher
NewMempoolWatcher creates a new MempoolWatcher with configured circuit breakers for resilient RPC operations
func (*MempoolWatcher) Start ¶
func (w *MempoolWatcher) Start(ctx context.Context) error
Start initializes the watcher's context and launches all background goroutines for transaction discovery and processing.
func (*MempoolWatcher) Stop ¶
func (w *MempoolWatcher) Stop()
Stop cleanly shuts down the watcher, canceling the WebSocket subscription and waiting for all goroutines to complete.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func (*Metrics) AddMempoolTxExpired ¶
AddMempoolTxExpired increments the counter for expired mempool transactions.
func (*Metrics) AddMempoolTxProcessed ¶
AddMempoolTxProcessed increments the counter for processed mempool transactions.
func (*Metrics) AddMempoolTxReceived ¶
AddMempoolTxReceived increments the counter for received mempool transactions.
func (*Metrics) AddQueueRejections ¶
AddQueueRejections increments the counter for queue rejections.
func (*Metrics) AddQueueThroughput ¶
AddQueueThroughput increments the counter for queue throughput.
func (*Metrics) AddTxBySource ¶
AddTxBySource increments the counter for transactions by source.
func (*Metrics) AddTxProcessingOutcome ¶
AddTxProcessingOutcome increments the counter for transaction processing outcomes.
func (*Metrics) ObserveBatchProcessingDuration ¶
ObserveBatchProcessingDuration observes the duration of batch processing.
func (*Metrics) ObserveRPCRequestDuration ¶
ObserveRPCRequestDuration observes the duration of RPC requests.
func (*Metrics) SetCircuitBreakerState ¶
SetCircuitBreakerState updates the circuit breaker state gauge
func (*Metrics) SetMempoolTxPending ¶
SetMempoolTxPending sets the gauge for pending mempool transactions.
func (*Metrics) SetProcessedCacheSize ¶
SetProcessedCacheSize sets the gauge for processed cache size.
func (*Metrics) SetQueueSize ¶
SetQueueSize sets the queue size gauge.
func (*Metrics) SetWebsocketConnected ¶
SetWebsocketConnected sets the gauge for websocket connection.
type PendingTxRecord ¶
type PendingTxRecord struct {
Hash string
FirstSeen time.Time
Attempts int
TxData json.RawMessage // Raw tx data (when available).
Source string // Eg: "websocket", "txpool_content", or "eth_pendingTransactions".
MarkedForPruning bool
}
PendingTxRecord represents a transaction hash and when it was first seen.
type StateSizeConfig ¶ added in v1.6.0
type StateSizeConfig struct {
// Enabled is whether state size monitoring is enabled.
Enabled bool `yaml:"enabled" default:"false"`
// TriggerMode determines how state size polling is triggered.
// Valid values: "head" (on consensus head events), "block" (on execution block events), "interval" (periodic polling).
TriggerMode string `yaml:"triggerMode" default:"head"`
// IntervalSeconds is the polling interval in seconds (used when TriggerMode is "interval").
IntervalSeconds int `yaml:"intervalSeconds" default:"12"`
}
StateSizeConfig defines configuration for state size monitoring.
type StateSizeWatcher ¶ added in v1.6.0
type StateSizeWatcher struct {
// contains filtered or unexported fields
}
StateSizeWatcher polls the execution client's debug_stateSize endpoint to collect state size metrics. It supports three trigger modes: - "head": Triggered by consensus layer head events - "block": Triggered by execution layer block events (requires WebSocket) - "interval": Periodic polling at a configured interval
func NewStateSizeWatcher ¶ added in v1.6.0
func NewStateSizeWatcher( client *Client, log logrus.FieldLogger, config *StateSizeConfig, callback func(context.Context, *DebugStateSizeResponse) error, ) *StateSizeWatcher
NewStateSizeWatcher creates a new StateSizeWatcher instance.
func (*StateSizeWatcher) OnHeadEvent ¶ added in v1.6.0
func (w *StateSizeWatcher) OnHeadEvent(ctx context.Context) error
OnHeadEvent should be called when a new head event is received from the consensus layer. This is only used when TriggerMode is "head".
func (*StateSizeWatcher) Start ¶ added in v1.6.0
func (w *StateSizeWatcher) Start(parentCtx context.Context) error
Start initializes the watcher's context and launches background goroutines based on the configured trigger mode.
func (*StateSizeWatcher) Stop ¶ added in v1.6.0
func (w *StateSizeWatcher) Stop() error
Stop gracefully shuts down the watcher and waits for all goroutines to complete.
type SubscriptionType ¶
type SubscriptionType string
SubscriptionType represents a type of subscription to the execution client.