dht

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MPL-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultAlpha is exported for callers that inspect it; the slot engine uses
	// queryAlpha internally.
	DefaultAlpha = 5

	// DefaultStagger is retained for API compatibility but is not used by the
	// slot-based engine.
	DefaultStagger = 200 * time.Millisecond
)
View Source
const DebugHTTPAddr = "127.0.0.1:2634"

DebugHTTPAddr is a suggested address for StartDebugHTTP when embedding the dht package directly (without a2ald). The daemon serves /debug/* on its own API port.

View Source
const (
	DefaultMaxTotalKeys = 100_000
)

Variables

View Source
var ErrNoEndpoint = errors.New("dht: no endpoint record")

ErrNoEndpoint is returned when iterative FIND_VALUE does not yield a valid endpoint record.

View Source
var ErrNoMatchingRecords = errors.New("dht: no matching records")

ErrNoMatchingRecords is returned when FindRecords / AggregateRecords find nothing for the filter.

View Source
var ErrStaleRecord = errors.New("dht: stale record")

ErrStaleRecord means an equal or older record already exists for the same slot.

Functions

func ObservedAddr

func ObservedAddr(from net.Addr) []byte

ObservedAddr encodes remote IP:port for PONG / FIND_*_RESP (spec §7.6).

Types

type BootstrapSeed

type BootstrapSeed struct {
	Addr net.Addr
	Info protocol.NodeInfo
}

BootstrapSeed is a known dial address plus wire NodeInfo (legacy; prefer BootstrapAddrs).

type Config

type Config struct {
	Transport transport.Transport
	Keystore  crypto.KeyStore
	// OnObservedAddr is called whenever a DHT response carries an observed_addr
	// (PONG, FIND_NODE_RESP, FIND_VALUE_RESP). reporter is the responding node's
	// NodeID; wire is the raw observed_addr bytes (6 or 18 bytes).
	// May be nil.
	OnObservedAddr func(reporter a2al.NodeID, wire []byte)
	// RecordAuth is an optional authority policy called by Store.Put after
	// signature/expiry verification passes (Phase 4: includes DHT key).
	// If nil, no authority check is performed (useful in tests).
	RecordAuth RecordAuthFunc
	// MaxStoreKeys limits the number of distinct DHT keys in the local store.
	// 0 uses DefaultMaxTotalKeys. Configurable per-node soft limit.
	MaxStoreKeys int
	// Logger is used for DHT-level diagnostics (send failures, RPC retries).
	// If nil, slog.Default() is used.
	Logger *slog.Logger
	// SeenPeersPath is the file path for persisting the seenPeers sliding-window
	// table across restarts (spec §7.3). Empty disables persistence (default in
	// tests). The file is written with mode 0600.
	SeenPeersPath string
}

Config holds runtime dependencies for a DHT node (spec Step 7).

type DebugStats added in v0.1.4

type DebugStats struct {
	RxPackets             uint64  `json:"rx_packets_verified"`
	TxPackets             uint64  `json:"tx_packets"`
	RPCOK                 uint64  `json:"rpc_completed"`
	TotalPeers            int     `json:"total_peers"`
	Reach1h               int     `json:"reach_1h"`
	Reach24h              int     `json:"reach_24h"`
	Reach7d               int     `json:"reach_7d"`
	EstimatedNetworkSize  int     `json:"estimated_network_size"`
	EstimateConfidence    float64 `json:"estimate_confidence,omitempty"`
	VerifiedPeers1h       int     `json:"verified_peers_1h,omitempty"`
	UniqueNodesSinceStart uint64  `json:"unique_nodes_since_start"`
}

DebugStats is the DHT portion of GET /debug/stats (spec §3.6, §7).

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is a single DHT participant (routing + local store + wire handler).

func NewNode

func NewNode(cfg Config) (*Node, error)

NewNode builds a node; keystore must list exactly one address (Phase 1).

func (*Node) AddContact

func (n *Node) AddContact(addr net.Addr, ni protocol.NodeInfo)

AddContact pins a peer's dial address and seeds the routing table. Treated as a trusted (user-configured) contact: VerifiedAt is set to now.

func (*Node) Address

func (n *Node) Address() a2al.Address

Address returns the agent address.

func (*Node) BindPeerAddr

func (n *Node) BindPeerAddr(id a2al.NodeID, addr net.Addr)

BindPeerAddr registers the transport dial address for a remote NodeID (e.g. MemTransport name lookup in tests).

func (*Node) Bootstrap

func (n *Node) Bootstrap(ctx context.Context, seeds []BootstrapSeed) error

Bootstrap registers seeds with pre-known identity and runs FIND_NODE(self). For seeds where only ip:port is known, use BootstrapAddrs instead.

func (*Node) BootstrapAddrs

func (n *Node) BootstrapAddrs(ctx context.Context, addrs []net.Addr) error

BootstrapAddrs connects to seed nodes by raw network addresses (ip:port only). For each address it sends PING, extracts the peer's identity from the PONG, registers the peer, then runs FIND_NODE(self) to widen the routing table. This is the recommended bootstrap entry point — callers do not need to know the seed's Address or NodeID in advance.

func (*Node) BootstrapCandidateAddrs

func (n *Node) BootstrapCandidateAddrs(max int) []net.Addr

BootstrapCandidateAddrs returns up to max UDP addresses for cold-start bootstrap (routing table + remembered peer addrs). Best-effort for persisting peers.cache.

Candidates are sorted by observed health: Good → Unknown → Bad. The max cap therefore naturally favours peers we have successfully communicated with before, so that the next cold-start spends its bootstrap window on the most promising contacts rather than on known-dead nodes.

func (*Node) Close

func (n *Node) Close() error

Close stops the node, closes the transport, and waits for the receive loop to exit.

func (*Node) DebugHTTPHandler

func (n *Node) DebugHTTPHandler() http.Handler

DebugHTTPHandler returns read-only /debug/* handlers for mounting on an existing server (spec §3.6).

func (*Node) DebugStatsData added in v0.1.4

func (n *Node) DebugStatsData() DebugStats

DebugStatsData returns a snapshot for embedding in host-level /debug/stats.

func (*Node) EstimatedNetworkSize added in v0.1.7

func (n *Node) EstimatedNetworkSize() int

EstimatedNetworkSize returns the bucket-density estimate of the current number of active nodes in the DHT (includes all nodes; for freshness-filtered estimate use EstimatedNetworkSizeFiltered).

func (*Node) EstimatedNetworkSizeFiltered added in v0.1.7

func (n *Node) EstimatedNetworkSizeFiltered(cutoff time.Time) (int, float64)

EstimatedNetworkSizeFiltered returns the network size estimate restricted to nodes verified within the past 30 minutes, along with a confidence score in [0, 1]. Higher confidence means more sample buckets contributed to the median.

func (*Node) FindNode

func (n *Node) FindNode(ctx context.Context, peer net.Addr, target a2al.NodeID) ([]protocol.NodeInfo, error)

FindNode asks peer for closest nodes to target NodeID.

func (*Node) FindValue

func (n *Node) FindValue(ctx context.Context, peer net.Addr, key a2al.NodeID) (*protocol.SignedRecord, error)

FindValue queries peer for the best endpoint record at key NodeID (legacy helper).

func (*Node) FindValueWithNodes

func (n *Node) FindValueWithNodes(ctx context.Context, peer net.Addr, key a2al.NodeID, recType uint8) ([]protocol.SignedRecord, []protocol.NodeInfo, error)

FindValueWithNodes queries peer. recType 0 requests all record types in the response.

func (*Node) LocalAddr

func (n *Node) LocalAddr() net.Addr

LocalAddr returns the underlying transport address.

func (*Node) LocalStoreGet added in v0.1.6

func (n *Node) LocalStoreGet(key a2al.NodeID, recType uint8) []protocol.SignedRecord

LocalStoreGet returns verified non-expired records at the given DHT key, with optional RecType filter (0 = all types).

func (*Node) LocalStoreGetByAddress added in v0.1.6

func (n *Node) LocalStoreGetByAddress(addr a2al.Address, recType uint8) []protocol.SignedRecord

LocalStoreGetByAddress returns verified non-expired records where sr.Address matches addr, with optional RecType filter (0 = all types). Scans all store buckets; intended for low-frequency paths such as the QUIC control exchange.

func (*Node) LocalStoreInvalidate added in v0.1.6

func (n *Node) LocalStoreInvalidate(key a2al.NodeID, recType uint8)

LocalStoreInvalidate removes locally-cached records for key and recType (0 = all types). Used internally by the host layer to clear stale endpoint records when a connection attempt fails, so the next Resolve fetches fresh data from the network.

func (*Node) LocalStorePut added in v0.1.6

func (n *Node) LocalStorePut(storeKey a2al.NodeID, rec protocol.SignedRecord) error

LocalStorePut writes rec into the local store without triggering replication. Use this to seed records received via an out-of-band channel (e.g. QUIC control plane AgentInfo push) so that subsequent AggregateRecords queries return the fresh data immediately.

func (*Node) NodeID

func (n *Node) NodeID() a2al.NodeID

NodeID returns the DHT key for this node.

func (*Node) PeerAllowContact added in v0.1.6

func (n *Node) PeerAllowContact(id a2al.NodeID) bool

PeerAllowContact returns true if the global back-off for this peer has expired (or was never set). Callers decide whether their own tolerance permits contacting a peer whose back-off is still active; this gate reflects the shared signal across all communication channels.

func (*Node) PeerHealthOf added in v0.1.3

func (n *Node) PeerHealthOf(id a2al.NodeID) PeerHealthState

PeerHealthOf returns the observed health state for the given peer.

func (*Node) PeerRTT added in v0.1.7

func (n *Node) PeerRTT(addr net.Addr) time.Duration

PeerRTT returns the last measured round-trip time for addr, or 0 if the address is not yet known or has never completed a successful exchange.

func (*Node) Ping

func (n *Node) Ping(ctx context.Context, peer net.Addr) error

Ping sends PING and waits for PONG. Start() must be running.

func (*Node) PingIdentity

func (n *Node) PingIdentity(ctx context.Context, peer net.Addr) (*PeerIdentity, error)

PingIdentity sends PING, waits for PONG, and returns the remote peer's identity extracted from the response. The peer is automatically registered into the routing table and dial address map.

func (*Node) PublishEndpointRecord

func (n *Node) PublishEndpointRecord(ctx context.Context, rec protocol.SignedRecord) error

PublishEndpointRecord stores the record locally and immediately returns (过程一). Replication to remote peers is handled asynchronously by scheduleReplicate → renewBackground (FindNode + staggered StoreAt).

func (*Node) PublishMailboxRecord

func (n *Node) PublishMailboxRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error

PublishMailboxRecord stores the mailbox record at storeKey (recipient NodeID) locally and replicates to k-closest reachable peers asynchronously.

func (*Node) PublishTopicRecord

func (n *Node) PublishTopicRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error

PublishTopicRecord stores the topic record at storeKey (SHA-256("topic:"+topic)) locally and replicates asynchronously.

func (*Node) RemoveRepSetsForPublisher added in v0.1.3

func (n *Node) RemoveRepSetsForPublisher(publisher a2al.NodeID)

RemoveRepSetsForPublisher removes all repSets whose publisher matches the given NodeID. Call when an agent is deleted so background probes and refill tasks cease.

func (*Node) SelfExtIP added in v0.1.7

func (n *Node) SelfExtIP() net.IP

SelfExtIP returns the node's current public IP as seen by STUN/HTTP probe, or nil if not yet known.

func (*Node) SendNATProbeReq added in v0.1.4

func (n *Node) SendNATProbeReq(ctx context.Context, probeAddr net.Addr, claimedAddr []byte) (bool, error)

SendNATProbeReq asks probeAddr to send a NATProbeEcho to claimedAddr. claimedAddr is the wire-encoded public UDP address (6 or 18 bytes). Returns true if an echo arrived within the context deadline, nil error on timeout.

func (*Node) SetMaxStoreKeys added in v0.1.7

func (n *Node) SetMaxStoreKeys(max int)

SetMaxStoreKeys updates the maximum number of distinct keys in the local store.

func (*Node) SetPassiveRouting added in v0.1.7

func (n *Node) SetPassiveRouting(passive bool)

SetPassiveRouting controls whether this node suppresses proactive FindNode queries. When true (passive mode), the node fills its routing table naturally through incoming traffic and skips active bucket-refill and topology scans.

func (*Node) SetSelfExtIP

func (n *Node) SetSelfExtIP(ip net.IP)

SetSelfExtIP records our own public IP (from STUN/HTTP probe). Used to detect NAT hairpin peers: nodes behind the same NAT share the same public IP and typically cannot reach each other via that IP (router hairpinning not supported).

func (*Node) Start

func (n *Node) Start()

Start begins the inbound packet loop and background maintenance workers.

func (*Node) StartDebugHTTP

func (n *Node) StartDebugHTTP(addr string) (stop func(), err error)

StartDebugHTTP listens on addr and serves read-only /debug/* JSON (spec §3.6). When using the a2ald daemon the /debug/* routes are served on the API port (default 127.0.0.1:2121) and this method is not needed. Use it only when embedding the dht package directly without the daemon. stop shuts the server down (idempotent).

func (*Node) StartWithBootstrap

func (n *Node) StartWithBootstrap(ctx context.Context, addrs []net.Addr) error

StartWithBootstrap starts the receive loop then bootstraps with raw addresses.

func (*Node) StoreAt

func (n *Node) StoreAt(ctx context.Context, peer net.Addr, storeKey a2al.NodeID, rec protocol.SignedRecord) (bool, error)

StoreAt sends STORE to peer. storeKey zero omits BodyStore.Key (receiver derives key from rec.Address).

type PeerHealthState added in v0.1.3

type PeerHealthState uint8

PeerHealthState classifies a peer's reachability based on observed RPC outcomes.

const (
	PeerHealthUnknown PeerHealthState = iota // no RPC history yet
	PeerHealthGood                           // last RPC succeeded, failCount == 0
	PeerHealthBad                            // consecutive failures >= badHealthThreshold
)

type PeerIdentity

type PeerIdentity struct {
	Address      a2al.Address
	NodeID       a2al.NodeID
	ObservedWire []byte // BodyPong.observed_addr (how reporter sees us); may be nil
}

PeerIdentity holds the identity extracted from a PONG response.

type Query

type Query struct {
	Alpha   int           // retained for API compatibility; slot engine uses queryAlpha
	Stagger time.Duration // retained for API compatibility; slot engine uses querySlotStagger
	// contains filtered or unexported fields
}

Query runs iterative FIND_NODE / FIND_VALUE (spec Step 8).

func NewQuery

func NewQuery(n *Node) *Query

NewQuery builds a querier backed by n.

func (*Query) AggregateRecords

func (q *Query) AggregateRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)

AggregateRecords queries the network until the good+unknown candidate pools are exhausted, then merges and deduplicates all discovered records (Phase 4 Topic/Mailbox). Unlike FindRecords, there is no local-cache fast path: the network query always runs so that newly-joined publishers are discovered. Locally-cached records are seeded into the result set and merged with network results.

func (*Query) FindNode

func (q *Query) FindNode(ctx context.Context, target a2al.NodeID) ([]protocol.NodeInfo, error)

FindNode runs iterative FIND_NODE until the good+unknown candidate pools are exhausted. Returns the K XOR-closest nodes discovered.

func (*Query) FindRecords

func (q *Query) FindRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)

FindRecords runs iterative FIND_VALUE (recType 0 = all). Returns on the first valid record found (optimistic strategy). Local store is checked first; if a valid cached record exists the network is not queried. The cache is transparently invalidated by the host layer when a subsequent connection attempt using the cached data fails.

func (*Query) Resolve

func (q *Query) Resolve(ctx context.Context, target a2al.NodeID) (*protocol.EndpointRecord, error)

Resolve runs iterative FIND_VALUE for target NodeID and returns a verified endpoint record.

type RecordAuthFunc

type RecordAuthFunc func(key a2al.NodeID, rec protocol.SignedRecord, now time.Time) error

RecordAuthFunc decides whether a record may be stored at the given DHT key after signature and expiry checks (Phase 4: includes key binding for sovereign).

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is an in-memory record map keyed by DHT NodeID (Phase 4 multi-category).

func NewStore

func NewStore(auth RecordAuthFunc, maxKeys int) *Store

NewStore creates an empty Store. auth is optional (see Config.RecordAuth). maxKeys limits distinct key count; 0 uses DefaultMaxTotalKeys.

func (*Store) DebugRecords

func (s *Store) DebugRecords(now time.Time) []StoreRecordDebug

DebugRecords lists non-expired verified records (spec §3.6).

func (*Store) Get

func (s *Store) Get(key a2al.NodeID, now time.Time) *protocol.SignedRecord

Get returns the newest valid endpoint record (RecTypeEndpoint) at key, or nil.

func (*Store) GetAll

func (s *Store) GetAll(key a2al.NodeID, recType uint8, now time.Time) []protocol.SignedRecord

GetAll returns verified non-expired records at key, optionally filtered by RecType (0 = all).

func (*Store) GetAllByAddress added in v0.1.6

func (s *Store) GetAllByAddress(addr a2al.Address, recType uint8, now time.Time) []protocol.SignedRecord

GetAllByAddress returns verified non-expired records where sr.Address == addr, with optional RecType filter (0 = all types). Scans all key buckets; intended for low-frequency use (e.g. QUIC control plane exchange on connection setup).

func (*Store) Invalidate added in v0.1.6

func (s *Store) Invalidate(key a2al.NodeID, recType uint8)

Invalidate removes locally-cached records for key, optionally filtered by recType (0 = all types). Called internally when a connection attempt to the peer fails, ensuring the next Resolve goes to the network for fresh data.

func (*Store) Len

func (s *Store) Len() int

Len is the number of distinct key buckets with at least one record.

func (*Store) Put

func (s *Store) Put(key a2al.NodeID, rec protocol.SignedRecord, now time.Time) error

Put stores rec at key. Zero key derives NodeID(rec.Address).

func (*Store) SetMaxKeys added in v0.1.7

func (s *Store) SetMaxKeys(n int)

SetMaxKeys updates the maximum number of distinct DHT keys stored in-place. Safe to call concurrently; takes effect on the next eviction cycle.

type StoreRecordDebug

type StoreRecordDebug struct {
	KeyNodeIDHex string `json:"key_node_id_hex"`
	AddressHex   string `json:"address_hex"`
	RecType      uint8  `json:"rec_type"`
	Seq          uint64 `json:"seq"`
	Timestamp    uint64 `json:"timestamp"`
	TTL          uint32 `json:"ttl_seconds"`
	PayloadLen   int    `json:"payload_cbor_len"`
}

StoreRecordDebug is a JSON-friendly store row (spec §3.6).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL