Documentation
¶
Index ¶
- Constants
- func ExtractExecutionClientMetadata(event *TraceEvent) (*xatu.ClientMeta_Ethereum_Execution, error)
- func ExtractExecutionClientMetadataFromGetBlobs(event *TraceEvent) (*xatu.ClientMeta_Ethereum_Execution, error)
- func GetGossipTopics(event *TraceEvent) []string
- func GetMsgID(event *TraceEvent) string
- func GetShard(shardingKey string, totalShards uint64) uint64
- func IsShardActive(shard uint64, activeShards []uint64) bool
- func SipHash(key [16]byte, data []byte) uint64
- func TraceEventToAddPeer(event *TraceEvent) (*libp2p.AddPeer, error)
- func TraceEventToConnected(event *TraceEvent) (*libp2p.Connected, error)
- func TraceEventToConsensusEngineAPIGetBlobs(event *TraceEvent) (*xatu.ConsensusEngineAPIGetBlobs, error)
- func TraceEventToConsensusEngineAPINewPayload(event *TraceEvent) (*xatu.ConsensusEngineAPINewPayload, error)
- func TraceEventToCustodyProbe(event *TraceEvent) (*libp2p.DataColumnCustodyProbe, error)
- func TraceEventToDeliverMessage(event *TraceEvent) (*libp2p.DeliverMessage, error)
- func TraceEventToDisconnected(event *TraceEvent) (*libp2p.Disconnected, error)
- func TraceEventToDropRPC(event *TraceEvent) (*libp2p.DropRPC, error)
- func TraceEventToDuplicateMessage(event *TraceEvent) (*libp2p.DuplicateMessage, error)
- func TraceEventToGraft(event *TraceEvent) (*libp2p.Graft, error)
- func TraceEventToHandleMetadata(event *TraceEvent) (*libp2p.HandleMetadata, error)
- func TraceEventToHandleStatus(event *TraceEvent) (*libp2p.HandleStatus, error)
- func TraceEventToJoin(event *TraceEvent) (*libp2p.Join, error)
- func TraceEventToLeave(event *TraceEvent) (*libp2p.Leave, error)
- func TraceEventToPrune(event *TraceEvent) (*libp2p.Prune, error)
- func TraceEventToPublishMessage(event *TraceEvent) (*libp2p.PublishMessage, error)
- func TraceEventToRecvRPC(event *TraceEvent) (*libp2p.RecvRPC, error)
- func TraceEventToRejectMessage(event *TraceEvent) (*libp2p.RejectMessage, error)
- func TraceEventToRemovePeer(event *TraceEvent) (*libp2p.RemovePeer, error)
- func TraceEventToSendRPC(event *TraceEvent) (*libp2p.SendRPC, error)
- func TraceEventToSyntheticHeartbeat(event *TraceEvent) (*libp2p.SyntheticHeartbeat, error)
- type CompiledPattern
- type Config
- type DutiesProvider
- type EventCategorizer
- func (ec *EventCategorizer) GetAllEventsByGroup() map[ShardingGroup][]xatu.Event_Name
- func (ec *EventCategorizer) GetEventInfo(eventType xatu.Event_Name) (*EventInfo, bool)
- func (ec *EventCategorizer) GetGroupAEvents() []xatu.Event_Name
- func (ec *EventCategorizer) GetGroupBEvents() []xatu.Event_Name
- func (ec *EventCategorizer) GetGroupCEvents() []xatu.Event_Name
- func (ec *EventCategorizer) GetGroupDEvents() []xatu.Event_Name
- func (ec *EventCategorizer) GetShardingGroup(eventType xatu.Event_Name) ShardingGroup
- func (ec *EventCategorizer) IsMetaEvent(eventType xatu.Event_Name) bool
- type EventConfig
- type EventInfo
- type FilteredMessageWithIndex
- type MetaProvider
- type MetadataProvider
- type Metrics
- func (m *Metrics) AddDecoratedEvent(count float64, eventType, network string)
- func (m *Metrics) AddEvent(eventType, network string)
- func (m *Metrics) AddProcessedMessage(eventType, network string)
- func (m *Metrics) AddShardingDecision(eventType, reason, network string)
- func (m *Metrics) AddSkippedMessage(eventType, network string)
- type MetricsCollector
- type NoShardingKeyConfig
- type OutputHandler
- type Override
- type Processor
- func (p *Processor) HandleHermesEvent(ctx context.Context, event *TraceEvent) error
- func (p *Processor) ShouldTraceMessage(event *TraceEvent, clientMeta *xatu.ClientMeta, xatuEventType string) bool
- func (p *Processor) ShouldTraceRPCMetaMessages(clientMeta *xatu.ClientMeta, xatuEventType string, messages interface{}) ([]FilteredMessageWithIndex, error)
- type RPCMetaMessageInfo
- type RPCMetaTopicInfo
- type RpcControlGraft
- type RpcControlIHave
- type RpcControlIWant
- type RpcControlIdontWant
- type RpcControlPrune
- type RpcMeta
- type RpcMetaControl
- type RpcMetaMsg
- type RpcMetaSub
- type ShardableEvent
- type ShardingConfig
- type ShardingGroup
- type TopicShardingConfig
- type TraceEvent
- type TraceEventAltairBlock
- type TraceEventAttestation
- type TraceEventAttestationElectra
- type TraceEventAttesterSlashing
- type TraceEventBLSToExecutionChange
- type TraceEventBellatrixBlock
- type TraceEventBlobSidecar
- type TraceEventCapellaBlock
- type TraceEventConsensusEngineAPIGetBlobs
- type TraceEventConsensusEngineAPINewPayload
- type TraceEventCustodyProbe
- type TraceEventDataColumnSidecar
- type TraceEventDenebBlock
- type TraceEventElectraBlock
- type TraceEventFuluBlock
- type TraceEventPayloadMetaData
- type TraceEventPhase0Block
- type TraceEventProposerSlashing
- type TraceEventSignedAggregateAttestationAndProof
- type TraceEventSignedAggregateAttestationAndProofElectra
- type TraceEventSignedContributionAndProof
- type TraceEventSingleAttestation
- type TraceEventSyncCommitteeMessage
- type TraceEventVoluntaryExit
- type UnifiedSharder
Constants ¶
const ( // libp2p pubsub events. TraceEvent_HANDLE_MESSAGE = "HANDLE_MESSAGE" // libp2p core networking events. TraceEvent_CONNECTED = "CONNECTED" TraceEvent_DISCONNECTED = "DISCONNECTED" // RPC events. TraceEvent_HANDLE_METADATA = "HANDLE_METADATA" TraceEvent_HANDLE_STATUS = "HANDLE_STATUS" // Events that are not part of a normal Ethereum node. TraceEvent_SYNTHETIC_HEARTBEAT = "SYNTHETIC_HEARTBEAT" TraceEvent_CUSTODY_PROBE = "CUSTODY_PROBE" TraceEvent_CONSENSUS_ENGINE_API_NEWPAYLOAD = "CONSENSUS_ENGINE_API_NEWPAYLOAD" TraceEvent_CONSENSUS_ENGINE_API_GETBLOBS = "CONSENSUS_ENGINE_API_GETBLOBS" )
Define events not supplied by libp2p proto pkgs.
const (
// DefaultTotalShards is the default number of shards if not specified
DefaultTotalShards = 512
)
Variables ¶
This section is empty.
Functions ¶
func ExtractExecutionClientMetadata ¶ added in v1.6.8
func ExtractExecutionClientMetadata(event *TraceEvent) (*xatu.ClientMeta_Ethereum_Execution, error)
ExtractExecutionClientMetadata extracts and parses execution client metadata from a TraceEvent. The raw ExecutionClientVersion string is parsed into its components. Returns a ClientMeta_Ethereum_Execution proto that can be assigned directly to ClientMeta.Ethereum.Execution.
func ExtractExecutionClientMetadataFromGetBlobs ¶ added in v1.6.9
func ExtractExecutionClientMetadataFromGetBlobs(event *TraceEvent) (*xatu.ClientMeta_Ethereum_Execution, error)
ExtractExecutionClientMetadataFromGetBlobs extracts and parses execution client metadata from a GetBlobs TraceEvent. The raw ExecutionClientVersion string is parsed into its components. Returns a ClientMeta_Ethereum_Execution proto that can be assigned directly to ClientMeta.Ethereum.Execution.
func GetGossipTopics ¶ added in v1.1.19
func GetGossipTopics(event *TraceEvent) []string
GetGossipTopics extracts all gossip topics from a trace event if available. Returns a slice of unique topics found in the event.
func GetMsgID ¶ added in v1.1.19
func GetMsgID(event *TraceEvent) string
GetMsgID extracts the message ID from the event for sharding. We only shard based on message IDs, not peer IDs.
func GetShard ¶ added in v1.1.1
GetShard calculates which shard a message belongs to based on its ID.
This function uses SipHash to consistently map message IDs (typically hashes themselves) to specific shards, ensuring even distribution across the available shards.
Key benefits: - Deterministic: The same message ID always maps to the same shard - Balanced: Messages are evenly distributed across all shards
Parameters:
- shardingKey: The identifier to use for sharding (often a hash like "0x1234...abcd")
- totalShards: The total number of available shards (e.g., 64)
Returns:
- The shard number (0 to totalShards-1) where this message should be processed
func IsShardActive ¶ added in v1.1.1
IsShardActive checks if a shard is in the active shards list.
func SipHash ¶ added in v1.1.1
SipHash implements the SipHash-2-4 algorithm, a fast and efficient hash function designed for message authentication and hash-table lookups.
Key features of SipHash: - Deterministic output for identical inputs - Even distribution of outputs across the range of uint64
When used for sharding (via GetShard), SipHash provides: - Consistent distribution of messages across shards - Deterministic routing where the same message always maps to the same shard
Parameters:
- key: A 16-byte secret key (can be fixed for consistent sharding)
- data: The message bytes to hash
Returns:
- A 64-bit unsigned integer hash value
References:
- Reference implementation: https://github.com/veorq/SipHash
- ClickHouse documentation: https://clickhouse.com/docs/sql-reference/functions/hash-functions#siphash64
func TraceEventToAddPeer ¶ added in v1.6.4
func TraceEventToAddPeer(event *TraceEvent) (*libp2p.AddPeer, error)
Helper function to convert a Hermes TraceEvent to a libp2p AddPeer
func TraceEventToConnected ¶ added in v1.6.4
func TraceEventToConnected(event *TraceEvent) (*libp2p.Connected, error)
func TraceEventToConsensusEngineAPIGetBlobs ¶ added in v1.6.9
func TraceEventToConsensusEngineAPIGetBlobs(event *TraceEvent) (*xatu.ConsensusEngineAPIGetBlobs, error)
TraceEventToConsensusEngineAPIGetBlobs converts a TraceEvent to a ConsensusEngineAPIGetBlobs protobuf message. Supports typed *TraceEventConsensusEngineAPIGetBlobs payloads from direct callers.
func TraceEventToConsensusEngineAPINewPayload ¶ added in v1.6.7
func TraceEventToConsensusEngineAPINewPayload(event *TraceEvent) (*xatu.ConsensusEngineAPINewPayload, error)
TraceEventToConsensusEngineAPINewPayload converts a TraceEvent to a ConsensusEngineAPINewPayload protobuf message. Supports typed *TraceEventConsensusEngineAPINewPayload payloads from direct callers.
func TraceEventToCustodyProbe ¶ added in v1.6.4
func TraceEventToCustodyProbe(event *TraceEvent) (*libp2p.DataColumnCustodyProbe, error)
TraceEventToCustodyProbe converts a Hermes TraceEvent to a DataColumnCustodyProbe protobuf message. Supports both typed *TraceEventCustodyProbe payloads (from direct callers like tysm) and map[string]any payloads (for backwards compatibility with Hermes-style events).
func TraceEventToDeliverMessage ¶ added in v1.6.4
func TraceEventToDeliverMessage(event *TraceEvent) (*libp2p.DeliverMessage, error)
Helper function to convert a Hermes TraceEvent to a libp2p DeliverMessage.
func TraceEventToDisconnected ¶ added in v1.6.4
func TraceEventToDisconnected(event *TraceEvent) (*libp2p.Disconnected, error)
func TraceEventToDropRPC ¶ added in v1.6.4
func TraceEventToDropRPC(event *TraceEvent) (*libp2p.DropRPC, error)
Helper function to convert a Hermes TraceEvent to a libp2p DropRPC.
func TraceEventToDuplicateMessage ¶ added in v1.6.4
func TraceEventToDuplicateMessage(event *TraceEvent) (*libp2p.DuplicateMessage, error)
Helper function to convert a Hermes TraceEvent to a libp2p DuplicateMessage.
func TraceEventToGraft ¶ added in v1.6.4
func TraceEventToGraft(event *TraceEvent) (*libp2p.Graft, error)
Helper function to convert a Hermes TraceEvent to a libp2p Graft.
func TraceEventToHandleMetadata ¶ added in v1.6.4
func TraceEventToHandleMetadata(event *TraceEvent) (*libp2p.HandleMetadata, error)
func TraceEventToHandleStatus ¶ added in v1.6.4
func TraceEventToHandleStatus(event *TraceEvent) (*libp2p.HandleStatus, error)
func TraceEventToJoin ¶ added in v1.6.4
func TraceEventToJoin(event *TraceEvent) (*libp2p.Join, error)
Helper function to convert a Hermes TraceEvent to a libp2p Join
func TraceEventToLeave ¶ added in v1.6.4
func TraceEventToLeave(event *TraceEvent) (*libp2p.Leave, error)
Helper function to convert a Hermes TraceEvent to a libp2p Leave
func TraceEventToPrune ¶ added in v1.6.4
func TraceEventToPrune(event *TraceEvent) (*libp2p.Prune, error)
Helper function to convert a Hermes TraceEvent to a libp2p Prune.
func TraceEventToPublishMessage ¶ added in v1.6.4
func TraceEventToPublishMessage(event *TraceEvent) (*libp2p.PublishMessage, error)
Helper function to convert a Hermes TraceEvent to a libp2p PublishMessage.
func TraceEventToRecvRPC ¶ added in v1.6.4
func TraceEventToRecvRPC(event *TraceEvent) (*libp2p.RecvRPC, error)
Helper function to convert a Hermes TraceEvent to a libp2p RecvRPC.
func TraceEventToRejectMessage ¶ added in v1.6.4
func TraceEventToRejectMessage(event *TraceEvent) (*libp2p.RejectMessage, error)
Helper function to convert a Hermes TraceEvent to a libp2p RejectMessage.
func TraceEventToRemovePeer ¶ added in v1.6.4
func TraceEventToRemovePeer(event *TraceEvent) (*libp2p.RemovePeer, error)
Helper function to convert a Hermes TraceEvent to a libp2p RemovePeer
func TraceEventToSendRPC ¶ added in v1.6.4
func TraceEventToSendRPC(event *TraceEvent) (*libp2p.SendRPC, error)
Helper function to convert a Hermes TraceEvent to a libp2p SendRPC.
func TraceEventToSyntheticHeartbeat ¶ added in v1.6.4
func TraceEventToSyntheticHeartbeat(event *TraceEvent) (*libp2p.SyntheticHeartbeat, error)
TraceEventToSyntheticHeartbeat converts a Hermes TraceEvent to a SyntheticHeartbeat protobuf message
Types ¶
type CompiledPattern ¶ added in v1.1.19
type CompiledPattern struct {
Pattern *regexp.Regexp
Config *TopicShardingConfig
// EventTypeConstraint specifies which event types this pattern applies to.
// Empty string means it applies to all events (backward compatibility).
// Can be an exact event name (e.g., "LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION")
// or a wildcard pattern (e.g., "LIBP2P_TRACE_RPC_META_*")
EventTypeConstraint string
}
CompiledPattern holds a compiled regex pattern and its config
type Config ¶
type Config struct {
LoggingLevel string `yaml:"logging" default:"info"`
MetricsAddr string `yaml:"metricsAddr" default:":9090"`
PProfAddr *string `yaml:"pprofAddr"`
ProbeAddr *string `yaml:"probeAddr"`
// The name of the mimicry
Name string `yaml:"name"`
// Ethereum configuration
Ethereum ethereum.Config `yaml:"ethereum"`
// Outputs configuration
Outputs []output.Config `yaml:"outputs"`
// Labels configures the mimicry with labels
Labels map[string]string `yaml:"labels"`
// NTP Server to use for clock drift correction
NTPServer string `yaml:"ntpServer" default:"time.google.com"`
// Events is the configuration for the events
Events EventConfig `yaml:"events"`
// Sharding is the configuration for event sharding
Sharding ShardingConfig `yaml:"sharding"`
}
func (*Config) ApplyOverrides ¶ added in v1.0.15
func (c *Config) ApplyOverrides(o *Override, log logrus.FieldLogger) error
ApplyOverrides applies any overrides to the config.
func (*Config) CreateSinks ¶
type DutiesProvider ¶ added in v1.2.1
type DutiesProvider interface {
GetValidatorIndex(epoch phase0.Epoch, slot phase0.Slot, committeeIndex phase0.CommitteeIndex, position uint64) (phase0.ValidatorIndex, error)
}
DutiesProvider provides validator duty information
type EventCategorizer ¶ added in v1.1.19
type EventCategorizer struct {
// contains filtered or unexported fields
}
EventCategorizer manages event categorization for sharding decisions
func NewEventCategorizer ¶ added in v1.1.19
func NewEventCategorizer() *EventCategorizer
NewEventCategorizer creates and initializes an EventCategorizer with all known events
func (*EventCategorizer) GetAllEventsByGroup ¶ added in v1.1.19
func (ec *EventCategorizer) GetAllEventsByGroup() map[ShardingGroup][]xatu.Event_Name
GetAllEventsByGroup returns all events categorized by their sharding group
func (*EventCategorizer) GetEventInfo ¶ added in v1.1.19
func (ec *EventCategorizer) GetEventInfo(eventType xatu.Event_Name) (*EventInfo, bool)
GetEventInfo returns information about an event type
func (*EventCategorizer) GetGroupAEvents ¶ added in v1.1.19
func (ec *EventCategorizer) GetGroupAEvents() []xatu.Event_Name
GetGroupAEvents returns all events that have both Topic and MsgID
func (*EventCategorizer) GetGroupBEvents ¶ added in v1.1.19
func (ec *EventCategorizer) GetGroupBEvents() []xatu.Event_Name
GetGroupBEvents returns all events that have only Topic
func (*EventCategorizer) GetGroupCEvents ¶ added in v1.1.19
func (ec *EventCategorizer) GetGroupCEvents() []xatu.Event_Name
GetGroupCEvents returns all events that have only MsgID
func (*EventCategorizer) GetGroupDEvents ¶ added in v1.1.19
func (ec *EventCategorizer) GetGroupDEvents() []xatu.Event_Name
GetGroupDEvents returns all events that have no sharding keys
func (*EventCategorizer) GetShardingGroup ¶ added in v1.1.19
func (ec *EventCategorizer) GetShardingGroup(eventType xatu.Event_Name) ShardingGroup
GetShardingGroup returns the sharding group for an event type
func (*EventCategorizer) IsMetaEvent ¶ added in v1.1.19
func (ec *EventCategorizer) IsMetaEvent(eventType xatu.Event_Name) bool
IsMetaEvent returns whether an event is an RPC meta event
type EventConfig ¶ added in v0.0.169
type EventConfig struct {
RecvRPCEnabled bool `yaml:"recvRpcEnabled" default:"false"`
SendRPCEnabled bool `yaml:"sendRpcEnabled" default:"false"`
DropRPCEnabled bool `yaml:"dropRpcEnabled" default:"false"`
RpcMetaControlIHaveEnabled bool `yaml:"rpcMetaControlIHaveEnabled" default:"false"`
RpcMetaControlIWantEnabled bool `yaml:"rpcMetaControlIWantEnabled" default:"false"`
RpcMetaControlIDontWantEnabled bool `yaml:"rpcMetaControlIDontWantEnabled" default:"false"`
RpcMetaControlGraftEnabled bool `yaml:"rpcMetaControlGraftEnabled" default:"false"`
RpcMetaControlPruneEnabled bool `yaml:"rpcMetaControlPruneEnabled" default:"false"`
RpcMetaSubscriptionEnabled bool `yaml:"rpcMetaSubscriptionEnabled" default:"false"`
RpcMetaMessageEnabled bool `yaml:"rpcMetaMessageEnabled" default:"false"`
AddPeerEnabled bool `yaml:"addPeerEnabled" default:"true"`
RemovePeerEnabled bool `yaml:"removePeerEnabled" default:"true"`
ConnectedEnabled bool `yaml:"connectedEnabled" default:"true"`
DisconnectedEnabled bool `yaml:"disconnectedEnabled" default:"true"`
SyntheticHeartbeatEnabled bool `yaml:"syntheticHeartbeatEnabled" default:"true"`
JoinEnabled bool `yaml:"joinEnabled" default:"true"`
LeaveEnabled bool `yaml:"leaveEnabled" default:"false"`
GraftEnabled bool `yaml:"graftEnabled" default:"false"`
PruneEnabled bool `yaml:"pruneEnabled" default:"false"`
PublishMessageEnabled bool `yaml:"publishMessageEnabled" default:"false"`
RejectMessageEnabled bool `yaml:"rejectMessageEnabled" default:"false"`
DuplicateMessageEnabled bool `yaml:"duplicateMessageEnabled" default:"false"`
DeliverMessageEnabled bool `yaml:"deliverMessageEnabled" default:"false"`
HandleMetadataEnabled bool `yaml:"handleMetadataEnabled" default:"true"`
HandleStatusEnabled bool `yaml:"handleStatusEnabled" default:"true"`
CustodyProbeEnabled bool `yaml:"custodyProbeEnabled" default:"true"`
GossipSubBeaconBlockEnabled bool `yaml:"gossipSubBeaconBlockEnabled" default:"true"`
GossipSubAttestationEnabled bool `yaml:"gossipSubAttestationEnabled" default:"true"`
GossipSubAggregateAndProofEnabled bool `yaml:"gossipSubAggregateAndProofEnabled" default:"true"`
GossipSubBlobSidecarEnabled bool `yaml:"gossipSubBlobSidecarEnabled" default:"true"`
GossipSubDataColumnSidecarEnabled bool `yaml:"gossipSubDataColumnSidecarEnabled" default:"true"`
EngineAPINewPayloadEnabled bool `yaml:"engineApiNewPayloadEnabled" default:"false"`
EngineAPIGetBlobsEnabled bool `yaml:"engineApiGetBlobsEnabled" default:"false"`
}
EventConfig represents configuration for all event types.
func (*EventConfig) Validate ¶ added in v0.0.169
func (e *EventConfig) Validate() error
Validate validates the event config.
type EventInfo ¶ added in v1.1.19
type EventInfo struct {
Type xatu.Event_Name
ShardingGroup ShardingGroup
HasTopic bool
HasMsgID bool
IsMeta bool // True for RPC meta events
}
EventInfo contains metadata about an event type
type FilteredMessageWithIndex ¶ added in v1.1.9
type FilteredMessageWithIndex struct {
MessageID *wrapperspb.StringValue
OriginalIndex uint32
}
FilteredMessageWithIndex represents a filtered message with its original index
type MetaProvider ¶ added in v1.2.2
type MetaProvider interface {
GetClientMeta(ctx context.Context) (*xatu.ClientMeta, error)
}
MetaProvider provides client metadata
type MetadataProvider ¶ added in v1.2.1
type MetadataProvider interface {
Wallclock() *ethwallclock.EthereumBeaconChain
ClockDrift() *time.Duration
Network() *xatu.ClientMeta_Ethereum_Network
}
MetadataProvider provides ethereum network metadata
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics provides simplified metrics for the sharding system
func NewMetrics ¶
NewMetrics creates a new metrics instance with simplified metrics
func (*Metrics) AddDecoratedEvent ¶
AddDecoratedEvent tracks decorated events (before sharding)
func (*Metrics) AddProcessedMessage ¶ added in v1.1.1
AddProcessedMessage records that an event was processed
func (*Metrics) AddShardingDecision ¶ added in v1.1.19
AddShardingDecision records the reason for a sharding decision
func (*Metrics) AddSkippedMessage ¶ added in v1.1.1
AddSkippedMessage records that an event was filtered out
type MetricsCollector ¶ added in v1.2.1
type MetricsCollector interface {
AddEvent(eventType, network string)
AddProcessedMessage(eventType, network string)
AddSkippedMessage(eventType, network string)
AddShardingDecision(eventType, reason, network string)
AddDecoratedEvent(count float64, eventType, network string)
}
MetricsCollector interface for event processing metrics
type NoShardingKeyConfig ¶ added in v1.1.19
type NoShardingKeyConfig struct {
// Whether to record events without sharding keys (default: true)
Enabled bool `yaml:"enabled" default:"true"`
}
NoShardingKeyConfig defines behavior for events without sharding keys
type OutputHandler ¶ added in v1.2.1
type OutputHandler interface {
HandleDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error
HandleDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error
}
OutputHandler handles processed events
type Processor ¶ added in v1.2.1
type Processor struct {
// contains filtered or unexported fields
}
Processor encapsulates all event processing logic for Hermes events
func NewProcessor ¶ added in v1.2.1
func NewProcessor( duties DutiesProvider, output OutputHandler, metrics MetricsCollector, metaProvider MetaProvider, unifiedSharder *UnifiedSharder, eventCategorizer *EventCategorizer, wallclock *ethwallclock.EthereumBeaconChain, clockDrift time.Duration, events EventConfig, log logrus.FieldLogger, ) *Processor
NewProcessor creates a new Processor instance
func (*Processor) HandleHermesEvent ¶ added in v1.2.1
func (p *Processor) HandleHermesEvent(ctx context.Context, event *TraceEvent) error
HandleHermesEvent processes a Hermes trace event and routes it to the appropriate handler
func (*Processor) ShouldTraceMessage ¶ added in v1.2.1
func (p *Processor) ShouldTraceMessage( event *TraceEvent, clientMeta *xatu.ClientMeta, xatuEventType string, ) bool
ShouldTraceMessage determines whether a message with the given MsgID should be included in the sample based on the configured trace settings.
func (*Processor) ShouldTraceRPCMetaMessages ¶ added in v1.2.1
func (p *Processor) ShouldTraceRPCMetaMessages( clientMeta *xatu.ClientMeta, xatuEventType string, messages interface{}, ) ([]FilteredMessageWithIndex, error)
ShouldTraceRPCMetaMessages determines which RPC meta messages should be processed based on sharding configuration
type RPCMetaMessageInfo ¶ added in v1.1.19
type RPCMetaMessageInfo struct {
MessageID *wrapperspb.StringValue
Topic *wrapperspb.StringValue // Optional: gossip topic for the message
}
RPCMetaMessageInfo represents a message with its ID and optional topic for RPC meta filtering
type RPCMetaTopicInfo ¶ added in v1.1.23
type RPCMetaTopicInfo struct {
Topic *wrapperspb.StringValue // Gossip topic for the event
}
RPCMetaTopicInfo represents a topic-based RPC meta event for filtering
type RpcControlGraft ¶ added in v1.6.4
type RpcControlGraft struct {
TopicID string
}
RpcControlGraft represents a GRAFT control message.
type RpcControlIHave ¶ added in v1.6.4
RpcControlIHave represents an IHAVE control message.
type RpcControlIWant ¶ added in v1.6.4
type RpcControlIWant struct {
MsgIDs []string
}
RpcControlIWant represents an IWANT control message.
type RpcControlIdontWant ¶ added in v1.6.4
type RpcControlIdontWant struct {
MsgIDs []string
}
RpcControlIdontWant represents an IDONTWANT control message.
type RpcControlPrune ¶ added in v1.6.4
RpcControlPrune represents a PRUNE control message.
type RpcMeta ¶ added in v1.6.4
type RpcMeta struct {
PeerID peer.ID
Subscriptions []RpcMetaSub `json:"Subs,omitempty"`
Messages []RpcMetaMsg `json:"Msgs,omitempty"`
Control *RpcMetaControl `json:"Control,omitempty"`
}
RpcMeta contains metadata for RPC messages. Extracted from github.com/probe-lab/hermes/host.
type RpcMetaControl ¶ added in v1.6.4
type RpcMetaControl struct {
IHave []RpcControlIHave `json:"IHave,omitempty"`
IWant []RpcControlIWant `json:"IWant,omitempty"`
Graft []RpcControlGraft `json:"Graft,omitempty"`
Prune []RpcControlPrune `json:"Prune,omitempty"`
Idontwant []RpcControlIdontWant `json:"Idontwant,omitempty"`
}
RpcMetaControl contains control messages for GossipSub.
type RpcMetaMsg ¶ added in v1.6.4
type RpcMetaMsg struct {
MsgID string `json:"MsgID,omitempty"`
Topic string `json:"Topic,omitempty"`
}
RpcMetaMsg represents a message in an RPC.
type RpcMetaSub ¶ added in v1.6.4
RpcMetaSub represents a subscription message.
type ShardableEvent ¶ added in v1.1.19
ShardableEvent represents an event that can be sharded
type ShardingConfig ¶ added in v1.1.19
type ShardingConfig struct {
// Topic-based patterns with sampling rates
Topics map[string]*TopicShardingConfig `yaml:"topics"`
// Events without sharding keys (Group D)
NoShardingKeyEvents *NoShardingKeyConfig `yaml:"noShardingKeyEvents,omitempty"`
// contains filtered or unexported fields
}
ShardingConfig represents the sharding configuration
func (*ShardingConfig) LogSummary ¶ added in v1.1.19
func (c *ShardingConfig) LogSummary() string
LogSummary returns a human-readable summary of the sharding configuration
type ShardingGroup ¶ added in v1.1.19
type ShardingGroup int
ShardingGroup represents the categorization of events based on their sharding capabilities
const ( // GroupA events have both Topic and MsgID available for sharding GroupA ShardingGroup = iota // GroupB events have only Topic available for sharding GroupB // GroupC events have only MsgID available for sharding GroupC // GroupD events have no sharding keys available GroupD )
type TopicShardingConfig ¶ added in v1.1.19
type TopicShardingConfig struct {
// Total number of shards for this topic pattern
TotalShards uint64 `yaml:"totalShards"`
// Active shards for this topic pattern
ActiveShards []uint64 `yaml:"activeShards"`
}
TopicShardingConfig defines sharding for a topic pattern
func (*TopicShardingConfig) GetSamplingRate ¶ added in v1.1.19
func (t *TopicShardingConfig) GetSamplingRate() float64
GetSamplingRate returns the sampling rate for a topic pattern
func (*TopicShardingConfig) IsFirehose ¶ added in v1.1.19
func (t *TopicShardingConfig) IsFirehose() bool
IsFirehose returns true if all shards are active
func (*TopicShardingConfig) UnmarshalYAML ¶ added in v1.1.19
func (t *TopicShardingConfig) UnmarshalYAML(node *yaml.Node) error
UnmarshalYAML implements custom YAML unmarshaling to support range syntax
type TraceEvent ¶ added in v1.6.4
type TraceEvent struct {
Type string
Topic string
PeerID peer.ID
Timestamp time.Time
Payload any `json:"Data"` // cannot use field "Data" because of gk.Record method
}
TraceEvent represents a trace event from the libp2p network. Extracted from github.com/probe-lab/hermes/host.
type TraceEventAltairBlock ¶ added in v1.6.4
type TraceEventAltairBlock struct {
TraceEventPayloadMetaData
Block *ethtypes.SignedBeaconBlockAltair
}
TraceEventAltairBlock represents an Altair beacon block event.
func NewAltairBlockPayload ¶ added in v1.6.4
func NewAltairBlockPayload(block *ethtypes.SignedBeaconBlockAltair, meta *TraceEventPayloadMetaData) *TraceEventAltairBlock
NewAltairBlockPayload creates an Altair block payload.
type TraceEventAttestation ¶ added in v1.6.4
type TraceEventAttestation struct {
TraceEventPayloadMetaData
Attestation *ethtypes.Attestation
}
TraceEventAttestation represents an attestation event.
func NewAttestationPayload ¶ added in v1.6.4
func NewAttestationPayload(att *ethtypes.Attestation, meta *TraceEventPayloadMetaData) *TraceEventAttestation
NewAttestationPayload creates a pre-Electra attestation payload.
type TraceEventAttestationElectra ¶ added in v1.6.4
type TraceEventAttestationElectra struct {
TraceEventPayloadMetaData
AttestationElectra *ethtypes.AttestationElectra
}
TraceEventAttestationElectra represents an Electra attestation event.
func NewAttestationElectraPayload ¶ added in v1.6.4
func NewAttestationElectraPayload(att *ethtypes.AttestationElectra, meta *TraceEventPayloadMetaData) *TraceEventAttestationElectra
NewAttestationElectraPayload creates an Electra attestation payload.
type TraceEventAttesterSlashing ¶ added in v1.6.4
type TraceEventAttesterSlashing struct {
TraceEventPayloadMetaData
AttesterSlashing *ethtypes.AttesterSlashing
}
TraceEventAttesterSlashing represents an attester slashing event.
type TraceEventBLSToExecutionChange ¶ added in v1.6.4
type TraceEventBLSToExecutionChange struct {
TraceEventPayloadMetaData
BLSToExecutionChange *ethtypes.BLSToExecutionChange
}
TraceEventBLSToExecutionChange represents a BLS to execution change event.
type TraceEventBellatrixBlock ¶ added in v1.6.4
type TraceEventBellatrixBlock struct {
TraceEventPayloadMetaData
Block *ethtypes.SignedBeaconBlockBellatrix
}
TraceEventBellatrixBlock represents a Bellatrix beacon block event.
func NewBellatrixBlockPayload ¶ added in v1.6.4
func NewBellatrixBlockPayload(block *ethtypes.SignedBeaconBlockBellatrix, meta *TraceEventPayloadMetaData) *TraceEventBellatrixBlock
NewBellatrixBlockPayload creates a Bellatrix block payload.
type TraceEventBlobSidecar ¶ added in v1.6.4
type TraceEventBlobSidecar struct {
TraceEventPayloadMetaData
BlobSidecar *ethtypes.BlobSidecar
}
TraceEventBlobSidecar represents a blob sidecar event.
func NewBlobSidecarPayload ¶ added in v1.6.4
func NewBlobSidecarPayload(blob *ethtypes.BlobSidecar, meta *TraceEventPayloadMetaData) *TraceEventBlobSidecar
NewBlobSidecarPayload creates a blob sidecar payload.
type TraceEventCapellaBlock ¶ added in v1.6.4
type TraceEventCapellaBlock struct {
TraceEventPayloadMetaData
Block *ethtypes.SignedBeaconBlockCapella
}
TraceEventCapellaBlock represents a Capella beacon block event.
func NewCapellaBlockPayload ¶ added in v1.6.4
func NewCapellaBlockPayload(block *ethtypes.SignedBeaconBlockCapella, meta *TraceEventPayloadMetaData) *TraceEventCapellaBlock
NewCapellaBlockPayload creates a Capella block payload.
type TraceEventConsensusEngineAPIGetBlobs ¶ added in v1.6.9
type TraceEventConsensusEngineAPIGetBlobs struct {
TraceEventPayloadMetaData
// Timing
RequestedAt time.Time `json:"requested_at"`
Duration time.Duration `json:"duration"`
// Beacon context
Slot uint64 `json:"slot"`
BlockRoot string `json:"block_root"`
ParentBlockRoot string `json:"parent_block_root"`
// Request details
RequestedCount uint32 `json:"requested_count"`
VersionedHashes []string `json:"versioned_hashes"`
// Response
ReturnedCount uint32 `json:"returned_count"`
Status string `json:"status"`
ErrorMessage string `json:"error_message"`
// Meta
MethodVersion string `json:"method_version"`
// ExecutionClientVersion is the raw version string from web3_clientVersion RPC.
// Parsed into components when converting to protobuf.
ExecutionClientVersion string `json:"execution_client_version"`
}
TraceEventConsensusEngineAPIGetBlobs represents an engine_getBlobs API call event.
func NewConsensusEngineAPIGetBlobsPayload ¶ added in v1.6.9
func NewConsensusEngineAPIGetBlobsPayload( requestedAt time.Time, duration time.Duration, slot uint64, blockRoot, parentBlockRoot string, requestedCount uint32, versionedHashes []string, returnedCount uint32, status, errorMessage string, methodVersion string, executionClientVersion string, ) *TraceEventConsensusEngineAPIGetBlobs
NewConsensusEngineAPIGetBlobsPayload creates a consensus engine API get blobs event. The executionClientVersion parameter is the raw version string from web3_clientVersion RPC. It will be parsed into components when converting to protobuf.
type TraceEventConsensusEngineAPINewPayload ¶ added in v1.6.7
type TraceEventConsensusEngineAPINewPayload struct {
TraceEventPayloadMetaData
// Timing
RequestedAt time.Time `json:"requested_at"`
Duration time.Duration `json:"duration"`
// Beacon context
Slot uint64 `json:"slot"`
BlockRoot string `json:"block_root"`
ParentBlockRoot string `json:"parent_block_root"`
ProposerIndex uint64 `json:"proposer_index"`
// Execution payload
BlockNumber uint64 `json:"block_number"`
BlockHash string `json:"block_hash"`
ParentHash string `json:"parent_hash"`
GasUsed uint64 `json:"gas_used"`
GasLimit uint64 `json:"gas_limit"`
TxCount uint32 `json:"tx_count"`
BlobCount uint32 `json:"blob_count"`
// Response
Status string `json:"status"`
LatestValidHash string `json:"latest_valid_hash"`
ValidationError string `json:"validation_error"`
// Meta
MethodVersion string `json:"method_version"`
// ExecutionClientVersion is the raw version string from web3_clientVersion RPC.
// Parsed into components when converting to protobuf.
ExecutionClientVersion string `json:"execution_client_version"`
}
TraceEventConsensusEngineAPINewPayload represents an engine_newPayload API call event.
func NewConsensusEngineAPINewPayloadPayload ¶ added in v1.6.7
func NewConsensusEngineAPINewPayloadPayload( requestedAt time.Time, duration time.Duration, slot, proposerIndex uint64, blockRoot, parentBlockRoot string, blockNumber uint64, blockHash, parentHash string, gasUsed, gasLimit uint64, txCount, blobCount uint32, status, latestValidHash, validationError string, methodVersion string, executionClientVersion string, ) *TraceEventConsensusEngineAPINewPayload
NewConsensusEngineAPINewPayloadPayload creates a consensus engine API new payload event. The executionClientVersion parameter is the raw version string from web3_clientVersion RPC. It will be parsed into components when converting to protobuf.
type TraceEventCustodyProbe ¶ added in v1.6.4
type TraceEventCustodyProbe struct {
TraceEventPayloadMetaData
PeerID *peer.ID `json:"peer_id,omitempty"`
Epoch uint64 `json:"epoch"`
Slot uint64 `json:"slot"`
BlockHash string `json:"block_hash"`
Column uint64 `json:"column_id"`
Result string `json:"result,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
ColumnSize int `json:"column_size,omitempty"`
Error string `json:"error,omitempty"`
}
TraceEventCustodyProbe represents a data column custody probe event.
func NewCustodyProbePayload ¶ added in v1.6.4
func NewCustodyProbePayload( peerID *peer.ID, epoch, slot, column uint64, blockHash, result, errorStr string, duration time.Duration, columnSize int, ) *TraceEventCustodyProbe
NewCustodyProbePayload creates a custody probe payload.
type TraceEventDataColumnSidecar ¶ added in v1.6.4
type TraceEventDataColumnSidecar struct {
TraceEventPayloadMetaData
DataColumnSidecar *ethtypes.DataColumnSidecar
}
TraceEventDataColumnSidecar represents a data column sidecar event.
func NewDataColumnSidecarPayload ¶ added in v1.6.4
func NewDataColumnSidecarPayload(dataColumn *ethtypes.DataColumnSidecar, meta *TraceEventPayloadMetaData) *TraceEventDataColumnSidecar
NewDataColumnSidecarPayload creates a data column sidecar payload.
type TraceEventDenebBlock ¶ added in v1.6.4
type TraceEventDenebBlock struct {
TraceEventPayloadMetaData
Block *ethtypes.SignedBeaconBlockDeneb
}
TraceEventDenebBlock represents a Deneb beacon block event.
func NewDenebBlockPayload ¶ added in v1.6.4
func NewDenebBlockPayload(block *ethtypes.SignedBeaconBlockDeneb, meta *TraceEventPayloadMetaData) *TraceEventDenebBlock
NewDenebBlockPayload creates a Deneb block payload.
type TraceEventElectraBlock ¶ added in v1.6.4
type TraceEventElectraBlock struct {
TraceEventPayloadMetaData
Block *ethtypes.SignedBeaconBlockElectra
}
TraceEventElectraBlock represents an Electra beacon block event.
func NewElectraBlockPayload ¶ added in v1.6.4
func NewElectraBlockPayload(block *ethtypes.SignedBeaconBlockElectra, meta *TraceEventPayloadMetaData) *TraceEventElectraBlock
NewElectraBlockPayload creates an Electra block payload.
type TraceEventFuluBlock ¶ added in v1.6.4
type TraceEventFuluBlock struct {
TraceEventPayloadMetaData
Block *ethtypes.SignedBeaconBlockFulu
}
TraceEventFuluBlock represents a Fulu beacon block event.
func NewFuluBlockPayload ¶ added in v1.6.4
func NewFuluBlockPayload(block *ethtypes.SignedBeaconBlockFulu, meta *TraceEventPayloadMetaData) *TraceEventFuluBlock
NewFuluBlockPayload creates a Fulu block payload.
type TraceEventPayloadMetaData ¶ added in v1.6.4
type TraceEventPayloadMetaData struct {
PeerID string `json:"PeerID"`
Topic string `json:"Topic"`
Seq []byte `json:"Seq"`
MsgID string `json:"MsgID"`
MsgSize int `json:"MsgSize"`
}
TraceEventPayloadMetaData contains metadata for trace event payloads. Extracted from github.com/probe-lab/hermes/host.
type TraceEventPhase0Block ¶ added in v1.6.4
type TraceEventPhase0Block struct {
TraceEventPayloadMetaData
Block *ethtypes.SignedBeaconBlock
}
TraceEventPhase0Block represents a Phase0 beacon block event.
func NewPhase0BlockPayload ¶ added in v1.6.4
func NewPhase0BlockPayload(block *ethtypes.SignedBeaconBlock, meta *TraceEventPayloadMetaData) *TraceEventPhase0Block
NewPhase0BlockPayload creates a Phase0 block payload.
type TraceEventProposerSlashing ¶ added in v1.6.4
type TraceEventProposerSlashing struct {
TraceEventPayloadMetaData
ProposerSlashing *ethtypes.ProposerSlashing
}
TraceEventProposerSlashing represents a proposer slashing event.
type TraceEventSignedAggregateAttestationAndProof ¶ added in v1.6.4
type TraceEventSignedAggregateAttestationAndProof struct {
TraceEventPayloadMetaData
SignedAggregateAttestationAndProof *ethtypes.SignedAggregateAttestationAndProof
}
TraceEventSignedAggregateAttestationAndProof represents a signed aggregate attestation and proof event.
func NewSignedAggregateAttestationAndProofPayload ¶ added in v1.6.4
func NewSignedAggregateAttestationAndProofPayload(agg *ethtypes.SignedAggregateAttestationAndProof, meta *TraceEventPayloadMetaData) *TraceEventSignedAggregateAttestationAndProof
NewSignedAggregateAttestationAndProofPayload creates a pre-Electra aggregate attestation payload.
type TraceEventSignedAggregateAttestationAndProofElectra ¶ added in v1.6.4
type TraceEventSignedAggregateAttestationAndProofElectra struct {
TraceEventPayloadMetaData
SignedAggregateAttestationAndProofElectra *ethtypes.SignedAggregateAttestationAndProofElectra
}
TraceEventSignedAggregateAttestationAndProofElectra represents an Electra signed aggregate attestation and proof event.
func NewSignedAggregateAttestationAndProofElectraPayload ¶ added in v1.6.4
func NewSignedAggregateAttestationAndProofElectraPayload(agg *ethtypes.SignedAggregateAttestationAndProofElectra, meta *TraceEventPayloadMetaData) *TraceEventSignedAggregateAttestationAndProofElectra
NewSignedAggregateAttestationAndProofElectraPayload creates an Electra aggregate attestation payload.
type TraceEventSignedContributionAndProof ¶ added in v1.6.4
type TraceEventSignedContributionAndProof struct {
TraceEventPayloadMetaData
SignedContributionAndProof *ethtypes.SignedContributionAndProof
}
TraceEventSignedContributionAndProof represents a signed contribution and proof event.
type TraceEventSingleAttestation ¶ added in v1.6.4
type TraceEventSingleAttestation struct {
TraceEventPayloadMetaData
SingleAttestation *ethtypes.SingleAttestation
}
TraceEventSingleAttestation represents a single attestation event.
func NewSingleAttestationPayload ¶ added in v1.6.4
func NewSingleAttestationPayload(att *ethtypes.SingleAttestation, meta *TraceEventPayloadMetaData) *TraceEventSingleAttestation
NewSingleAttestationPayload creates a single attestation payload.
type TraceEventSyncCommitteeMessage ¶ added in v1.6.4
type TraceEventSyncCommitteeMessage struct {
TraceEventPayloadMetaData
SyncCommitteeMessage *ethtypes.SyncCommitteeMessage //nolint:staticcheck // gRPC API deprecated but still supported until v8 (2026)
}
TraceEventSyncCommitteeMessage represents a sync committee message event.
type TraceEventVoluntaryExit ¶ added in v1.6.4
type TraceEventVoluntaryExit struct {
TraceEventPayloadMetaData
VoluntaryExit *ethtypes.VoluntaryExit
}
TraceEventVoluntaryExit represents a voluntary exit event.
type UnifiedSharder ¶ added in v1.1.19
type UnifiedSharder struct {
// contains filtered or unexported fields
}
UnifiedSharder provides a single sharding decision point for all events
func NewUnifiedSharder ¶ added in v1.1.19
func NewUnifiedSharder(config *ShardingConfig, enabled bool) (*UnifiedSharder, error)
NewUnifiedSharder creates a new unified sharder
func (*UnifiedSharder) GetShardForKey ¶ added in v1.1.19
func (s *UnifiedSharder) GetShardForKey(key string, totalShards uint64) uint64
GetShardForKey returns the shard number for a given key (for testing/debugging)
func (*UnifiedSharder) ShouldProcess ¶ added in v1.1.19
func (s *UnifiedSharder) ShouldProcess(eventType xatu.Event_Name, msgID, topic string) (bool, string)
ShouldProcess determines if an event should be processed based on sharding rules
func (*UnifiedSharder) ShouldProcessBatch ¶ added in v1.1.19
func (s *UnifiedSharder) ShouldProcessBatch(eventType xatu.Event_Name, events []ShardableEvent) []bool
ShouldProcessBatch determines which events in a batch should be processed This is used for RPC meta events where we have multiple events to evaluate
Source Files
¶
- config.go
- event.go
- event_catergorizer.go
- event_config.go
- event_gossipsub.go
- event_libp2p.go
- event_libp2p_core.go
- event_rpc.go
- gossipsub_aggregate_and_proof.go
- gossipsub_attestation.go
- gossipsub_beacon_block.go
- gossipsub_blob_sidecar.go
- gossipsub_data_column_sidecar.go
- gossipsub_single_attestation.go
- hermes_event_payload.go
- hermes_rpc.go
- hermes_trace_event.go
- interfaces.go
- metrics.go
- overrides.go
- payload_builders.go
- processor.go
- sharding.go
- trace.go
- trace_convert.go
- trace_shard_key.go
- trace_siphash.go