Documentation
¶
Index ¶
- Constants
- Variables
- func GetShard(shardingKey string, totalShards uint64) uint64
- func GetShardingKey(event *host.TraceEvent, clientMeta *xatu.ClientMeta, shardingKeyType string, ...) string
- func IsShardActive(shard uint64, activeShards []uint64) bool
- func SipHash(key [16]byte, data []byte) uint64
- type ActiveShardsConfig
- type Config
- type EventConfig
- type FilteredMessageWithIndex
- type Metrics
- func (m *Metrics) AddDecoratedEvent(count float64, eventType, network string)
- func (m *Metrics) AddProcessedMessage(eventType, network string)
- func (m *Metrics) AddShardObservation(topic string, shard uint64, network string)
- func (m *Metrics) AddShardProcessed(topic string, shard uint64, network string)
- func (m *Metrics) AddShardSkipped(topic string, shard uint64, network string)
- func (m *Metrics) AddSkippedMessage(eventType, network string)
- type Mimicry
- func (m *Mimicry) ServeMetrics(ctx context.Context) error
- func (m *Mimicry) ServePProf(ctx context.Context) error
- func (m *Mimicry) ServeProbe(ctx context.Context) error
- func (m *Mimicry) ShouldTraceMessage(event *host.TraceEvent, clientMeta *xatu.ClientMeta, xatuEventType string) bool
- func (m *Mimicry) ShouldTraceRPCMetaMessages(event *host.TraceEvent, clientMeta *xatu.ClientMeta, xatuEventType string, ...) ([]FilteredMessageWithIndex, error)
- func (m *Mimicry) Start(ctx context.Context) error
- type NodeConfig
- type Override
- type ShardingKeyType
- type TopicConfig
- type TracesConfig
Constants ¶
const ( // libp2p pubsub events. TraceEvent_HANDLE_MESSAGE = "HANDLE_MESSAGE" // libp2p core networking events. TraceEvent_CONNECTED = "CONNECTED" TraceEvent_DISCONNECTED = "DISCONNECTED" // RPC events. TraceEvent_HANDLE_METADATA = "HANDLE_METADATA" TraceEvent_HANDLE_STATUS = "HANDLE_STATUS" )
Define events not supplied by libp2p proto pkgs.
Variables ¶
var ( // Some events dont have anything reasonable to shard on, so we let them all through. UnshardableEventTypes = []string{ xatu.Event_LIBP2P_TRACE_JOIN.String(), xatu.Event_LIBP2P_TRACE_LEAVE.String(), xatu.Event_LIBP2P_TRACE_RPC_META_CONTROL_GRAFT.String(), } )
Functions ¶
func GetShard ¶ added in v1.1.1
GetShard calculates which shard a message belongs to based on its ID.
This function uses SipHash to consistently map message IDs (typically hashes themselves) to specific shards, ensuring even distribution across the available shards.
Key benefits: - Deterministic: The same message ID always maps to the same shard - Balanced: Messages are evenly distributed across all shards
Parameters:
- shardingKey: The identifier to use for sharding (often a hash like "0x1234...abcd")
- totalShards: The total number of available shards (e.g., 64)
Returns:
- The shard number (0 to totalShards-1) where this message should be processed
func GetShardingKey ¶ added in v1.1.4
func GetShardingKey( event *host.TraceEvent, clientMeta *xatu.ClientMeta, shardingKeyType string, eventType string, ) string
GetShardingKey extracts the appropriate sharding key based on the configured type. Default to MsgID if the event type is not supported.
func IsShardActive ¶ added in v1.1.1
IsShardActive checks if a shard is in the active shards list.
func SipHash ¶ added in v1.1.1
SipHash implements the SipHash-2-4 algorithm, a fast and efficient hash function designed for message authentication and hash-table lookups.
Key features of SipHash: - Deterministic output for identical inputs - Even distribution of outputs across the range of uint64
When used for sharding (via GetShard), SipHash provides: - Consistent distribution of messages across shards - Deterministic routing where the same message always maps to the same shard
Parameters:
- key: A 16-byte secret key (can be fixed for consistent sharding)
- data: The message bytes to hash
Returns:
- A 64-bit unsigned integer hash value
References:
- Reference implementation: https://github.com/veorq/SipHash
- ClickHouse documentation: https://clickhouse.com/docs/sql-reference/functions/hash-functions#siphash64
Types ¶
type ActiveShardsConfig ¶ added in v1.1.13
type ActiveShardsConfig []interface{}
ActiveShardsConfig represents a list of active shards that can be specified as individual numbers or ranges (e.g., "0-255").
func (ActiveShardsConfig) ToUint64Slice ¶ added in v1.1.13
func (a ActiveShardsConfig) ToUint64Slice() ([]uint64, error)
ToUint64Slice converts the ActiveShardsConfig to a slice of uint64. It expands any ranges found in the configuration.
type Config ¶
type Config struct {
LoggingLevel string `yaml:"logging" default:"info"`
MetricsAddr string `yaml:"metricsAddr" default:":9090"`
PProfAddr *string `yaml:"pprofAddr"`
ProbeAddr *string `yaml:"probeAddr"`
// The name of the mimicry
Name string `yaml:"name"`
// Ethereum configuration
Ethereum ethereum.Config `yaml:"ethereum"`
// Outputs configuration
Outputs []output.Config `yaml:"outputs"`
// Labels configures the mimicry with labels
Labels map[string]string `yaml:"labels"`
// NTP Server to use for clock drift correction
NTPServer string `yaml:"ntpServer" default:"time.google.com"`
// Node is the configuration for the node
Node NodeConfig `yaml:"node"`
// Events is the configuration for the events
Events EventConfig `yaml:"events"`
// Traces is the configuration for the traces.
Traces TracesConfig `yaml:"traces"`
}
func (*Config) ApplyOverrides ¶ added in v1.0.15
func (c *Config) ApplyOverrides(o *Override, log logrus.FieldLogger) error
ApplyOverrides applies any overrides to the config.
func (*Config) CreateSinks ¶
type EventConfig ¶ added in v0.0.169
type EventConfig struct {
RecvRPCEnabled bool `yaml:"recvRpcEnabled" default:"false"`
SendRPCEnabled bool `yaml:"sendRpcEnabled" default:"false"`
DropRPCEnabled bool `yaml:"dropRpcEnabled" default:"false"`
RpcMetaControlIHaveEnabled bool `yaml:"rpcMetaControlIHaveEnabled" default:"false"`
RpcMetaControlIWantEnabled bool `yaml:"rpcMetaControlIWantEnabled" default:"false"`
RpcMetaControlIDontWantEnabled bool `yaml:"rpcMetaControlIDontWantEnabled" default:"false"`
RpcMetaControlGraftEnabled bool `yaml:"rpcMetaControlGraftEnabled" default:"false"`
RpcMetaControlPruneEnabled bool `yaml:"rpcMetaControlPruneEnabled" default:"false"`
RpcMetaSubscriptionEnabled bool `yaml:"rpcMetaSubscriptionEnabled" default:"false"`
RpcMetaMessageEnabled bool `yaml:"rpcMetaMessageEnabled" default:"false"`
AddPeerEnabled bool `yaml:"addPeerEnabled" default:"true"`
RemovePeerEnabled bool `yaml:"removePeerEnabled" default:"true"`
ConnectedEnabled bool `yaml:"connectedEnabled" default:"true"`
DisconnectedEnabled bool `yaml:"disconnectedEnabled" default:"true"`
JoinEnabled bool `yaml:"joinEnabled" default:"true"`
LeaveEnabled bool `yaml:"leaveEnabled" default:"false"`
GraftEnabled bool `yaml:"graftEnabled" default:"false"`
PruneEnabled bool `yaml:"pruneEnabled" default:"false"`
PublishMessageEnabled bool `yaml:"publishMessageEnabled" default:"false"`
RejectMessageEnabled bool `yaml:"rejectMessageEnabled" default:"false"`
DuplicateMessageEnabled bool `yaml:"duplicateMessageEnabled" default:"false"`
DeliverMessageEnabled bool `yaml:"deliverMessageEnabled" default:"false"`
HandleMetadataEnabled bool `yaml:"handleMetadataEnabled" default:"true"`
HandleStatusEnabled bool `yaml:"handleStatusEnabled" default:"true"`
GossipSubBeaconBlockEnabled bool `yaml:"gossipSubBeaconBlockEnabled" default:"true"`
GossipSubAttestationEnabled bool `yaml:"gossipSubAttestationEnabled" default:"true"`
GossipSubBlobSidecarEnabled bool `yaml:"gossipSubBlobSidecarEnabled" default:"true"`
}
EventConfig represents configuration for all event types.
func (*EventConfig) Validate ¶ added in v0.0.169
func (e *EventConfig) Validate() error
Validate validates the event config.
type FilteredMessageWithIndex ¶ added in v1.1.9
type FilteredMessageWithIndex struct {
MessageID *wrapperspb.StringValue
OriginalIndex uint32
}
FilteredMessageWithIndex represents a filtered message with its original index
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func (*Metrics) AddDecoratedEvent ¶
func (*Metrics) AddProcessedMessage ¶ added in v1.1.1
func (*Metrics) AddShardObservation ¶ added in v1.1.1
AddShardObservation records a message being assigned to a particular shard
func (*Metrics) AddShardProcessed ¶ added in v1.1.1
AddShardProcessed records a message being processed from a particular shard
func (*Metrics) AddShardSkipped ¶ added in v1.1.1
AddShardSkipped records a message being skipped from a particular shard
func (*Metrics) AddSkippedMessage ¶ added in v1.1.1
type Mimicry ¶
type Mimicry struct {
Config *Config
// contains filtered or unexported fields
}
func (*Mimicry) ServeProbe ¶ added in v0.0.163
func (*Mimicry) ShouldTraceMessage ¶ added in v1.1.1
func (m *Mimicry) ShouldTraceMessage( event *host.TraceEvent, clientMeta *xatu.ClientMeta, xatuEventType string, ) bool
ShouldTraceMessage determines whether a message with the given MsgID should be included in the sample based on the configured trace settings.
func (*Mimicry) ShouldTraceRPCMetaMessages ¶ added in v1.1.9
func (m *Mimicry) ShouldTraceRPCMetaMessages( event *host.TraceEvent, clientMeta *xatu.ClientMeta, xatuEventType string, messageIDs []*wrapperspb.StringValue, ) ([]FilteredMessageWithIndex, error)
type NodeConfig ¶
type NodeConfig struct {
// The private key for the libp2p host and local enode in hex format
PrivateKeyStr string `yaml:"privateKeyStr" default:""`
// General timeout when communicating with other network participants
DialTimeout time.Duration `yaml:"dialTimeout" default:"5s"`
// The address information of the local ethereuem [enode.Node].
Devp2pHost string `yaml:"devp2pHost" default:"0.0.0.0"`
Devp2pPort int `yaml:"devp2pPort" default:"0"`
// The address information of the local libp2p host
Libp2pHost string `yaml:"libp2pHost" default:"0.0.0.0"`
Libp2pPort int `yaml:"libp2pPort" default:"0"`
// The address information where the Beacon API or Prysm's custom API is accessible at
PrysmHost string `yaml:"prysmHost" default:"127.0.0.1"`
PrysmPortHTTP int `yaml:"prysmPortHttp" default:"3500"`
PrysmPortGRPC int `yaml:"prysmPortGrpc" default:"4000"`
PrysmUseTLS bool `yaml:"prysmUseTls" default:"false"`
// The maximum number of peers our libp2p host can be connected to.
MaxPeers int `yaml:"maxPeers" default:"30"`
// Limits the number of concurrent connection establishment routines. When
// we discover peers over discv5 and are not at our MaxPeers limit we try
// to establish a connection to a peer. However, we limit the concurrency to
// this DialConcurrency value.
DialConcurrency int `yaml:"dialConcurrency" default:"16"`
// DataStreamType is the type of data stream to use for the node (e.g. kinesis, callback, etc).
DataStreamType string `yaml:"dataStreamType" default:"callback"`
// Subnets is the configuration for gossipsub subnets.
Subnets map[string]*hermes.SubnetConfig `yaml:"subnets"`
}
func (*NodeConfig) AsHermesConfig ¶
func (h *NodeConfig) AsHermesConfig() *hermes.NodeConfig
type ShardingKeyType ¶ added in v1.1.4
type ShardingKeyType string
ShardingKeyType represents the different types of sharding keys.
const ( // ShardingKeyTypeMsgID uses the message ID for sharding. ShardingKeyTypeMsgID ShardingKeyType = "MsgID" // ShardingKeyTypePeerID uses the peer ID for sharding. ShardingKeyTypePeerID ShardingKeyType = "PeerID" )
type TopicConfig ¶ added in v1.1.1
type TopicConfig struct {
// Total number of shards for this topic.
TotalShards uint64 `yaml:"totalShards" default:"64"`
// List of active shards to process. Supports individual numbers and ranges (e.g., "0-255").
ActiveShardsRaw ActiveShardsConfig `yaml:"activeShards"`
// Processed active shards (populated during validation).
ActiveShards []uint64 `yaml:"-"`
// Key to use for sharding (MsgID, PeerID, etc).
ShardingKey string `yaml:"shardingKey" default:"MsgID"`
}
TopicConfig represents configuration for a specific topic pattern.
type TracesConfig ¶ added in v1.1.1
type TracesConfig struct {
// Whether or not trace config is globally enabled.
Enabled bool `yaml:"enabled" default:"false"`
// AlwaysRecordRootRpcEvents is a flag that controls whether or not to record
// the root rpc events even if there are no rpc meta/control level messages.
AlwaysRecordRootRpcEvents bool `yaml:"alwaysRecordRootRpcEvents" default:"false"`
// Topics allows for per-topic configuration.
Topics map[string]TopicConfig `yaml:"topics"`
// contains filtered or unexported fields
}
TracesConfig represents the new trace-based configuration.
func (*TracesConfig) CompilePatterns ¶ added in v1.1.1
func (e *TracesConfig) CompilePatterns() error
CompilePatterns pre-compiles all regex patterns for better performance.
func (*TracesConfig) FindMatchingTopicConfig ¶ added in v1.1.1
func (e *TracesConfig) FindMatchingTopicConfig(eventType string) (*TopicConfig, bool)
FindMatchingTopicConfig finds a matching topic configuration for an event type.
func (*TracesConfig) LogSummary ¶ added in v1.1.1
func (e *TracesConfig) LogSummary() string
LogSummary returns a human-readable summary of the trace configuration.
func (*TracesConfig) Validate ¶ added in v1.1.1
func (e *TracesConfig) Validate() error
Validate validates the traces config.
Source Files
¶
- config.go
- event.go
- event_config.go
- event_gossipsub.go
- event_libp2p.go
- event_libp2p_core.go
- event_rpc.go
- gossipsub_attestation.go
- gossipsub_beacon_block.go
- gossipsub_blob_sidecar.go
- gossipsub_single_attestation.go
- metrics.go
- mimicry.go
- node_config.go
- overrides.go
- trace.go
- trace_config.go
- trace_shard_key.go
- trace_siphash.go