node

package
v0.0.0-...-da72ffe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2025 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BulkProtocolSuffix = "/bulk/1.0.0"
)

Protocol IDs for P2P distribution

View Source
const (
	NodeStateType state.StateType = "node"
)

Define service and component state types.

View Source
const P2PDistributorStateType state.StateType = "p2p_distributor"

P2PDistributorStateType is the type identifier for P2PDistributor state

Variables

This section is empty.

Functions

This section is empty.

Types

type DbReadHandler

type DbReadHandler struct {
	// contains filtered or unexported fields
}

DbReadHandler struct with MDBX database passed in

func NewDbReadHandler

func NewDbReadHandler(db db.Provider, logger logger.Logger, obs *observability.Observability, distributor *P2PDistributor) *DbReadHandler

NewDbReadHandler creates a new DbReadHandler with an MDBX database

func (*DbReadHandler) Handle

func (rh *DbReadHandler) Handle(conn transports.Connection, frame []byte)

Handle processes the incoming message using the DbReadHandler

type DbWriteHandler

type DbWriteHandler struct {
	// contains filtered or unexported fields
}

DbWriteHandler struct with MDBX database passed in

func NewDbWriteHandler

func NewDbWriteHandler(db db.Provider, batchWriter *db.BatchWriter, logger logger.Logger, obs *observability.Observability, distributor *P2PDistributor) *DbWriteHandler

NewDbWriteHandler creates a new DbWriteHandler with an MDBX database

func (*DbWriteHandler) ForceFlush

func (wh *DbWriteHandler) ForceFlush()

ForceFlush forces the BatchWriter to flush any pending writes. This is particularly useful for testing scenarios where immediate persistence is needed.

func (*DbWriteHandler) Handle

func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte)

Handle processes the incoming message using the TCPWriteHandler

type DistributionStats

type DistributionStats struct {
	RecordsDistributed int64
	BytesDistributed   int64
	BatchesSent        int64
	TransmissionErrors int64
	AverageLatencyMs   int64
	// contains filtered or unexported fields
}

DistributionStats tracks performance metrics

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node encapsulates all components of a PeerDNS node.

func NewNode

func NewNode(
	ctx context.Context, config config.Config, rbacMgr *rbac.Manager, logger logger.Logger,
	store *accounts.Store, obs *observability.Observability, stateMgr *state.StateManager,
	dbM *db.Manager, batchWriter *db.BatchWriter,
) (*Node, error)

NewNode initializes and returns a new Node.

func (*Node) Account

func (n *Node) Account() *accounts.Account

Account returns the node's account.

func (*Node) ActorSet

func (n *Node) ActorSet() *share.ActorSet

ActorSet returns the node's actor set.

func (*Node) Collector

func (n *Node) Collector() *metrics.Collector

Collector returns the node's metrics collector.

func (*Node) Discovery

func (n *Node) Discovery() *networking.DiscoveryService

Discovery returns the node's discovery service.

func (*Node) DistributeRecord

func (n *Node) DistributeRecord(key [32]byte, value []byte) error

DistributeRecord adds a record to be distributed across the P2P network

func (*Node) DistributeRecordToPeer

func (n *Node) DistributeRecordToPeer(key [32]byte, value []byte, peerID peer.ID) error

DistributeRecordToPeer sends a record directly to a specific peer

func (*Node) DistributeRecordWithPriority

func (n *Node) DistributeRecordWithPriority(key [32]byte, value []byte, priority types.Priority, target types.Target) error

DistributeRecordWithPriority adds a record with specified priority and target

func (*Node) Distributor

func (n *Node) Distributor() *P2PDistributor

Distributor returns the node's P2P distributor component

func (*Node) Network

func (n *Node) Network() *networking.Network

Network returns the node's network.

func (*Node) Observability

func (n *Node) Observability() *observability.Observability

Observability returns the node's observability.

func (*Node) PerformanceMonitor

func (n *Node) PerformanceMonitor() *metrics.PerformanceMonitor

PerformanceMonitor returns the node's performance monitor.

func (*Node) RbacManager

func (n *Node) RbacManager() *rbac.Manager

RbacManager returns the node's RBAC manager.

func (*Node) Shutdown

func (n *Node) Shutdown() error

Shutdown gracefully shuts down the node.

func (*Node) Start

func (n *Node) Start() error

func (*Node) StateManager

func (n *Node) StateManager() *state.StateManager

StateManager returns the node's state manager.

func (*Node) Store

func (n *Node) Store() *accounts.Store

Store returns the node's account store.

func (*Node) Topology

func (n *Node) Topology() *topology.Topology

Topology returns the node's topology.

type P2PDistributor

type P2PDistributor struct {
	// contains filtered or unexported fields
}

P2PDistributor handles efficient record distribution across the network

func NewP2PDistributor

func NewP2PDistributor(node *Node, batchSize int) *P2PDistributor

NewP2PDistributor creates a new P2P distributor

func (*P2PDistributor) DistributeRecord

func (d *P2PDistributor) DistributeRecord(key [32]byte, value []byte, priority types.Priority, target types.Target) error

DistributeRecord adds a record to the appropriate distribution queue

func (*P2PDistributor) DistributeRecordToPeer

func (d *P2PDistributor) DistributeRecordToPeer(key [32]byte, value []byte, peerID peer.ID, priority types.Priority) error

DistributeRecordToPeer sends a record directly to a specific peer

func (*P2PDistributor) GetStats

func (d *P2PDistributor) GetStats() DistributionStats

GetStats returns the current distribution statistics

func (*P2PDistributor) HandleBulkStream

func (d *P2PDistributor) HandleBulkStream(s network.Stream)

HandleBulkStream processes large data streams sent through the bulk protocol It bypasses signature verification for performance reasons when handling large payloads

func (*P2PDistributor) HandleRecordBatchPacket

func (d *P2PDistributor) HandleRecordBatchPacket(ctx context.Context, packet *packets.NetworkPacket, sender peer.ID) error

HandleRecordBatchPacket processes incoming record batches from the network

func (*P2PDistributor) Start

func (d *P2PDistributor) Start() error

Start begins processing the distribution queues

func (*P2PDistributor) Stop

func (d *P2PDistributor) Stop()

Stop gracefully halts all distribution processes

type RecordBatch

type RecordBatch struct {
	Records    []db.WriteRequest
	Priority   types.Priority
	Target     types.Target
	TargetPeer *peer.ID // Only set if Target is TargetDirectPeer
}

RecordBatch represents a batch of records to be distributed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL