p2p

package module
v1.21.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: BSD-3-Clause Imports: 18 Imported by: 17

README

p2p

The p2p package provides a low-level networking abstraction for peer-to-peer communication in the Lux ecosystem.

Installation

go get github.com/luxfi/p2p

Core Interfaces

Handler

The Handler interface defines how incoming messages are processed:

type Handler interface {
    // Gossip handles an incoming gossip message
    Gossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte)
    // Request handles an incoming request and returns a response or error
    Request(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *Error)
}
Sender

The Sender interface defines how outgoing messages are sent:

type Sender interface {
    SendRequest(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) error
    SendResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error
    SendError(ctx context.Context, nodeID ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error
    SendGossip(ctx context.Context, config SendConfig, msg []byte) error
}
Error

The Error type represents application-level errors:

type Error struct {
    Code    int32
    Message string
}

Standard errors:

  • ErrUnexpected (-1): Generic unexpected error
  • ErrUnregisteredHandler (-2): No handler registered for protocol
  • ErrNotValidator (-3): Requesting peer is not a validator
  • ErrThrottled (-4): Request rate limit exceeded

Sub-packages

gossip

Implements gossip protocols for efficient data propagation:

import "github.com/luxfi/p2p/gossip"

// Create a push gossiper
gossiper := gossip.NewPushGossiper[MyGossipable](...)

// Start gossiping
gossiper.Gossip(ctx)
lp118

Implements LP-118 warp message signature handling:

import "github.com/luxfi/p2p/lp118"

// Create a signature aggregator
aggregator := lp118.NewSignatureAggregator(log, client)

// Aggregate signatures for a warp message
msg, stake, total, err := aggregator.AggregateSignatures(ctx, message, justification, validators, quorumNum, quorumDen)

Usage with warp

The warp package re-exports types from p2p for backward compatibility:

import "github.com/luxfi/warp"

// warp.Error is an alias for p2p.Error
// warp.Sender is an alias for p2p.Sender
// warp.SendConfig is an alias for p2p.SendConfig

License

See LICENSE file.

Documentation

Index

Constants

View Source
const (
	TxGossipHandlerID = iota
	AtomicTxGossipHandlerID
	// SignatureRequestHandlerID is specified in LP-118
	SignatureRequestHandlerID
)

Standardized identifiers for application protocol handlers

Variables

View Source
var (
	ErrRequestPending = errors.New("request pending")
	ErrNoPeers        = errors.New("no peers")
)
View Source
var (
	// ErrUnexpected should be used to indicate that a request failed due to a
	// generic error
	ErrUnexpected = &Error{
		Code:    -1,
		Message: "unexpected error",
	}
	// ErrUnregisteredHandler should be used to indicate that a request failed
	// due to it not matching a registered handler
	ErrUnregisteredHandler = &Error{
		Code:    -2,
		Message: "unregistered handler",
	}
	// ErrNotValidator should be used to indicate that a request failed due to
	// the requesting peer not being a validator
	ErrNotValidator = &Error{
		Code:    -3,
		Message: "not a validator",
	}
	// ErrThrottled should be used to indicate that a request failed due to the
	// requesting peer exceeding a rate limit
	ErrThrottled = &Error{
		Code:    -4,
		Message: "throttled",
	}
)
View Source
var (
	ErrExistingAppProtocol = errors.New("existing app protocol")
	ErrUnrequestedResponse = errors.New("unrequested response")
)

Functions

func ParseMessage

func ParseMessage(msg []byte) (uint64, []byte, bool)

Parse a gossip or request message.

Returns: - The protocol ID. - The unprefixed protocol message. - A boolean indicating that parsing succeeded.

func PrefixMessage

func PrefixMessage(prefix, msg []byte) []byte

PrefixMessage prefixes the original message with the protocol identifier.

Only gossip and request messages need to be prefixed. Response messages don't need to be prefixed because request ids are tracked which map to the expected response handler.

func ProtocolPrefix

func ProtocolPrefix(handlerID uint64) []byte

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) Gossip

func (c *Client) Gossip(
	ctx context.Context,
	config SendConfig,
	gossipBytes []byte,
) error

Gossip sends a gossip message to a random set of peers.

func (*Client) Request

func (c *Client) Request(
	ctx context.Context,
	nodeIDs set.Set[ids.NodeID],
	requestBytes []byte,
	onResponse ResponseCallback,
) error

Request issues an arbitrary request to a node. [onResponse] is invoked upon an error or a response.

func (*Client) RequestAny

func (c *Client) RequestAny(
	ctx context.Context,
	requestBytes []byte,
	onResponse ResponseCallback,
) error

RequestAny issues a request to an arbitrary node decided by Client. If a specific node needs to be requested, use Request instead. See Request for more docs.

type ClientOption

type ClientOption interface {
	// contains filtered or unexported methods
}

ClientOption configures Client

func WithValidatorSampling

func WithValidatorSampling(validators *Validators) ClientOption

WithValidatorSampling configures Client.RequestAny to sample validators. If validators is nil, the default PeerSampler is kept.

type Clock

type Clock interface {
	Time() time.Time
}

Clock interface for testability

type Error

type Error struct {
	Code    int32
	Message string
}

Error represents an application-level error for peer messaging

func (*Error) Error

func (e *Error) Error() string

Error implements the error interface

type Handler

type Handler interface {
	// Gossip is called when handling a gossip message.
	Gossip(
		ctx context.Context,
		nodeID ids.NodeID,
		gossipBytes []byte,
	)
	// Request is called when handling a request message.
	// Sends a response with the response corresponding to requestBytes or
	// an application-defined error.
	Request(
		ctx context.Context,
		nodeID ids.NodeID,
		deadline time.Time,
		requestBytes []byte,
	) ([]byte, *Error)
}

Handler is the server-side logic for virtual machine application protocols.

type Network

type Network struct {
	Peers *Peers
	// contains filtered or unexported fields
}

Network exposes networking state and supports building p2p application protocols

func NewNetwork

func NewNetwork(
	log log.Logger,
	sender Sender,
	registerer metric.Registerer,
	namespace string,
) (*Network, error)

NewNetwork returns an instance of Network

func (*Network) AddHandler

func (n *Network) AddHandler(handlerID uint64, handler Handler) error

AddHandler reserves an identifier for an application protocol

func (*Network) Connected

func (n *Network) Connected(_ context.Context, nodeID ids.NodeID, _ *consensusversion.Application) error

func (*Network) Disconnected

func (n *Network) Disconnected(_ context.Context, nodeID ids.NodeID) error

func (*Network) Gossip

func (n *Network) Gossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error

func (*Network) NewClient

func (n *Network) NewClient(handlerID uint64, options ...ClientOption) *Client

NewClient returns a Client that can be used to send messages for the corresponding protocol.

func (*Network) Request

func (n *Network) Request(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) ([]byte, *Error)

func (*Network) RequestFailed

func (n *Network) RequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32, appErr *Error) error

func (*Network) Response

func (n *Network) Response(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error

type NoOpHandler

type NoOpHandler struct{}

NoOpHandler drops all messages

func (NoOpHandler) Gossip

func (NoOpHandler) Gossip(context.Context, ids.NodeID, []byte)

func (NoOpHandler) Request

func (NoOpHandler) Request(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *Error)

type NodeSampler

type NodeSampler interface {
	// Sample returns at most [limit] nodes. This may return fewer nodes if
	// fewer than [limit] are available.
	Sample(ctx context.Context, limit int) []ids.NodeID
}

NodeSampler samples nodes in network

type PeerSampler

type PeerSampler struct {
	Peers *Peers
}

PeerSampler implements NodeSampler

func (PeerSampler) Sample

func (p PeerSampler) Sample(_ context.Context, limit int) []ids.NodeID

type PeerTracker

type PeerTracker struct {
	// contains filtered or unexported fields
}

Tracks the bandwidth of responses coming from peers, preferring to contact peers with known good bandwidth, connecting to new peers with an exponentially decaying probability.

func NewPeerTracker

func NewPeerTracker(
	log log.Logger,
	metricsNamespace string,
	registerer metric.Registerer,
	ignoredNodes set.Set[ids.NodeID],
	minVersion *consensusversion.Application,
) (*PeerTracker, error)

func (*PeerTracker) Connected

func (p *PeerTracker) Connected(nodeID ids.NodeID, nodeVersion *consensusversion.Application)

Connected should be called when [nodeID] connects to this node.

func (*PeerTracker) Disconnected

func (p *PeerTracker) Disconnected(nodeID ids.NodeID)

Disconnected should be called when [nodeID] disconnects from this node.

func (*PeerTracker) RegisterFailure

func (p *PeerTracker) RegisterFailure(nodeID ids.NodeID)

Record that a request failed to [nodeID].

func (*PeerTracker) RegisterRequest

func (p *PeerTracker) RegisterRequest(nodeID ids.NodeID)

Record that we sent a request to [nodeID].

func (*PeerTracker) RegisterResponse

func (p *PeerTracker) RegisterResponse(nodeID ids.NodeID, bandwidth float64)

Record that we observed that [nodeID]'s bandwidth is [bandwidth].

func (*PeerTracker) SelectPeer

func (p *PeerTracker) SelectPeer() (ids.NodeID, bool)

SelectPeer that we could send a request to.

func (*PeerTracker) Size

func (p *PeerTracker) Size() int

Returns the number of peers the node is connected to.

type Peers

type Peers struct {
	// contains filtered or unexported fields
}

Peers contains metadata about the current set of connected peers

func (*Peers) Sample

func (p *Peers) Sample(limit int) []ids.NodeID

Sample returns a pseudo-random sample of up to limit Peers

type ResponseCallback

type ResponseCallback func(
	ctx context.Context,
	nodeID ids.NodeID,
	responseBytes []byte,
	err error,
)

ResponseCallback is called upon receiving a response for a request issued by Client. Callers should check [err] to see whether the request failed or not.

type SendConfig

type SendConfig struct {
	// NodeIDs specifies specific nodes to send to
	NodeIDs set.Set[ids.NodeID]

	// Validators is the number of validators to sample and send to
	Validators int

	// NonValidators is the number of non-validators to sample and send to
	NonValidators int

	// Peers is the number of peers to sample and send to
	Peers int
}

SendConfig configures how a gossip message is sent

type Sender

type Sender interface {
	// SendRequest sends a request to the specified nodes
	SendRequest(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) error

	// SendResponse sends a response to a previous request
	SendResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error

	// SendError sends an error response to a previous request
	SendError(ctx context.Context, nodeID ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error

	// SendGossip sends a gossip message
	SendGossip(ctx context.Context, config SendConfig, msg []byte) error
}

Sender sends messages to other nodes

type SlidingWindowThrottler

type SlidingWindowThrottler struct {
	// contains filtered or unexported fields
}

SlidingWindowThrottler is an implementation of the sliding window throttling algorithm.

func NewSlidingWindowThrottler

func NewSlidingWindowThrottler(period time.Duration, limit int) *SlidingWindowThrottler

NewSlidingWindowThrottler returns a new instance of SlidingWindowThrottler. Nodes are throttled if they exceed [limit] messages during an interval of time over [period]. [period] and [limit] should both be > 0.

func (*SlidingWindowThrottler) Handle

func (s *SlidingWindowThrottler) Handle(nodeID ids.NodeID) bool

Handle returns true if the amount of calls received in the last [s.period] time is less than [s.limit]

This is calculated by adding the current period's count to a weighted count of the previous period.

type TestHandler

type TestHandler struct {
	GossipF  func(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte)
	RequestF func(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *Error)
}

func (TestHandler) Gossip

func (t TestHandler) Gossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte)

func (TestHandler) Request

func (t TestHandler) Request(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *Error)

type Throttler

type Throttler interface {
	// Handle returns true if a message from [nodeID] should be handled.
	Handle(nodeID ids.NodeID) bool
}

type ThrottlerHandler

type ThrottlerHandler struct {
	// contains filtered or unexported fields
}

func NewThrottlerHandler

func NewThrottlerHandler(handler Handler, throttler Throttler, log log.Logger) *ThrottlerHandler

func (ThrottlerHandler) Gossip

func (t ThrottlerHandler) Gossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte)

func (ThrottlerHandler) Request

func (t ThrottlerHandler) Request(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *Error)

type ValidatorHandler

type ValidatorHandler struct {
	// contains filtered or unexported fields
}

ValidatorHandler drops messages from non-validators

func NewValidatorHandler

func NewValidatorHandler(
	handler Handler,
	validatorSet ValidatorSet,
	log log.Logger,
) *ValidatorHandler

func (ValidatorHandler) Gossip

func (v ValidatorHandler) Gossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte)

func (ValidatorHandler) Request

func (v ValidatorHandler) Request(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *Error)

type ValidatorSet

type ValidatorSet interface {
	Has(ctx context.Context, nodeID ids.NodeID) bool // TODO return error
}

type ValidatorSubset

type ValidatorSubset interface {
	Top(ctx context.Context, percentage float64) []ids.NodeID // TODO return error
}

type Validators

type Validators struct {
	// contains filtered or unexported fields
}

Validators contains a set of nodes that are staking.

func NewValidators

func NewValidators(
	peers *Peers,
	log log.Logger,
	netID ids.ID,
	validators validators.State,
	maxValidatorSetStaleness time.Duration,
) *Validators

func (*Validators) Has

func (v *Validators) Has(ctx context.Context, nodeID ids.NodeID) bool

Has returns if nodeID is a connected validator

func (*Validators) Sample

func (v *Validators) Sample(ctx context.Context, limit int) []ids.NodeID

Sample returns a random sample of connected validators

func (*Validators) Top

func (v *Validators) Top(ctx context.Context, percentage float64) []ids.NodeID

Top returns the top [percentage] of validators, regardless of if they are connected or not.

Directories

Path Synopsis
Package lp118 implements the Lux Protocol 118 (LP118) for warp message handling.
Package lp118 implements the Lux Protocol 118 (LP118) for warp message handling.
Package message is a generated GoMock package.
Package message is a generated GoMock package.
messagemock
Package messagemock is a generated GoMock package.
Package messagemock is a generated GoMock package.
Package peer is a generated GoMock package.
Package peer is a generated GoMock package.
proto
p2p
Package p2p re-exports ZAP types for default builds.
Package p2p re-exports ZAP types for default builds.
pb/p2p
Package p2p re-exports ZAP types for default builds.
Package p2p re-exports ZAP types for default builds.
zap/p2p
Package p2p provides ZAP-encoded p2p message types.
Package p2p provides ZAP-encoded p2p message types.
trackermock
Package trackermock provides mock implementations for testing
Package trackermock provides mock implementations for testing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL