Documentation
¶
Overview ¶
pkg/topology/actors.go
pkg/topology/errors.go
pkg/topology/metrics.go
pkg/topology/metrics_integration.go
pkg/topology/topology.go
Index ¶
- Variables
- type Actor
- type ActorAddedCallback
- type ActorRemovedCallback
- type Actors
- func (a *Actors) AddPeer(peerID peer.ID, address types.Address, addresses []multiaddr.Multiaddr, ...) error
- func (a *Actors) AddPeerRole(peerID peer.ID, role types.Role) error
- func (a *Actors) GetPeer(peerID peer.ID) (*Actor, error)
- func (a *Actors) GetPeers() ([]*Actor, error)
- func (a *Actors) GetPeersByRole(role types.Role) ([]*Actor, error)
- func (a *Actors) GetPeersByRoles(roles ...types.Role) ([]*Actor, error)
- func (a *Actors) GetPeersCount() (int, error)
- func (a *Actors) GetRolesOfPeer(peerID peer.ID) ([]types.Role, error)
- func (a *Actors) HandlePeerConnected(ctx context.Context, peerInfo peer.AddrInfo) error
- func (a *Actors) HandlePeerDisconnected(ctx context.Context, peerID peer.ID) error
- func (a *Actors) HasPeer(peerID peer.ID) bool
- func (a *Actors) PeerHasRole(peerID peer.ID, role types.Role) (bool, error)
- func (a *Actors) RegisterActorAddedCallback(cb ActorAddedCallback)
- func (a *Actors) RegisterActorRemovedCallback(cb ActorRemovedCallback)
- func (a *Actors) RemovePeer(ctx context.Context, peerID peer.ID, force bool) error
- func (a *Actors) RemovePeerRole(peerID peer.ID, role types.Role) error
- func (a *Actors) SendActorResponsePacket(ctx context.Context, peerID peer.ID, ap *packets.ActorPacket) error
- type Topology
- func (t *Topology) ActorSet() *share.ActorSet
- func (t *Topology) Actors() *Actors
- func (t *Topology) HandleActorPacket(ctx context.Context, msg *packets.NetworkPacket, sender peer.ID) error
- func (t *Topology) RegisterHandlers() error
- func (t *Topology) Shutdown()
- func (t *Topology) WaitForPeer(ctx context.Context, targetPeerID peer.ID, timeout time.Duration) error
- func (t *Topology) WaitForPeers(ctx context.Context, timeout time.Duration) error
- func (t *Topology) WaitForPeersWithRole(ctx context.Context, role types.Role, timeout time.Duration) error
- func (t *Topology) WaitForPeersWithRoles(ctx context.Context, roles []types.Role, timeout time.Duration) error
- type TopologyMetrics
- func (m *TopologyMetrics) RecordActorAdded(ctx context.Context, count int64, role string)
- func (m *TopologyMetrics) RecordActorConnection(ctx context.Context, count int64)
- func (m *TopologyMetrics) RecordActorDisconnection(ctx context.Context, count int64)
- func (m *TopologyMetrics) RecordActorRemoved(ctx context.Context, count int64, role string)
- func (m *TopologyMetrics) RecordActorVerification(ctx context.Context, count int64)
- func (m *TopologyMetrics) RecordActorVerificationFailure(ctx context.Context, count int64)
- func (m *TopologyMetrics) RecordActorVerificationLatency(ctx context.Context, duration time.Duration)
- func (m *TopologyMetrics) RecordConsensusActor(ctx context.Context, delta int64)
- func (m *TopologyMetrics) RecordPendingPeer(ctx context.Context, count int64)
- func (m *TopologyMetrics) RecordPendingPeerTimeout(ctx context.Context, count int64)
- func (m *TopologyMetrics) RecordTopologyChangeLatency(ctx context.Context, duration time.Duration)
- func (m *TopologyMetrics) RecordWaitOperation(ctx context.Context, count int64, waitType string)
- func (m *TopologyMetrics) RecordWaitOperationLatency(ctx context.Context, duration time.Duration, waitType string)
- func (m *TopologyMetrics) RecordWaitOperationSuccess(ctx context.Context, count int64, waitType string)
- func (m *TopologyMetrics) RecordWaitOperationTimeout(ctx context.Context, count int64, waitType string)
Constants ¶
This section is empty.
Variables ¶
var ( ErrPeerNotFound = errors.New("peer not found in topology") ErrPeerAlreadyExists = errors.New("peer already exists in topology") ErrInvalidMessageType = errors.New("invalid message type") ErrInsufficientPermissions = errors.New("insufficient permissions") ErrPeerNotInPendingPeers = errors.New("peer not found in pendingPeers") ErrFailedToRemovePeer = errors.New("failed to remove peer from topology") ErrFailedToProcessActorPacket = errors.New("failed to process actor packet") ErrUnknownActorPacketStatus = errors.New("unknown ActorPacket status") ErrInvalidSignature = errors.New("invalid signature") ErrTimeoutExceeded = errors.New("wait operation timed out") )
Custom errors for the topology package.
Functions ¶
This section is empty.
Types ¶
type Actor ¶
type Actor struct {
ID peer.ID
Address types.Address
Addresses []multiaddr.Multiaddr
Roles []types.Role
SupportedTransports []types.TransportType
SupportedProtocols []types.ProtocolType
SupportedSigners []types.SignerType
ConsensusPublicKey kyber.Point
PublicKey libp2pCrypto.PubKey
ConsensusActor *share.Actor // Optional: Link to share.Actor for consensus participation
}
Actor represents both the node's own actor information and that of connected peers.
type ActorAddedCallback ¶
ActorAddedCallback is a function type that is called when an actor is added.
type ActorRemovedCallback ¶
ActorRemovedCallback is a function type that is called when an actor is removed.
type Actors ¶
type Actors struct {
// contains filtered or unexported fields
}
Actors manages actor information within the topology.
func NewActors ¶
func NewActors(logger logger.Logger, account share.Account, network *networking.Network, rbacMgr *rbac.Manager, collector share.Collector, consensusSet *share.ActorSet, onPeerEvent func()) (*Actors, error)
NewActors creates a new Actors instance and registers event handlers. The onPeerEvent function is called whenever a peer is added or removed.
func (*Actors) AddPeer ¶
func (a *Actors) AddPeer(peerID peer.ID, address types.Address, addresses []multiaddr.Multiaddr, roles []types.Role, transports []types.TransportType, protocols []types.ProtocolType, signers []types.SignerType, pubKey libp2pCrypto.PubKey) error
AddPeer adds a peer to the topology with verified information.
func (*Actors) AddPeerRole ¶
AddPeerRole assigns a new role to an existing peer.
func (*Actors) GetPeersByRole ¶
GetPeersByRole retrieves all peers assigned a specific role.
func (*Actors) GetPeersByRoles ¶
GetPeersByRoles retrieves all peers assigned any of the specified roles.
func (*Actors) GetPeersCount ¶
GetPeersCount returns a count of all peers in the topology.
func (*Actors) GetRolesOfPeer ¶
GetRolesOfPeer retrieves all roles associated with a specific peer.
func (*Actors) HandlePeerConnected ¶
HandlePeerConnected is invoked when a new peer connects.
func (*Actors) HandlePeerDisconnected ¶
HandlePeerDisconnected is invoked when a peer disconnects.
func (*Actors) PeerHasRole ¶
PeerHasRole checks if a specific peer possesses a particular role.
func (*Actors) RegisterActorAddedCallback ¶
func (a *Actors) RegisterActorAddedCallback(cb ActorAddedCallback)
RegisterActorAddedCallback allows external packages to register a callback that is invoked whenever an actor is added.
func (*Actors) RegisterActorRemovedCallback ¶
func (a *Actors) RegisterActorRemovedCallback(cb ActorRemovedCallback)
RegisterActorRemovedCallback allows external packages to register a callback that is invoked whenever an actor is removed.
func (*Actors) RemovePeer ¶
RemovePeer removes a peer from the topology.
func (*Actors) RemovePeerRole ¶
RemovePeerRole removes a specific role from an existing peer.
func (*Actors) SendActorResponsePacket ¶
func (a *Actors) SendActorResponsePacket(ctx context.Context, peerID peer.ID, ap *packets.ActorPacket) error
SendActorResponsePacket sends an ActorPacket (e.g., approved) to a peer.
type Topology ¶
type Topology struct {
// contains filtered or unexported fields
}
Topology orchestrates the overall network topology and peer management.
func NewTopology ¶
func NewTopology(ctx context.Context, logger logger.Logger, account share.Account, network *networking.Network, rbacMgr *rbac.Manager, collector *metrics.Collector, actorSet *share.ActorSet) (*Topology, error)
NewTopology creates a new Topology instance with the given logger and network reference.
func (*Topology) ActorSet ¶
ActorSet return the share.ActorSet instance managed by Topology. share.ActorSet is in charge of consensus-related-actors TODO: Rename to ConsensusSet
func (*Topology) HandleActorPacket ¶
func (t *Topology) HandleActorPacket(ctx context.Context, msg *packets.NetworkPacket, sender peer.ID) error
HandleActorPacket handles incoming ActorPacket based on its Status.
func (*Topology) RegisterHandlers ¶
RegisterHandlers registers the Topology's packet handlers with the networking layer.
func (*Topology) Shutdown ¶
func (t *Topology) Shutdown()
Shutdown gracefully shuts down the topology.
func (*Topology) WaitForPeer ¶
func (t *Topology) WaitForPeer(ctx context.Context, targetPeerID peer.ID, timeout time.Duration) error
WaitForPeer waits until the specified peer is available in the topology or the context times out.
func (*Topology) WaitForPeers ¶
WaitForPeers waits until at least one peer is available in the topology or the context times out.
type TopologyMetrics ¶
type TopologyMetrics struct {
// Actor management metrics
ActorsAddedTotal metric.Int64Counter
ActorsRemovedTotal metric.Int64Counter
ActorVerificationsTotal metric.Int64Counter
ActorVerificationFailures metric.Int64Counter
ActorVerificationLatency metric.Float64Histogram
// Pending peer metrics
PendingPeersTotal metric.Int64Counter
PendingPeersTimeoutTotal metric.Int64Counter
// Role-based metrics
ActorsByRoleCount metric.Int64UpDownCounter
// Connection metrics
ActorConnectionsTotal metric.Int64Counter
ActorDisconnectionsTotal metric.Int64Counter
// Consensus-related metrics
ConsensusActorsTotal metric.Int64UpDownCounter
// Health metrics
TopologyChangeLatency metric.Float64Histogram
// Wait operation metrics
WaitOperationsTotal metric.Int64Counter
WaitOperationSuccess metric.Int64Counter
WaitOperationTimeout metric.Int64Counter
WaitOperationLatency metric.Float64Histogram
}
TopologyMetrics holds all the metrics instruments for the Topology management.
func InitializeMetrics ¶
func InitializeMetrics(meter metric.Meter) (*TopologyMetrics, error)
InitializeMetrics initializes all topology metrics using the provided meter.
func InitializeTopologyMetrics ¶
func InitializeTopologyMetrics(meter metric.Meter) (*TopologyMetrics, error)
InitializeTopologyMetrics initializes the metrics instruments for Topology.
func (*TopologyMetrics) RecordActorAdded ¶
func (m *TopologyMetrics) RecordActorAdded(ctx context.Context, count int64, role string)
RecordActorAdded increments the ActorsAddedTotal counter.
func (*TopologyMetrics) RecordActorConnection ¶
func (m *TopologyMetrics) RecordActorConnection(ctx context.Context, count int64)
RecordActorConnection increments the ActorConnectionsTotal counter.
func (*TopologyMetrics) RecordActorDisconnection ¶
func (m *TopologyMetrics) RecordActorDisconnection(ctx context.Context, count int64)
RecordActorDisconnection increments the ActorDisconnectionsTotal counter.
func (*TopologyMetrics) RecordActorRemoved ¶
func (m *TopologyMetrics) RecordActorRemoved(ctx context.Context, count int64, role string)
RecordActorRemoved increments the ActorsRemovedTotal counter.
func (*TopologyMetrics) RecordActorVerification ¶
func (m *TopologyMetrics) RecordActorVerification(ctx context.Context, count int64)
RecordActorVerification increments the ActorVerificationsTotal counter.
func (*TopologyMetrics) RecordActorVerificationFailure ¶
func (m *TopologyMetrics) RecordActorVerificationFailure(ctx context.Context, count int64)
RecordActorVerificationFailure increments the ActorVerificationFailures counter.
func (*TopologyMetrics) RecordActorVerificationLatency ¶
func (m *TopologyMetrics) RecordActorVerificationLatency(ctx context.Context, duration time.Duration)
RecordActorVerificationLatency records the latency of actor verification operations.
func (*TopologyMetrics) RecordConsensusActor ¶
func (m *TopologyMetrics) RecordConsensusActor(ctx context.Context, delta int64)
RecordConsensusActor updates the ConsensusActorsTotal counter.
func (*TopologyMetrics) RecordPendingPeer ¶
func (m *TopologyMetrics) RecordPendingPeer(ctx context.Context, count int64)
RecordPendingPeer increments the PendingPeersTotal counter.
func (*TopologyMetrics) RecordPendingPeerTimeout ¶
func (m *TopologyMetrics) RecordPendingPeerTimeout(ctx context.Context, count int64)
RecordPendingPeerTimeout increments the PendingPeersTimeoutTotal counter.
func (*TopologyMetrics) RecordTopologyChangeLatency ¶
func (m *TopologyMetrics) RecordTopologyChangeLatency(ctx context.Context, duration time.Duration)
RecordTopologyChangeLatency records the latency of topology change operations.
func (*TopologyMetrics) RecordWaitOperation ¶
func (m *TopologyMetrics) RecordWaitOperation(ctx context.Context, count int64, waitType string)
RecordWaitOperation increments the WaitOperationsTotal counter.
func (*TopologyMetrics) RecordWaitOperationLatency ¶
func (m *TopologyMetrics) RecordWaitOperationLatency(ctx context.Context, duration time.Duration, waitType string)
RecordWaitOperationLatency records the latency of wait operations.
func (*TopologyMetrics) RecordWaitOperationSuccess ¶
func (m *TopologyMetrics) RecordWaitOperationSuccess(ctx context.Context, count int64, waitType string)
RecordWaitOperationSuccess increments the WaitOperationSuccess counter.
func (*TopologyMetrics) RecordWaitOperationTimeout ¶
func (m *TopologyMetrics) RecordWaitOperationTimeout(ctx context.Context, count int64, waitType string)
RecordWaitOperationTimeout increments the WaitOperationTimeout counter.