p2p

package
v0.1.14-snapshot Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: MIT Imports: 37 Imported by: 0

README

P2P Package

The P2P package provides a robust peer-to-peer networking layer for the Canopy Network. It implements a secure, multiplexed, and encrypted communication system between network nodes.

Overview

The P2P package is designed to handle:

  • TCP/IP transport
  • Multiplexing of different message types
  • Encrypted connections
  • DOS mitigation
  • Peer configuration and management
  • Peer discovery and churn
  • Message dissemination via gossip

Core Components

P2P

The main entry point for the P2P system. It manages the overall P2P network functionality, including:

  • Starting and stopping the P2P service
  • Listening for inbound peers
  • Dialing outbound peers
  • Managing peer connections
  • Handling peer book exchanges
  • Filtering bad IPs and countries
PeerSet

Manages the active set of connected peers. It provides:

  • Adding and removing peers
  • Tracking inbound and outbound connections
  • Managing peer reputation
  • Sending messages to peers
  • Handling must-connect peers
PeerBook

Maintains a persistent list of potential peers. It provides:

  • Storing and retrieving peer information
  • Managing peer churn
  • Exchanging peer information with other nodes
  • Tracking failed connection attempts
  • Persisting peer data to disk
MultiConn

Represents a multiplexed connection to a peer. It provides:

  • Multiple independent bi-directional communication channels
  • Rate-limited message sending and receiving
  • Ping/pong keep-alive mechanism
  • Error handling and reporting
EncryptedConn

Handles the encrypted communication with peers. It implements:

  • ECDH key exchange for establishing shared secrets
  • HKDF for deriving encryption keys
  • ChaCha20-Poly1305 AEAD for message encryption and authentication
  • Handshake protocol for peer authentication
Stream

Represents a single communication channel within a MultiConn. It provides:

  • Message queuing and sending
  • Message assembly from packets
  • Inbox for received messages

Sequence Diagram

The following sequence diagram illustrates the core interactions in the P2P package:

sequenceDiagram
    participant App as Application
    participant P2P as P2P
    participant PS as PeerSet
    participant PB as PeerBook
    participant Peer as Remote Peer

    %% Connection flow
    Note over P2P,Peer: New peer connects
    Peer->>P2P: Connection request
    P2P->>P2P: Security handshake
    P2P->>PS: Add to active peers
    
    %% Message flow
    App->>P2P: Send message
    P2P->>PS: Forward to peer
    PS->>Peer: Send encrypted message
    
    Peer->>P2P: Receive message
    P2P->>App: Deliver to application
    
    %% Gossip protocol
    App->>P2P: Broadcast message
    P2P->>PS: Send to all peers except sender
    PS->>Peer: Forward message
    
    %% Peer discovery
    P2P->>P2P: Request peer list
    P2P->>PS: Send request to peers
    PS->>Peer: Forward request
    
    Peer->>P2P: Send peer list
    P2P->>PB: Update peer book

Technical Details

Multiplexing

The P2P system uses multiplexing to allow multiple independent communication channels over a single TCP connection. This is achieved by:

  • Topic-Based Channels: Each message is assigned to a specific topic (like blocks, transactions, or peer discovery).
  • Independent Streams: Each topic gets its own Stream with its own message queue and inbox.
  • Packet Headers: Each packet includes a header that identifies which topic it belongs to.

This approach is similar to how a single phone line can carry multiple conversations using different frequencies. By multiplexing, the system can efficiently handle different types of messages without needing separate connections for each type, reducing overhead and improving performance.

It's important to note that this multiplexing is implemented entirely in our application code logic, not relying on transport-level multiplexing like HTTP/2. This gives us complete control over the multiplexing behavior and allows us to optimize it specifically for our blockchain network requirements.

Encryption Protocol

The P2P system uses a robust encryption protocol to secure all communications:

  1. Key Exchange: When two peers connect, they use the X25519 elliptic curve Diffie-Hellman (ECDH) algorithm to establish a shared secret. This allows them to generate the same encryption keys without ever transmitting the keys themselves.

  2. Key Derivation: The shared secret is then processed using HMAC-based Key Derivation Function (HKDF) to derive:

    • An encryption key for sending messages
    • An encryption key for receiving messages
    • A nonce (number used once) for ensuring message uniqueness
  3. Message Encryption: All messages are encrypted using the ChaCha20-Poly1305 authenticated encryption algorithm, which:

    • Provides confidentiality (messages can't be read by others)
    • Ensures authenticity (messages haven't been tampered with)
    • Verifies integrity (messages arrive exactly as sent)

This encryption approach provides end-to-end security, meaning that even if someone intercepts the traffic between two nodes, they cannot read or modify the messages without knowing the encryption keys.

Component Interactions

1. Getting Started: P2P Initialization

When a node starts up, the P2P system initializes with these steps:

  • Creating the Foundation: The P2P system creates a PeerSet to manage active connections and a PeerBook to keep track of potential peers. The PeerSet uses a thread-safe map to store active connections, while the PeerBook maintains a persistent database of known peers.
  • Setting Up Communication Channels: It establishes channels for different types of messages (like blocks, transactions, or peer discovery). Each channel is implemented as a buffered Go channel with a configurable capacity to prevent memory overflow.
  • Loading Configuration: It applies settings like maximum connections, timeouts, and security parameters. The configuration includes parameters for connection limits, timeouts, and security thresholds that are enforced throughout the system.

Think of this like setting up a phone system - you need a phone book (PeerBook), a list of active calls (PeerSet), and different lines for different types of conversations (channels).

2. Making Connections: Peer Connection Flow

When nodes connect to each other, several layers of security and functionality are added:

  • Listening for Calls: The P2P system listens for incoming connection requests from other nodes using a TCP listener with rate limiting to prevent connection flooding.
  • Making Outgoing Calls: It also proactively connects to other nodes it knows about, using exponential backoff for retry attempts when connections fail.
  • Secure Handshake: When a connection is established, nodes perform a cryptographic handshake using X25519 for key exchange and HKDF for key derivation, establishing unique encryption keys for each direction of communication.
  • Multiplexing Setup: The connection is then set up to handle multiple independent communication channels using a custom protocol that includes topic identifiers and sequence numbers in packet headers.

This is similar to how a phone call works - you dial a number (or receive a call), verify who's on the other end, and then can have a secure conversation.

3. Sending Messages: Message Flow

When a node wants to send information to another node:

  • Topic Selection: The message is assigned to a specific topic (like BLOCK or PEERS_REQUEST) using a predefined enum that maps to specific message types.
  • Message Queuing: The message is queued in the appropriate communication channel with priority handling for critical messages.
  • Packetization: Large messages are broken down into smaller packets with a maximum size limit, each containing a header with topic ID, sequence number, and packet count.
  • Encryption: Each packet is encrypted using ChaCha20-Poly1305 with a unique nonce to prevent replay attacks.
  • Transmission: Packets are sent over the TCP connection with flow control to prevent overwhelming the receiver.
  • Reassembly: On the receiving end, packets are reassembled into the complete message using sequence numbers and packet counts, with timeout handling for missing packets.

This is like sending a letter - you write your message, put it in an envelope (packet), seal it (encrypt it), and send it through the mail (TCP connection). The recipient opens the envelope and reads the message.

4. Discovering Peers: Peer Discovery and Management

The P2P system uses several techniques to discover and manage peers:

  • Peer Book Exchange: Nodes periodically exchange lists of known peers with each other using a compact binary format that includes peer metadata and connection statistics.
  • Gossip Protocol: When a node learns about a new peer or receives a message, it immediately forwards that information to all of its connected peers except the one who sent it. The gossip protocol includes a hop count to prevent infinite propagation and uses bloom filters to track recently seen messages.
  • Churn Management: The system handles peers joining and leaving the network gracefully using connection timeouts and heartbeat messages to detect disconnections.
  • Reputation System: It tracks how well peers behave using metrics like message delivery success rate, response times, and protocol compliance, with automatic disconnection for peers that fall below thresholds.

The gossip protocol is particularly interesting - it's like how rumors spread in a social network. If you hear something interesting, you tell all your friends except the one who told you, and they tell all their friends except you, and soon many people know about it. This is an efficient way to disseminate information in a decentralized network without a central authority, ensuring that important updates reach all nodes quickly.

5. Security Measures

The P2P system implements several security techniques:

  • Encryption: All communication is encrypted using ChaCha20-Poly1305 with unique nonces for each message, providing both confidentiality and authenticity.
  • Rate Limiting: Messages are rate-limited using a token bucket algorithm that allows for burst handling while maintaining long-term rate limits.
  • IP Filtering: Known bad IPs and countries can be blocked using a combination of static lists and dynamic reputation scoring.
  • Connection Limits: The number of connections is limited using a configurable maximum that considers both inbound and outbound connections, with priority given to trusted peers.

These measures work together to create a secure and resilient network that can operate even when some participants are malicious.

Security Features

  • Encryption: All communication is encrypted using ChaCha20-Poly1305 with unique nonces for each message, providing both confidentiality and authenticity. The encryption keys are derived using HKDF from a shared secret established through X25519 key exchange during the initial handshake. Each message includes an authentication tag to prevent tampering.

  • Authentication: Peers are authenticated during the handshake process using their public keys, which are verified against a trusted list or through a chain of trust. The handshake includes a challenge-response mechanism to prove possession of the private key without revealing it. Failed authentication attempts are logged and can trigger temporary IP bans.

  • DOS Mitigation: Rate limiting and connection limits prevent denial of service attacks through multiple layers:

    • Token bucket algorithm for message rate limiting with configurable burst sizes
    • Connection rate limiting using a sliding window counter
    • Maximum connection limits per IP with dynamic adjustment based on reputation
    • Automatic blacklisting of IPs that exceed thresholds
    • SYN cookie protection for TCP connections
  • IP Filtering: Bad IPs and countries can be filtered out using:

    • Static blacklists of known malicious IPs
    • Dynamic reputation scoring based on behavior
    • Geographic filtering using IP geolocation
    • Automatic updates of threat intelligence feeds
    • Rate-based blocking of suspicious IP ranges
  • Reputation System: Peers with bad behavior can be disconnected based on:

    • Message delivery success rate and latency
    • Protocol compliance and message validity
    • Resource usage patterns
    • Connection stability and uptime
    • Historical behavior tracking with decay over time
    • Automatic scoring adjustments based on observed behavior

Usage

To use the P2P package:

  1. Create a new P2P instance with appropriate configuration
  2. Start the P2P service
  3. Send and receive messages through the appropriate channels
  4. Stop the P2P service when done
// Example usage
p2p := p2p.New(privateKey, maxMembersPerCommittee, metrics, config, logger)
p2p.Start()
// Use p2p.Inbox(topic) to receive messages
// Use p2p.PeerSet.SendTo() to send messages
p2p.Stop()

Documentation

Index

Constants

View Source
const (

	// "Peer Reputation Points" are actively maintained for each peer the node is connected to
	// These points allow a node to track peer behavior over its lifetime, allowing it to disconnect from faulty peers
	PollMaxHeightTimeoutS   = 1   // wait time for polling the maximum height of the peers
	SyncTimeoutS            = 5   // wait time to receive an individual block (certificate) from a peer during syncing
	MaxBlockReqPerWindow    = 20  // maximum block (certificate) requests per window per requester
	BlockReqWindowS         = 2   // the 'window of time' before resetting limits for block (certificate) requests
	GoodPeerBookRespRep     = 3   // reputation points for a good peer book response
	GoodBlockRep            = 3   // rep boost for sending us a valid block (certificate)
	UnexpectedBlockRep      = -1  // rep slash for sending us a block we weren't expecting
	InvalidMsgRep           = -3  // slash for an invalid message
	ExceedMaxPBReqRep       = -3  // slash for exceeding the max peer book requests
	UnknownMessageSlash     = -3  // unknown message type is received
	BadStreamSlash          = -3  // unknown stream id is received
	InvalidTxRep            = -3  // rep slash for sending us an invalid transaction
	InvalidBlockRep         = -3  // rep slash for sending an invalid block (certificate) message
	BlockReqExceededRep     = -3  // rep slash for over-requesting blocks (certificates)
	MaxMessageExceededSlash = -10 // slash for sending a 'Message (sum of Packets)' above the allowed maximum size
)
View Source
const (
	ErrListenerClosed = "use of closed network connection"
	ErrConnReset      = "connection reset by peer"
	ErrEOF            = "EOF"
	ErrPeer           = "Error peer"
)
View Source
const (
	MaxPeerReputation     = 10
	MinimumPeerReputation = -10
)

Variables

View Source
var (
	MaxFailedDialAttempts        = int32(10)        // maximum times a peer may fail a churn management dial attempt before evicted from the peer book
	MaxPeersExchanged            = 1                // maximum number of peers per chain that may be sent/received during a peer exchange
	MaxPeerBookRequestsPerWindow = 2                // maximum peer book request per window
	PeerBookRequestWindowS       = 30               // seconds in a peer book request
	CrawlAndCleanBookFrequency   = time.Minute * 10 // how often the book is cleaned and crawled
	SaveBookFrequency            = time.Minute * 5  // how often the book is saved to a file
)
View Source
var (
	ReadTimeout  = 40 * time.Second // this is just the default; it gets set by config upon initialization
	WriteTimeout = 80 * time.Second // this is just the default; it gets set by config upon initialization
)

Functions

func ErrBadStream

func ErrBadStream() lib.ErrorI

func ErrBannedCountry

func ErrBannedCountry(s string) lib.ErrorI

func ErrBannedID

func ErrBannedID(s string) lib.ErrorI

func ErrBannedIP

func ErrBannedIP(s string) lib.ErrorI

func ErrChunkLargerThanMax

func ErrChunkLargerThanMax() lib.ErrorI

func ErrConnDecryptFailed

func ErrConnDecryptFailed(err error) lib.ErrorI

func ErrErrorGroup

func ErrErrorGroup(err error) lib.ErrorI

func ErrFailedChallenge

func ErrFailedChallenge() lib.ErrorI

func ErrFailedDial

func ErrFailedDial(err error) lib.ErrorI

func ErrFailedDiffieHellman

func ErrFailedDiffieHellman(err error) lib.ErrorI

func ErrFailedHKDF

func ErrFailedHKDF(err error) lib.ErrorI

func ErrFailedListen

func ErrFailedListen(err error) lib.ErrorI

func ErrFailedMetaSwap

func ErrFailedMetaSwap(err error) lib.ErrorI

func ErrFailedRead

func ErrFailedRead(err error) lib.ErrorI

func ErrFailedSignatureSwap

func ErrFailedSignatureSwap(err error) lib.ErrorI

func ErrFailedWrite

func ErrFailedWrite(err error) lib.ErrorI

func ErrIPLookup

func ErrIPLookup(err error) lib.ErrorI

func ErrIncompatiblePeer

func ErrIncompatiblePeer() lib.ErrorI

func ErrInvalidPublicKey

func ErrInvalidPublicKey(err error) lib.ErrorI

func ErrIsBlacklisted

func ErrIsBlacklisted() lib.ErrorI

func ErrMaxInbound

func ErrMaxInbound() lib.ErrorI

func ErrMaxMessageSize

func ErrMaxMessageSize() lib.ErrorI

func ErrMaxOutbound

func ErrMaxOutbound() lib.ErrorI

func ErrMismatchPeerPublicKey

func ErrMismatchPeerPublicKey(expected, got []byte) lib.ErrorI

func ErrNonTCPAddress

func ErrNonTCPAddress() lib.ErrorI

func ErrPeerAlreadyExists

func ErrPeerAlreadyExists(s string) lib.ErrorI

func ErrPeerNotFound

func ErrPeerNotFound(s string) lib.ErrorI

func ErrPongTimeout

func ErrPongTimeout() lib.ErrorI

func ErrUnknownP2PMsg

func ErrUnknownP2PMsg(t proto.Message) lib.ErrorI

func PeerError

func PeerError(publicKey []byte, remoteAddr string, err error) string

Types

type BookPeer

type BookPeer struct {

	// address: is the peer address object that holds identification and metadata about the peer
	Address *lib.PeerAddress `protobuf:"bytes,1,opt,name=Address,proto3" json:"address"` // @gotags: json:"address"
	// consecutive_failed_dial: is a churn management counter that tracks the number of consecutive failures
	// enough consecutive fails, the BookPeer is evicted from the book
	ConsecutiveFailedDial int32 `protobuf:"varint,2,opt,name=consecutive_failed_dial,json=consecutiveFailedDial,proto3" json:"consecutiveFailedDial"` // @gotags: json:"consecutiveFailedDial"
	// contains filtered or unexported fields
}

BookPeer is the peer object held and saved in the peer book for persisted peer connectivity and exchange Peer or Address book: A collection of peers or network addresses that a node uses to discover and connect with other nodes in the network. This book helps nodes keep track of available peers for communication and maintaining network connectivity.

func (*BookPeer) Descriptor deprecated

func (*BookPeer) Descriptor() ([]byte, []int)

Deprecated: Use BookPeer.ProtoReflect.Descriptor instead.

func (*BookPeer) GetAddress

func (x *BookPeer) GetAddress() *lib.PeerAddress

func (*BookPeer) GetConsecutiveFailedDial

func (x *BookPeer) GetConsecutiveFailedDial() int32

func (*BookPeer) ProtoMessage

func (*BookPeer) ProtoMessage()

func (*BookPeer) ProtoReflect

func (x *BookPeer) ProtoReflect() protoreflect.Message

func (*BookPeer) Reset

func (x *BookPeer) Reset()

func (*BookPeer) String

func (x *BookPeer) String() string

type EncryptedConn

type EncryptedConn struct {
	Address *lib.PeerAddress // authenticated remote peer information
	// contains filtered or unexported fields
}

EncryptedConn is made of the underlying tcp connection, send and receive AEAD ciphers, and the peer address NOTE: receiving and sending have two distinct AEAD state objects for key / nonce management and simultaneous send/receive

func NewHandshake

func NewHandshake(conn net.Conn, meta *lib.PeerMeta, privateKey crypto.PrivateKeyI) (encryptedConn *EncryptedConn, e lib.ErrorI)

NewHandshake() executes the authentication protocol between two tcp connections to result in an encryption connection

func (*EncryptedConn) Close

func (c *EncryptedConn) Close() error

EncryptedConn satisfies the net.conn interface

func (*EncryptedConn) LocalAddr

func (c *EncryptedConn) LocalAddr() net.Addr

func (*EncryptedConn) Read

func (c *EncryptedConn) Read(data []byte) (n int, err error)

Read() checks the connection for cipher data bytes, if found, the func decrypts and loads them into the 'data' buffer

func (*EncryptedConn) RemoteAddr

func (c *EncryptedConn) RemoteAddr() net.Addr

func (*EncryptedConn) SetDeadline

func (c *EncryptedConn) SetDeadline(t time.Time) error

func (*EncryptedConn) SetReadDeadline

func (c *EncryptedConn) SetReadDeadline(t time.Time) error

func (*EncryptedConn) SetWriteDeadline

func (c *EncryptedConn) SetWriteDeadline(t time.Time) error

func (*EncryptedConn) Write

func (c *EncryptedConn) Write(data []byte) (n int, err error)

Write() writes the data bytes to the encrypted connection

type Envelope

type Envelope struct {

	// payload: is a generic proto.message
	Payload *anypb.Any `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
	// contains filtered or unexported fields
}

***************************************************************************************************** This file is auto-generated from source files in `/lib/.proto/*` using Protocol Buffers (protobuf)

Protobuf is a language-neutral, platform-neutral serialization format. It allows users to define objects in a way that’s both efficient to store and fast to transmit over the network. These definitions are compiled into code that *enables different systems and programming languages to communicate in a byte-perfect manner*

To update these structures, make changes to the source .proto files, then recompile to regenerate this file. These auto-generated files are easily recognized by checking for a `.pb.go` ending ***************************************************************************************************** _ _ _ Enveloper is a generic wrapper over a proto.message for the P2P module to handle Envelopes are first used to wrap bytes into a Packet, Ping, or Pong Then later is used to wrap a Packet.bytes into proto.Message

func (*Envelope) Descriptor deprecated

func (*Envelope) Descriptor() ([]byte, []int)

Deprecated: Use Envelope.ProtoReflect.Descriptor instead.

func (*Envelope) GetPayload

func (x *Envelope) GetPayload() *anypb.Any

func (*Envelope) ProtoMessage

func (*Envelope) ProtoMessage()

func (*Envelope) ProtoReflect

func (x *Envelope) ProtoReflect() protoreflect.Message

func (*Envelope) Reset

func (x *Envelope) Reset()

func (*Envelope) String

func (x *Envelope) String() string

type MultiConn

type MultiConn struct {
	Address *lib.PeerAddress // authenticated peer information
	// contains filtered or unexported fields
}

MultiConn: A rate-limited, multiplexed connection that utilizes a series streams with varying priority for sending and receiving

func (*MultiConn) Error

func (c *MultiConn) Error(err error, reputationDelta ...int32)

Error() when an error occurs on the MultiConn execute a callback. Optionally pass a reputation delta to slash the peer

func (*MultiConn) Send

func (c *MultiConn) Send(topic lib.Topic, bz []byte) (ok bool)

Send() queues the sending of a message to a specific Stream

func (*MultiConn) Start

func (c *MultiConn) Start()

Start() begins send and receive services for a MultiConn

func (*MultiConn) Stop

func (c *MultiConn) Stop()

Stop() sends exit signals for send and receive loops and closes the connection

type P2P

type P2P struct {
	PeerSet // active set

	MustConnectsReceiver chan []*lib.PeerAddress
	// contains filtered or unexported fields
}

func New

func New(p crypto.PrivateKeyI, maxMembersPerCommittee uint64, m *lib.Metrics, c lib.Config, l lib.LoggerI) *P2P

New() creates an initialized pointer instance of a P2P object

func (*P2P) AddPeer

func (p *P2P) AddPeer(conn net.Conn, info *lib.PeerInfo, disconnect, strictPublicKey bool) (err lib.ErrorI)

AddPeer() takes an ephemeral tcp connection and an incomplete peerInfo and attempts to create a E2E encrypted channel with a fully authenticated peer and save it to the peer set and the peer book

func (*P2P) Dial

func (p *P2P) Dial(address *lib.PeerAddress, disconnect, strictPublicKey bool) lib.ErrorI

Dial() tries to establish an outbound connection with a peer candidate

func (*P2P) DialAndDisconnect

func (p *P2P) DialAndDisconnect(a *lib.PeerAddress, strictPublicKey bool) lib.ErrorI

DialAndDisconnect() dials the peer but disconnects once a fully authenticated connection is established

func (*P2P) DialForOutboundPeers

func (p *P2P) DialForOutboundPeers()

DialForOutboundPeers() uses the config and peer book to try to max out the outbound peer connections

func (*P2P) DialWithBackoff

func (p *P2P) DialWithBackoff(peerInfo *lib.PeerAddress, strictPublicKey bool)

DialWithBackoff() dials the peer with exponential backoff retry

func (*P2P) GetBookPeers

func (p *P2P) GetBookPeers() []*BookPeer

GetBookPeers() returns all peers in the PeerBook

func (*P2P) GetInboxStats

func (p *P2P) GetInboxStats() map[lib.Topic]int

GetInboxStats returns the current message count for each inbox channel This operation is non-blocking and safe to call concurrently

func (*P2P) ID

func (p *P2P) ID() *lib.PeerAddress

ID() returns the self peer address

func (*P2P) Inbox

func (p *P2P) Inbox(topic lib.Topic) chan *lib.MessageAndMetadata

Inbox() is a getter for the multiplexed stream with a specific topic

func (*P2P) IsSelf

func (p *P2P) IsSelf(a *lib.PeerAddress) bool

IsSelf() returns if the peer address public key equals the self public key

func (*P2P) ListenForInboundPeers

func (p *P2P) ListenForInboundPeers(listenAddress *lib.PeerAddress)

ListenForInboundPeers() starts a rate-limited tcp listener service to accept inbound peers

func (*P2P) ListenForMustConnects

func (p *P2P) ListenForMustConnects()

ListenForMustConnects() is an internal listener that receives 'must connect peers' updates from the controller

func (*P2P) ListenForPeerBookRequests

func (p *P2P) ListenForPeerBookRequests()

ListenForPeerBookRequests()

func (*P2P) ListenForPeerBookResponses

func (p *P2P) ListenForPeerBookResponses()

func (*P2P) MaxPossibleInbound

func (p *P2P) MaxPossibleInbound() int

MaxPossibleInbound() sums the MaxIn, MaxCommitteeConnects and trusted peer IDs

func (*P2P) MaxPossibleOutbound

func (p *P2P) MaxPossibleOutbound() int

MaxPossibleOutbound() sums the MaxIn, MaxCommitteeConnects and trusted peer IDs

func (*P2P) MaxPossiblePeers

func (p *P2P) MaxPossiblePeers() int

MaxPossiblePeers() sums the MaxIn, MaxOut, MaxCommitteeConnects and trusted peer IDs

func (*P2P) MonitorInboxStats

func (p *P2P) MonitorInboxStats(interval time.Duration)

MonitorInboxStats continuously monitors and logs inbox channel depths without blocking message processing. Safe to run as a goroutine.

func (*P2P) NewConnection

func (p *P2P) NewConnection(conn net.Conn) (*MultiConn, lib.ErrorI)

NewConnection() creates and starts a new instance of a MultiConn

func (*P2P) NewStreams

func (p *P2P) NewStreams() (streams map[lib.Topic]*Stream)

NewStreams() creates map of streams for the multiplexing architecture

func (*P2P) OnPeerError

func (p *P2P) OnPeerError(err error, publicKey []byte, remoteAddr string, uuid uint64)

OnPeerError() callback to P2P when a peer errors

func (*P2P) SelfSend

func (p *P2P) SelfSend(fromPublicKey []byte, topic lib.Topic, payload proto.Message) lib.ErrorI

SelfSend() executes an internal pipe send to self

func (*P2P) SendPeerBookRequests

func (p *P2P) SendPeerBookRequests()

SendPeerBookRequests() is the requesting service of the peer exchange Sends a peer request out to a random peer and waits PeerBookRequestTimeoutS for a response

func (*P2P) Start

func (p *P2P) Start()

Start() begins the P2P service

func (*P2P) StartPeerBookService

func (p *P2P) StartPeerBookService()

StartPeerBookService() begins: - Peer Exchange service: exchange known peers with currently active set - Churn management service: evict inactive peers from the book - File save service: persist the book.json file periodically

func (*P2P) Stop

func (p *P2P) Stop()

Stop() stops the P2P service

func (*P2P) WaitForMinimumPeers

func (p *P2P) WaitForMinimumPeers()

WaitForMinimumPeers() doesn't return until the minimum peer count is reached This may be useful when coordinating network starts

type Packet

type Packet struct {

	// stream_id: the identifier of the stream/topic that this packet belongs to, used for multiplexing
	StreamId lib.Topic `protobuf:"varint,1,opt,name=stream_id,json=streamId,proto3,enum=types.Topic" json:"streamID"` // @gotags: json:"streamID"
	// eof: indicates whether this is the last packet of the message (EOF = true) or if more packets will follow (EOF = false)
	Eof bool `protobuf:"varint,2,opt,name=eof,proto3" json:"eof,omitempty"`
	// bytes: the actual message data transferred in this packet. It could represent the entire message or just a part of it
	Bytes []byte `protobuf:"bytes,3,opt,name=bytes,proto3" json:"bytes,omitempty"`
	// contains filtered or unexported fields
}

A Packet is a part of a message (or the entire message) that is associated with a specific stream topic and includes an EOF signal

func (*Packet) Descriptor deprecated

func (*Packet) Descriptor() ([]byte, []int)

Deprecated: Use Packet.ProtoReflect.Descriptor instead.

func (*Packet) GetBytes

func (x *Packet) GetBytes() []byte

func (*Packet) GetEof

func (x *Packet) GetEof() bool

func (*Packet) GetStreamId

func (x *Packet) GetStreamId() lib.Topic

func (*Packet) ProtoMessage

func (*Packet) ProtoMessage()

func (*Packet) ProtoReflect

func (x *Packet) ProtoReflect() protoreflect.Message

func (*Packet) Reset

func (x *Packet) Reset()

func (*Packet) String

func (x *Packet) String() string

type Peer

type Peer struct {
	*lib.PeerInfo // authenticated information of the peer
	// contains filtered or unexported fields
}

Peer is a multiplexed connection + authenticated peer information

type PeerBook

type PeerBook struct {
	Book     []*BookPeer `json:"book"`     // persisted list of peers
	BookSize int         `json:"bookSize"` // number of peers in the book
	// contains filtered or unexported fields
}

PeerBook is a persisted structure that maintains information on potential peers

func NewPeerBook

func NewPeerBook(publicKey []byte, c lib.Config, l lib.LoggerI) *PeerBook

NewPeerBook() instantiates a PeerBook object from a file, it creates a file if none exist

func (*PeerBook) Add

func (p *PeerBook) Add(peer *BookPeer)

Add() adds a peer to the book in sorted order by public key

func (*PeerBook) AddFailedDialAttempt

func (p *PeerBook) AddFailedDialAttempt(address *lib.PeerAddress)

AddFailedDialAttempt() increments the failed dial attempt counter for a BookPeer

func (*PeerBook) DeleteAtIndex

func (p *PeerBook) DeleteAtIndex(address *lib.PeerAddress)

AddFailedDialAttempt() increments the failed dial attempt counter for a BookPeer

func (*PeerBook) GetAll

func (p *PeerBook) GetAll() (res []*BookPeer)

GetAll() returns a snapshot of all peers in the book

func (*PeerBook) GetBookSize

func (p *PeerBook) GetBookSize() int

GetBookSize() returns the book peer count

func (*PeerBook) GetRandom

func (p *PeerBook) GetRandom() *BookPeer

GetRandom() returns a random peer from the Book that has a specific chain in the Meta

func (*PeerBook) Remove

func (p *PeerBook) Remove(address *lib.PeerAddress)

Remove() a peer from the book

func (*PeerBook) ResetFailedDialAttempts

func (p *PeerBook) ResetFailedDialAttempts(address *lib.PeerAddress)

ResetFailedDialAttempts() resets the failed dial attempt count for the peer

func (*PeerBook) SaveRoutine

func (p *PeerBook) SaveRoutine()

SaveRoutine() periodically saves the book to a json file

func (*PeerBook) StartChurnManagement

func (p *PeerBook) StartChurnManagement(dialAndDisconnect func(a *lib.PeerAddress, strictPublicKey bool) lib.ErrorI)

StartChurnManagement() evicts inactive peers from the PeerBook by periodically attempting to connect with each peer

func (*PeerBook) WriteToFile

func (p *PeerBook) WriteToFile() error

WriteToFile() saves the peer book object to a json file

type PeerBookRequestMessage

type PeerBookRequestMessage struct {
	// contains filtered or unexported fields
}

PeerBookRequest is a peer exchange request message that enables new peer discovery via swapping

func (*PeerBookRequestMessage) Descriptor deprecated

func (*PeerBookRequestMessage) Descriptor() ([]byte, []int)

Deprecated: Use PeerBookRequestMessage.ProtoReflect.Descriptor instead.

func (*PeerBookRequestMessage) ProtoMessage

func (*PeerBookRequestMessage) ProtoMessage()

func (*PeerBookRequestMessage) ProtoReflect

func (x *PeerBookRequestMessage) ProtoReflect() protoreflect.Message

func (*PeerBookRequestMessage) Reset

func (x *PeerBookRequestMessage) Reset()

func (*PeerBookRequestMessage) String

func (x *PeerBookRequestMessage) String() string

type PeerBookResponseMessage

type PeerBookResponseMessage struct {

	// book: randomly selected peers to exchange with the requester peer
	Book []*BookPeer `protobuf:"bytes,1,rep,name=book,proto3" json:"book,omitempty"`
	// contains filtered or unexported fields
}

PeerBookResponseMessage is a peer exchange response message sent back after receiving a PeerBookRequestMessage The peer will select some random peers from their PeerBook to reply with

func (*PeerBookResponseMessage) Descriptor deprecated

func (*PeerBookResponseMessage) Descriptor() ([]byte, []int)

Deprecated: Use PeerBookResponseMessage.ProtoReflect.Descriptor instead.

func (*PeerBookResponseMessage) GetBook

func (x *PeerBookResponseMessage) GetBook() []*BookPeer

func (*PeerBookResponseMessage) ProtoMessage

func (*PeerBookResponseMessage) ProtoMessage()

func (*PeerBookResponseMessage) ProtoReflect

func (x *PeerBookResponseMessage) ProtoReflect() protoreflect.Message

func (*PeerBookResponseMessage) Reset

func (x *PeerBookResponseMessage) Reset()

func (*PeerBookResponseMessage) String

func (x *PeerBookResponseMessage) String() string

type PeerSet

type PeerSet struct {
	sync.RWMutex // read / write mutex
	// contains filtered or unexported fields
}

PeerSet is the structure that maintains the connections and metadata of connected peers

func NewPeerSet

func NewPeerSet(c lib.Config, priv crypto.PrivateKeyI, metrics *lib.Metrics, logger lib.LoggerI) PeerSet

func (*PeerSet) Add

func (ps *PeerSet) Add(p *Peer) (err lib.ErrorI)

Add() introduces a peer to the set

func (*PeerSet) ChangeReputation

func (ps *PeerSet) ChangeReputation(publicKey []byte, delta int32)

ChangeReputation() updates the peer reputation +/- based on the int32 delta

func (*PeerSet) GetAllInfos

func (ps *PeerSet) GetAllInfos() (res []*lib.PeerInfo, numInbound, numOutbound int)

GetAllInfos() returns the information on connected peers and the total inbound / outbound counts

func (*PeerSet) GetPeerInfo

func (ps *PeerSet) GetPeerInfo(publicKey []byte) (*lib.PeerInfo, lib.ErrorI)

GetPeerInfo() returns a copy of the authenticated information from the peer structure

func (*PeerSet) Has

func (ps *PeerSet) Has(publicKey []byte) bool

Has() returns if the set has a peer with a specific public key

func (*PeerSet) IsMustConnect

func (ps *PeerSet) IsMustConnect(publicKey []byte) bool

IsMustConnect() checks if a peer is on the must-connect list

func (*PeerSet) PeerCount

func (ps *PeerSet) PeerCount() int

PeerCount() returns the total number of peers

func (*PeerSet) Remove

func (ps *PeerSet) Remove(publicKey []byte, uuid uint64) (err lib.ErrorI)

Remove() evicts a peer from the set

func (*PeerSet) SendTo

func (ps *PeerSet) SendTo(publicKey []byte, topic lib.Topic, msg proto.Message) lib.ErrorI

SendTo() sends a message to a specific peer based on their public key

func (*PeerSet) SendToPeers

func (ps *PeerSet) SendToPeers(topic lib.Topic, msg proto.Message, excludeKeys ...string) lib.ErrorI

SendToPeers() sends a message to all peers

func (*PeerSet) SendToRandPeer

func (ps *PeerSet) SendToRandPeer(topic lib.Topic, msg proto.Message) (*lib.PeerInfo, lib.ErrorI)

SendToRandPeer() sends a message to any random peer on the list

func (*PeerSet) Stop

func (ps *PeerSet) Stop()

Stop() stops the entire peer set

func (*PeerSet) UpdateMustConnects

func (ps *PeerSet) UpdateMustConnects(mustConnect []*lib.PeerAddress) (toDial []*lib.PeerAddress)

UpdateMustConnects() updates the list of peers that 'must be connected to' Ex. the peers needed to complete committee consensus

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream: an independent, bidirectional communication channel that is scoped to a single topic. In a multiplexed connection there is typically more than one stream per connection

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL