Documentation
¶
Index ¶
- Constants
- type DbReadHandler
- type DbWriteHandler
- type DistributionStats
- type Node
- func (n *Node) Account() *accounts.Account
- func (n *Node) ActorSet() *share.ActorSet
- func (n *Node) Collector() *metrics.Collector
- func (n *Node) Discovery() *networking.DiscoveryService
- func (n *Node) DistributeRecord(key [32]byte, value []byte) error
- func (n *Node) DistributeRecordToPeer(key [32]byte, value []byte, peerID peer.ID) error
- func (n *Node) DistributeRecordWithPriority(key [32]byte, value []byte, priority types.Priority, target types.Target) error
- func (n *Node) Distributor() *P2PDistributor
- func (n *Node) Network() *networking.Network
- func (n *Node) Observability() *observability.Observability
- func (n *Node) PerformanceMonitor() *metrics.PerformanceMonitor
- func (n *Node) RbacManager() *rbac.Manager
- func (n *Node) Shutdown() error
- func (n *Node) Start() error
- func (n *Node) StateManager() *state.StateManager
- func (n *Node) Store() *accounts.Store
- func (n *Node) Topology() *topology.Topology
- type P2PDistributor
- func (d *P2PDistributor) DistributeRecord(key [32]byte, value []byte, priority types.Priority, target types.Target) error
- func (d *P2PDistributor) DistributeRecordToPeer(key [32]byte, value []byte, peerID peer.ID, priority types.Priority) error
- func (d *P2PDistributor) GetStats() DistributionStats
- func (d *P2PDistributor) HandleBulkStream(s network.Stream)
- func (d *P2PDistributor) HandleRecordBatchPacket(ctx context.Context, packet *packets.NetworkPacket, sender peer.ID) error
- func (d *P2PDistributor) Start() error
- func (d *P2PDistributor) Stop()
- type RecordBatch
Constants ¶
const (
BulkProtocolSuffix = "/bulk/1.0.0"
)
Protocol IDs for P2P distribution
const (
NodeStateType state.StateType = "node"
)
Define service and component state types.
const P2PDistributorStateType state.StateType = "p2p_distributor"
P2PDistributorStateType is the type identifier for P2PDistributor state
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DbReadHandler ¶
type DbReadHandler struct {
// contains filtered or unexported fields
}
DbReadHandler struct with MDBX database passed in
func NewDbReadHandler ¶
func NewDbReadHandler(db db.Provider, logger logger.Logger, obs *observability.Observability, distributor *P2PDistributor) *DbReadHandler
NewDbReadHandler creates a new DbReadHandler with an MDBX database
func (*DbReadHandler) Handle ¶
func (rh *DbReadHandler) Handle(conn transports.Connection, frame []byte)
Handle processes the incoming message using the DbReadHandler
type DbWriteHandler ¶
type DbWriteHandler struct {
// contains filtered or unexported fields
}
DbWriteHandler struct with MDBX database passed in
func NewDbWriteHandler ¶
func NewDbWriteHandler(db db.Provider, batchWriter *db.BatchWriter, logger logger.Logger, obs *observability.Observability, distributor *P2PDistributor) *DbWriteHandler
NewDbWriteHandler creates a new DbWriteHandler with an MDBX database
func (*DbWriteHandler) ForceFlush ¶
func (wh *DbWriteHandler) ForceFlush()
ForceFlush forces the BatchWriter to flush any pending writes. This is particularly useful for testing scenarios where immediate persistence is needed.
func (*DbWriteHandler) Handle ¶
func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte)
Handle processes the incoming message using the TCPWriteHandler
type DistributionStats ¶
type DistributionStats struct {
RecordsDistributed int64
BytesDistributed int64
BatchesSent int64
TransmissionErrors int64
AverageLatencyMs int64
// contains filtered or unexported fields
}
DistributionStats tracks performance metrics
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node encapsulates all components of a PeerDNS node.
func NewNode ¶
func NewNode( ctx context.Context, config config.Config, rbacMgr *rbac.Manager, logger logger.Logger, store *accounts.Store, obs *observability.Observability, stateMgr *state.StateManager, dbM *db.Manager, batchWriter *db.BatchWriter, ) (*Node, error)
NewNode initializes and returns a new Node.
func (*Node) Discovery ¶
func (n *Node) Discovery() *networking.DiscoveryService
Discovery returns the node's discovery service.
func (*Node) DistributeRecord ¶
DistributeRecord adds a record to be distributed across the P2P network
func (*Node) DistributeRecordToPeer ¶
DistributeRecordToPeer sends a record directly to a specific peer
func (*Node) DistributeRecordWithPriority ¶
func (n *Node) DistributeRecordWithPriority(key [32]byte, value []byte, priority types.Priority, target types.Target) error
DistributeRecordWithPriority adds a record with specified priority and target
func (*Node) Distributor ¶
func (n *Node) Distributor() *P2PDistributor
Distributor returns the node's P2P distributor component
func (*Node) Network ¶
func (n *Node) Network() *networking.Network
Network returns the node's network.
func (*Node) Observability ¶
func (n *Node) Observability() *observability.Observability
Observability returns the node's observability.
func (*Node) PerformanceMonitor ¶
func (n *Node) PerformanceMonitor() *metrics.PerformanceMonitor
PerformanceMonitor returns the node's performance monitor.
func (*Node) RbacManager ¶
RbacManager returns the node's RBAC manager.
func (*Node) StateManager ¶
func (n *Node) StateManager() *state.StateManager
StateManager returns the node's state manager.
type P2PDistributor ¶
type P2PDistributor struct {
// contains filtered or unexported fields
}
P2PDistributor handles efficient record distribution across the network
func NewP2PDistributor ¶
func NewP2PDistributor(node *Node, batchSize int) *P2PDistributor
NewP2PDistributor creates a new P2P distributor
func (*P2PDistributor) DistributeRecord ¶
func (d *P2PDistributor) DistributeRecord(key [32]byte, value []byte, priority types.Priority, target types.Target) error
DistributeRecord adds a record to the appropriate distribution queue
func (*P2PDistributor) DistributeRecordToPeer ¶
func (d *P2PDistributor) DistributeRecordToPeer(key [32]byte, value []byte, peerID peer.ID, priority types.Priority) error
DistributeRecordToPeer sends a record directly to a specific peer
func (*P2PDistributor) GetStats ¶
func (d *P2PDistributor) GetStats() DistributionStats
GetStats returns the current distribution statistics
func (*P2PDistributor) HandleBulkStream ¶
func (d *P2PDistributor) HandleBulkStream(s network.Stream)
HandleBulkStream processes large data streams sent through the bulk protocol It bypasses signature verification for performance reasons when handling large payloads
func (*P2PDistributor) HandleRecordBatchPacket ¶
func (d *P2PDistributor) HandleRecordBatchPacket(ctx context.Context, packet *packets.NetworkPacket, sender peer.ID) error
HandleRecordBatchPacket processes incoming record batches from the network
func (*P2PDistributor) Start ¶
func (d *P2PDistributor) Start() error
Start begins processing the distribution queues
func (*P2PDistributor) Stop ¶
func (d *P2PDistributor) Stop()
Stop gracefully halts all distribution processes
type RecordBatch ¶
type RecordBatch struct {
Records []db.WriteRequest
Priority types.Priority
Target types.Target
TargetPeer *peer.ID // Only set if Target is TargetDirectPeer
}
RecordBatch represents a batch of records to be distributed