Documentation
¶
Index ¶
- Constants
- Variables
- func ObservedAddr(from net.Addr) []byte
- type BootstrapSeed
- type Config
- type DebugStats
- type Node
- func (n *Node) AddContact(addr net.Addr, ni protocol.NodeInfo)
- func (n *Node) Address() a2al.Address
- func (n *Node) BindPeerAddr(id a2al.NodeID, addr net.Addr)
- func (n *Node) Bootstrap(ctx context.Context, seeds []BootstrapSeed) error
- func (n *Node) BootstrapAddrs(ctx context.Context, addrs []net.Addr) error
- func (n *Node) BootstrapCandidateAddrs(max int) []net.Addr
- func (n *Node) Close() error
- func (n *Node) DebugHTTPHandler() http.Handler
- func (n *Node) DebugStatsData() DebugStats
- func (n *Node) FindNode(ctx context.Context, peer net.Addr, target a2al.NodeID) ([]protocol.NodeInfo, error)
- func (n *Node) FindValue(ctx context.Context, peer net.Addr, key a2al.NodeID) (*protocol.SignedRecord, error)
- func (n *Node) FindValueWithNodes(ctx context.Context, peer net.Addr, key a2al.NodeID, recType uint8) ([]protocol.SignedRecord, []protocol.NodeInfo, error)
- func (n *Node) LocalAddr() net.Addr
- func (n *Node) NodeID() a2al.NodeID
- func (n *Node) PeerHealthOf(id a2al.NodeID) PeerHealthState
- func (n *Node) Ping(ctx context.Context, peer net.Addr) error
- func (n *Node) PingIdentity(ctx context.Context, peer net.Addr) (*PeerIdentity, error)
- func (n *Node) PublishEndpointRecord(ctx context.Context, rec protocol.SignedRecord) error
- func (n *Node) PublishMailboxRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error
- func (n *Node) PublishTopicRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error
- func (n *Node) RemoveRepSetsForPublisher(publisher a2al.NodeID)
- func (n *Node) SendNATProbeReq(ctx context.Context, probeAddr net.Addr, claimedAddr []byte) (bool, error)
- func (n *Node) SetSelfExtIP(ip net.IP)
- func (n *Node) Start()
- func (n *Node) StartDebugHTTP(addr string) (stop func(), err error)
- func (n *Node) StartWithBootstrap(ctx context.Context, addrs []net.Addr) error
- func (n *Node) StoreAt(ctx context.Context, peer net.Addr, storeKey a2al.NodeID, ...) (bool, error)
- type PeerHealthState
- type PeerIdentity
- type Query
- func (q *Query) AggregateRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)
- func (q *Query) FindNode(ctx context.Context, target a2al.NodeID) ([]protocol.NodeInfo, error)
- func (q *Query) FindRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)
- func (q *Query) Resolve(ctx context.Context, target a2al.NodeID) (*protocol.EndpointRecord, error)
- type RecordAuthFunc
- type Store
- func (s *Store) DebugRecords(now time.Time) []StoreRecordDebug
- func (s *Store) Get(key a2al.NodeID, now time.Time) *protocol.SignedRecord
- func (s *Store) GetAll(key a2al.NodeID, recType uint8, now time.Time) []protocol.SignedRecord
- func (s *Store) Len() int
- func (s *Store) Put(key a2al.NodeID, rec protocol.SignedRecord, now time.Time) error
- type StoreRecordDebug
Constants ¶
const ( DefaultAlpha = 3 // DefaultStagger is the inter-launch delay between successive RPCs within // a single query batch (spec §3.2 "交错发射"). A peer that responds // within this window prevents the next candidate from being contacted, // saving bandwidth and avoiding unnecessary pressure on slow peers. // Phase 1 fixed value; future versions derive from RTT median × 0.5. DefaultStagger = 200 * time.Millisecond )
const DebugHTTPAddr = "127.0.0.1:2634"
DebugHTTPAddr is a suggested address for StartDebugHTTP when embedding the dht package directly (without a2ald). The daemon serves /debug/* on its own API port.
const (
DefaultMaxTotalKeys = 100_000
)
Variables ¶
var ErrNoEndpoint = errors.New("dht: no endpoint record")
ErrNoEndpoint is returned when iterative FIND_VALUE does not yield a valid endpoint record.
var ErrNoMatchingRecords = errors.New("dht: no matching records")
ErrNoMatchingRecords is returned when FindRecords finds nothing for the filter.
var ErrStaleRecord = errors.New("dht: stale record")
ErrStaleRecord means an equal or older record already exists for the same slot.
Functions ¶
func ObservedAddr ¶
ObservedAddr encodes remote IP:port for PONG / FIND_*_RESP (spec §7.6).
Types ¶
type BootstrapSeed ¶
BootstrapSeed is a known dial address plus wire NodeInfo (legacy; prefer BootstrapAddrs).
type Config ¶
type Config struct {
Transport transport.Transport
Keystore crypto.KeyStore
// OnObservedAddr is called whenever a DHT response carries an observed_addr
// (PONG, FIND_NODE_RESP, FIND_VALUE_RESP). reporter is the responding node's
// NodeID; wire is the raw observed_addr bytes (6 or 18 bytes).
// May be nil.
OnObservedAddr func(reporter a2al.NodeID, wire []byte)
// RecordAuth is an optional authority policy called by Store.Put after
// signature/expiry verification passes (Phase 4: includes DHT key).
// If nil, no authority check is performed (useful in tests).
RecordAuth RecordAuthFunc
// MaxStoreKeys limits the number of distinct DHT keys in the local store.
// 0 uses DefaultMaxTotalKeys. Configurable per-node soft limit.
MaxStoreKeys int
// Logger is used for DHT-level diagnostics (send failures, RPC retries).
// If nil, slog.Default() is used.
Logger *slog.Logger
// SeenPeersPath is the file path for persisting the seenPeers sliding-window
// table across restarts (spec §7.3). Empty disables persistence (default in
// tests). The file is written with mode 0600.
SeenPeersPath string
}
Config holds runtime dependencies for a DHT node (spec Step 7).
type DebugStats ¶ added in v0.1.4
type DebugStats struct {
RxPackets uint64 `json:"rx_packets_verified"`
TxPackets uint64 `json:"tx_packets"`
RPCOK uint64 `json:"rpc_completed"`
TotalPeers int `json:"total_peers"`
Reach1h int `json:"reach_1h"`
Reach24h int `json:"reach_24h"`
Reach7d int `json:"reach_7d"`
EstimatedNetworkSize int `json:"estimated_network_size"`
UniqueNodesSinceStart uint64 `json:"unique_nodes_since_start"`
}
DebugStats is the DHT portion of GET /debug/stats (spec §3.6, §7).
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node is a single DHT participant (routing + local store + wire handler).
func (*Node) AddContact ¶
AddContact pins a peer's dial address and seeds the routing table.
func (*Node) BindPeerAddr ¶
BindPeerAddr registers the transport dial address for a remote NodeID (e.g. MemTransport name lookup in tests).
func (*Node) Bootstrap ¶
func (n *Node) Bootstrap(ctx context.Context, seeds []BootstrapSeed) error
Bootstrap registers seeds with pre-known identity and runs FIND_NODE(self). For seeds where only ip:port is known, use BootstrapAddrs instead.
func (*Node) BootstrapAddrs ¶
BootstrapAddrs connects to seed nodes by raw network addresses (ip:port only). For each address it sends PING, extracts the peer's identity from the PONG, registers the peer, then runs FIND_NODE(self) to widen the routing table. This is the recommended bootstrap entry point — callers do not need to know the seed's Address or NodeID in advance.
func (*Node) BootstrapCandidateAddrs ¶
BootstrapCandidateAddrs returns up to max UDP addresses for cold-start bootstrap (routing table + remembered peer addrs). Best-effort for persisting peers.cache.
func (*Node) Close ¶
Close stops the node, closes the transport, and waits for the receive loop to exit.
func (*Node) DebugHTTPHandler ¶
DebugHTTPHandler returns read-only /debug/* handlers for mounting on an existing server (spec §3.6).
func (*Node) DebugStatsData ¶ added in v0.1.4
func (n *Node) DebugStatsData() DebugStats
DebugStatsData returns a snapshot for embedding in host-level /debug/stats.
func (*Node) FindNode ¶
func (n *Node) FindNode(ctx context.Context, peer net.Addr, target a2al.NodeID) ([]protocol.NodeInfo, error)
FindNode asks peer for closest nodes to target NodeID.
func (*Node) FindValue ¶
func (n *Node) FindValue(ctx context.Context, peer net.Addr, key a2al.NodeID) (*protocol.SignedRecord, error)
FindValue queries peer for the best endpoint record at key NodeID (legacy helper).
func (*Node) FindValueWithNodes ¶
func (n *Node) FindValueWithNodes(ctx context.Context, peer net.Addr, key a2al.NodeID, recType uint8) ([]protocol.SignedRecord, []protocol.NodeInfo, error)
FindValueWithNodes queries peer. recType 0 requests all record types in the response.
func (*Node) PeerHealthOf ¶ added in v0.1.3
func (n *Node) PeerHealthOf(id a2al.NodeID) PeerHealthState
PeerHealthOf returns the observed health state for the given peer.
func (*Node) PingIdentity ¶
PingIdentity sends PING, waits for PONG, and returns the remote peer's identity extracted from the response. The peer is automatically registered into the routing table and dial address map.
func (*Node) PublishEndpointRecord ¶
PublishEndpointRecord stores the record locally and immediately returns (过程一). Replication to remote peers is handled asynchronously by scheduleReplicate → renewBackground (FindNode + staggered StoreAt).
func (*Node) PublishMailboxRecord ¶
func (n *Node) PublishMailboxRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error
PublishMailboxRecord stores the mailbox record at storeKey (recipient NodeID) locally and replicates to k-closest reachable peers asynchronously.
func (*Node) PublishTopicRecord ¶
func (n *Node) PublishTopicRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error
PublishTopicRecord stores the topic record at storeKey (SHA-256("topic:"+topic)) locally and replicates asynchronously.
func (*Node) RemoveRepSetsForPublisher ¶ added in v0.1.3
RemoveRepSetsForPublisher removes all repSets whose publisher matches the given NodeID. Call when an agent is deleted so background probes and refill tasks cease.
func (*Node) SendNATProbeReq ¶ added in v0.1.4
func (n *Node) SendNATProbeReq(ctx context.Context, probeAddr net.Addr, claimedAddr []byte) (bool, error)
SendNATProbeReq asks probeAddr to send a NATProbeEcho to claimedAddr. claimedAddr is the wire-encoded public UDP address (6 or 18 bytes). Returns true if an echo arrived within the context deadline, nil error on timeout.
func (*Node) SetSelfExtIP ¶
SetSelfExtIP records our own public IP (from STUN/HTTP probe). Used to detect NAT hairpin peers: nodes behind the same NAT share the same public IP and typically cannot reach each other via that IP (router hairpinning not supported).
func (*Node) Start ¶
func (n *Node) Start()
Start begins the inbound packet loop and background maintenance workers.
func (*Node) StartDebugHTTP ¶
StartDebugHTTP listens on addr and serves read-only /debug/* JSON (spec §3.6). When using the a2ald daemon the /debug/* routes are served on the API port (default 127.0.0.1:2121) and this method is not needed. Use it only when embedding the dht package directly without the daemon. stop shuts the server down (idempotent).
func (*Node) StartWithBootstrap ¶
StartWithBootstrap starts the receive loop then bootstraps with raw addresses.
type PeerHealthState ¶ added in v0.1.3
type PeerHealthState uint8
PeerHealthState classifies a peer's reachability based on observed RPC outcomes.
const ( PeerHealthUnknown PeerHealthState = iota // no RPC history yet PeerHealthGood // last RPC succeeded, failCount == 0 PeerHealthBad // consecutive failures >= badHealthThreshold )
type PeerIdentity ¶
type PeerIdentity struct {
Address a2al.Address
NodeID a2al.NodeID
ObservedWire []byte // BodyPong.observed_addr (how reporter sees us); may be nil
}
PeerIdentity holds the identity extracted from a PONG response.
type Query ¶
Query runs iterative FIND_NODE / FIND_VALUE (spec Step 8).
func (*Query) AggregateRecords ¶
func (q *Query) AggregateRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)
AggregateRecords queries until the k-closest set is exhausted, merges and deduplicates (Phase 4 Topic/Mailbox). If the local store already holds valid records the network query is skipped — this fast-path is correct for the common case where the querying node is itself a publisher or a replication target (same a2ald, small network). Bad peers are skipped (with fallback) and a per-batch timeout prevents slow stragglers from stalling the whole traversal.
func (*Query) FindNode ¶
FindNode runs iterative FIND_NODE until k-closest known nodes are exhausted or all have been queried. Candidates are seeded from tabNearestHealthy (Good peers first). Within each batch, peers known to be Bad are skipped (tried last as fallback). The collection loop returns as soon as the first peer responds, allowing the next round to start without waiting for slow/unreachable peers.
type RecordAuthFunc ¶
RecordAuthFunc decides whether a record may be stored at the given DHT key after signature and expiry checks (Phase 4: includes key binding for sovereign).
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is an in-memory record map keyed by DHT NodeID (Phase 4 multi-category).
func NewStore ¶
func NewStore(auth RecordAuthFunc, maxKeys int) *Store
NewStore creates an empty Store. auth is optional (see Config.RecordAuth). maxKeys limits distinct key count; 0 uses DefaultMaxTotalKeys.
func (*Store) DebugRecords ¶
func (s *Store) DebugRecords(now time.Time) []StoreRecordDebug
DebugRecords lists non-expired verified records (spec §3.6).
func (*Store) GetAll ¶
GetAll returns verified non-expired records at key, optionally filtered by RecType (0 = all).
type StoreRecordDebug ¶
type StoreRecordDebug struct {
KeyNodeIDHex string `json:"key_node_id_hex"`
AddressHex string `json:"address_hex"`
RecType uint8 `json:"rec_type"`
Seq uint64 `json:"seq"`
Timestamp uint64 `json:"timestamp"`
TTL uint32 `json:"ttl_seconds"`
PayloadLen int `json:"payload_cbor_len"`
}
StoreRecordDebug is a JSON-friendly store row (spec §3.6).