Documentation
¶
Index ¶
- Constants
- Variables
- func NewValidatorsWrapper(manager validators.Manager) *validatorsWrapper
- type Config
- type ConsistentHashPeerSelector
- func (s *ConsistentHashPeerSelector) AddPeer(nodeID ids.NodeID, weight uint64)
- func (s *ConsistentHashPeerSelector) Contains(nodeID ids.NodeID) bool
- func (s *ConsistentHashPeerSelector) GetPeersByWeight(limit int) []ids.NodeID
- func (s *ConsistentHashPeerSelector) RemovePeer(nodeID ids.NodeID)
- func (s *ConsistentHashPeerSelector) SelectPeers(key []byte, n int, exclude set.Set[ids.NodeID]) []ids.NodeID
- func (s *ConsistentHashPeerSelector) SelectRandomPeers(n int, exclude set.Set[ids.NodeID]) []ids.NodeID
- func (s *ConsistentHashPeerSelector) Size() int
- type DelayConfig
- type ExternalHandler
- type ExternalSender
- type HealthConfig
- type Message
- type Network
- type PeerListGossipConfig
- type SetCallbackListener
- type ThrottlerConfig
- type TimeoutConfig
- type UptimeResult
Examples ¶
Constants ¶
const ( PrimaryNetworkValidatorHealthKey = "primary network validator health" ConnectedPeersKey = "connectedPeers" TimeSinceLastMsgReceivedKey = "timeSinceLastMsgReceived" TimeSinceLastMsgSentKey = "timeSinceLastMsgSent" SendFailRateKey = "sendFailRate" )
Variables ¶
var ErrNoIngressConnections = errors.New("primary network validator has no inbound connections")
ErrNoIngressConnections denotes that no node is connected to this validator.
Functions ¶
func NewValidatorsWrapper ¶ added in v1.11.14
func NewValidatorsWrapper(manager validators.Manager) *validatorsWrapper
NewValidatorsWrapper creates a new validators wrapper
Types ¶
type Config ¶
type Config struct {
HealthConfig `json:"healthConfig"`
PeerListGossipConfig `json:"peerListGossipConfig"`
TimeoutConfig `json:"timeoutConfigs"`
DelayConfig `json:"delayConfig"`
ThrottlerConfig ThrottlerConfig `json:"throttlerConfig"`
ProxyEnabled bool `json:"proxyEnabled"`
ProxyReadHeaderTimeout time.Duration `json:"proxyReadHeaderTimeout"`
DialerConfig dialer.Config `json:"dialerConfig"`
TLSConfig *tls.Config `json:"-"`
TLSKeyLogFile string `json:"tlsKeyLogFile"`
MyNodeID ids.NodeID `json:"myNodeID"`
MyIPPort *utils.Atomic[netip.AddrPort] `json:"myIP"`
NetworkID uint32 `json:"networkID"`
MaxClockDifference time.Duration `json:"maxClockDifference"`
PingFrequency time.Duration `json:"pingFrequency"`
AllowPrivateIPs bool `json:"allowPrivateIPs"`
SupportedLPs set.Set[uint32] `json:"supportedLPs"`
ObjectedLPs set.Set[uint32] `json:"objectedLPs"`
// The compression type to use when compressing outbound messages.
// Assumes all peers support this compression type.
CompressionType compression.Type `json:"compressionType"`
// TLSKey is this node's TLS key that is used to sign IPs.
TLSKey crypto.Signer `json:"-"`
// BLSKey is this node's BLS key that is used to sign IPs.
BLSKey bls.Signer `json:"-"`
// TrackedChains of the node.
// It must not include the primary network ID.
TrackedChains set.Set[ids.ID] `json:"-"`
Beacons validators.Manager `json:"-"`
// Validators are the current validators in the Lux network
Validators validators.Manager `json:"-"`
UptimeCalculator uptime.Calculator `json:"-"`
// UptimeMetricFreq marks how frequently this node will recalculate the
// observed average uptime metric.
UptimeMetricFreq time.Duration `json:"uptimeMetricFreq"`
// UptimeRequirement is the fraction of time a validator must be online and
// responsive for us to vote that they should receive a staking reward.
UptimeRequirement float64 `json:"-"`
// RequireValidatorToConnect require that all connections must have at least
// one validator between the 2 peers. This can be useful to enable if the
// node wants to connect to the minimum number of nodes without impacting
// the network negatively.
RequireValidatorToConnect bool `json:"requireValidatorToConnect"`
// MaximumInboundMessageTimeout is the maximum deadline duration in a
// message. Messages sent by clients setting values higher than this value
// will be reset to this value.
MaximumInboundMessageTimeout time.Duration `json:"maximumInboundMessageTimeout"`
// Size, in bytes, of the buffer that we read peer messages into
// (there is one buffer per peer)
PeerReadBufferSize int `json:"peerReadBufferSize"`
// Size, in bytes, of the buffer that we write peer messages into
// (there is one buffer per peer)
PeerWriteBufferSize int `json:"peerWriteBufferSize"`
// Tracks the CPU/disk usage caused by processing messages of each peer.
ResourceTracker consensustracker.ResourceTracker `json:"-"`
// Specifies how much CPU usage each peer can cause before
// we rate-limit them.
CPUTargeter tracker.Targeter `json:"-"`
// Specifies how much disk usage each peer can cause before
// we rate-limit them.
DiskTargeter tracker.Targeter `json:"-"`
}
func NewTestNetworkConfig ¶
type ConsistentHashPeerSelector ¶ added in v1.16.56
type ConsistentHashPeerSelector struct {
// contains filtered or unexported fields
}
ConsistentHashPeerSelector implements O(1) peer selection using consistent hashing
func NewConsistentHashPeerSelector ¶ added in v1.16.56
func NewConsistentHashPeerSelector(virtualNodes int) *ConsistentHashPeerSelector
NewConsistentHashPeerSelector creates an optimized peer selector
func (*ConsistentHashPeerSelector) AddPeer ¶ added in v1.16.56
func (s *ConsistentHashPeerSelector) AddPeer(nodeID ids.NodeID, weight uint64)
AddPeer adds a peer to the consistent hash ring - O(log n)
func (*ConsistentHashPeerSelector) Contains ¶ added in v1.16.56
func (s *ConsistentHashPeerSelector) Contains(nodeID ids.NodeID) bool
Contains checks if a peer exists in the selector
func (*ConsistentHashPeerSelector) GetPeersByWeight ¶ added in v1.16.56
func (s *ConsistentHashPeerSelector) GetPeersByWeight(limit int) []ids.NodeID
GetPeersByWeight returns peers sorted by weight - O(n log n) but cached
func (*ConsistentHashPeerSelector) RemovePeer ¶ added in v1.16.56
func (s *ConsistentHashPeerSelector) RemovePeer(nodeID ids.NodeID)
RemovePeer removes a peer from the ring - O(log n)
func (*ConsistentHashPeerSelector) SelectPeers ¶ added in v1.16.56
func (s *ConsistentHashPeerSelector) SelectPeers(key []byte, n int, exclude set.Set[ids.NodeID]) []ids.NodeID
SelectPeers selects n peers using consistent hashing - O(n log k) where k is ring size
func (*ConsistentHashPeerSelector) SelectRandomPeers ¶ added in v1.16.56
func (s *ConsistentHashPeerSelector) SelectRandomPeers(n int, exclude set.Set[ids.NodeID]) []ids.NodeID
SelectRandomPeers selects n random peers efficiently - O(n)
func (*ConsistentHashPeerSelector) Size ¶ added in v1.16.56
func (s *ConsistentHashPeerSelector) Size() int
Size returns the number of unique peers
type DelayConfig ¶
type DelayConfig struct {
// InitialReconnectDelay is the minimum amount of time the node will delay a
// reconnection to a peer. This value is used to start the exponential
// backoff.
InitialReconnectDelay time.Duration `json:"initialReconnectDelay"`
// MaxReconnectDelay is the maximum amount of time the node will delay a
// reconnection to a peer.
MaxReconnectDelay time.Duration `json:"maxReconnectDelay"`
}
type ExternalHandler ¶ added in v1.16.56
type ExternalHandler interface {
Connected(nodeID ids.NodeID, version *version.Application, netID ids.ID)
Disconnected(nodeID ids.NodeID)
HandleInbound(ctx context.Context, msg message.InboundMessage)
}
ExternalHandler handles incoming messages
type ExternalSender ¶ added in v1.16.56
type ExternalSender interface {
Send(msg Message, nodeIDs set.Set[ids.NodeID], netID ids.ID, requestID uint32) set.Set[ids.NodeID]
Gossip(msg Message, nodeIDs set.Set[ids.NodeID], netID ids.ID, numValidatorsToSend int, numNonValidatorsToSend int, numPeersToSend int) set.Set[ids.NodeID]
}
ExternalSender sends messages to peers
type HealthConfig ¶
type HealthConfig struct {
// Marks if the health check should be enabled
Enabled bool `json:"-"`
// NoIngressValidatorConnectionGracePeriod denotes the time after which the health check fails
// for primary network validators with no ingress connections.
NoIngressValidatorConnectionGracePeriod time.Duration
// MinConnectedPeers is the minimum number of peers that the network should
// be connected to be considered healthy.
MinConnectedPeers uint `json:"minConnectedPeers"`
// MaxTimeSinceMsgReceived is the maximum amount of time since the network
// last received a message to be considered healthy.
MaxTimeSinceMsgReceived time.Duration `json:"maxTimeSinceMsgReceived"`
// MaxTimeSinceMsgSent is the maximum amount of time since the network last
// sent a message to be considered healthy.
MaxTimeSinceMsgSent time.Duration `json:"maxTimeSinceMsgSent"`
// MaxPortionSendQueueBytesFull is the maximum percentage of the pending
// send byte queue that should be used for the network to be considered
// healthy. Should be in (0,1].
MaxPortionSendQueueBytesFull float64 `json:"maxPortionSendQueueBytesFull"`
// MaxSendFailRate is the maximum percentage of send attempts that should be
// failing for the network to be considered healthy. This does not include
// send attempts that were not made due to benching. Should be in [0,1].
MaxSendFailRate float64 `json:"maxSendFailRate"`
// SendFailRateHalflife is the halflife of the averager used to calculate
// the send fail rate percentage. Should be > 0. Larger values mean that the
// fail rate is affected less by recently dropped messages.
SendFailRateHalflife time.Duration `json:"sendFailRateHalflife"`
}
HealthConfig describes parameters for network layer health checks.
type Message ¶ added in v1.16.56
type Message = message.OutboundMessage
Message represents a network message
type Network ¶
type Network interface {
// All consensus messages can be sent through this interface. Thread safety
// must be managed internally in the network.
ExternalSender
// Has a health check
health.Checker
peer.Network
// StartClose this network and all existing connections it has. Calling
// StartClose multiple times is handled gracefully.
StartClose()
// Should only be called once, will run until either a fatal error occurs,
// or the network is closed.
Dispatch() error
// Attempt to connect to this IP. The network will never stop attempting to
// connect to this ID.
ManuallyTrack(nodeID ids.NodeID, ip netip.AddrPort)
// PeerInfo returns information about peers. If [nodeIDs] is empty, returns
// info about all peers that have finished the handshake. Otherwise, returns
// info about the peers in [nodeIDs] that have finished the handshake.
PeerInfo(nodeIDs []ids.NodeID) []peer.Info
// NodeUptime returns given node's primary network UptimeResults in the view of
// this node's peer validators.
NodeUptime() (UptimeResult, error)
// TrackChain starts tracking a chain. This updates the local node's tracked
// chains and will be included in handshakes with new peers.
TrackChain(chainID ids.ID) error
// TrackedChains returns the set of chains this node is tracking.
TrackedChains() set.Set[ids.ID]
}
Network defines the functionality of the networking library.
func NewNetwork ¶
func NewNetwork( config *Config, minCompatibleTime time.Time, msgCreator message.Creator, metricsRegistry metric.Registry, log log.Logger, listener net.Listener, dialer dialer.Dialer, router ExternalHandler, ) (Network, error)
NewNetwork returns a new Network implementation with the provided parameters.
func NewTestNetwork ¶
func NewTestNetwork( log log.Logger, registry metric.Registry, cfg *Config, router ExternalHandler, ) (Network, error)
Example ¶
// Copyright (C) 2019-2025, Lux Industries Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package main
import (
"context"
"fmt"
"time"
"github.com/luxfi/log"
"github.com/luxfi/metric"
consensuscore "github.com/luxfi/consensus/core"
validators "github.com/luxfi/consensus/validator"
"github.com/luxfi/constants"
"github.com/luxfi/ids"
"github.com/luxfi/math/set"
"github.com/luxfi/node/genesis/builder"
"github.com/luxfi/node/message"
"github.com/luxfi/node/version"
)
var _ ExternalHandler = (*testExternalHandler)(nil)
// Note: all of the external handler's methods are called on peer goroutines. It
// is possible for multiple concurrent calls to happen with different NodeIDs.
// However, a given NodeID will only be performing one call at a time.
type testExternalHandler struct {
log log.Logger
}
// Note: HandleInbound will be called with raw P2P messages, the networking
// implementation does not implicitly register timeouts, so this handler is only
// called by messages explicitly sent by the peer. If timeouts are required,
// that must be handled by the user of this utility.
func (t *testExternalHandler) HandleInbound(_ context.Context, msg message.InboundMessage) {
t.log.Info(
"receiving message",
"op", msg.Op(),
)
}
func (t *testExternalHandler) Connected(nodeID ids.NodeID, version *version.Application, netID ids.ID) {
t.log.Info(
"connected",
"nodeID", nodeID,
"version", version,
"netID", netID,
)
}
func (t *testExternalHandler) HandleGossip(_ context.Context, nodeID ids.NodeID, msg []byte) {
t.log.Info(
"received gossip",
"nodeID", nodeID,
"size", len(msg),
)
}
func (t *testExternalHandler) HandleTimeout(_ context.Context) {
t.log.Info("timeout occurred")
}
func (t *testExternalHandler) Disconnected(nodeID ids.NodeID) {
t.log.Info(
"disconnected",
"nodeID", nodeID,
)
}
func (t *testExternalHandler) AppRequest(_ context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, appRequestBytes []byte) error {
t.log.Info("AppRequest", "nodeID", nodeID, "requestID", requestID)
return nil
}
func (t *testExternalHandler) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32, appErr *consensuscore.AppError) error {
t.log.Info("AppRequestFailed", "nodeID", nodeID, "requestID", requestID)
return nil
}
func (t *testExternalHandler) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error {
t.log.Info("AppResponse", "nodeID", nodeID, "requestID", requestID)
return nil
}
func (t *testExternalHandler) AppGossip(_ context.Context, nodeID ids.NodeID, appGossipBytes []byte) error {
t.log.Info("AppGossip", "nodeID", nodeID)
return nil
}
type testAggressiveValidatorManager struct {
validators.Manager
}
func (*testAggressiveValidatorManager) Contains(ids.ID, ids.NodeID) bool {
return true
}
func main() {
var log log.Logger
// Needs to be periodically updated by the caller to have the latest
// validator set
validators := &testAggressiveValidatorManager{
Manager: validators.NewManager(),
}
// If we want to be able to communicate with non-primary network nets, we
// should register them here.
trackedNets := make(set.Set[ids.ID])
// Messages and connections are handled by the external handler.
handler := &testExternalHandler{
log: log,
}
metrics := metric.NewRegistry()
cfg, err := NewTestNetworkConfig(
metrics,
constants.TestnetID,
validators,
trackedNets,
)
if err != nil {
panic(fmt.Sprintf("failed to create test network config: %v", err))
return
}
network, err := NewTestNetwork(
log,
metrics,
cfg,
handler,
)
if err != nil {
log.Error(
"failed to create test network",
"error", err,
)
return
}
// We need to initially connect to some nodes in the network before peer
// gossip will enable connecting to all the remaining nodes in the network.
bootstrappers, err := builder.SampleBootstrappers(constants.TestnetID, 5)
if err != nil {
log.Error(
"failed to get sample bootstrappers",
"error", err,
)
return
}
for _, bootstrapper := range bootstrappers {
network.ManuallyTrack(bootstrapper.ID, bootstrapper.IP)
}
// Typically network.StartClose() should be called based on receiving a
// SIGINT or SIGTERM. For the example, we close the network after 15s.
go func() {
time.Sleep(15 * time.Second)
network.StartClose()
}()
// network.Send(...) and network.Gossip(...) can be used here to send
// messages to peers.
// Calling network.Dispatch() will block until a fatal error occurs or
// network.StartClose() is called.
err = network.Dispatch()
log.Info(
"network exited",
"error", err,
)
}
type PeerListGossipConfig ¶
type PeerListGossipConfig struct {
// PeerListNumValidatorIPs is the number of validator IPs to gossip in every
// gossip event.
PeerListNumValidatorIPs uint32 `json:"peerListNumValidatorIPs"`
// PeerListPullGossipFreq is the frequency that this node will attempt to
// request signed IPs from its peers.
PeerListPullGossipFreq time.Duration `json:"peerListPullGossipFreq"`
// PeerListBloomResetFreq is how frequently this node will recalculate the
// IP tracker's bloom filter.
PeerListBloomResetFreq time.Duration `json:"peerListBloomResetFreq"`
}
type SetCallbackListener ¶ added in v1.11.14
type SetCallbackListener interface {
OnValidatorAdded(nodeID ids.NodeID, pk *bls.PublicKey, txID ids.ID, weight uint64)
OnValidatorRemoved(nodeID ids.NodeID, weight uint64)
OnValidatorWeightChanged(nodeID ids.NodeID, oldWeight, newWeight uint64)
}
SetCallbackListener for validator changes
type ThrottlerConfig ¶
type ThrottlerConfig struct {
InboundConnUpgradeThrottlerConfig throttling.InboundConnUpgradeThrottlerConfig `json:"inboundConnUpgradeThrottlerConfig"`
InboundMsgThrottlerConfig throttling.InboundMsgThrottlerConfig `json:"inboundMsgThrottlerConfig"`
OutboundMsgThrottlerConfig throttling.MsgByteThrottlerConfig `json:"outboundMsgThrottlerConfig"`
MaxInboundConnsPerSec float64 `json:"maxInboundConnsPerSec"`
}
type TimeoutConfig ¶
type TimeoutConfig struct {
// PingPongTimeout is the maximum amount of time to wait for a Pong response
// from a peer we sent a Ping to.
PingPongTimeout time.Duration `json:"pingPongTimeout"`
// ReadHandshakeTimeout is the maximum amount of time to wait for the peer's
// connection upgrade to finish before starting the p2p handshake.
ReadHandshakeTimeout time.Duration `json:"readHandshakeTimeout"`
}
type UptimeResult ¶
type UptimeResult struct {
// RewardingStakePercentage shows what percent of network stake thinks we're
// above the uptime requirement.
RewardingStakePercentage float64
// WeightedAveragePercentage is the average perceived uptime of this node,
// weighted by stake.
// Note that this is different from RewardingStakePercentage, which shows
// the percent of the network stake that thinks this node is above the
// uptime requirement. WeightedAveragePercentage is weighted by uptime.
// i.e If uptime requirement is 85 and a peer reports 40 percent it will be
// counted (40*weight) in WeightedAveragePercentage but not in
// RewardingStakePercentage since 40 < 85
WeightedAveragePercentage float64
}
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
lp118
Package lp118 implements LP-118 message handling
|
Package lp118 implements LP-118 message handling |
|
mocks
Package mocks is a generated GoMock package.
|
Package mocks is a generated GoMock package. |
|
Package peer is a generated GoMock package.
|
Package peer is a generated GoMock package. |
|
trackermock
Package trackermock provides mock implementations for testing
|
Package trackermock provides mock implementations for testing |