execution

package
v1.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2025 License: GPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SubNewPendingTransactions subscribes to new pending transactions.
	SubNewPendingTransactions SubscriptionType = "newPendingTransactions"
	// RPCMethodTxpoolContent is the RPC method for getting the content of the transaction pool.
	RPCMethodTxpoolContent = "txpool_content"
	// RPCMethodPendingTransactions is the RPC method for getting pending transactions.
	RPCMethodPendingTransactions = "eth_pendingTransactions"
	// RPCMethodGetTransactionByHash is the RPC method for getting a transaction by its hash.
	RPCMethodGetTransactionByHash = "eth_getTransactionByHash"
	// RPCMethodDebugStateSize is the RPC method for getting the execution layer state size.
	RPCMethodDebugStateSize = "debug_stateSize"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

ExecutionClient represents a unified execution client with both WebSocket and RPC capabilities.

func NewClient

func NewClient(ctx context.Context, log logrus.FieldLogger, config *Config) (*Client, error)

NewClient creates a new unified execution client.

func (*Client) BatchCallContext

func (c *Client) BatchCallContext(ctx context.Context, method string, params []interface{}) ([]json.RawMessage, error)

BatchCallContext performs a batch JSON-RPC call for multiple transactions.

func (*Client) BatchGetTransactionsByHash

func (c *Client) BatchGetTransactionsByHash(ctx context.Context, hashes []string) ([]json.RawMessage, error)

BatchGetTransactionsByHash retrieves transactions by their hashes.

func (*Client) CallContext

func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error

CallContext calls an RPC method with the given context.

func (*Client) DebugStateSize added in v1.6.0

func (c *Client) DebugStateSize(ctx context.Context, blockNumber string) (*DebugStateSizeResponse, error)

DebugStateSize retrieves the state size from the execution client. blockNumber can be "latest", "earliest", "pending", or a specific block number (e.g., "0x1234"). If empty, defaults to "latest".

func (*Client) GetClientInfo

func (c *Client) GetClientInfo(ctx context.Context, version *string) error

GetClientInfo gets the client version info.

func (*Client) GetClientMetadata added in v1.6.0

func (c *Client) GetClientMetadata() ClientMetadata

GetClientMetadata returns the cached execution client metadata. Returns empty strings if metadata hasn't been initialized yet.

func (*Client) GetPendingTransactions

func (c *Client) GetPendingTransactions(ctx context.Context) ([]json.RawMessage, error)

GetPendingTransactions retrieves pending transactions.

func (*Client) GetRPCClient

func (c *Client) GetRPCClient() *rpc.Client

GetRPCClient provides access to the RPC client directly.

func (*Client) GetSender

func (c *Client) GetSender(tx *types.Transaction) (common.Address, error)

GetSender retrieves the sender of a transaction.

func (*Client) GetSigner

func (c *Client) GetSigner() types.Signer

GetSigner returns the signer.

func (*Client) GetTxpoolContent

func (c *Client) GetTxpoolContent(ctx context.Context) (json.RawMessage, error)

GetTxpoolContent retrieves the full transaction pool content.

func (*Client) GetWebSocketClient

func (c *Client) GetWebSocketClient() *rpc.Client

GetWebSocketClient provides access to the WebSocket client directly.

func (*Client) InitSigner

func (c *Client) InitSigner(ctx context.Context)

InitSigner initialises the transaction signer. This is used to determine mempool tx senders.

func (*Client) Start

func (c *Client) Start(ctx context.Context) error

Start starts the execution client.

func (*Client) Stop

func (c *Client) Stop(ctx context.Context) error

Stop stops the execution client.

func (*Client) SubscribeToNewHeads added in v1.6.0

func (c *Client) SubscribeToNewHeads(ctx context.Context) (headerChan <-chan *types.Header, errChan <-chan error, err error)

SubscribeToNewHeads subscribes to new block header notifications via WebSocket. Returns a channel for receiving block headers and a channel for errors. Requires WebSocket to be enabled in the client configuration.

func (*Client) SubscribeToNewPendingTxs

func (c *Client) SubscribeToNewPendingTxs(ctx context.Context) (<-chan string, <-chan error, error)

SubscribeToNewPendingTxs subscribes to new pending transaction notifications

type ClientMetadata added in v1.6.0

type ClientMetadata struct {
	Implementation string
	Version        string
	VersionMajor   string
	VersionMinor   string
	VersionPatch   string
	Initialized    bool
}

ClientMetadata contains parsed execution client version information.

type ClientProvider

type ClientProvider interface {
	// GetTxpoolContent retrieves the full transaction pool content.
	GetTxpoolContent(ctx context.Context) (json.RawMessage, error)

	// GetPendingTransactions retrieves pending transactions.
	GetPendingTransactions(ctx context.Context) ([]json.RawMessage, error)

	// BatchGetTransactionsByHash retrieves transactions by their hashes.
	BatchGetTransactionsByHash(ctx context.Context, hashes []string) ([]json.RawMessage, error)

	// SubscribeToNewPendingTxs subscribes to new pending transaction notifications.
	SubscribeToNewPendingTxs(ctx context.Context) (<-chan string, <-chan error, error)

	// CallContext performs a JSON-RPC call with the given arguments.
	CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
}

ClientProvider defines the interface for unified execution client (WS and RPC) operations.

type Config

type Config struct {
	// Enabled is whether the execution client is enabled.
	Enabled bool `yaml:"enabled" default:"false"`
	// EthPendingTxsEnabled is whether we suppliment fetching tx's using eth_pendingTransactions periodically.
	EthPendingTxsEnabled bool `yaml:"ethPendingTxsEnabled" default:"false"`
	// TxPoolContentEnabled is whether we suppliment fetching tx's using txpool_content periodically.
	TxPoolContentEnabled bool `yaml:"txPoolContentEnabled" default:"true"`
	// WebsocketEnabled is whether the websocket is enabled.
	WebsocketEnabled bool `yaml:"websocketEnabled" default:"false"`
	// WSAddress is the WebSocket address of the execution client for subscriptions.
	WSAddress string `yaml:"wsAddress"`
	// RPCAddress is the RPC address of the execution client for txpool_content calls.
	RPCAddress string `yaml:"rpcAddress"`
	// Headers is a map of headers to send to the execution client.
	Headers map[string]string `yaml:"headers"`
	// FetchInterval is how often to fetch txpool_content (in seconds).
	FetchInterval int `yaml:"fetchInterval" default:"15"`
	// PruneDuration is how long to keep pending transactions in memory before pruning (in seconds).
	PruneDuration int `yaml:"pruneDuration" default:"300"`
	// ProcessorWorkerCount is the number of worker goroutines for processing transactions.
	ProcessorWorkerCount int `yaml:"processorWorkerCount" default:"50"`
	// RpcBatchSize is the number of transactions to include in a single RPC batch call.
	RpcBatchSize int `yaml:"rpcBatchSize" default:"40"`
	// QueueSize is the size of the transaction processing queue.
	QueueSize int `yaml:"queueSize" default:"5000"`
	// ProcessingInterval is the interval at which to process batches of transactions (in milliseconds).
	ProcessingInterval int `yaml:"processingInterval" default:"500"`
	// MaxConcurrency is the maximum number of concurrent batch RPC requests.
	MaxConcurrency int `yaml:"maxConcurrency" default:"5"`
	// CircuitBreakerFailureThreshold is the number of consecutive failures before opening the circuit breaker.
	CircuitBreakerFailureThreshold int `yaml:"circuitBreakerFailureThreshold" default:"5"`
	// CircuitBreakerResetTimeout is the time to wait before transitioning from open to half-open (in seconds).
	CircuitBreakerResetTimeout int `yaml:"circuitBreakerResetTimeout" default:"30"`
	// StateSize is the configuration for state size monitoring.
	StateSize *StateSizeConfig `yaml:"stateSize"`
}

Config defines configuration for connecting to an execution client.

type DebugStateSizeResponse added in v1.6.0

type DebugStateSizeResponse struct {
	AccountBytes         string `json:"accountBytes"`
	AccountTrienodeBytes string `json:"accountTrienodeBytes"`
	AccountTrienodes     string `json:"accountTrienodes"`
	Accounts             string `json:"accounts"`
	BlockNumber          string `json:"blockNumber"`
	ContractCodeBytes    string `json:"contractCodeBytes"`
	ContractCodes        string `json:"contractCodes"`
	StateRoot            string `json:"stateRoot"`
	StorageBytes         string `json:"storageBytes"`
	StorageTrienodeBytes string `json:"storageTrienodeBytes"`
	StorageTrienodes     string `json:"storageTrienodes"`
	Storages             string `json:"storages"`
}

DebugStateSizeResponse represents the response from debug_stateSize RPC call.

type EventCallback

type EventCallback func(ctx context.Context, event interface{}) error

EventCallback is a generic callback function for subscription events.

type MempoolWatcher

type MempoolWatcher struct {
	// contains filtered or unexported fields
}

MempoolWatcher captures and processes pending transactions from the Ethereum execution client. It uses three complementary methods to ensure comprehensive transaction coverage: 1. Websocket subscription to newPendingTransactions for real-time notification 2. Periodic polling of txpool_content for full transaction details 3. Periodic polling of eth_pendingTransactions as a fallback/supplementary source

Transactions flow through the system as follows: - Detected via any of the three sources above - Added to pendingTxs map (temporary storage until processed) - Queued for processing via txQueue - Processed by worker goroutines - Marked as processed in txQueue.processed map to prevent reprocessing - Removed from pendingTxs map

The pendingTxs map functions as a temporary workspace for in-flight processing, not as a permanent mirror of the mempool state.

func NewMempoolWatcher

func NewMempoolWatcher(
	client ClientProvider,
	log logrus.FieldLogger,
	config *Config,
	processTxCallback func(context.Context, *PendingTxRecord, json.RawMessage) error,
	metrics *Metrics,
) *MempoolWatcher

NewMempoolWatcher creates a new MempoolWatcher with configured circuit breakers for resilient RPC operations

func (*MempoolWatcher) Start

func (w *MempoolWatcher) Start(ctx context.Context) error

Start initializes the watcher's context and launches all background goroutines for transaction discovery and processing.

func (*MempoolWatcher) Stop

func (w *MempoolWatcher) Stop()

Stop cleanly shuts down the watcher, canceling the WebSocket subscription and waiting for all goroutines to complete.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(namespace, networkID string) *Metrics

func (*Metrics) AddMempoolTxExpired

func (m *Metrics) AddMempoolTxExpired(count int)

AddMempoolTxExpired increments the counter for expired mempool transactions.

func (*Metrics) AddMempoolTxProcessed

func (m *Metrics) AddMempoolTxProcessed(count int)

AddMempoolTxProcessed increments the counter for processed mempool transactions.

func (*Metrics) AddMempoolTxReceived

func (m *Metrics) AddMempoolTxReceived(count int)

AddMempoolTxReceived increments the counter for received mempool transactions.

func (*Metrics) AddQueueRejections

func (m *Metrics) AddQueueRejections(count int)

AddQueueRejections increments the counter for queue rejections.

func (*Metrics) AddQueueThroughput

func (m *Metrics) AddQueueThroughput(count int)

AddQueueThroughput increments the counter for queue throughput.

func (*Metrics) AddTxBySource

func (m *Metrics) AddTxBySource(source string, count int)

AddTxBySource increments the counter for transactions by source.

func (*Metrics) AddTxProcessingOutcome

func (m *Metrics) AddTxProcessingOutcome(outcome string, count int)

AddTxProcessingOutcome increments the counter for transaction processing outcomes.

func (*Metrics) ObserveBatchProcessingDuration

func (m *Metrics) ObserveBatchProcessingDuration(duration float64)

ObserveBatchProcessingDuration observes the duration of batch processing.

func (*Metrics) ObserveRPCRequestDuration

func (m *Metrics) ObserveRPCRequestDuration(method string, duration float64)

ObserveRPCRequestDuration observes the duration of RPC requests.

func (*Metrics) SetCircuitBreakerState

func (m *Metrics) SetCircuitBreakerState(breakerName, state string)

SetCircuitBreakerState updates the circuit breaker state gauge

func (*Metrics) SetMempoolTxPending

func (m *Metrics) SetMempoolTxPending(count int)

SetMempoolTxPending sets the gauge for pending mempool transactions.

func (*Metrics) SetProcessedCacheSize

func (m *Metrics) SetProcessedCacheSize(size int)

SetProcessedCacheSize sets the gauge for processed cache size.

func (*Metrics) SetQueueSize

func (m *Metrics) SetQueueSize(size int)

SetQueueSize sets the queue size gauge.

func (*Metrics) SetWebsocketConnected

func (m *Metrics) SetWebsocketConnected(connected bool)

SetWebsocketConnected sets the gauge for websocket connection.

type PendingTxRecord

type PendingTxRecord struct {
	Hash             string
	FirstSeen        time.Time
	Attempts         int
	TxData           json.RawMessage // Raw tx data (when available).
	Source           string          // Eg: "websocket", "txpool_content", or "eth_pendingTransactions".
	MarkedForPruning bool
}

PendingTxRecord represents a transaction hash and when it was first seen.

type StateSizeConfig added in v1.6.0

type StateSizeConfig struct {
	// Enabled is whether state size monitoring is enabled.
	Enabled bool `yaml:"enabled" default:"false"`
	// TriggerMode determines how state size polling is triggered.
	// Valid values: "head" (on consensus head events), "block" (on execution block events), "interval" (periodic polling).
	TriggerMode string `yaml:"triggerMode" default:"head"`
	// IntervalSeconds is the polling interval in seconds (used when TriggerMode is "interval").
	IntervalSeconds int `yaml:"intervalSeconds" default:"12"`
}

StateSizeConfig defines configuration for state size monitoring.

type StateSizeWatcher added in v1.6.0

type StateSizeWatcher struct {
	// contains filtered or unexported fields
}

StateSizeWatcher polls the execution client's debug_stateSize endpoint to collect state size metrics. It supports three trigger modes: - "head": Triggered by consensus layer head events - "block": Triggered by execution layer block events (requires WebSocket) - "interval": Periodic polling at a configured interval

func NewStateSizeWatcher added in v1.6.0

func NewStateSizeWatcher(
	client *Client,
	log logrus.FieldLogger,
	config *StateSizeConfig,
	callback func(context.Context, *DebugStateSizeResponse) error,
) *StateSizeWatcher

NewStateSizeWatcher creates a new StateSizeWatcher instance.

func (*StateSizeWatcher) OnHeadEvent added in v1.6.0

func (w *StateSizeWatcher) OnHeadEvent(ctx context.Context) error

OnHeadEvent should be called when a new head event is received from the consensus layer. This is only used when TriggerMode is "head".

func (*StateSizeWatcher) Start added in v1.6.0

func (w *StateSizeWatcher) Start(parentCtx context.Context) error

Start initializes the watcher's context and launches background goroutines based on the configured trigger mode.

func (*StateSizeWatcher) Stop added in v1.6.0

func (w *StateSizeWatcher) Stop() error

Stop gracefully shuts down the watcher and waits for all goroutines to complete.

type SubscriptionType

type SubscriptionType string

SubscriptionType represents a type of subscription to the execution client.

type TransactionCallback

type TransactionCallback func(ctx context.Context, tx string) error

TransactionCallback is a callback function for when a transaction is received.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL