Documentation
¶
Index ¶
- Constants
- func GetGossipTopics(event *host.TraceEvent) []string
- func GetMsgID(event *host.TraceEvent) string
- func GetShard(shardingKey string, totalShards uint64) uint64
- func IsShardActive(shard uint64, activeShards []uint64) bool
- func SipHash(key [16]byte, data []byte) uint64
- type CompiledPattern
- type Config
- type DutiesProvider
- type EventCategorizer
- func (ec *EventCategorizer) GetAllEventsByGroup() map[ShardingGroup][]xatu.Event_Name
- func (ec *EventCategorizer) GetEventInfo(eventType xatu.Event_Name) (*EventInfo, bool)
- func (ec *EventCategorizer) GetGroupAEvents() []xatu.Event_Name
- func (ec *EventCategorizer) GetGroupBEvents() []xatu.Event_Name
- func (ec *EventCategorizer) GetGroupCEvents() []xatu.Event_Name
- func (ec *EventCategorizer) GetGroupDEvents() []xatu.Event_Name
- func (ec *EventCategorizer) GetShardingGroup(eventType xatu.Event_Name) ShardingGroup
- func (ec *EventCategorizer) IsMetaEvent(eventType xatu.Event_Name) bool
- type EventConfig
- type EventInfo
- type FilteredMessageWithIndex
- type MetaProvider
- type MetadataProvider
- type Metrics
- func (m *Metrics) AddDecoratedEvent(count float64, eventType, network string)
- func (m *Metrics) AddEvent(eventType, network string)
- func (m *Metrics) AddProcessedMessage(eventType, network string)
- func (m *Metrics) AddShardingDecision(eventType, reason, network string)
- func (m *Metrics) AddSkippedMessage(eventType, network string)
- type MetricsCollector
- type Mimicry
- func (m *Mimicry) ClockDrift() *time.Duration
- func (m *Mimicry) GetClientMeta(ctx context.Context) (*xatu.ClientMeta, error)
- func (m *Mimicry) GetProcessor() *Processor
- func (m *Mimicry) GetValidatorIndex(epoch phase0.Epoch, slot phase0.Slot, committeeIndex phase0.CommitteeIndex, ...) (phase0.ValidatorIndex, error)
- func (m *Mimicry) HandleDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error
- func (m *Mimicry) HandleDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error
- func (m *Mimicry) Network() *xatu.ClientMeta_Ethereum_Network
- func (m *Mimicry) ServeMetrics(ctx context.Context) error
- func (m *Mimicry) ServePProf(ctx context.Context) error
- func (m *Mimicry) ServeProbe(ctx context.Context) error
- func (m *Mimicry) Start(ctx context.Context) error
- func (m *Mimicry) Wallclock() *ethwallclock.EthereumBeaconChain
- type NoShardingKeyConfig
- type NodeConfig
- type OutputHandler
- type Override
- type Processor
- func (p *Processor) HandleHermesEvent(ctx context.Context, event *host.TraceEvent) error
- func (p *Processor) ShouldTraceMessage(event *host.TraceEvent, clientMeta *xatu.ClientMeta, xatuEventType string) bool
- func (p *Processor) ShouldTraceRPCMetaMessages(clientMeta *xatu.ClientMeta, xatuEventType string, messages interface{}) ([]FilteredMessageWithIndex, error)
- type RPCMetaMessageInfo
- type RPCMetaTopicInfo
- type ShardableEvent
- type ShardingConfig
- type ShardingGroup
- type TopicShardingConfig
- type UnifiedSharder
Constants ¶
const ( // libp2p pubsub events. TraceEvent_HANDLE_MESSAGE = "HANDLE_MESSAGE" // libp2p core networking events. TraceEvent_CONNECTED = "CONNECTED" TraceEvent_DISCONNECTED = "DISCONNECTED" TraceEvent_SYNTHETIC_HEARTBEAT = "SYNTHETIC_HEARTBEAT" // RPC events. TraceEvent_HANDLE_METADATA = "HANDLE_METADATA" TraceEvent_HANDLE_STATUS = "HANDLE_STATUS" )
Define events not supplied by libp2p proto pkgs.
const (
// DefaultTotalShards is the default number of shards if not specified
DefaultTotalShards = 512
)
Variables ¶
This section is empty.
Functions ¶
func GetGossipTopics ¶ added in v1.1.19
func GetGossipTopics(event *host.TraceEvent) []string
GetGossipTopics extracts all gossip topics from a trace event if available. Returns a slice of unique topics found in the event.
func GetMsgID ¶ added in v1.1.19
func GetMsgID(event *host.TraceEvent) string
GetMsgID extracts the message ID from the event for sharding. We only shard based on message IDs, not peer IDs.
func GetShard ¶ added in v1.1.1
GetShard calculates which shard a message belongs to based on its ID.
This function uses SipHash to consistently map message IDs (typically hashes themselves) to specific shards, ensuring even distribution across the available shards.
Key benefits: - Deterministic: The same message ID always maps to the same shard - Balanced: Messages are evenly distributed across all shards
Parameters:
- shardingKey: The identifier to use for sharding (often a hash like "0x1234...abcd")
- totalShards: The total number of available shards (e.g., 64)
Returns:
- The shard number (0 to totalShards-1) where this message should be processed
func IsShardActive ¶ added in v1.1.1
IsShardActive checks if a shard is in the active shards list.
func SipHash ¶ added in v1.1.1
SipHash implements the SipHash-2-4 algorithm, a fast and efficient hash function designed for message authentication and hash-table lookups.
Key features of SipHash: - Deterministic output for identical inputs - Even distribution of outputs across the range of uint64
When used for sharding (via GetShard), SipHash provides: - Consistent distribution of messages across shards - Deterministic routing where the same message always maps to the same shard
Parameters:
- key: A 16-byte secret key (can be fixed for consistent sharding)
- data: The message bytes to hash
Returns:
- A 64-bit unsigned integer hash value
References:
- Reference implementation: https://github.com/veorq/SipHash
- ClickHouse documentation: https://clickhouse.com/docs/sql-reference/functions/hash-functions#siphash64
Types ¶
type CompiledPattern ¶ added in v1.1.19
type CompiledPattern struct {
Pattern *regexp.Regexp
Config *TopicShardingConfig
// EventTypeConstraint specifies which event types this pattern applies to.
// Empty string means it applies to all events (backward compatibility).
// Can be an exact event name (e.g., "LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION")
// or a wildcard pattern (e.g., "LIBP2P_TRACE_RPC_META_*")
EventTypeConstraint string
}
CompiledPattern holds a compiled regex pattern and its config
type Config ¶
type Config struct {
LoggingLevel string `yaml:"logging" default:"info"`
MetricsAddr string `yaml:"metricsAddr" default:":9090"`
PProfAddr *string `yaml:"pprofAddr"`
ProbeAddr *string `yaml:"probeAddr"`
// The name of the mimicry
Name string `yaml:"name"`
// Ethereum configuration
Ethereum ethereum.Config `yaml:"ethereum"`
// Outputs configuration
Outputs []output.Config `yaml:"outputs"`
// Labels configures the mimicry with labels
Labels map[string]string `yaml:"labels"`
// NTP Server to use for clock drift correction
NTPServer string `yaml:"ntpServer" default:"time.google.com"`
// Node is the configuration for the node
Node NodeConfig `yaml:"node"`
// Events is the configuration for the events
Events EventConfig `yaml:"events"`
// Sharding is the configuration for event sharding
Sharding ShardingConfig `yaml:"sharding"`
}
func (*Config) ApplyOverrides ¶ added in v1.0.15
func (c *Config) ApplyOverrides(o *Override, log logrus.FieldLogger) error
ApplyOverrides applies any overrides to the config.
func (*Config) CreateSinks ¶
type DutiesProvider ¶ added in v1.2.1
type DutiesProvider interface {
GetValidatorIndex(epoch phase0.Epoch, slot phase0.Slot, committeeIndex phase0.CommitteeIndex, position uint64) (phase0.ValidatorIndex, error)
}
DutiesProvider provides validator duty information
type EventCategorizer ¶ added in v1.1.19
type EventCategorizer struct {
// contains filtered or unexported fields
}
EventCategorizer manages event categorization for sharding decisions
func NewEventCategorizer ¶ added in v1.1.19
func NewEventCategorizer() *EventCategorizer
NewEventCategorizer creates and initializes an EventCategorizer with all known events
func (*EventCategorizer) GetAllEventsByGroup ¶ added in v1.1.19
func (ec *EventCategorizer) GetAllEventsByGroup() map[ShardingGroup][]xatu.Event_Name
GetAllEventsByGroup returns all events categorized by their sharding group
func (*EventCategorizer) GetEventInfo ¶ added in v1.1.19
func (ec *EventCategorizer) GetEventInfo(eventType xatu.Event_Name) (*EventInfo, bool)
GetEventInfo returns information about an event type
func (*EventCategorizer) GetGroupAEvents ¶ added in v1.1.19
func (ec *EventCategorizer) GetGroupAEvents() []xatu.Event_Name
GetGroupAEvents returns all events that have both Topic and MsgID
func (*EventCategorizer) GetGroupBEvents ¶ added in v1.1.19
func (ec *EventCategorizer) GetGroupBEvents() []xatu.Event_Name
GetGroupBEvents returns all events that have only Topic
func (*EventCategorizer) GetGroupCEvents ¶ added in v1.1.19
func (ec *EventCategorizer) GetGroupCEvents() []xatu.Event_Name
GetGroupCEvents returns all events that have only MsgID
func (*EventCategorizer) GetGroupDEvents ¶ added in v1.1.19
func (ec *EventCategorizer) GetGroupDEvents() []xatu.Event_Name
GetGroupDEvents returns all events that have no sharding keys
func (*EventCategorizer) GetShardingGroup ¶ added in v1.1.19
func (ec *EventCategorizer) GetShardingGroup(eventType xatu.Event_Name) ShardingGroup
GetShardingGroup returns the sharding group for an event type
func (*EventCategorizer) IsMetaEvent ¶ added in v1.1.19
func (ec *EventCategorizer) IsMetaEvent(eventType xatu.Event_Name) bool
IsMetaEvent returns whether an event is an RPC meta event
type EventConfig ¶ added in v0.0.169
type EventConfig struct {
RecvRPCEnabled bool `yaml:"recvRpcEnabled" default:"false"`
SendRPCEnabled bool `yaml:"sendRpcEnabled" default:"false"`
DropRPCEnabled bool `yaml:"dropRpcEnabled" default:"false"`
RpcMetaControlIHaveEnabled bool `yaml:"rpcMetaControlIHaveEnabled" default:"false"`
RpcMetaControlIWantEnabled bool `yaml:"rpcMetaControlIWantEnabled" default:"false"`
RpcMetaControlIDontWantEnabled bool `yaml:"rpcMetaControlIDontWantEnabled" default:"false"`
RpcMetaControlGraftEnabled bool `yaml:"rpcMetaControlGraftEnabled" default:"false"`
RpcMetaControlPruneEnabled bool `yaml:"rpcMetaControlPruneEnabled" default:"false"`
RpcMetaSubscriptionEnabled bool `yaml:"rpcMetaSubscriptionEnabled" default:"false"`
RpcMetaMessageEnabled bool `yaml:"rpcMetaMessageEnabled" default:"false"`
AddPeerEnabled bool `yaml:"addPeerEnabled" default:"true"`
RemovePeerEnabled bool `yaml:"removePeerEnabled" default:"true"`
ConnectedEnabled bool `yaml:"connectedEnabled" default:"true"`
DisconnectedEnabled bool `yaml:"disconnectedEnabled" default:"true"`
SyntheticHeartbeatEnabled bool `yaml:"syntheticHeartbeatEnabled" default:"true"`
JoinEnabled bool `yaml:"joinEnabled" default:"true"`
LeaveEnabled bool `yaml:"leaveEnabled" default:"false"`
GraftEnabled bool `yaml:"graftEnabled" default:"false"`
PruneEnabled bool `yaml:"pruneEnabled" default:"false"`
PublishMessageEnabled bool `yaml:"publishMessageEnabled" default:"false"`
RejectMessageEnabled bool `yaml:"rejectMessageEnabled" default:"false"`
DuplicateMessageEnabled bool `yaml:"duplicateMessageEnabled" default:"false"`
DeliverMessageEnabled bool `yaml:"deliverMessageEnabled" default:"false"`
HandleMetadataEnabled bool `yaml:"handleMetadataEnabled" default:"true"`
HandleStatusEnabled bool `yaml:"handleStatusEnabled" default:"true"`
GossipSubBeaconBlockEnabled bool `yaml:"gossipSubBeaconBlockEnabled" default:"true"`
GossipSubAttestationEnabled bool `yaml:"gossipSubAttestationEnabled" default:"true"`
GossipSubAggregateAndProofEnabled bool `yaml:"gossipSubAggregateAndProofEnabled" default:"true"`
GossipSubBlobSidecarEnabled bool `yaml:"gossipSubBlobSidecarEnabled" default:"true"`
GossipSubDataColumnSidecarEnabled bool `yaml:"gossipSubDataColumnSidecarEnabled" default:"true"`
}
EventConfig represents configuration for all event types.
func (*EventConfig) Validate ¶ added in v0.0.169
func (e *EventConfig) Validate() error
Validate validates the event config.
type EventInfo ¶ added in v1.1.19
type EventInfo struct {
Type xatu.Event_Name
ShardingGroup ShardingGroup
HasTopic bool
HasMsgID bool
IsMeta bool // True for RPC meta events
}
EventInfo contains metadata about an event type
type FilteredMessageWithIndex ¶ added in v1.1.9
type FilteredMessageWithIndex struct {
MessageID *wrapperspb.StringValue
OriginalIndex uint32
}
FilteredMessageWithIndex represents a filtered message with its original index
type MetaProvider ¶ added in v1.2.2
type MetaProvider interface {
GetClientMeta(ctx context.Context) (*xatu.ClientMeta, error)
}
MetaProvider provides client metadata
type MetadataProvider ¶ added in v1.2.1
type MetadataProvider interface {
Wallclock() *ethwallclock.EthereumBeaconChain
ClockDrift() *time.Duration
Network() *xatu.ClientMeta_Ethereum_Network
}
MetadataProvider provides ethereum network metadata
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics provides simplified metrics for the sharding system
func NewMetrics ¶
NewMetrics creates a new metrics instance with simplified metrics
func (*Metrics) AddDecoratedEvent ¶
AddDecoratedEvent tracks decorated events (before sharding)
func (*Metrics) AddProcessedMessage ¶ added in v1.1.1
AddProcessedMessage records that an event was processed
func (*Metrics) AddShardingDecision ¶ added in v1.1.19
AddShardingDecision records the reason for a sharding decision
func (*Metrics) AddSkippedMessage ¶ added in v1.1.1
AddSkippedMessage records that an event was filtered out
type MetricsCollector ¶ added in v1.2.1
type MetricsCollector interface {
AddEvent(eventType, network string)
AddProcessedMessage(eventType, network string)
AddSkippedMessage(eventType, network string)
AddShardingDecision(eventType, reason, network string)
AddDecoratedEvent(count float64, eventType, network string)
}
MetricsCollector interface for event processing metrics
type Mimicry ¶
type Mimicry struct {
Config *Config
// contains filtered or unexported fields
}
func (*Mimicry) ClockDrift ¶ added in v1.2.1
func (*Mimicry) GetClientMeta ¶ added in v1.2.2
func (*Mimicry) GetProcessor ¶ added in v1.2.1
GetProcessor returns the processor for testing purposes
func (*Mimicry) GetValidatorIndex ¶ added in v1.2.1
func (m *Mimicry) GetValidatorIndex(epoch phase0.Epoch, slot phase0.Slot, committeeIndex phase0.CommitteeIndex, position uint64) (phase0.ValidatorIndex, error)
Implement DutiesProvider interface
func (*Mimicry) HandleDecoratedEvent ¶ added in v1.2.1
Implement OutputHandler interface
func (*Mimicry) HandleDecoratedEvents ¶ added in v1.2.1
func (*Mimicry) Network ¶ added in v1.2.1
func (m *Mimicry) Network() *xatu.ClientMeta_Ethereum_Network
func (*Mimicry) ServeProbe ¶ added in v0.0.163
func (*Mimicry) Wallclock ¶ added in v1.2.1
func (m *Mimicry) Wallclock() *ethwallclock.EthereumBeaconChain
Implement MetadataProvider interface
type NoShardingKeyConfig ¶ added in v1.1.19
type NoShardingKeyConfig struct {
// Whether to record events without sharding keys (default: true)
Enabled bool `yaml:"enabled" default:"true"`
}
NoShardingKeyConfig defines behavior for events without sharding keys
type NodeConfig ¶
type NodeConfig struct {
// The private key for the libp2p host and local enode in hex format
PrivateKeyStr string `yaml:"privateKeyStr" default:""`
// General timeout when communicating with other network participants
DialTimeout time.Duration `yaml:"dialTimeout" default:"5s"`
// The address information of the local ethereuem [enode.Node].
Devp2pHost string `yaml:"devp2pHost" default:"0.0.0.0"`
Devp2pPort int `yaml:"devp2pPort" default:"0"`
// The address information of the local libp2p host
Libp2pHost string `yaml:"libp2pHost" default:"0.0.0.0"`
Libp2pPort int `yaml:"libp2pPort" default:"0"`
// The address information where the Beacon API or Prysm's custom API is accessible at
PrysmHost string `yaml:"prysmHost" default:"127.0.0.1"`
PrysmPortHTTP int `yaml:"prysmPortHttp" default:"3500"`
PrysmPortGRPC int `yaml:"prysmPortGrpc" default:"4000"`
PrysmUseTLS bool `yaml:"prysmUseTls" default:"false"`
// The maximum number of peers our libp2p host can be connected to.
MaxPeers int `yaml:"maxPeers" default:"30"`
// Limits the number of concurrent connection establishment routines. When
// we discover peers over discv5 and are not at our MaxPeers limit we try
// to establish a connection to a peer. However, we limit the concurrency to
// this DialConcurrency value.
DialConcurrency int `yaml:"dialConcurrency" default:"16"`
// DataStreamType is the type of data stream to use for the node (e.g. kinesis, callback, etc).
DataStreamType string `yaml:"dataStreamType" default:"callback"`
// Subnets is the configuration for gossipsub subnets.
Subnets map[string]*hermes.SubnetConfig `yaml:"subnets"`
}
func (*NodeConfig) AsHermesConfig ¶
func (h *NodeConfig) AsHermesConfig() *hermes.NodeConfig
type OutputHandler ¶ added in v1.2.1
type OutputHandler interface {
HandleDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error
HandleDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error
}
OutputHandler handles processed events
type Processor ¶ added in v1.2.1
type Processor struct {
// contains filtered or unexported fields
}
Processor encapsulates all event processing logic for Hermes events
func NewProcessor ¶ added in v1.2.1
func NewProcessor( duties DutiesProvider, output OutputHandler, metrics MetricsCollector, metaProvider MetaProvider, unifiedSharder *UnifiedSharder, eventCategorizer *EventCategorizer, wallclock *ethwallclock.EthereumBeaconChain, clockDrift time.Duration, events EventConfig, log logrus.FieldLogger, ) *Processor
NewProcessor creates a new Processor instance
func (*Processor) HandleHermesEvent ¶ added in v1.2.1
HandleHermesEvent processes a Hermes trace event and routes it to the appropriate handler
func (*Processor) ShouldTraceMessage ¶ added in v1.2.1
func (p *Processor) ShouldTraceMessage( event *host.TraceEvent, clientMeta *xatu.ClientMeta, xatuEventType string, ) bool
ShouldTraceMessage determines whether a message with the given MsgID should be included in the sample based on the configured trace settings.
func (*Processor) ShouldTraceRPCMetaMessages ¶ added in v1.2.1
func (p *Processor) ShouldTraceRPCMetaMessages( clientMeta *xatu.ClientMeta, xatuEventType string, messages interface{}, ) ([]FilteredMessageWithIndex, error)
ShouldTraceRPCMetaMessages determines which RPC meta messages should be processed based on sharding configuration
type RPCMetaMessageInfo ¶ added in v1.1.19
type RPCMetaMessageInfo struct {
MessageID *wrapperspb.StringValue
Topic *wrapperspb.StringValue // Optional: gossip topic for the message
}
RPCMetaMessageInfo represents a message with its ID and optional topic for RPC meta filtering
type RPCMetaTopicInfo ¶ added in v1.1.23
type RPCMetaTopicInfo struct {
Topic *wrapperspb.StringValue // Gossip topic for the event
}
RPCMetaTopicInfo represents a topic-based RPC meta event for filtering
type ShardableEvent ¶ added in v1.1.19
ShardableEvent represents an event that can be sharded
type ShardingConfig ¶ added in v1.1.19
type ShardingConfig struct {
// Topic-based patterns with sampling rates
Topics map[string]*TopicShardingConfig `yaml:"topics"`
// Events without sharding keys (Group D)
NoShardingKeyEvents *NoShardingKeyConfig `yaml:"noShardingKeyEvents,omitempty"`
// contains filtered or unexported fields
}
ShardingConfig represents the sharding configuration
func (*ShardingConfig) LogSummary ¶ added in v1.1.19
func (c *ShardingConfig) LogSummary() string
LogSummary returns a human-readable summary of the sharding configuration
type ShardingGroup ¶ added in v1.1.19
type ShardingGroup int
ShardingGroup represents the categorization of events based on their sharding capabilities
const ( // GroupA events have both Topic and MsgID available for sharding GroupA ShardingGroup = iota // GroupB events have only Topic available for sharding GroupB // GroupC events have only MsgID available for sharding GroupC // GroupD events have no sharding keys available GroupD )
type TopicShardingConfig ¶ added in v1.1.19
type TopicShardingConfig struct {
// Total number of shards for this topic pattern
TotalShards uint64 `yaml:"totalShards"`
// Active shards for this topic pattern
ActiveShards []uint64 `yaml:"activeShards"`
}
TopicShardingConfig defines sharding for a topic pattern
func (*TopicShardingConfig) GetSamplingRate ¶ added in v1.1.19
func (t *TopicShardingConfig) GetSamplingRate() float64
GetSamplingRate returns the sampling rate for a topic pattern
func (*TopicShardingConfig) IsFirehose ¶ added in v1.1.19
func (t *TopicShardingConfig) IsFirehose() bool
IsFirehose returns true if all shards are active
func (*TopicShardingConfig) UnmarshalYAML ¶ added in v1.1.19
func (t *TopicShardingConfig) UnmarshalYAML(node *yaml.Node) error
UnmarshalYAML implements custom YAML unmarshaling to support range syntax
type UnifiedSharder ¶ added in v1.1.19
type UnifiedSharder struct {
// contains filtered or unexported fields
}
UnifiedSharder provides a single sharding decision point for all events
func NewUnifiedSharder ¶ added in v1.1.19
func NewUnifiedSharder(config *ShardingConfig, enabled bool) (*UnifiedSharder, error)
NewUnifiedSharder creates a new unified sharder
func (*UnifiedSharder) GetShardForKey ¶ added in v1.1.19
func (s *UnifiedSharder) GetShardForKey(key string, totalShards uint64) uint64
GetShardForKey returns the shard number for a given key (for testing/debugging)
func (*UnifiedSharder) ShouldProcess ¶ added in v1.1.19
func (s *UnifiedSharder) ShouldProcess(eventType xatu.Event_Name, msgID, topic string) (bool, string)
ShouldProcess determines if an event should be processed based on sharding rules
func (*UnifiedSharder) ShouldProcessBatch ¶ added in v1.1.19
func (s *UnifiedSharder) ShouldProcessBatch(eventType xatu.Event_Name, events []ShardableEvent) []bool
ShouldProcessBatch determines which events in a batch should be processed This is used for RPC meta events where we have multiple events to evaluate
Source Files
¶
- config.go
- event.go
- event_catergorizer.go
- event_config.go
- event_gossipsub.go
- event_libp2p.go
- event_libp2p_core.go
- event_rpc.go
- gossipsub_aggregate_and_proof.go
- gossipsub_attestation.go
- gossipsub_beacon_block.go
- gossipsub_blob_sidecar.go
- gossipsub_data_column_sidecar.go
- gossipsub_single_attestation.go
- interfaces.go
- metrics.go
- mimicry.go
- node_config.go
- overrides.go
- processor.go
- sharding.go
- trace.go
- trace_shard_key.go
- trace_siphash.go