host

package
v0.0.0-...-b1a9763 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2025 License: Apache-2.0, MIT Imports: 41 Imported by: 2

Documentation

Index

Constants

View Source
const PeerScoreEventType = "PEERSCORE"

Variables

View Source
var (
	S3ConnectionTimeout = 5 * time.Second
	S3OpTimeout         = 10 * time.Second
	DefaultByteLimit    = int64(10 * 1024 * 1024) // 10MB

	// metrics
	S3MeterName = "github.com/probe-lab/hermes/s3"
)

Functions

func EventTypeFromBeaconChainProtocol

func EventTypeFromBeaconChainProtocol(protocol string) string

EventTypeFromBeaconChainProtocol returns the event type for a given protocol string.

func EventsToBytes

func EventsToBytes[T any](
	events []any,
	opts ...parquet.WriterOption,
) (int, *bytes.Buffer, error)

eventTtoBytes translates any given number of traceEvents into parquet serialized bytes it can also be tuned with any desired set of parquet.WriterOption

func MaddrFrom

func MaddrFrom(ip string, port uint) (ma.Multiaddr, error)

MaddrFrom takes in an ip address string and port to produce a go multiaddr format.

func NoopHandler

func NoopHandler(ctx context.Context, msg *pubsub.Message) error

func RenderEvent

func RenderEvent(rawEvent *TraceEvent) (map[EventType][]any, error)

func SizeOfEvent

func SizeOfEvent(event any) int64

Return the size of any event

Types

type AgentVersionProvider

type AgentVersionProvider interface {
	AgentVersion(pid peer.ID) string
}

AgentVersionProvider is an interface for getting agent versions

type BaseEvent

type BaseEvent struct {
	Timestamp  int64  `parquet:"timestamp"`
	Type       string `parquet:"type"`
	ProducerID string `parquet:"producer_id"`
}

For analysis purposes, we need to pair the events one with eachother

func (*BaseEvent) GetProducerID

func (b *BaseEvent) GetProducerID() string

type BaseRPCEvent

type BaseRPCEvent struct {
	BaseEvent
	IsOg         bool   `parquet:"is_og"` // since we will divide original IHAVES into different rows off keep track of OG events for Control msg ids
	Direction    string `parquet:"direction"`
	RemotePeerID string `parquet:"remote_peer_id"`
}

type CallbackDataStream

type CallbackDataStream struct {
	// contains filtered or unexported fields
}

CallbackDataStream is a simple implementation of DataStream that holds a callback function. Users of CallbackDataStream should ensure that the callback function does not block, as blocking can delay or disrupt the processing of subsequent events.

func NewCallbackDataStream

func NewCallbackDataStream() *CallbackDataStream

NewCallbackDataStream creates a new instance of CallbackDataStream.

func (*CallbackDataStream) OnEvent

func (c *CallbackDataStream) OnEvent(onRecord func(ctx context.Context, event *TraceEvent))

OnEvent sets the callback function that will be called when an event is received.

func (*CallbackDataStream) OutputType

func (c *CallbackDataStream) OutputType() DataStreamOutputType

OutputType returns the output type to be used by this data stream.

func (*CallbackDataStream) PutRecord

func (c *CallbackDataStream) PutRecord(ctx context.Context, event *TraceEvent) error

PutRecord sends an event to the callback if the stream has not been stopped.

func (*CallbackDataStream) Start

func (c *CallbackDataStream) Start(ctx context.Context) error

Start begins the data stream's operations.

func (*CallbackDataStream) Stop

func (c *CallbackDataStream) Stop(ctx context.Context) error

Stop ends the data stream's operation.

func (*CallbackDataStream) Type

Type returns the type of the data stream, which is DataStreamTypeCallback.

type Config

type Config struct {
	DataStream            DataStream
	PeerscoreSnapshotFreq time.Duration
	PeerFilter            *FilterConfig // Optional peer filtering configuration
	DirectConnections     []peer.AddrInfo
	PubsubBlacklist       pubsub.Blacklist

	// Telemetry accessors
	Tracer trace.Tracer
	Meter  metric.Meter
}

type DataStream

type DataStream interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	PutRecord(ctx context.Context, event *TraceEvent) error
	Type() DataStreamType
	OutputType() DataStreamOutputType
}

type DataStreamOutputType

type DataStreamOutputType int

DataStreamOutputType is the output type of the data stream.

const (
	// DataStreamOutputTypeKinesis outputs the data stream decorated with metadata and in a format ingested by Kinesis.
	DataStreamOutputTypeKinesis DataStreamOutputType = iota
	// DataStreamOutputTypeFull outputs the data stream decorated with metadata and containing the raw/full event data.
	DataStreamOutputTypeFull
	// DataStreamOutputTypeParquet output the trace events formatted into a simplified parquet columns style
	DataStreamOutputTypeParquet
)

type DataStreamRenderer

type DataStreamRenderer interface {
	RenderPayload(evt *TraceEvent, msg *pubsub.Message, dst ssz.Unmarshaler) (*TraceEvent, error)
}

DataStreamRenderer is an interface to support rendering a data-stream message into a destination.

type DataStreamType

type DataStreamType int
const (
	DataStreamTypeKinesis DataStreamType = iota
	DataStreamTypeCallback
	DataStreamTypeLogger
	DataStreamTypeS3
	DataStreamTypeNoop
)

func DataStreamtypeFromStr

func DataStreamtypeFromStr(str string) DataStreamType

func (DataStreamType) String

func (ds DataStreamType) String() string

type EventSubType

type EventSubType int8
const (
	EventSubTypeNone EventSubType = iota
	// Add / Remove peers
	EventSubTypeAddPeer
	EventSubTypeRemovePeer
	// Graft / Prunes
	EventSubTypeGraft
	EventSubTypePrune
	// Msg arrivals
	EventSubTypeDeliverMsg
	EventSubTypeValidateMsg
	EventSubTypeHandleMsg // adding handle MSG aswell, although we are not parsing the Eth specific details from msgs
	EventSubTypeDuplicatedMsg
	EventSubTypeRejectMsg
	// Join/Leave Topic
	EventSubTypeJoinTopic
	EventSubTypeLeaveTopic
	// Libp2p
	EventSubTypeConnectPeer
	EventSubTypeDisconnectPeer
)

func (EventSubType) String

func (e EventSubType) String() string

type EventSubmissionTask

type EventSubmissionTask struct {
	EventType EventType
	Events    []any
	S3Key     string
}

EventSubmissionTask main event submission Task identifies: - the type of events (for a later cast) - the list of events (that need to be casted) - the name of the s3 key to store the events

type EventType

type EventType int8
const (
	EventTypeUnknown EventType = iota
	EventTypeGenericEvent
	// Gossip-mesh
	EventTypeAddRemovePeer
	EventTypeGraftPrune
	// PeerExchange
	EventTypeGossipPx
	// Gossip RPCs
	EventTypeControlRPC
	EventTypeIhave
	EventTypeIwant
	EventTypeIdontwant
	EventTypeSentMsg
	// Gossip Message arrivals
	EventTypeMsgArrivals
	// Gossip Join/Leave Topic
	EventTypeJoinLeaveTopic
	// Libp2p Event
	EventTypeConnectDisconnectPeer
)

func (EventType) String

func (e EventType) String() string

type FilterConfig

type FilterConfig struct {
	Mode     FilterMode `yaml:"mode" default:"disabled"`
	Patterns []string   `yaml:"patterns"`
}

FilterConfig holds configuration for peer filtering

func (*FilterConfig) Validate

func (fc *FilterConfig) Validate() error

Validate validates the filter configuration

type FilterMode

type FilterMode string

FilterMode defines the filtering behavior for peer connections

const (
	// FilterModeDisabled disables all peer filtering
	FilterModeDisabled FilterMode = "disabled"
	// FilterModeDenylist blocks peers matching patterns
	FilterModeDenylist FilterMode = "denylist"
	// FilterModeAllowlist only allows peers matching patterns
	FilterModeAllowlist FilterMode = "allowlist"
)

type GenericParquetEvent

type GenericParquetEvent struct {
	BaseEvent
	Topic   string `parquet:"topic"`
	Payload string `parquet:"payload"`
}

if we don't have a generic parquet format for a trace, use the generic one

func GenericTraceFromEvent

func GenericTraceFromEvent(t *TraceEvent) *GenericParquetEvent

type GossipAddRemovePeerEvent

type GossipAddRemovePeerEvent struct {
	BaseEvent
	SubType      string `parquet:"sub_type"`
	RemotePeerID string `parquet:"remote_peer_id"`
}

type GossipGraftPruneEvent

type GossipGraftPruneEvent struct {
	BaseEvent
	SubType      string `parquet:"sub_type"`
	RemotePeerID string `parquet:"remote_peer_id"`
	Topic        string `parquet:"topic"`
}

type GossipIdontwantEvent

type GossipIdontwantEvent struct {
	BaseRPCEvent
	MsgIDs []string `parquet:"msg_ids,list"`
	Msgs   int      `parquet:"msgs"`
}

type GossipIhaveEvent

type GossipIhaveEvent struct {
	BaseRPCEvent
	Topic  string   `parquet:"topic"`
	MsgIDs []string `parquet:"msg_ids,list"`
	Msgs   int      `parquet:"msgs"`
}

type GossipIwantEvent

type GossipIwantEvent struct {
	BaseRPCEvent
	MsgIDs []string `parquet:"msg_ids,list"`
	Msgs   int      `parquet:"msgs"`
}

type GossipJoinLeaveTopicEvent

type GossipJoinLeaveTopicEvent struct {
	BaseEvent
	SubType string `parquet:"sub_type"`
	Topic   string `parquet:"topic"`
}

type GossipMsgArrivalEvent

type GossipMsgArrivalEvent struct {
	BaseEvent
	SubType      string `parquet:"sub_type"`
	RemotePeerID string `parquet:"remote_peer_id"`
	Topic        string `parquet:"topic"`
	MsgID        string `parquet:"msg_id"`
	Local        bool   `parquet:"local"`
	MsgSize      int64  `parquet:"msg_size"`
	SeqNo        string `parquet:"seq_no"`
}

type GossipPeerExchangeEvent

type GossipPeerExchangeEvent struct {
	BaseRPCEvent
	Topic   string   `parquet:"topic"`
	PxPeers []string `parquet:"px_peers,list"`
}

type GossipSentMsgEvent

type GossipSentMsgEvent struct {
	BaseRPCEvent
	MsgID string `parquet:"msg_id"`
	Topic string `parquet:"topic"`
}

type Host

type Host struct {
	host.Host
	// contains filtered or unexported fields
}

func New

func New(cfg *Config, opts ...libp2p.Option) (*Host, error)

func (*Host) AddPeer

func (h *Host) AddPeer(p peer.ID, proto protocol.ID)

func (*Host) AgentVersion

func (h *Host) AgentVersion(pid peer.ID) string

AgentVersion returns the agent version of the given peer. If the agent version is not known, it returns an empty string.

func (*Host) ConnSignal

func (h *Host) ConnSignal(ctx context.Context, pid peer.ID) chan error

ConnSignal signals the incoming connection of the given peer on the returned channel by just closing it. Alternatively, if the context has a deadline that's exceeded, the channel will emit the context error and then be closed.

func (*Host) DeliverMessage

func (h *Host) DeliverMessage(msg *pubsub.Message)

func (*Host) DropRPC

func (h *Host) DropRPC(rpc *pubsub.RPC, p peer.ID)

func (*Host) DuplicateMessage

func (h *Host) DuplicateMessage(msg *pubsub.Message)

func (*Host) FlushTrace

func (h *Host) FlushTrace(evtType string, payload any)

func (*Host) FlushTraceWithTimestamp

func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payload any)

func (*Host) Graft

func (h *Host) Graft(p peer.ID, topic string)

func (*Host) InitGossipSub

func (h *Host) InitGossipSub(ctx context.Context, opts ...pubsub.Option) (*pubsub.PubSub, error)

func (*Host) Join

func (h *Host) Join(topic string)

func (*Host) Leave

func (h *Host) Leave(topic string)

func (*Host) LocalListenMaddr

func (h *Host) LocalListenMaddr() (ma.Multiaddr, error)

LocalListenMaddr returns the first multiaddress in a localhost IP range that this host is listening on.

func (*Host) PrivateListenMaddr

func (h *Host) PrivateListenMaddr() (ma.Multiaddr, error)

PrivateListenMaddr returns the first multiaddress in a private IP range that this host is listening on.

func (*Host) Prune

func (h *Host) Prune(p peer.ID, topic string)

func (*Host) RecvRPC

func (h *Host) RecvRPC(rpc *pubsub.RPC)

func (*Host) RejectMessage

func (h *Host) RejectMessage(msg *pubsub.Message, reason string)

func (*Host) RemovePeer

func (h *Host) RemovePeer(p peer.ID)

func (*Host) SendRPC

func (h *Host) SendRPC(rpc *pubsub.RPC, p peer.ID)

func (*Host) Serve

func (h *Host) Serve(ctx context.Context) error

func (*Host) ThrottlePeer

func (h *Host) ThrottlePeer(p peer.ID)

func (*Host) Trace

func (h *Host) Trace(evt *pubsubpb.TraceEvent)

func (*Host) TracedTopicHandler

func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler

func (*Host) UndeliverableMessage

func (h *Host) UndeliverableMessage(msg *pubsub.Message)

func (*Host) UpdatePeerScore

func (h *Host) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot)

func (*Host) ValidateMessage

func (h *Host) ValidateMessage(msg *pubsub.Message)

func (*Host) WaitForPublicAddress

func (h *Host) WaitForPublicAddress(ctx context.Context) error

WaitForPublicAddress blocks until the libp2p host has identified its own addresses at which its publicly reachable.

type KinesisDataStream

type KinesisDataStream struct {
	// contains filtered or unexported fields
}

func NewKinesisDataStream

func NewKinesisDataStream(p *gk.Producer) *KinesisDataStream

NewKinesisDataStream creates a new instance of KinesisDataStream with a given producer.

func (*KinesisDataStream) OutputType

func (k *KinesisDataStream) OutputType() DataStreamOutputType

OutputType returns the output type to be used by this data stream.

func (*KinesisDataStream) PutRecord

func (k *KinesisDataStream) PutRecord(ctx context.Context, event *TraceEvent) error

PutRecord sends an event to the Kinesis data stream.

func (*KinesisDataStream) Start

func (k *KinesisDataStream) Start(ctx context.Context) error

Start begins the data stream's operation.

func (*KinesisDataStream) Stop

func (k *KinesisDataStream) Stop(ctx context.Context) error

Stop ends the data stream.

func (*KinesisDataStream) Type

Type returns the type of the data stream

type Libp2pConnectDisconnectEvent

type Libp2pConnectDisconnectEvent struct {
	BaseEvent
	SubType          string `parquet:"sub_type"`
	RemotePeerID     string `parquet:"remote_peer_id"`
	RemotePeerMaddrs string `parquet:"remote_peer_maddrs"`
	AgentVersion     string `parquet:"agent_version"`
	Direction        string `parquet:"direction"`
	Opened           int64  `parquet:"opened"`
	Limited          bool   `parquet:"limited"`
}

type LocalyProducedEvent

type LocalyProducedEvent interface {
	GetProducerID() string
}

type NoopDataStream

type NoopDataStream struct{}

func (*NoopDataStream) OutputType

func (ds *NoopDataStream) OutputType() DataStreamOutputType

OutputType returns the output type to be used by this data stream.

func (*NoopDataStream) PutRecord

func (ds *NoopDataStream) PutRecord(ctx context.Context, event *TraceEvent) error

func (*NoopDataStream) Start

func (ds *NoopDataStream) Start(ctx context.Context) error

func (*NoopDataStream) Stop

func (ds *NoopDataStream) Stop(ctx context.Context) error

func (*NoopDataStream) Type

func (ds *NoopDataStream) Type() DataStreamType

type PeerFilter

type PeerFilter struct {
	// contains filtered or unexported fields
}

PeerFilter implements libp2p's ConnectionGater interface to filter peer connections based on agent strings using regex patterns

func NewPeerFilter

func NewPeerFilter(h AgentVersionProvider, config FilterConfig, log *slog.Logger) (*PeerFilter, error)

NewPeerFilter creates a new peer filter with the given configuration

func (*PeerFilter) CheckAgent

func (pf *PeerFilter) CheckAgent(agent string) bool

CheckAgent is a public method for testing filter patterns

func (*PeerFilter) InterceptAccept

func (pf *PeerFilter) InterceptAccept(conn network.ConnMultiaddrs) (allow bool)

InterceptAccept implements ConnectionGater

func (*PeerFilter) InterceptAddrDial

func (pf *PeerFilter) InterceptAddrDial(p peer.ID, addr ma.Multiaddr) (allow bool)

InterceptAddrDial implements ConnectionGater

func (*PeerFilter) InterceptPeerDial

func (pf *PeerFilter) InterceptPeerDial(p peer.ID) (allow bool)

InterceptPeerDial implements ConnectionGater

func (*PeerFilter) InterceptSecured

func (pf *PeerFilter) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool)

InterceptSecured implements ConnectionGater

func (*PeerFilter) InterceptUpgraded

func (pf *PeerFilter) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason)

InterceptUpgraded implements ConnectionGater

type RPCdirection

type RPCdirection int8
const (
	RPCdirectionUnknown RPCdirection = iota
	RPCdirectionIn
	RPCdirectionOut
	RPCdirectionDrop
)

func (RPCdirection) String

func (d RPCdirection) String() string

type RpcControlGraft

type RpcControlGraft struct {
	TopicID string
}

type RpcControlIHave

type RpcControlIHave struct {
	TopicID string
	MsgIDs  []string
}

type RpcControlIWant

type RpcControlIWant struct {
	MsgIDs []string
}

type RpcControlIdontWant

type RpcControlIdontWant struct {
	MsgIDs []string
}

type RpcControlPrune

type RpcControlPrune struct {
	TopicID string
	PeerIDs []peer.ID
}

type RpcMeta

type RpcMeta struct {
	PeerID        peer.ID
	Subscriptions []RpcMetaSub    `json:"Subs,omitempty"`
	Messages      []RpcMetaMsg    `json:"Msgs,omitempty"`
	Control       *RpcMetaControl `json:"Control,omitempty"`
}

type RpcMetaControl

type RpcMetaControl struct {
	IHave     []RpcControlIHave     `json:"IHave,omitempty"`
	IWant     []RpcControlIWant     `json:"IWant,omitempty"`
	Graft     []RpcControlGraft     `json:"Graft,omitempty"`
	Prune     []RpcControlPrune     `json:"Prune,omitempty"`
	Idontwant []RpcControlIdontWant `json:"Idontwant,omitempty"`
}

type RpcMetaMsg

type RpcMetaMsg struct {
	MsgID string `json:"MsgID,omitempty"`
	Topic string `json:"Topic,omitempty"`
}

type RpcMetaSub

type RpcMetaSub struct {
	Subscribe bool
	TopicID   string
}

type S3DSConfig

type S3DSConfig struct {
	Meter         metric.Meter
	Flushers      int
	ByteLimit     int64
	FlushInterval time.Duration
	AccessKeyID   string
	SecretKey     string
	Region        string
	Endpoint      string
	Bucket        string
	Tag           string
}

S3DSConfig belongs to the configuration needed to stablish a connection with an s3 instance

func (*S3DSConfig) CheckValidity

func (s3cfg *S3DSConfig) CheckValidity() error

IsValid checks whether the current configuration is valid or not returns the missing items in case there is anything wrong with the config

func (S3DSConfig) ToAWSconfig

func (s3cfg S3DSConfig) ToAWSconfig() (*aws.Config, error)

ToAWSconfig makes a quick translation from the given user args into the aws.Config struct -> ready to create the S3 client

type S3DataStream

type S3DataStream struct {
	// contains filtered or unexported fields
}

func NewS3DataStream

func NewS3DataStream(baseCfg S3DSConfig) (*S3DataStream, error)

func (*S3DataStream) OutputType

func (s3ds *S3DataStream) OutputType() DataStreamOutputType

func (*S3DataStream) PutRecord

func (s3ds *S3DataStream) PutRecord(ctx context.Context, event *TraceEvent) error

func (*S3DataStream) S3KeySubmission

func (s3ds *S3DataStream) S3KeySubmission(ctx context.Context, s3Key string, content []byte) error

s3KeySubmission is a arbitraty method that submits any []byte into S3 with the given keys

func (*S3DataStream) Start

func (s3ds *S3DataStream) Start(ctx context.Context) error

func (*S3DataStream) Stop

func (s3ds *S3DataStream) Stop(ctx context.Context) error

func (*S3DataStream) Type

func (s3ds *S3DataStream) Type() DataStreamType

type ScoreKeeper

type ScoreKeeper struct {
	// contains filtered or unexported fields
}

ScoreKeeper is a thread-safe local copy of the score per peer and per copy TODO: figure out if this is some sort of info that we want to expose through OpenTelemetry (Still good to have it)

func (*ScoreKeeper) Get

func (sk *ScoreKeeper) Get() map[peer.ID]*pubsub.PeerScoreSnapshot

func (*ScoreKeeper) Update

func (sk *ScoreKeeper) Update(scores map[peer.ID]*pubsub.PeerScoreSnapshot)

type SendRecvRPCEvent

type SendRecvRPCEvent struct {
	BaseRPCEvent
	Ihaves     int32 `parquet:"ihaves"`
	Iwants     int32 `parquet:"iwants"`
	Idontwants int32 `parquet:"idontwants"`
	SentMsgs   int32 `parquet:"sent_msgs"`
}

to track number of original RPCs exchanged tracks the direction and the number of message_ids per control

type TopicHandler

type TopicHandler = func(context.Context, *pubsub.Message) error

type TopicScore

type TopicScore struct {
	Topic                    string
	TimeInMesh               time.Duration
	FirstMessageDeliveries   float64
	MeshMessageDeliveries    float64
	InvalidMessageDeliveries float64
}

type TopicSubscription

type TopicSubscription struct {
	Topic   string
	LocalID peer.ID
	Sub     *pubsub.Subscription
	Handler TopicHandler
}

func (*TopicSubscription) Serve

func (t *TopicSubscription) Serve(ctx context.Context) error

type TraceEvent

type TraceEvent struct {
	Type      string
	Topic     string
	PeerID    peer.ID
	Timestamp time.Time
	Payload   any `json:"Data"` // cannot use field "Data" because of gk.Record method
}

func (*TraceEvent) Data

func (t *TraceEvent) Data() []byte

func (*TraceEvent) ExplicitHashKey

func (t *TraceEvent) ExplicitHashKey() *string

func (*TraceEvent) PartitionKey

func (t *TraceEvent) PartitionKey() string

type TraceEventPayloadMetaData

type TraceEventPayloadMetaData struct {
	PeerID  string `json:"PeerID"`
	Topic   string `json:"Topic"`
	Seq     []byte `json:"Seq"`
	MsgID   string `json:"MsgID"`
	MsgSize int    `json:"MsgSize"`
}

type TraceEventPeerScore

type TraceEventPeerScore struct {
	PeerID             string
	Score              float64
	AppSpecificScore   float64
	IPColocationFactor float64
	BehaviourPenalty   float64
	Topics             []TopicScore
}

type TraceLogger

type TraceLogger struct{}

func (*TraceLogger) OutputType

func (t *TraceLogger) OutputType() DataStreamOutputType

OutputType returns the output type to be used by this data stream.

func (*TraceLogger) PutRecord

func (t *TraceLogger) PutRecord(ctx context.Context, event *TraceEvent) error

func (*TraceLogger) Start

func (t *TraceLogger) Start(ctx context.Context) error

func (*TraceLogger) Stop

func (t *TraceLogger) Stop(ctx context.Context) error

func (*TraceLogger) Type

func (t *TraceLogger) Type() DataStreamType

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL