clmimicry

package
v1.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2025 License: GPL-3.0 Imports: 36 Imported by: 0

README

CL-Mimicry: Ethereum Consensus Layer P2P Network Monitoring

CL-Mimicry is a sophisticated consensus layer P2P network monitoring client that mimics validator behavior to collect libp2p and gossipsub events from Ethereum consensus networks. It provides advanced trace-based sampling and sharding capabilities for scalable, distributed monitoring of Ethereum network activity.

Table of Contents

Overview

CL-Mimicry connects to Ethereum consensus network nodes and captures libp2p trace events, providing insights into:

  • Gossipsub Message Flow: Beacon blocks, attestations, and blob sidecars
  • Peer Behavior: Connection patterns, message propagation, and network topology
  • Protocol Performance: Message timing, duplicate detection, and peer interactions
  • Network Health: RPC communication, consensus participation, and validator activity

The system uses consistent hashing with SipHash-2-4 algorithm to enable distributed processing across multiple instances while maintaining deterministic message routing.

Sharding System

CL-Mimicry uses a simplified, unified sharding system based on a streamlined event categorization model.

  • Event categorization: Events grouped by sharding capabilities (Groups A-D)
  • Configurable shards: Consistent distribution across all configurations (defaulting to 512)
  • Topic-first design: Prioritize topic-based sharding where available
How It Works
┌─────────────┐
│Event Arrives│
└──────┬──────┘
       │
       ▼
┌──────────────┐     ┌─────────────────┐
│Get Event Info├────►│ Event Category? │
└──────────────┘     └────────┬────────┘
                              │
        ┌─────────────────────┼─────────────────────┬─────────────────────┐
        ▼                     ▼                     ▼                     ▼
   ┌─────────┐         ┌─────────┐           ┌─────────┐           ┌─────────┐
   │ Group A │         │ Group B │           │ Group C │           │ Group D │
   │Topic+Msg│         │Topic Only│          │Msg Only │           │ No Keys │
   └────┬────┘         └────┬────┘           └────┬────┘           └────┬────┘
        │                   │                     │                     │
        ▼                   ▼                     ▼                     ▼
   Topic Config?       Topic Config?         Default Shard         Enabled?
        │                   │                     │                     │
     Yes/No              Yes/No                   │                  Yes/No
        │                   │                     │                     │
        ▼                   ▼                     ▼                     ▼
   Shard by Msg       Shard by Topic        Shard by Msg         Process/Drop

Event Categorization

Events are categorized into four groups based on their available sharding keys:

Group A: Topic + MsgID Events

Events with both topic and message ID, enabling full sharding flexibility:

  • PUBLISH_MESSAGE, DELIVER_MESSAGE, DUPLICATE_MESSAGE, REJECT_MESSAGE
  • GOSSIPSUB_BEACON_BLOCK, GOSSIPSUB_BEACON_ATTESTATION, GOSSIPSUB_BLOB_SIDECAR
  • RPC_META_MESSAGE, RPC_META_CONTROL_IHAVE

Sharding: Uses message ID for sharding, with topic-based configuration

Group B: Topic-Only Events

Events with only topic information:

  • JOIN, LEAVE, GRAFT, PRUNE
  • RPC_META_CONTROL_GRAFT, RPC_META_CONTROL_PRUNE, RPC_META_SUBSCRIPTION

Sharding: Uses topic hash for sharding decisions

Group C: MsgID-Only Events

Events with only message ID:

  • RPC_META_CONTROL_IWANT, RPC_META_CONTROL_IDONTWANT

Sharding: Uses message ID with default configuration

Group D: No Sharding Key Events

Events without sharding keys:

  • ADD_PEER, REMOVE_PEER, CONNECTED, DISCONNECTED
  • RECV_RPC, SEND_RPC, DROP_RPC (parent events only)
  • HANDLE_METADATA, HANDLE_STATUS

Sharding: All-or-nothing based on configuration

Configuration

The configuration focuses on topic-based patterns with simplified sharding:

Basic Structure
sharding:
  # Topic-based sharding configuration
  topics:
    ".*beacon_block.*":
      totalShards: 512          # Always 512 for consistency
      activeShards: ["0-511"]   # 100% sampling

    ".*beacon_attestation.*":
      totalShards: 512
      activeShards: ["0-25"]    # 26/512 = ~5% sampling

    ".*":                       # Catch-all pattern
      totalShards: 512
      activeShards: ["0-127"]   # 25% sampling

  # Events without sharding keys (Group D)
  noShardingKeyEvents:
    enabled: true               # Process all Group D events

events:
  # Enable/disable specific event types
  recvRpcEnabled: true
  gossipSubBeaconBlockEnabled: true
  gossipSubAttestationEnabled: true
  # ... etc
Active Shards Syntax

Flexible syntax for specifying which shards to process:

# Range syntax (recommended)
activeShards: ["0-255"]         # 256 shards = 50% sampling

# Individual shards
activeShards: [0, 1, 5, 10]     # Specific shards only

# Mixed syntax
activeShards: ["0-10", 50, "100-150"]  # Ranges and individuals

# Common sampling rates with 512 total shards:
activeShards: ["0-511"]         # 100% (all shards)
activeShards: ["0-255"]         # 50%  (256 shards)
activeShards: ["0-127"]         # 25%  (128 shards)
activeShards: ["0-25"]          # 5%   (26 shards)
activeShards: ["0-4"]           # 1%   (5 shards)
activeShards: [0]               # 0.2% (1 shard)
Pattern Matching

Topics use regex patterns with highest-match-wins:

topics:
  # Most specific patterns first
  ".*beacon_attestation_[0-9]+.*":  # Subnet-specific
    activeShards: ["0-12"]          # 2.5% sampling

  ".*beacon_attestation.*":         # General attestations
    activeShards: ["0-25"]          # 5% sampling

  ".*":                            # Everything else
    activeShards: ["0-127"]        # 25% sampling

Documentation

Index

Constants

View Source
const (

	// libp2p pubsub events.
	TraceEvent_HANDLE_MESSAGE = "HANDLE_MESSAGE"

	// libp2p core networking events.
	TraceEvent_CONNECTED    = "CONNECTED"
	TraceEvent_DISCONNECTED = "DISCONNECTED"

	// RPC events.
	TraceEvent_HANDLE_METADATA = "HANDLE_METADATA"
	TraceEvent_HANDLE_STATUS   = "HANDLE_STATUS"

	// Events that are not part of a normal Ethereum node.
	TraceEvent_SYNTHETIC_HEARTBEAT             = "SYNTHETIC_HEARTBEAT"
	TraceEvent_CUSTODY_PROBE                   = "CUSTODY_PROBE"
	TraceEvent_CONSENSUS_ENGINE_API_NEWPAYLOAD = "CONSENSUS_ENGINE_API_NEWPAYLOAD"
	TraceEvent_CONSENSUS_ENGINE_API_GETBLOBS   = "CONSENSUS_ENGINE_API_GETBLOBS"
)

Define events not supplied by libp2p proto pkgs.

View Source
const (
	// DefaultTotalShards is the default number of shards if not specified
	DefaultTotalShards = 512
)

Variables

This section is empty.

Functions

func ExtractExecutionClientMetadata added in v1.6.8

func ExtractExecutionClientMetadata(event *TraceEvent) (*xatu.ClientMeta_Ethereum_Execution, error)

ExtractExecutionClientMetadata extracts and parses execution client metadata from a TraceEvent. The raw ExecutionClientVersion string is parsed into its components. Returns a ClientMeta_Ethereum_Execution proto that can be assigned directly to ClientMeta.Ethereum.Execution.

func ExtractExecutionClientMetadataFromGetBlobs added in v1.6.9

func ExtractExecutionClientMetadataFromGetBlobs(event *TraceEvent) (*xatu.ClientMeta_Ethereum_Execution, error)

ExtractExecutionClientMetadataFromGetBlobs extracts and parses execution client metadata from a GetBlobs TraceEvent. The raw ExecutionClientVersion string is parsed into its components. Returns a ClientMeta_Ethereum_Execution proto that can be assigned directly to ClientMeta.Ethereum.Execution.

func GetGossipTopics added in v1.1.19

func GetGossipTopics(event *TraceEvent) []string

GetGossipTopics extracts all gossip topics from a trace event if available. Returns a slice of unique topics found in the event.

func GetMsgID added in v1.1.19

func GetMsgID(event *TraceEvent) string

GetMsgID extracts the message ID from the event for sharding. We only shard based on message IDs, not peer IDs.

func GetShard added in v1.1.1

func GetShard(shardingKey string, totalShards uint64) uint64

GetShard calculates which shard a message belongs to based on its ID.

This function uses SipHash to consistently map message IDs (typically hashes themselves) to specific shards, ensuring even distribution across the available shards.

Key benefits: - Deterministic: The same message ID always maps to the same shard - Balanced: Messages are evenly distributed across all shards

Parameters:

  • shardingKey: The identifier to use for sharding (often a hash like "0x1234...abcd")
  • totalShards: The total number of available shards (e.g., 64)

Returns:

  • The shard number (0 to totalShards-1) where this message should be processed

func IsShardActive added in v1.1.1

func IsShardActive(shard uint64, activeShards []uint64) bool

IsShardActive checks if a shard is in the active shards list.

func SipHash added in v1.1.1

func SipHash(key [16]byte, data []byte) uint64

SipHash implements the SipHash-2-4 algorithm, a fast and efficient hash function designed for message authentication and hash-table lookups.

Key features of SipHash: - Deterministic output for identical inputs - Even distribution of outputs across the range of uint64

When used for sharding (via GetShard), SipHash provides: - Consistent distribution of messages across shards - Deterministic routing where the same message always maps to the same shard

Parameters:

  • key: A 16-byte secret key (can be fixed for consistent sharding)
  • data: The message bytes to hash

Returns:

  • A 64-bit unsigned integer hash value

References:

func TraceEventToAddPeer added in v1.6.4

func TraceEventToAddPeer(event *TraceEvent) (*libp2p.AddPeer, error)

Helper function to convert a Hermes TraceEvent to a libp2p AddPeer

func TraceEventToConnected added in v1.6.4

func TraceEventToConnected(event *TraceEvent) (*libp2p.Connected, error)

func TraceEventToConsensusEngineAPIGetBlobs added in v1.6.9

func TraceEventToConsensusEngineAPIGetBlobs(event *TraceEvent) (*xatu.ConsensusEngineAPIGetBlobs, error)

TraceEventToConsensusEngineAPIGetBlobs converts a TraceEvent to a ConsensusEngineAPIGetBlobs protobuf message. Supports typed *TraceEventConsensusEngineAPIGetBlobs payloads from direct callers.

func TraceEventToConsensusEngineAPINewPayload added in v1.6.7

func TraceEventToConsensusEngineAPINewPayload(event *TraceEvent) (*xatu.ConsensusEngineAPINewPayload, error)

TraceEventToConsensusEngineAPINewPayload converts a TraceEvent to a ConsensusEngineAPINewPayload protobuf message. Supports typed *TraceEventConsensusEngineAPINewPayload payloads from direct callers.

func TraceEventToCustodyProbe added in v1.6.4

func TraceEventToCustodyProbe(event *TraceEvent) (*libp2p.DataColumnCustodyProbe, error)

TraceEventToCustodyProbe converts a Hermes TraceEvent to a DataColumnCustodyProbe protobuf message. Supports both typed *TraceEventCustodyProbe payloads (from direct callers like tysm) and map[string]any payloads (for backwards compatibility with Hermes-style events).

func TraceEventToDeliverMessage added in v1.6.4

func TraceEventToDeliverMessage(event *TraceEvent) (*libp2p.DeliverMessage, error)

Helper function to convert a Hermes TraceEvent to a libp2p DeliverMessage.

func TraceEventToDisconnected added in v1.6.4

func TraceEventToDisconnected(event *TraceEvent) (*libp2p.Disconnected, error)

func TraceEventToDropRPC added in v1.6.4

func TraceEventToDropRPC(event *TraceEvent) (*libp2p.DropRPC, error)

Helper function to convert a Hermes TraceEvent to a libp2p DropRPC.

func TraceEventToDuplicateMessage added in v1.6.4

func TraceEventToDuplicateMessage(event *TraceEvent) (*libp2p.DuplicateMessage, error)

Helper function to convert a Hermes TraceEvent to a libp2p DuplicateMessage.

func TraceEventToGraft added in v1.6.4

func TraceEventToGraft(event *TraceEvent) (*libp2p.Graft, error)

Helper function to convert a Hermes TraceEvent to a libp2p Graft.

func TraceEventToHandleMetadata added in v1.6.4

func TraceEventToHandleMetadata(event *TraceEvent) (*libp2p.HandleMetadata, error)

func TraceEventToHandleStatus added in v1.6.4

func TraceEventToHandleStatus(event *TraceEvent) (*libp2p.HandleStatus, error)

func TraceEventToJoin added in v1.6.4

func TraceEventToJoin(event *TraceEvent) (*libp2p.Join, error)

Helper function to convert a Hermes TraceEvent to a libp2p Join

func TraceEventToLeave added in v1.6.4

func TraceEventToLeave(event *TraceEvent) (*libp2p.Leave, error)

Helper function to convert a Hermes TraceEvent to a libp2p Leave

func TraceEventToPrune added in v1.6.4

func TraceEventToPrune(event *TraceEvent) (*libp2p.Prune, error)

Helper function to convert a Hermes TraceEvent to a libp2p Prune.

func TraceEventToPublishMessage added in v1.6.4

func TraceEventToPublishMessage(event *TraceEvent) (*libp2p.PublishMessage, error)

Helper function to convert a Hermes TraceEvent to a libp2p PublishMessage.

func TraceEventToRecvRPC added in v1.6.4

func TraceEventToRecvRPC(event *TraceEvent) (*libp2p.RecvRPC, error)

Helper function to convert a Hermes TraceEvent to a libp2p RecvRPC.

func TraceEventToRejectMessage added in v1.6.4

func TraceEventToRejectMessage(event *TraceEvent) (*libp2p.RejectMessage, error)

Helper function to convert a Hermes TraceEvent to a libp2p RejectMessage.

func TraceEventToRemovePeer added in v1.6.4

func TraceEventToRemovePeer(event *TraceEvent) (*libp2p.RemovePeer, error)

Helper function to convert a Hermes TraceEvent to a libp2p RemovePeer

func TraceEventToSendRPC added in v1.6.4

func TraceEventToSendRPC(event *TraceEvent) (*libp2p.SendRPC, error)

Helper function to convert a Hermes TraceEvent to a libp2p SendRPC.

func TraceEventToSyntheticHeartbeat added in v1.6.4

func TraceEventToSyntheticHeartbeat(event *TraceEvent) (*libp2p.SyntheticHeartbeat, error)

TraceEventToSyntheticHeartbeat converts a Hermes TraceEvent to a SyntheticHeartbeat protobuf message

Types

type CompiledPattern added in v1.1.19

type CompiledPattern struct {
	Pattern *regexp.Regexp
	Config  *TopicShardingConfig
	// EventTypeConstraint specifies which event types this pattern applies to.
	// Empty string means it applies to all events (backward compatibility).
	// Can be an exact event name (e.g., "LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION")
	// or a wildcard pattern (e.g., "LIBP2P_TRACE_RPC_META_*")
	EventTypeConstraint string
}

CompiledPattern holds a compiled regex pattern and its config

type Config

type Config struct {
	LoggingLevel string  `yaml:"logging" default:"info"`
	MetricsAddr  string  `yaml:"metricsAddr" default:":9090"`
	PProfAddr    *string `yaml:"pprofAddr"`
	ProbeAddr    *string `yaml:"probeAddr"`

	// The name of the mimicry
	Name string `yaml:"name"`

	// Ethereum configuration
	Ethereum ethereum.Config `yaml:"ethereum"`

	// Outputs configuration
	Outputs []output.Config `yaml:"outputs"`

	// Labels configures the mimicry with labels
	Labels map[string]string `yaml:"labels"`

	// NTP Server to use for clock drift correction
	NTPServer string `yaml:"ntpServer" default:"time.google.com"`

	// Events is the configuration for the events
	Events EventConfig `yaml:"events"`

	// Sharding is the configuration for event sharding
	Sharding ShardingConfig `yaml:"sharding"`
}

func (*Config) ApplyOverrides added in v1.0.15

func (c *Config) ApplyOverrides(o *Override, log logrus.FieldLogger) error

ApplyOverrides applies any overrides to the config.

func (*Config) CreateSinks

func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error)

func (*Config) Validate

func (c *Config) Validate() error

type DutiesProvider added in v1.2.1

type DutiesProvider interface {
	GetValidatorIndex(epoch phase0.Epoch, slot phase0.Slot, committeeIndex phase0.CommitteeIndex, position uint64) (phase0.ValidatorIndex, error)
}

DutiesProvider provides validator duty information

type EventCategorizer added in v1.1.19

type EventCategorizer struct {
	// contains filtered or unexported fields
}

EventCategorizer manages event categorization for sharding decisions

func NewEventCategorizer added in v1.1.19

func NewEventCategorizer() *EventCategorizer

NewEventCategorizer creates and initializes an EventCategorizer with all known events

func (*EventCategorizer) GetAllEventsByGroup added in v1.1.19

func (ec *EventCategorizer) GetAllEventsByGroup() map[ShardingGroup][]xatu.Event_Name

GetAllEventsByGroup returns all events categorized by their sharding group

func (*EventCategorizer) GetEventInfo added in v1.1.19

func (ec *EventCategorizer) GetEventInfo(eventType xatu.Event_Name) (*EventInfo, bool)

GetEventInfo returns information about an event type

func (*EventCategorizer) GetGroupAEvents added in v1.1.19

func (ec *EventCategorizer) GetGroupAEvents() []xatu.Event_Name

GetGroupAEvents returns all events that have both Topic and MsgID

func (*EventCategorizer) GetGroupBEvents added in v1.1.19

func (ec *EventCategorizer) GetGroupBEvents() []xatu.Event_Name

GetGroupBEvents returns all events that have only Topic

func (*EventCategorizer) GetGroupCEvents added in v1.1.19

func (ec *EventCategorizer) GetGroupCEvents() []xatu.Event_Name

GetGroupCEvents returns all events that have only MsgID

func (*EventCategorizer) GetGroupDEvents added in v1.1.19

func (ec *EventCategorizer) GetGroupDEvents() []xatu.Event_Name

GetGroupDEvents returns all events that have no sharding keys

func (*EventCategorizer) GetShardingGroup added in v1.1.19

func (ec *EventCategorizer) GetShardingGroup(eventType xatu.Event_Name) ShardingGroup

GetShardingGroup returns the sharding group for an event type

func (*EventCategorizer) IsMetaEvent added in v1.1.19

func (ec *EventCategorizer) IsMetaEvent(eventType xatu.Event_Name) bool

IsMetaEvent returns whether an event is an RPC meta event

type EventConfig added in v0.0.169

type EventConfig struct {
	RecvRPCEnabled                    bool `yaml:"recvRpcEnabled" default:"false"`
	SendRPCEnabled                    bool `yaml:"sendRpcEnabled" default:"false"`
	DropRPCEnabled                    bool `yaml:"dropRpcEnabled" default:"false"`
	RpcMetaControlIHaveEnabled        bool `yaml:"rpcMetaControlIHaveEnabled" default:"false"`
	RpcMetaControlIWantEnabled        bool `yaml:"rpcMetaControlIWantEnabled" default:"false"`
	RpcMetaControlIDontWantEnabled    bool `yaml:"rpcMetaControlIDontWantEnabled" default:"false"`
	RpcMetaControlGraftEnabled        bool `yaml:"rpcMetaControlGraftEnabled" default:"false"`
	RpcMetaControlPruneEnabled        bool `yaml:"rpcMetaControlPruneEnabled" default:"false"`
	RpcMetaSubscriptionEnabled        bool `yaml:"rpcMetaSubscriptionEnabled" default:"false"`
	RpcMetaMessageEnabled             bool `yaml:"rpcMetaMessageEnabled" default:"false"`
	AddPeerEnabled                    bool `yaml:"addPeerEnabled" default:"true"`
	RemovePeerEnabled                 bool `yaml:"removePeerEnabled" default:"true"`
	ConnectedEnabled                  bool `yaml:"connectedEnabled" default:"true"`
	DisconnectedEnabled               bool `yaml:"disconnectedEnabled" default:"true"`
	SyntheticHeartbeatEnabled         bool `yaml:"syntheticHeartbeatEnabled" default:"true"`
	JoinEnabled                       bool `yaml:"joinEnabled" default:"true"`
	LeaveEnabled                      bool `yaml:"leaveEnabled" default:"false"`
	GraftEnabled                      bool `yaml:"graftEnabled" default:"false"`
	PruneEnabled                      bool `yaml:"pruneEnabled" default:"false"`
	PublishMessageEnabled             bool `yaml:"publishMessageEnabled" default:"false"`
	RejectMessageEnabled              bool `yaml:"rejectMessageEnabled" default:"false"`
	DuplicateMessageEnabled           bool `yaml:"duplicateMessageEnabled" default:"false"`
	DeliverMessageEnabled             bool `yaml:"deliverMessageEnabled" default:"false"`
	HandleMetadataEnabled             bool `yaml:"handleMetadataEnabled" default:"true"`
	HandleStatusEnabled               bool `yaml:"handleStatusEnabled" default:"true"`
	CustodyProbeEnabled               bool `yaml:"custodyProbeEnabled" default:"true"`
	GossipSubBeaconBlockEnabled       bool `yaml:"gossipSubBeaconBlockEnabled" default:"true"`
	GossipSubAttestationEnabled       bool `yaml:"gossipSubAttestationEnabled" default:"true"`
	GossipSubAggregateAndProofEnabled bool `yaml:"gossipSubAggregateAndProofEnabled" default:"true"`
	GossipSubBlobSidecarEnabled       bool `yaml:"gossipSubBlobSidecarEnabled" default:"true"`
	GossipSubDataColumnSidecarEnabled bool `yaml:"gossipSubDataColumnSidecarEnabled" default:"true"`
	EngineAPINewPayloadEnabled        bool `yaml:"engineApiNewPayloadEnabled" default:"false"`
	EngineAPIGetBlobsEnabled          bool `yaml:"engineApiGetBlobsEnabled" default:"false"`
}

EventConfig represents configuration for all event types.

func (*EventConfig) Validate added in v0.0.169

func (e *EventConfig) Validate() error

Validate validates the event config.

type EventInfo added in v1.1.19

type EventInfo struct {
	Type          xatu.Event_Name
	ShardingGroup ShardingGroup
	HasTopic      bool
	HasMsgID      bool
	IsMeta        bool // True for RPC meta events
}

EventInfo contains metadata about an event type

type FilteredMessageWithIndex added in v1.1.9

type FilteredMessageWithIndex struct {
	MessageID     *wrapperspb.StringValue
	OriginalIndex uint32
}

FilteredMessageWithIndex represents a filtered message with its original index

type MetaProvider added in v1.2.2

type MetaProvider interface {
	GetClientMeta(ctx context.Context) (*xatu.ClientMeta, error)
}

MetaProvider provides client metadata

type MetadataProvider added in v1.2.1

type MetadataProvider interface {
	Wallclock() *ethwallclock.EthereumBeaconChain
	ClockDrift() *time.Duration
	Network() *xatu.ClientMeta_Ethereum_Network
}

MetadataProvider provides ethereum network metadata

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics provides simplified metrics for the sharding system

func NewMetrics

func NewMetrics(namespace string) *Metrics

NewMetrics creates a new metrics instance with simplified metrics

func (*Metrics) AddDecoratedEvent

func (m *Metrics) AddDecoratedEvent(count float64, eventType, network string)

AddDecoratedEvent tracks decorated events (before sharding)

func (*Metrics) AddEvent added in v1.1.19

func (m *Metrics) AddEvent(eventType, network string)

AddEvent records that an event was received

func (*Metrics) AddProcessedMessage added in v1.1.1

func (m *Metrics) AddProcessedMessage(eventType, network string)

AddProcessedMessage records that an event was processed

func (*Metrics) AddShardingDecision added in v1.1.19

func (m *Metrics) AddShardingDecision(eventType, reason, network string)

AddShardingDecision records the reason for a sharding decision

func (*Metrics) AddSkippedMessage added in v1.1.1

func (m *Metrics) AddSkippedMessage(eventType, network string)

AddSkippedMessage records that an event was filtered out

type MetricsCollector added in v1.2.1

type MetricsCollector interface {
	AddEvent(eventType, network string)
	AddProcessedMessage(eventType, network string)
	AddSkippedMessage(eventType, network string)
	AddShardingDecision(eventType, reason, network string)
	AddDecoratedEvent(count float64, eventType, network string)
}

MetricsCollector interface for event processing metrics

type NoShardingKeyConfig added in v1.1.19

type NoShardingKeyConfig struct {
	// Whether to record events without sharding keys (default: true)
	Enabled bool `yaml:"enabled" default:"true"`
}

NoShardingKeyConfig defines behavior for events without sharding keys

type OutputHandler added in v1.2.1

type OutputHandler interface {
	HandleDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error
	HandleDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error
}

OutputHandler handles processed events

type Override added in v1.0.15

type Override struct {
	MetricsAddr struct {
		Enabled bool
		Value   string
	}
}

Override is the set of overrides for the cl-mimicry command.

type Processor added in v1.2.1

type Processor struct {
	// contains filtered or unexported fields
}

Processor encapsulates all event processing logic for Hermes events

func NewProcessor added in v1.2.1

func NewProcessor(
	duties DutiesProvider,
	output OutputHandler,
	metrics MetricsCollector,
	metaProvider MetaProvider,
	unifiedSharder *UnifiedSharder,
	eventCategorizer *EventCategorizer,
	wallclock *ethwallclock.EthereumBeaconChain,
	clockDrift time.Duration,
	events EventConfig,
	log logrus.FieldLogger,
) *Processor

NewProcessor creates a new Processor instance

func (*Processor) HandleHermesEvent added in v1.2.1

func (p *Processor) HandleHermesEvent(ctx context.Context, event *TraceEvent) error

HandleHermesEvent processes a Hermes trace event and routes it to the appropriate handler

func (*Processor) ShouldTraceMessage added in v1.2.1

func (p *Processor) ShouldTraceMessage(
	event *TraceEvent,
	clientMeta *xatu.ClientMeta,
	xatuEventType string,
) bool

ShouldTraceMessage determines whether a message with the given MsgID should be included in the sample based on the configured trace settings.

func (*Processor) ShouldTraceRPCMetaMessages added in v1.2.1

func (p *Processor) ShouldTraceRPCMetaMessages(
	clientMeta *xatu.ClientMeta,
	xatuEventType string,
	messages interface{},
) ([]FilteredMessageWithIndex, error)

ShouldTraceRPCMetaMessages determines which RPC meta messages should be processed based on sharding configuration

type RPCMetaMessageInfo added in v1.1.19

type RPCMetaMessageInfo struct {
	MessageID *wrapperspb.StringValue
	Topic     *wrapperspb.StringValue // Optional: gossip topic for the message
}

RPCMetaMessageInfo represents a message with its ID and optional topic for RPC meta filtering

type RPCMetaTopicInfo added in v1.1.23

type RPCMetaTopicInfo struct {
	Topic *wrapperspb.StringValue // Gossip topic for the event
}

RPCMetaTopicInfo represents a topic-based RPC meta event for filtering

type RpcControlGraft added in v1.6.4

type RpcControlGraft struct {
	TopicID string
}

RpcControlGraft represents a GRAFT control message.

type RpcControlIHave added in v1.6.4

type RpcControlIHave struct {
	TopicID string
	MsgIDs  []string
}

RpcControlIHave represents an IHAVE control message.

type RpcControlIWant added in v1.6.4

type RpcControlIWant struct {
	MsgIDs []string
}

RpcControlIWant represents an IWANT control message.

type RpcControlIdontWant added in v1.6.4

type RpcControlIdontWant struct {
	MsgIDs []string
}

RpcControlIdontWant represents an IDONTWANT control message.

type RpcControlPrune added in v1.6.4

type RpcControlPrune struct {
	TopicID string
	PeerIDs []peer.ID
}

RpcControlPrune represents a PRUNE control message.

type RpcMeta added in v1.6.4

type RpcMeta struct {
	PeerID        peer.ID
	Subscriptions []RpcMetaSub    `json:"Subs,omitempty"`
	Messages      []RpcMetaMsg    `json:"Msgs,omitempty"`
	Control       *RpcMetaControl `json:"Control,omitempty"`
}

RpcMeta contains metadata for RPC messages. Extracted from github.com/probe-lab/hermes/host.

type RpcMetaControl added in v1.6.4

type RpcMetaControl struct {
	IHave     []RpcControlIHave     `json:"IHave,omitempty"`
	IWant     []RpcControlIWant     `json:"IWant,omitempty"`
	Graft     []RpcControlGraft     `json:"Graft,omitempty"`
	Prune     []RpcControlPrune     `json:"Prune,omitempty"`
	Idontwant []RpcControlIdontWant `json:"Idontwant,omitempty"`
}

RpcMetaControl contains control messages for GossipSub.

type RpcMetaMsg added in v1.6.4

type RpcMetaMsg struct {
	MsgID string `json:"MsgID,omitempty"`
	Topic string `json:"Topic,omitempty"`
}

RpcMetaMsg represents a message in an RPC.

type RpcMetaSub added in v1.6.4

type RpcMetaSub struct {
	Subscribe bool
	TopicID   string
}

RpcMetaSub represents a subscription message.

type ShardableEvent added in v1.1.19

type ShardableEvent struct {
	MsgID string
	Topic string
}

ShardableEvent represents an event that can be sharded

type ShardingConfig added in v1.1.19

type ShardingConfig struct {
	// Topic-based patterns with sampling rates
	Topics map[string]*TopicShardingConfig `yaml:"topics"`

	// Events without sharding keys (Group D)
	NoShardingKeyEvents *NoShardingKeyConfig `yaml:"noShardingKeyEvents,omitempty"`
	// contains filtered or unexported fields
}

ShardingConfig represents the sharding configuration

func (*ShardingConfig) LogSummary added in v1.1.19

func (c *ShardingConfig) LogSummary() string

LogSummary returns a human-readable summary of the sharding configuration

type ShardingGroup added in v1.1.19

type ShardingGroup int

ShardingGroup represents the categorization of events based on their sharding capabilities

const (
	// GroupA events have both Topic and MsgID available for sharding
	GroupA ShardingGroup = iota
	// GroupB events have only Topic available for sharding
	GroupB
	// GroupC events have only MsgID available for sharding
	GroupC
	// GroupD events have no sharding keys available
	GroupD
)

type TopicShardingConfig added in v1.1.19

type TopicShardingConfig struct {
	// Total number of shards for this topic pattern
	TotalShards uint64 `yaml:"totalShards"`
	// Active shards for this topic pattern
	ActiveShards []uint64 `yaml:"activeShards"`
}

TopicShardingConfig defines sharding for a topic pattern

func (*TopicShardingConfig) GetSamplingRate added in v1.1.19

func (t *TopicShardingConfig) GetSamplingRate() float64

GetSamplingRate returns the sampling rate for a topic pattern

func (*TopicShardingConfig) IsFirehose added in v1.1.19

func (t *TopicShardingConfig) IsFirehose() bool

IsFirehose returns true if all shards are active

func (*TopicShardingConfig) UnmarshalYAML added in v1.1.19

func (t *TopicShardingConfig) UnmarshalYAML(node *yaml.Node) error

UnmarshalYAML implements custom YAML unmarshaling to support range syntax

type TraceEvent added in v1.6.4

type TraceEvent struct {
	Type      string
	Topic     string
	PeerID    peer.ID
	Timestamp time.Time
	Payload   any `json:"Data"` // cannot use field "Data" because of gk.Record method
}

TraceEvent represents a trace event from the libp2p network. Extracted from github.com/probe-lab/hermes/host.

type TraceEventAltairBlock added in v1.6.4

type TraceEventAltairBlock struct {
	TraceEventPayloadMetaData
	Block *ethtypes.SignedBeaconBlockAltair
}

TraceEventAltairBlock represents an Altair beacon block event.

func NewAltairBlockPayload added in v1.6.4

NewAltairBlockPayload creates an Altair block payload.

type TraceEventAttestation added in v1.6.4

type TraceEventAttestation struct {
	TraceEventPayloadMetaData
	Attestation *ethtypes.Attestation
}

TraceEventAttestation represents an attestation event.

func NewAttestationPayload added in v1.6.4

func NewAttestationPayload(att *ethtypes.Attestation, meta *TraceEventPayloadMetaData) *TraceEventAttestation

NewAttestationPayload creates a pre-Electra attestation payload.

type TraceEventAttestationElectra added in v1.6.4

type TraceEventAttestationElectra struct {
	TraceEventPayloadMetaData
	AttestationElectra *ethtypes.AttestationElectra
}

TraceEventAttestationElectra represents an Electra attestation event.

func NewAttestationElectraPayload added in v1.6.4

func NewAttestationElectraPayload(att *ethtypes.AttestationElectra, meta *TraceEventPayloadMetaData) *TraceEventAttestationElectra

NewAttestationElectraPayload creates an Electra attestation payload.

type TraceEventAttesterSlashing added in v1.6.4

type TraceEventAttesterSlashing struct {
	TraceEventPayloadMetaData
	AttesterSlashing *ethtypes.AttesterSlashing
}

TraceEventAttesterSlashing represents an attester slashing event.

type TraceEventBLSToExecutionChange added in v1.6.4

type TraceEventBLSToExecutionChange struct {
	TraceEventPayloadMetaData
	BLSToExecutionChange *ethtypes.BLSToExecutionChange
}

TraceEventBLSToExecutionChange represents a BLS to execution change event.

type TraceEventBellatrixBlock added in v1.6.4

type TraceEventBellatrixBlock struct {
	TraceEventPayloadMetaData
	Block *ethtypes.SignedBeaconBlockBellatrix
}

TraceEventBellatrixBlock represents a Bellatrix beacon block event.

func NewBellatrixBlockPayload added in v1.6.4

NewBellatrixBlockPayload creates a Bellatrix block payload.

type TraceEventBlobSidecar added in v1.6.4

type TraceEventBlobSidecar struct {
	TraceEventPayloadMetaData
	BlobSidecar *ethtypes.BlobSidecar
}

TraceEventBlobSidecar represents a blob sidecar event.

func NewBlobSidecarPayload added in v1.6.4

func NewBlobSidecarPayload(blob *ethtypes.BlobSidecar, meta *TraceEventPayloadMetaData) *TraceEventBlobSidecar

NewBlobSidecarPayload creates a blob sidecar payload.

type TraceEventCapellaBlock added in v1.6.4

type TraceEventCapellaBlock struct {
	TraceEventPayloadMetaData
	Block *ethtypes.SignedBeaconBlockCapella
}

TraceEventCapellaBlock represents a Capella beacon block event.

func NewCapellaBlockPayload added in v1.6.4

NewCapellaBlockPayload creates a Capella block payload.

type TraceEventConsensusEngineAPIGetBlobs added in v1.6.9

type TraceEventConsensusEngineAPIGetBlobs struct {
	TraceEventPayloadMetaData

	// Timing
	RequestedAt time.Time     `json:"requested_at"`
	Duration    time.Duration `json:"duration"`

	// Beacon context
	Slot            uint64 `json:"slot"`
	BlockRoot       string `json:"block_root"`
	ParentBlockRoot string `json:"parent_block_root"`

	// Request details
	RequestedCount  uint32   `json:"requested_count"`
	VersionedHashes []string `json:"versioned_hashes"`

	// Response
	ReturnedCount uint32 `json:"returned_count"`
	Status        string `json:"status"`
	ErrorMessage  string `json:"error_message"`

	// Meta
	MethodVersion string `json:"method_version"`

	// ExecutionClientVersion is the raw version string from web3_clientVersion RPC.
	// Parsed into components when converting to protobuf.
	ExecutionClientVersion string `json:"execution_client_version"`
}

TraceEventConsensusEngineAPIGetBlobs represents an engine_getBlobs API call event.

func NewConsensusEngineAPIGetBlobsPayload added in v1.6.9

func NewConsensusEngineAPIGetBlobsPayload(
	requestedAt time.Time,
	duration time.Duration,
	slot uint64,
	blockRoot, parentBlockRoot string,
	requestedCount uint32,
	versionedHashes []string,
	returnedCount uint32,
	status, errorMessage string,
	methodVersion string,
	executionClientVersion string,
) *TraceEventConsensusEngineAPIGetBlobs

NewConsensusEngineAPIGetBlobsPayload creates a consensus engine API get blobs event. The executionClientVersion parameter is the raw version string from web3_clientVersion RPC. It will be parsed into components when converting to protobuf.

type TraceEventConsensusEngineAPINewPayload added in v1.6.7

type TraceEventConsensusEngineAPINewPayload struct {
	TraceEventPayloadMetaData

	// Timing
	RequestedAt time.Time     `json:"requested_at"`
	Duration    time.Duration `json:"duration"`

	// Beacon context
	Slot            uint64 `json:"slot"`
	BlockRoot       string `json:"block_root"`
	ParentBlockRoot string `json:"parent_block_root"`
	ProposerIndex   uint64 `json:"proposer_index"`

	// Execution payload
	BlockNumber uint64 `json:"block_number"`
	BlockHash   string `json:"block_hash"`
	ParentHash  string `json:"parent_hash"`
	GasUsed     uint64 `json:"gas_used"`
	GasLimit    uint64 `json:"gas_limit"`
	TxCount     uint32 `json:"tx_count"`
	BlobCount   uint32 `json:"blob_count"`

	// Response
	Status          string `json:"status"`
	LatestValidHash string `json:"latest_valid_hash"`
	ValidationError string `json:"validation_error"`

	// Meta
	MethodVersion string `json:"method_version"`

	// ExecutionClientVersion is the raw version string from web3_clientVersion RPC.
	// Parsed into components when converting to protobuf.
	ExecutionClientVersion string `json:"execution_client_version"`
}

TraceEventConsensusEngineAPINewPayload represents an engine_newPayload API call event.

func NewConsensusEngineAPINewPayloadPayload added in v1.6.7

func NewConsensusEngineAPINewPayloadPayload(
	requestedAt time.Time,
	duration time.Duration,
	slot, proposerIndex uint64,
	blockRoot, parentBlockRoot string,
	blockNumber uint64,
	blockHash, parentHash string,
	gasUsed, gasLimit uint64,
	txCount, blobCount uint32,
	status, latestValidHash, validationError string,
	methodVersion string,
	executionClientVersion string,
) *TraceEventConsensusEngineAPINewPayload

NewConsensusEngineAPINewPayloadPayload creates a consensus engine API new payload event. The executionClientVersion parameter is the raw version string from web3_clientVersion RPC. It will be parsed into components when converting to protobuf.

type TraceEventCustodyProbe added in v1.6.4

type TraceEventCustodyProbe struct {
	TraceEventPayloadMetaData
	PeerID     *peer.ID      `json:"peer_id,omitempty"`
	Epoch      uint64        `json:"epoch"`
	Slot       uint64        `json:"slot"`
	BlockHash  string        `json:"block_hash"`
	Column     uint64        `json:"column_id"`
	Result     string        `json:"result,omitempty"`
	Duration   time.Duration `json:"duration,omitempty"`
	ColumnSize int           `json:"column_size,omitempty"`
	Error      string        `json:"error,omitempty"`
}

TraceEventCustodyProbe represents a data column custody probe event.

func NewCustodyProbePayload added in v1.6.4

func NewCustodyProbePayload(
	peerID *peer.ID,
	epoch, slot, column uint64,
	blockHash, result, errorStr string,
	duration time.Duration,
	columnSize int,
) *TraceEventCustodyProbe

NewCustodyProbePayload creates a custody probe payload.

type TraceEventDataColumnSidecar added in v1.6.4

type TraceEventDataColumnSidecar struct {
	TraceEventPayloadMetaData
	DataColumnSidecar *ethtypes.DataColumnSidecar
}

TraceEventDataColumnSidecar represents a data column sidecar event.

func NewDataColumnSidecarPayload added in v1.6.4

func NewDataColumnSidecarPayload(dataColumn *ethtypes.DataColumnSidecar, meta *TraceEventPayloadMetaData) *TraceEventDataColumnSidecar

NewDataColumnSidecarPayload creates a data column sidecar payload.

type TraceEventDenebBlock added in v1.6.4

type TraceEventDenebBlock struct {
	TraceEventPayloadMetaData
	Block *ethtypes.SignedBeaconBlockDeneb
}

TraceEventDenebBlock represents a Deneb beacon block event.

func NewDenebBlockPayload added in v1.6.4

NewDenebBlockPayload creates a Deneb block payload.

type TraceEventElectraBlock added in v1.6.4

type TraceEventElectraBlock struct {
	TraceEventPayloadMetaData
	Block *ethtypes.SignedBeaconBlockElectra
}

TraceEventElectraBlock represents an Electra beacon block event.

func NewElectraBlockPayload added in v1.6.4

NewElectraBlockPayload creates an Electra block payload.

type TraceEventFuluBlock added in v1.6.4

type TraceEventFuluBlock struct {
	TraceEventPayloadMetaData
	Block *ethtypes.SignedBeaconBlockFulu
}

TraceEventFuluBlock represents a Fulu beacon block event.

func NewFuluBlockPayload added in v1.6.4

NewFuluBlockPayload creates a Fulu block payload.

type TraceEventPayloadMetaData added in v1.6.4

type TraceEventPayloadMetaData struct {
	PeerID  string `json:"PeerID"`
	Topic   string `json:"Topic"`
	Seq     []byte `json:"Seq"`
	MsgID   string `json:"MsgID"`
	MsgSize int    `json:"MsgSize"`
}

TraceEventPayloadMetaData contains metadata for trace event payloads. Extracted from github.com/probe-lab/hermes/host.

type TraceEventPhase0Block added in v1.6.4

type TraceEventPhase0Block struct {
	TraceEventPayloadMetaData
	Block *ethtypes.SignedBeaconBlock
}

TraceEventPhase0Block represents a Phase0 beacon block event.

func NewPhase0BlockPayload added in v1.6.4

func NewPhase0BlockPayload(block *ethtypes.SignedBeaconBlock, meta *TraceEventPayloadMetaData) *TraceEventPhase0Block

NewPhase0BlockPayload creates a Phase0 block payload.

type TraceEventProposerSlashing added in v1.6.4

type TraceEventProposerSlashing struct {
	TraceEventPayloadMetaData
	ProposerSlashing *ethtypes.ProposerSlashing
}

TraceEventProposerSlashing represents a proposer slashing event.

type TraceEventSignedAggregateAttestationAndProof added in v1.6.4

type TraceEventSignedAggregateAttestationAndProof struct {
	TraceEventPayloadMetaData
	SignedAggregateAttestationAndProof *ethtypes.SignedAggregateAttestationAndProof
}

TraceEventSignedAggregateAttestationAndProof represents a signed aggregate attestation and proof event.

func NewSignedAggregateAttestationAndProofPayload added in v1.6.4

NewSignedAggregateAttestationAndProofPayload creates a pre-Electra aggregate attestation payload.

type TraceEventSignedAggregateAttestationAndProofElectra added in v1.6.4

type TraceEventSignedAggregateAttestationAndProofElectra struct {
	TraceEventPayloadMetaData
	SignedAggregateAttestationAndProofElectra *ethtypes.SignedAggregateAttestationAndProofElectra
}

TraceEventSignedAggregateAttestationAndProofElectra represents an Electra signed aggregate attestation and proof event.

func NewSignedAggregateAttestationAndProofElectraPayload added in v1.6.4

NewSignedAggregateAttestationAndProofElectraPayload creates an Electra aggregate attestation payload.

type TraceEventSignedContributionAndProof added in v1.6.4

type TraceEventSignedContributionAndProof struct {
	TraceEventPayloadMetaData
	SignedContributionAndProof *ethtypes.SignedContributionAndProof
}

TraceEventSignedContributionAndProof represents a signed contribution and proof event.

type TraceEventSingleAttestation added in v1.6.4

type TraceEventSingleAttestation struct {
	TraceEventPayloadMetaData
	SingleAttestation *ethtypes.SingleAttestation
}

TraceEventSingleAttestation represents a single attestation event.

func NewSingleAttestationPayload added in v1.6.4

func NewSingleAttestationPayload(att *ethtypes.SingleAttestation, meta *TraceEventPayloadMetaData) *TraceEventSingleAttestation

NewSingleAttestationPayload creates a single attestation payload.

type TraceEventSyncCommitteeMessage added in v1.6.4

type TraceEventSyncCommitteeMessage struct {
	TraceEventPayloadMetaData
	SyncCommitteeMessage *ethtypes.SyncCommitteeMessage //nolint:staticcheck // gRPC API deprecated but still supported until v8 (2026)
}

TraceEventSyncCommitteeMessage represents a sync committee message event.

type TraceEventVoluntaryExit added in v1.6.4

type TraceEventVoluntaryExit struct {
	TraceEventPayloadMetaData
	VoluntaryExit *ethtypes.VoluntaryExit
}

TraceEventVoluntaryExit represents a voluntary exit event.

type UnifiedSharder added in v1.1.19

type UnifiedSharder struct {
	// contains filtered or unexported fields
}

UnifiedSharder provides a single sharding decision point for all events

func NewUnifiedSharder added in v1.1.19

func NewUnifiedSharder(config *ShardingConfig, enabled bool) (*UnifiedSharder, error)

NewUnifiedSharder creates a new unified sharder

func (*UnifiedSharder) GetShardForKey added in v1.1.19

func (s *UnifiedSharder) GetShardForKey(key string, totalShards uint64) uint64

GetShardForKey returns the shard number for a given key (for testing/debugging)

func (*UnifiedSharder) ShouldProcess added in v1.1.19

func (s *UnifiedSharder) ShouldProcess(eventType xatu.Event_Name, msgID, topic string) (bool, string)

ShouldProcess determines if an event should be processed based on sharding rules

func (*UnifiedSharder) ShouldProcessBatch added in v1.1.19

func (s *UnifiedSharder) ShouldProcessBatch(eventType xatu.Event_Name, events []ShardableEvent) []bool

ShouldProcessBatch determines which events in a batch should be processed This is used for RPC meta events where we have multiple events to evaluate

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL