Documentation
¶
Index ¶
- Constants
- Variables
- func ObservedAddr(from net.Addr) []byte
- type BootstrapSeed
- type Config
- type DebugStats
- type Node
- func (n *Node) AddContact(addr net.Addr, ni protocol.NodeInfo)
- func (n *Node) Address() a2al.Address
- func (n *Node) BindPeerAddr(id a2al.NodeID, addr net.Addr)
- func (n *Node) Bootstrap(ctx context.Context, seeds []BootstrapSeed) error
- func (n *Node) BootstrapAddrs(ctx context.Context, addrs []net.Addr) error
- func (n *Node) BootstrapCandidateAddrs(max int) []net.Addr
- func (n *Node) Close() error
- func (n *Node) DebugHTTPHandler() http.Handler
- func (n *Node) DebugStatsData() DebugStats
- func (n *Node) FindNode(ctx context.Context, peer net.Addr, target a2al.NodeID) ([]protocol.NodeInfo, error)
- func (n *Node) FindValue(ctx context.Context, peer net.Addr, key a2al.NodeID) (*protocol.SignedRecord, error)
- func (n *Node) FindValueWithNodes(ctx context.Context, peer net.Addr, key a2al.NodeID, recType uint8) ([]protocol.SignedRecord, []protocol.NodeInfo, error)
- func (n *Node) LocalAddr() net.Addr
- func (n *Node) LocalStoreGet(key a2al.NodeID, recType uint8) []protocol.SignedRecord
- func (n *Node) LocalStoreGetByAddress(addr a2al.Address, recType uint8) []protocol.SignedRecord
- func (n *Node) LocalStoreInvalidate(key a2al.NodeID, recType uint8)
- func (n *Node) LocalStorePut(storeKey a2al.NodeID, rec protocol.SignedRecord) error
- func (n *Node) NodeID() a2al.NodeID
- func (n *Node) PeerAllowContact(id a2al.NodeID) bool
- func (n *Node) PeerHealthOf(id a2al.NodeID) PeerHealthState
- func (n *Node) Ping(ctx context.Context, peer net.Addr) error
- func (n *Node) PingIdentity(ctx context.Context, peer net.Addr) (*PeerIdentity, error)
- func (n *Node) PublishEndpointRecord(ctx context.Context, rec protocol.SignedRecord) error
- func (n *Node) PublishMailboxRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error
- func (n *Node) PublishTopicRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error
- func (n *Node) RemoveRepSetsForPublisher(publisher a2al.NodeID)
- func (n *Node) SendNATProbeReq(ctx context.Context, probeAddr net.Addr, claimedAddr []byte) (bool, error)
- func (n *Node) SetSelfExtIP(ip net.IP)
- func (n *Node) Start()
- func (n *Node) StartDebugHTTP(addr string) (stop func(), err error)
- func (n *Node) StartWithBootstrap(ctx context.Context, addrs []net.Addr) error
- func (n *Node) StoreAt(ctx context.Context, peer net.Addr, storeKey a2al.NodeID, ...) (bool, error)
- type PeerHealthState
- type PeerIdentity
- type Query
- func (q *Query) AggregateRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)
- func (q *Query) FindNode(ctx context.Context, target a2al.NodeID) ([]protocol.NodeInfo, error)
- func (q *Query) FindRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)
- func (q *Query) Resolve(ctx context.Context, target a2al.NodeID) (*protocol.EndpointRecord, error)
- type RecordAuthFunc
- type Store
- func (s *Store) DebugRecords(now time.Time) []StoreRecordDebug
- func (s *Store) Get(key a2al.NodeID, now time.Time) *protocol.SignedRecord
- func (s *Store) GetAll(key a2al.NodeID, recType uint8, now time.Time) []protocol.SignedRecord
- func (s *Store) GetAllByAddress(addr a2al.Address, recType uint8, now time.Time) []protocol.SignedRecord
- func (s *Store) Invalidate(key a2al.NodeID, recType uint8)
- func (s *Store) Len() int
- func (s *Store) Put(key a2al.NodeID, rec protocol.SignedRecord, now time.Time) error
- type StoreRecordDebug
Constants ¶
const ( // DefaultAlpha is exported for callers that inspect it; the slot engine uses // queryAlpha internally. DefaultAlpha = 5 // DefaultStagger is retained for API compatibility but is not used by the // slot-based engine. DefaultStagger = 200 * time.Millisecond )
const DebugHTTPAddr = "127.0.0.1:2634"
DebugHTTPAddr is a suggested address for StartDebugHTTP when embedding the dht package directly (without a2ald). The daemon serves /debug/* on its own API port.
const (
DefaultMaxTotalKeys = 100_000
)
Variables ¶
var ErrNoEndpoint = errors.New("dht: no endpoint record")
ErrNoEndpoint is returned when iterative FIND_VALUE does not yield a valid endpoint record.
var ErrNoMatchingRecords = errors.New("dht: no matching records")
ErrNoMatchingRecords is returned when FindRecords / AggregateRecords find nothing for the filter.
var ErrStaleRecord = errors.New("dht: stale record")
ErrStaleRecord means an equal or older record already exists for the same slot.
Functions ¶
func ObservedAddr ¶
ObservedAddr encodes remote IP:port for PONG / FIND_*_RESP (spec §7.6).
Types ¶
type BootstrapSeed ¶
BootstrapSeed is a known dial address plus wire NodeInfo (legacy; prefer BootstrapAddrs).
type Config ¶
type Config struct {
Transport transport.Transport
Keystore crypto.KeyStore
// OnObservedAddr is called whenever a DHT response carries an observed_addr
// (PONG, FIND_NODE_RESP, FIND_VALUE_RESP). reporter is the responding node's
// NodeID; wire is the raw observed_addr bytes (6 or 18 bytes).
// May be nil.
OnObservedAddr func(reporter a2al.NodeID, wire []byte)
// RecordAuth is an optional authority policy called by Store.Put after
// signature/expiry verification passes (Phase 4: includes DHT key).
// If nil, no authority check is performed (useful in tests).
RecordAuth RecordAuthFunc
// MaxStoreKeys limits the number of distinct DHT keys in the local store.
// 0 uses DefaultMaxTotalKeys. Configurable per-node soft limit.
MaxStoreKeys int
// Logger is used for DHT-level diagnostics (send failures, RPC retries).
// If nil, slog.Default() is used.
Logger *slog.Logger
// SeenPeersPath is the file path for persisting the seenPeers sliding-window
// table across restarts (spec §7.3). Empty disables persistence (default in
// tests). The file is written with mode 0600.
SeenPeersPath string
}
Config holds runtime dependencies for a DHT node (spec Step 7).
type DebugStats ¶ added in v0.1.4
type DebugStats struct {
RxPackets uint64 `json:"rx_packets_verified"`
TxPackets uint64 `json:"tx_packets"`
RPCOK uint64 `json:"rpc_completed"`
TotalPeers int `json:"total_peers"`
Reach1h int `json:"reach_1h"`
Reach24h int `json:"reach_24h"`
Reach7d int `json:"reach_7d"`
EstimatedNetworkSize int `json:"estimated_network_size"`
UniqueNodesSinceStart uint64 `json:"unique_nodes_since_start"`
}
DebugStats is the DHT portion of GET /debug/stats (spec §3.6, §7).
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node is a single DHT participant (routing + local store + wire handler).
func (*Node) AddContact ¶
AddContact pins a peer's dial address and seeds the routing table.
func (*Node) BindPeerAddr ¶
BindPeerAddr registers the transport dial address for a remote NodeID (e.g. MemTransport name lookup in tests).
func (*Node) Bootstrap ¶
func (n *Node) Bootstrap(ctx context.Context, seeds []BootstrapSeed) error
Bootstrap registers seeds with pre-known identity and runs FIND_NODE(self). For seeds where only ip:port is known, use BootstrapAddrs instead.
func (*Node) BootstrapAddrs ¶
BootstrapAddrs connects to seed nodes by raw network addresses (ip:port only). For each address it sends PING, extracts the peer's identity from the PONG, registers the peer, then runs FIND_NODE(self) to widen the routing table. This is the recommended bootstrap entry point — callers do not need to know the seed's Address or NodeID in advance.
func (*Node) BootstrapCandidateAddrs ¶
BootstrapCandidateAddrs returns up to max UDP addresses for cold-start bootstrap (routing table + remembered peer addrs). Best-effort for persisting peers.cache.
Candidates are sorted by observed health: Good → Unknown → Bad. The max cap therefore naturally favours peers we have successfully communicated with before, so that the next cold-start spends its bootstrap window on the most promising contacts rather than on known-dead nodes.
func (*Node) Close ¶
Close stops the node, closes the transport, and waits for the receive loop to exit.
func (*Node) DebugHTTPHandler ¶
DebugHTTPHandler returns read-only /debug/* handlers for mounting on an existing server (spec §3.6).
func (*Node) DebugStatsData ¶ added in v0.1.4
func (n *Node) DebugStatsData() DebugStats
DebugStatsData returns a snapshot for embedding in host-level /debug/stats.
func (*Node) FindNode ¶
func (n *Node) FindNode(ctx context.Context, peer net.Addr, target a2al.NodeID) ([]protocol.NodeInfo, error)
FindNode asks peer for closest nodes to target NodeID.
func (*Node) FindValue ¶
func (n *Node) FindValue(ctx context.Context, peer net.Addr, key a2al.NodeID) (*protocol.SignedRecord, error)
FindValue queries peer for the best endpoint record at key NodeID (legacy helper).
func (*Node) FindValueWithNodes ¶
func (n *Node) FindValueWithNodes(ctx context.Context, peer net.Addr, key a2al.NodeID, recType uint8) ([]protocol.SignedRecord, []protocol.NodeInfo, error)
FindValueWithNodes queries peer. recType 0 requests all record types in the response.
func (*Node) LocalStoreGet ¶ added in v0.1.6
LocalStoreGet returns verified non-expired records at the given DHT key, with optional RecType filter (0 = all types).
func (*Node) LocalStoreGetByAddress ¶ added in v0.1.6
LocalStoreGetByAddress returns verified non-expired records where sr.Address matches addr, with optional RecType filter (0 = all types). Scans all store buckets; intended for low-frequency paths such as the QUIC control exchange.
func (*Node) LocalStoreInvalidate ¶ added in v0.1.6
LocalStoreInvalidate removes locally-cached records for key and recType (0 = all types). Used internally by the host layer to clear stale endpoint records when a connection attempt fails, so the next Resolve fetches fresh data from the network.
func (*Node) LocalStorePut ¶ added in v0.1.6
LocalStorePut writes rec into the local store without triggering replication. Use this to seed records received via an out-of-band channel (e.g. QUIC control plane AgentInfo push) so that subsequent AggregateRecords queries return the fresh data immediately.
func (*Node) PeerAllowContact ¶ added in v0.1.6
PeerAllowContact returns true if the global back-off for this peer has expired (or was never set). Callers decide whether their own tolerance permits contacting a peer whose back-off is still active; this gate reflects the shared signal across all communication channels.
func (*Node) PeerHealthOf ¶ added in v0.1.3
func (n *Node) PeerHealthOf(id a2al.NodeID) PeerHealthState
PeerHealthOf returns the observed health state for the given peer.
func (*Node) PingIdentity ¶
PingIdentity sends PING, waits for PONG, and returns the remote peer's identity extracted from the response. The peer is automatically registered into the routing table and dial address map.
func (*Node) PublishEndpointRecord ¶
PublishEndpointRecord stores the record locally and immediately returns (过程一). Replication to remote peers is handled asynchronously by scheduleReplicate → renewBackground (FindNode + staggered StoreAt).
func (*Node) PublishMailboxRecord ¶
func (n *Node) PublishMailboxRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error
PublishMailboxRecord stores the mailbox record at storeKey (recipient NodeID) locally and replicates to k-closest reachable peers asynchronously.
func (*Node) PublishTopicRecord ¶
func (n *Node) PublishTopicRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error
PublishTopicRecord stores the topic record at storeKey (SHA-256("topic:"+topic)) locally and replicates asynchronously.
func (*Node) RemoveRepSetsForPublisher ¶ added in v0.1.3
RemoveRepSetsForPublisher removes all repSets whose publisher matches the given NodeID. Call when an agent is deleted so background probes and refill tasks cease.
func (*Node) SendNATProbeReq ¶ added in v0.1.4
func (n *Node) SendNATProbeReq(ctx context.Context, probeAddr net.Addr, claimedAddr []byte) (bool, error)
SendNATProbeReq asks probeAddr to send a NATProbeEcho to claimedAddr. claimedAddr is the wire-encoded public UDP address (6 or 18 bytes). Returns true if an echo arrived within the context deadline, nil error on timeout.
func (*Node) SetSelfExtIP ¶
SetSelfExtIP records our own public IP (from STUN/HTTP probe). Used to detect NAT hairpin peers: nodes behind the same NAT share the same public IP and typically cannot reach each other via that IP (router hairpinning not supported).
func (*Node) Start ¶
func (n *Node) Start()
Start begins the inbound packet loop and background maintenance workers.
func (*Node) StartDebugHTTP ¶
StartDebugHTTP listens on addr and serves read-only /debug/* JSON (spec §3.6). When using the a2ald daemon the /debug/* routes are served on the API port (default 127.0.0.1:2121) and this method is not needed. Use it only when embedding the dht package directly without the daemon. stop shuts the server down (idempotent).
func (*Node) StartWithBootstrap ¶
StartWithBootstrap starts the receive loop then bootstraps with raw addresses.
type PeerHealthState ¶ added in v0.1.3
type PeerHealthState uint8
PeerHealthState classifies a peer's reachability based on observed RPC outcomes.
const ( PeerHealthUnknown PeerHealthState = iota // no RPC history yet PeerHealthGood // last RPC succeeded, failCount == 0 PeerHealthBad // consecutive failures >= badHealthThreshold )
type PeerIdentity ¶
type PeerIdentity struct {
Address a2al.Address
NodeID a2al.NodeID
ObservedWire []byte // BodyPong.observed_addr (how reporter sees us); may be nil
}
PeerIdentity holds the identity extracted from a PONG response.
type Query ¶
type Query struct {
Alpha int // retained for API compatibility; slot engine uses queryAlpha
Stagger time.Duration // retained for API compatibility; slot engine uses querySlotStagger
// contains filtered or unexported fields
}
Query runs iterative FIND_NODE / FIND_VALUE (spec Step 8).
func (*Query) AggregateRecords ¶
func (q *Query) AggregateRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)
AggregateRecords queries the network until the good+unknown candidate pools are exhausted, then merges and deduplicates all discovered records (Phase 4 Topic/Mailbox). Unlike FindRecords, there is no local-cache fast path: the network query always runs so that newly-joined publishers are discovered. Locally-cached records are seeded into the result set and merged with network results.
func (*Query) FindNode ¶
FindNode runs iterative FIND_NODE until the good+unknown candidate pools are exhausted. Returns the K XOR-closest nodes discovered.
func (*Query) FindRecords ¶
func (q *Query) FindRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)
FindRecords runs iterative FIND_VALUE (recType 0 = all). Returns on the first valid record found (optimistic strategy). Local store is checked first; if a valid cached record exists the network is not queried. The cache is transparently invalidated by the host layer when a subsequent connection attempt using the cached data fails.
type RecordAuthFunc ¶
RecordAuthFunc decides whether a record may be stored at the given DHT key after signature and expiry checks (Phase 4: includes key binding for sovereign).
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is an in-memory record map keyed by DHT NodeID (Phase 4 multi-category).
func NewStore ¶
func NewStore(auth RecordAuthFunc, maxKeys int) *Store
NewStore creates an empty Store. auth is optional (see Config.RecordAuth). maxKeys limits distinct key count; 0 uses DefaultMaxTotalKeys.
func (*Store) DebugRecords ¶
func (s *Store) DebugRecords(now time.Time) []StoreRecordDebug
DebugRecords lists non-expired verified records (spec §3.6).
func (*Store) GetAll ¶
GetAll returns verified non-expired records at key, optionally filtered by RecType (0 = all).
func (*Store) GetAllByAddress ¶ added in v0.1.6
func (s *Store) GetAllByAddress(addr a2al.Address, recType uint8, now time.Time) []protocol.SignedRecord
GetAllByAddress returns verified non-expired records where sr.Address == addr, with optional RecType filter (0 = all types). Scans all key buckets; intended for low-frequency use (e.g. QUIC control plane exchange on connection setup).
func (*Store) Invalidate ¶ added in v0.1.6
Invalidate removes locally-cached records for key, optionally filtered by recType (0 = all types). Called internally when a connection attempt to the peer fails, ensuring the next Resolve goes to the network for fresh data.
type StoreRecordDebug ¶
type StoreRecordDebug struct {
KeyNodeIDHex string `json:"key_node_id_hex"`
AddressHex string `json:"address_hex"`
RecType uint8 `json:"rec_type"`
Seq uint64 `json:"seq"`
Timestamp uint64 `json:"timestamp"`
TTL uint32 `json:"ttl_seconds"`
PayloadLen int `json:"payload_cbor_len"`
}
StoreRecordDebug is a JSON-friendly store row (spec §3.6).