Documentation
¶
Index ¶
- Constants
- Variables
- func EventTypeFromBeaconChainProtocol(protocol string) string
- func EventsToBytes[T any](events []any, opts ...parquet.WriterOption) (int, *bytes.Buffer, error)
- func MaddrFrom(ip string, port uint) (ma.Multiaddr, error)
- func NoopHandler(ctx context.Context, msg *pubsub.Message) error
- func RenderEvent(rawEvent *TraceEvent) (map[EventType][]any, error)
- func SizeOfEvent(event any) int64
- type AgentVersionProvider
- type BaseEvent
- type BaseRPCEvent
- type CallbackDataStream
- func (c *CallbackDataStream) OnEvent(onRecord func(ctx context.Context, event *TraceEvent))
- func (c *CallbackDataStream) OutputType() DataStreamOutputType
- func (c *CallbackDataStream) PutRecord(ctx context.Context, event *TraceEvent) error
- func (c *CallbackDataStream) Start(ctx context.Context) error
- func (c *CallbackDataStream) Stop(ctx context.Context) error
- func (c *CallbackDataStream) Type() DataStreamType
- type Config
- type DataStream
- type DataStreamOutputType
- type DataStreamRenderer
- type DataStreamType
- type EventSubType
- type EventSubmissionTask
- type EventType
- type FilterConfig
- type FilterMode
- type GenericParquetEvent
- type GossipAddRemovePeerEvent
- type GossipGraftPruneEvent
- type GossipIdontwantEvent
- type GossipIhaveEvent
- type GossipIwantEvent
- type GossipJoinLeaveTopicEvent
- type GossipMsgArrivalEvent
- type GossipPeerExchangeEvent
- type GossipSentMsgEvent
- type Host
- func (h *Host) AddPeer(p peer.ID, proto protocol.ID)
- func (h *Host) AgentVersion(pid peer.ID) string
- func (h *Host) ConnSignal(ctx context.Context, pid peer.ID) chan error
- func (h *Host) DeliverMessage(msg *pubsub.Message)
- func (h *Host) DropRPC(rpc *pubsub.RPC, p peer.ID)
- func (h *Host) DuplicateMessage(msg *pubsub.Message)
- func (h *Host) FlushTrace(evtType string, payload any)
- func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payload any)
- func (h *Host) Graft(p peer.ID, topic string)
- func (h *Host) InitGossipSub(ctx context.Context, opts ...pubsub.Option) (*pubsub.PubSub, error)
- func (h *Host) Join(topic string)
- func (h *Host) Leave(topic string)
- func (h *Host) LocalListenMaddr() (ma.Multiaddr, error)
- func (h *Host) PrivateListenMaddr() (ma.Multiaddr, error)
- func (h *Host) Prune(p peer.ID, topic string)
- func (h *Host) RecvRPC(rpc *pubsub.RPC)
- func (h *Host) RejectMessage(msg *pubsub.Message, reason string)
- func (h *Host) RemovePeer(p peer.ID)
- func (h *Host) SendRPC(rpc *pubsub.RPC, p peer.ID)
- func (h *Host) Serve(ctx context.Context) error
- func (h *Host) ThrottlePeer(p peer.ID)
- func (h *Host) Trace(evt *pubsubpb.TraceEvent)
- func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler
- func (h *Host) UndeliverableMessage(msg *pubsub.Message)
- func (h *Host) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot)
- func (h *Host) ValidateMessage(msg *pubsub.Message)
- func (h *Host) WaitForPublicAddress(ctx context.Context) error
- type KinesisDataStream
- func (k *KinesisDataStream) OutputType() DataStreamOutputType
- func (k *KinesisDataStream) PutRecord(ctx context.Context, event *TraceEvent) error
- func (k *KinesisDataStream) Start(ctx context.Context) error
- func (k *KinesisDataStream) Stop(ctx context.Context) error
- func (k *KinesisDataStream) Type() DataStreamType
- type Libp2pConnectDisconnectEvent
- type LocalyProducedEvent
- type NoopDataStream
- func (ds *NoopDataStream) OutputType() DataStreamOutputType
- func (ds *NoopDataStream) PutRecord(ctx context.Context, event *TraceEvent) error
- func (ds *NoopDataStream) Start(ctx context.Context) error
- func (ds *NoopDataStream) Stop(ctx context.Context) error
- func (ds *NoopDataStream) Type() DataStreamType
- type PeerFilter
- func (pf *PeerFilter) CheckAgent(agent string) bool
- func (pf *PeerFilter) InterceptAccept(conn network.ConnMultiaddrs) (allow bool)
- func (pf *PeerFilter) InterceptAddrDial(p peer.ID, addr ma.Multiaddr) (allow bool)
- func (pf *PeerFilter) InterceptPeerDial(p peer.ID) (allow bool)
- func (pf *PeerFilter) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool)
- func (pf *PeerFilter) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason)
- type RPCdirection
- type RpcControlGraft
- type RpcControlIHave
- type RpcControlIWant
- type RpcControlIdontWant
- type RpcControlPrune
- type RpcMeta
- type RpcMetaControl
- type RpcMetaMsg
- type RpcMetaSub
- type S3DSConfig
- type S3DataStream
- func (s3ds *S3DataStream) OutputType() DataStreamOutputType
- func (s3ds *S3DataStream) PutRecord(ctx context.Context, event *TraceEvent) error
- func (s3ds *S3DataStream) S3KeySubmission(ctx context.Context, s3Key string, content []byte) error
- func (s3ds *S3DataStream) Start(ctx context.Context) error
- func (s3ds *S3DataStream) Stop(ctx context.Context) error
- func (s3ds *S3DataStream) Type() DataStreamType
- type ScoreKeeper
- type SendRecvRPCEvent
- type TopicHandler
- type TopicScore
- type TopicSubscription
- type TraceEvent
- type TraceEventPayloadMetaData
- type TraceEventPeerScore
- type TraceLogger
Constants ¶
const PeerScoreEventType = "PEERSCORE"
Variables ¶
Functions ¶
func EventTypeFromBeaconChainProtocol ¶
EventTypeFromBeaconChainProtocol returns the event type for a given protocol string.
func EventsToBytes ¶
func EventsToBytes[T any]( events []any, opts ...parquet.WriterOption, ) (int, *bytes.Buffer, error)
eventTtoBytes translates any given number of traceEvents into parquet serialized bytes it can also be tuned with any desired set of parquet.WriterOption
func RenderEvent ¶
func RenderEvent(rawEvent *TraceEvent) (map[EventType][]any, error)
Types ¶
type AgentVersionProvider ¶
AgentVersionProvider is an interface for getting agent versions
type BaseEvent ¶
type BaseEvent struct {
Timestamp int64 `parquet:"timestamp"`
Type string `parquet:"type"`
ProducerID string `parquet:"producer_id"`
}
For analysis purposes, we need to pair the events one with eachother
func (*BaseEvent) GetProducerID ¶
type BaseRPCEvent ¶
type CallbackDataStream ¶
type CallbackDataStream struct {
// contains filtered or unexported fields
}
CallbackDataStream is a simple implementation of DataStream that holds a callback function. Users of CallbackDataStream should ensure that the callback function does not block, as blocking can delay or disrupt the processing of subsequent events.
func NewCallbackDataStream ¶
func NewCallbackDataStream() *CallbackDataStream
NewCallbackDataStream creates a new instance of CallbackDataStream.
func (*CallbackDataStream) OnEvent ¶
func (c *CallbackDataStream) OnEvent(onRecord func(ctx context.Context, event *TraceEvent))
OnEvent sets the callback function that will be called when an event is received.
func (*CallbackDataStream) OutputType ¶
func (c *CallbackDataStream) OutputType() DataStreamOutputType
OutputType returns the output type to be used by this data stream.
func (*CallbackDataStream) PutRecord ¶
func (c *CallbackDataStream) PutRecord(ctx context.Context, event *TraceEvent) error
PutRecord sends an event to the callback if the stream has not been stopped.
func (*CallbackDataStream) Start ¶
func (c *CallbackDataStream) Start(ctx context.Context) error
Start begins the data stream's operations.
func (*CallbackDataStream) Stop ¶
func (c *CallbackDataStream) Stop(ctx context.Context) error
Stop ends the data stream's operation.
func (*CallbackDataStream) Type ¶
func (c *CallbackDataStream) Type() DataStreamType
Type returns the type of the data stream, which is DataStreamTypeCallback.
type DataStream ¶
type DataStream interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
PutRecord(ctx context.Context, event *TraceEvent) error
Type() DataStreamType
OutputType() DataStreamOutputType
}
type DataStreamOutputType ¶
type DataStreamOutputType int
DataStreamOutputType is the output type of the data stream.
const ( // DataStreamOutputTypeKinesis outputs the data stream decorated with metadata and in a format ingested by Kinesis. DataStreamOutputTypeKinesis DataStreamOutputType = iota // DataStreamOutputTypeFull outputs the data stream decorated with metadata and containing the raw/full event data. DataStreamOutputTypeFull // DataStreamOutputTypeParquet output the trace events formatted into a simplified parquet columns style DataStreamOutputTypeParquet )
type DataStreamRenderer ¶
type DataStreamRenderer interface {
RenderPayload(evt *TraceEvent, msg *pubsub.Message, dst ssz.Unmarshaler) (*TraceEvent, error)
}
DataStreamRenderer is an interface to support rendering a data-stream message into a destination.
type DataStreamType ¶
type DataStreamType int
const ( DataStreamTypeKinesis DataStreamType = iota DataStreamTypeCallback DataStreamTypeLogger DataStreamTypeS3 DataStreamTypeNoop )
func DataStreamtypeFromStr ¶
func DataStreamtypeFromStr(str string) DataStreamType
func (DataStreamType) String ¶
func (ds DataStreamType) String() string
type EventSubType ¶
type EventSubType int8
const ( EventSubTypeNone EventSubType = iota // Add / Remove peers EventSubTypeAddPeer EventSubTypeRemovePeer // Graft / Prunes EventSubTypeGraft EventSubTypePrune // Msg arrivals EventSubTypeDeliverMsg EventSubTypeValidateMsg EventSubTypeHandleMsg // adding handle MSG aswell, although we are not parsing the Eth specific details from msgs EventSubTypeDuplicatedMsg EventSubTypeRejectMsg // Join/Leave Topic EventSubTypeJoinTopic EventSubTypeLeaveTopic // Libp2p EventSubTypeConnectPeer EventSubTypeDisconnectPeer )
func (EventSubType) String ¶
func (e EventSubType) String() string
type EventSubmissionTask ¶
EventSubmissionTask main event submission Task identifies: - the type of events (for a later cast) - the list of events (that need to be casted) - the name of the s3 key to store the events
type EventType ¶
type EventType int8
const ( EventTypeUnknown EventType = iota EventTypeGenericEvent // Gossip-mesh EventTypeAddRemovePeer EventTypeGraftPrune // PeerExchange EventTypeGossipPx // Gossip RPCs EventTypeControlRPC EventTypeIhave EventTypeIwant EventTypeIdontwant EventTypeSentMsg // Gossip Message arrivals EventTypeMsgArrivals // Gossip Join/Leave Topic EventTypeJoinLeaveTopic // Libp2p Event EventTypeConnectDisconnectPeer )
type FilterConfig ¶
type FilterConfig struct {
Mode FilterMode `yaml:"mode" default:"disabled"`
Patterns []string `yaml:"patterns"`
}
FilterConfig holds configuration for peer filtering
func (*FilterConfig) Validate ¶
func (fc *FilterConfig) Validate() error
Validate validates the filter configuration
type FilterMode ¶
type FilterMode string
FilterMode defines the filtering behavior for peer connections
const ( // FilterModeDisabled disables all peer filtering FilterModeDisabled FilterMode = "disabled" // FilterModeDenylist blocks peers matching patterns FilterModeDenylist FilterMode = "denylist" // FilterModeAllowlist only allows peers matching patterns FilterModeAllowlist FilterMode = "allowlist" )
type GenericParquetEvent ¶
type GenericParquetEvent struct {
BaseEvent
Topic string `parquet:"topic"`
Payload string `parquet:"payload"`
}
if we don't have a generic parquet format for a trace, use the generic one
func GenericTraceFromEvent ¶
func GenericTraceFromEvent(t *TraceEvent) *GenericParquetEvent
type GossipGraftPruneEvent ¶
type GossipIdontwantEvent ¶
type GossipIdontwantEvent struct {
BaseRPCEvent
MsgIDs []string `parquet:"msg_ids,list"`
Msgs int `parquet:"msgs"`
}
type GossipIhaveEvent ¶
type GossipIhaveEvent struct {
BaseRPCEvent
Topic string `parquet:"topic"`
MsgIDs []string `parquet:"msg_ids,list"`
Msgs int `parquet:"msgs"`
}
type GossipIwantEvent ¶
type GossipIwantEvent struct {
BaseRPCEvent
MsgIDs []string `parquet:"msg_ids,list"`
Msgs int `parquet:"msgs"`
}
type GossipMsgArrivalEvent ¶
type GossipPeerExchangeEvent ¶
type GossipPeerExchangeEvent struct {
BaseRPCEvent
Topic string `parquet:"topic"`
PxPeers []string `parquet:"px_peers,list"`
}
type GossipSentMsgEvent ¶
type GossipSentMsgEvent struct {
BaseRPCEvent
MsgID string `parquet:"msg_id"`
Topic string `parquet:"topic"`
}
type Host ¶
func (*Host) AgentVersion ¶
AgentVersion returns the agent version of the given peer. If the agent version is not known, it returns an empty string.
func (*Host) ConnSignal ¶
ConnSignal signals the incoming connection of the given peer on the returned channel by just closing it. Alternatively, if the context has a deadline that's exceeded, the channel will emit the context error and then be closed.
func (*Host) DeliverMessage ¶
func (*Host) DuplicateMessage ¶
func (*Host) FlushTrace ¶
func (*Host) FlushTraceWithTimestamp ¶
func (*Host) InitGossipSub ¶
func (*Host) LocalListenMaddr ¶
LocalListenMaddr returns the first multiaddress in a localhost IP range that this host is listening on.
func (*Host) PrivateListenMaddr ¶
PrivateListenMaddr returns the first multiaddress in a private IP range that this host is listening on.
func (*Host) RemovePeer ¶
func (*Host) ThrottlePeer ¶
func (*Host) Trace ¶
func (h *Host) Trace(evt *pubsubpb.TraceEvent)
func (*Host) TracedTopicHandler ¶
func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler
func (*Host) UndeliverableMessage ¶
func (*Host) UpdatePeerScore ¶
func (h *Host) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot)
func (*Host) ValidateMessage ¶
type KinesisDataStream ¶
type KinesisDataStream struct {
// contains filtered or unexported fields
}
func NewKinesisDataStream ¶
func NewKinesisDataStream(p *gk.Producer) *KinesisDataStream
NewKinesisDataStream creates a new instance of KinesisDataStream with a given producer.
func (*KinesisDataStream) OutputType ¶
func (k *KinesisDataStream) OutputType() DataStreamOutputType
OutputType returns the output type to be used by this data stream.
func (*KinesisDataStream) PutRecord ¶
func (k *KinesisDataStream) PutRecord(ctx context.Context, event *TraceEvent) error
PutRecord sends an event to the Kinesis data stream.
func (*KinesisDataStream) Start ¶
func (k *KinesisDataStream) Start(ctx context.Context) error
Start begins the data stream's operation.
func (*KinesisDataStream) Stop ¶
func (k *KinesisDataStream) Stop(ctx context.Context) error
Stop ends the data stream.
func (*KinesisDataStream) Type ¶
func (k *KinesisDataStream) Type() DataStreamType
Type returns the type of the data stream
type Libp2pConnectDisconnectEvent ¶
type Libp2pConnectDisconnectEvent struct {
BaseEvent
SubType string `parquet:"sub_type"`
RemotePeerID string `parquet:"remote_peer_id"`
RemotePeerMaddrs string `parquet:"remote_peer_maddrs"`
AgentVersion string `parquet:"agent_version"`
Direction string `parquet:"direction"`
Opened int64 `parquet:"opened"`
Limited bool `parquet:"limited"`
}
type LocalyProducedEvent ¶
type LocalyProducedEvent interface {
GetProducerID() string
}
type NoopDataStream ¶
type NoopDataStream struct{}
func (*NoopDataStream) OutputType ¶
func (ds *NoopDataStream) OutputType() DataStreamOutputType
OutputType returns the output type to be used by this data stream.
func (*NoopDataStream) PutRecord ¶
func (ds *NoopDataStream) PutRecord(ctx context.Context, event *TraceEvent) error
func (*NoopDataStream) Type ¶
func (ds *NoopDataStream) Type() DataStreamType
type PeerFilter ¶
type PeerFilter struct {
// contains filtered or unexported fields
}
PeerFilter implements libp2p's ConnectionGater interface to filter peer connections based on agent strings using regex patterns
func NewPeerFilter ¶
func NewPeerFilter(h AgentVersionProvider, config FilterConfig, log *slog.Logger) (*PeerFilter, error)
NewPeerFilter creates a new peer filter with the given configuration
func (*PeerFilter) CheckAgent ¶
func (pf *PeerFilter) CheckAgent(agent string) bool
CheckAgent is a public method for testing filter patterns
func (*PeerFilter) InterceptAccept ¶
func (pf *PeerFilter) InterceptAccept(conn network.ConnMultiaddrs) (allow bool)
InterceptAccept implements ConnectionGater
func (*PeerFilter) InterceptAddrDial ¶
InterceptAddrDial implements ConnectionGater
func (*PeerFilter) InterceptPeerDial ¶
func (pf *PeerFilter) InterceptPeerDial(p peer.ID) (allow bool)
InterceptPeerDial implements ConnectionGater
func (*PeerFilter) InterceptSecured ¶
func (pf *PeerFilter) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool)
InterceptSecured implements ConnectionGater
func (*PeerFilter) InterceptUpgraded ¶
func (pf *PeerFilter) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason)
InterceptUpgraded implements ConnectionGater
type RPCdirection ¶
type RPCdirection int8
const ( RPCdirectionUnknown RPCdirection = iota RPCdirectionIn RPCdirectionOut RPCdirectionDrop )
func (RPCdirection) String ¶
func (d RPCdirection) String() string
type RpcControlGraft ¶
type RpcControlGraft struct {
TopicID string
}
type RpcControlIHave ¶
type RpcControlIWant ¶
type RpcControlIWant struct {
MsgIDs []string
}
type RpcControlIdontWant ¶
type RpcControlIdontWant struct {
MsgIDs []string
}
type RpcControlPrune ¶
type RpcMeta ¶
type RpcMeta struct {
PeerID peer.ID
Subscriptions []RpcMetaSub `json:"Subs,omitempty"`
Messages []RpcMetaMsg `json:"Msgs,omitempty"`
Control *RpcMetaControl `json:"Control,omitempty"`
}
type RpcMetaControl ¶
type RpcMetaControl struct {
IHave []RpcControlIHave `json:"IHave,omitempty"`
IWant []RpcControlIWant `json:"IWant,omitempty"`
Graft []RpcControlGraft `json:"Graft,omitempty"`
Prune []RpcControlPrune `json:"Prune,omitempty"`
Idontwant []RpcControlIdontWant `json:"Idontwant,omitempty"`
}
type RpcMetaMsg ¶
type RpcMetaSub ¶
type S3DSConfig ¶
type S3DSConfig struct {
Meter metric.Meter
Flushers int
ByteLimit int64
FlushInterval time.Duration
AccessKeyID string
SecretKey string
Region string
Endpoint string
Bucket string
Tag string
}
S3DSConfig belongs to the configuration needed to stablish a connection with an s3 instance
func (*S3DSConfig) CheckValidity ¶
func (s3cfg *S3DSConfig) CheckValidity() error
IsValid checks whether the current configuration is valid or not returns the missing items in case there is anything wrong with the config
func (S3DSConfig) ToAWSconfig ¶
func (s3cfg S3DSConfig) ToAWSconfig() (*aws.Config, error)
ToAWSconfig makes a quick translation from the given user args into the aws.Config struct -> ready to create the S3 client
type S3DataStream ¶
type S3DataStream struct {
// contains filtered or unexported fields
}
func NewS3DataStream ¶
func NewS3DataStream(baseCfg S3DSConfig) (*S3DataStream, error)
func (*S3DataStream) OutputType ¶
func (s3ds *S3DataStream) OutputType() DataStreamOutputType
func (*S3DataStream) PutRecord ¶
func (s3ds *S3DataStream) PutRecord(ctx context.Context, event *TraceEvent) error
func (*S3DataStream) S3KeySubmission ¶
s3KeySubmission is a arbitraty method that submits any []byte into S3 with the given keys
func (*S3DataStream) Type ¶
func (s3ds *S3DataStream) Type() DataStreamType
type ScoreKeeper ¶
type ScoreKeeper struct {
// contains filtered or unexported fields
}
ScoreKeeper is a thread-safe local copy of the score per peer and per copy TODO: figure out if this is some sort of info that we want to expose through OpenTelemetry (Still good to have it)
func (*ScoreKeeper) Get ¶
func (sk *ScoreKeeper) Get() map[peer.ID]*pubsub.PeerScoreSnapshot
func (*ScoreKeeper) Update ¶
func (sk *ScoreKeeper) Update(scores map[peer.ID]*pubsub.PeerScoreSnapshot)
type SendRecvRPCEvent ¶
type SendRecvRPCEvent struct {
BaseRPCEvent
Ihaves int32 `parquet:"ihaves"`
Iwants int32 `parquet:"iwants"`
Idontwants int32 `parquet:"idontwants"`
SentMsgs int32 `parquet:"sent_msgs"`
}
to track number of original RPCs exchanged tracks the direction and the number of message_ids per control
type TopicScore ¶
type TopicSubscription ¶
type TopicSubscription struct {
Topic string
LocalID peer.ID
Sub *pubsub.Subscription
Handler TopicHandler
}
type TraceEvent ¶
type TraceEvent struct {
Type string
Topic string
PeerID peer.ID
Timestamp time.Time
Payload any `json:"Data"` // cannot use field "Data" because of gk.Record method
}
func (*TraceEvent) Data ¶
func (t *TraceEvent) Data() []byte
func (*TraceEvent) ExplicitHashKey ¶
func (t *TraceEvent) ExplicitHashKey() *string
func (*TraceEvent) PartitionKey ¶
func (t *TraceEvent) PartitionKey() string
type TraceEventPeerScore ¶
type TraceLogger ¶
type TraceLogger struct{}
func (*TraceLogger) OutputType ¶
func (t *TraceLogger) OutputType() DataStreamOutputType
OutputType returns the output type to be used by this data stream.
func (*TraceLogger) PutRecord ¶
func (t *TraceLogger) PutRecord(ctx context.Context, event *TraceEvent) error
func (*TraceLogger) Type ¶
func (t *TraceLogger) Type() DataStreamType