dht

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: MPL-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultAlpha = 3

	// DefaultStagger is the inter-launch delay between successive RPCs within
	// a single query batch (spec §3.2 "交错发射").  A peer that responds
	// within this window prevents the next candidate from being contacted,
	// saving bandwidth and avoiding unnecessary pressure on slow peers.
	// Phase 1 fixed value; future versions derive from RTT median × 0.5.
	DefaultStagger = 200 * time.Millisecond
)
View Source
const DebugHTTPAddr = "127.0.0.1:2634"

DebugHTTPAddr is a suggested address for StartDebugHTTP when embedding the dht package directly (without a2ald). The daemon serves /debug/* on its own API port.

View Source
const (
	DefaultMaxTotalKeys = 100_000
)

Variables

View Source
var ErrNoEndpoint = errors.New("dht: no endpoint record")

ErrNoEndpoint is returned when iterative FIND_VALUE does not yield a valid endpoint record.

View Source
var ErrNoMatchingRecords = errors.New("dht: no matching records")

ErrNoMatchingRecords is returned when FindRecords finds nothing for the filter.

View Source
var ErrStaleRecord = errors.New("dht: stale record")

ErrStaleRecord means an equal or older record already exists for the same slot.

Functions

func ObservedAddr

func ObservedAddr(from net.Addr) []byte

ObservedAddr encodes remote IP:port for PONG / FIND_*_RESP (spec §7.6).

Types

type BootstrapSeed

type BootstrapSeed struct {
	Addr net.Addr
	Info protocol.NodeInfo
}

BootstrapSeed is a known dial address plus wire NodeInfo (legacy; prefer BootstrapAddrs).

type Config

type Config struct {
	Transport transport.Transport
	Keystore  crypto.KeyStore
	// OnObservedAddr is called whenever a DHT response carries an observed_addr
	// (PONG, FIND_NODE_RESP, FIND_VALUE_RESP). reporter is the responding node's
	// NodeID; wire is the raw observed_addr bytes (6 or 18 bytes).
	// May be nil.
	OnObservedAddr func(reporter a2al.NodeID, wire []byte)
	// RecordAuth is an optional authority policy called by Store.Put after
	// signature/expiry verification passes (Phase 4: includes DHT key).
	// If nil, no authority check is performed (useful in tests).
	RecordAuth RecordAuthFunc
	// MaxStoreKeys limits the number of distinct DHT keys in the local store.
	// 0 uses DefaultMaxTotalKeys. Configurable per-node soft limit.
	MaxStoreKeys int
	// Logger is used for DHT-level diagnostics (send failures, RPC retries).
	// If nil, slog.Default() is used.
	Logger *slog.Logger
	// SeenPeersPath is the file path for persisting the seenPeers sliding-window
	// table across restarts (spec §7.3). Empty disables persistence (default in
	// tests). The file is written with mode 0600.
	SeenPeersPath string
}

Config holds runtime dependencies for a DHT node (spec Step 7).

type DebugStats added in v0.1.4

type DebugStats struct {
	RxPackets             uint64 `json:"rx_packets_verified"`
	TxPackets             uint64 `json:"tx_packets"`
	RPCOK                 uint64 `json:"rpc_completed"`
	TotalPeers            int    `json:"total_peers"`
	Reach1h               int    `json:"reach_1h"`
	Reach24h              int    `json:"reach_24h"`
	Reach7d               int    `json:"reach_7d"`
	EstimatedNetworkSize  int    `json:"estimated_network_size"`
	UniqueNodesSinceStart uint64 `json:"unique_nodes_since_start"`
}

DebugStats is the DHT portion of GET /debug/stats (spec §3.6, §7).

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is a single DHT participant (routing + local store + wire handler).

func NewNode

func NewNode(cfg Config) (*Node, error)

NewNode builds a node; keystore must list exactly one address (Phase 1).

func (*Node) AddContact

func (n *Node) AddContact(addr net.Addr, ni protocol.NodeInfo)

AddContact pins a peer's dial address and seeds the routing table.

func (*Node) Address

func (n *Node) Address() a2al.Address

Address returns the agent address.

func (*Node) BindPeerAddr

func (n *Node) BindPeerAddr(id a2al.NodeID, addr net.Addr)

BindPeerAddr registers the transport dial address for a remote NodeID (e.g. MemTransport name lookup in tests).

func (*Node) Bootstrap

func (n *Node) Bootstrap(ctx context.Context, seeds []BootstrapSeed) error

Bootstrap registers seeds with pre-known identity and runs FIND_NODE(self). For seeds where only ip:port is known, use BootstrapAddrs instead.

func (*Node) BootstrapAddrs

func (n *Node) BootstrapAddrs(ctx context.Context, addrs []net.Addr) error

BootstrapAddrs connects to seed nodes by raw network addresses (ip:port only). For each address it sends PING, extracts the peer's identity from the PONG, registers the peer, then runs FIND_NODE(self) to widen the routing table. This is the recommended bootstrap entry point — callers do not need to know the seed's Address or NodeID in advance.

func (*Node) BootstrapCandidateAddrs

func (n *Node) BootstrapCandidateAddrs(max int) []net.Addr

BootstrapCandidateAddrs returns up to max UDP addresses for cold-start bootstrap (routing table + remembered peer addrs). Best-effort for persisting peers.cache.

func (*Node) Close

func (n *Node) Close() error

Close stops the node, closes the transport, and waits for the receive loop to exit.

func (*Node) DebugHTTPHandler

func (n *Node) DebugHTTPHandler() http.Handler

DebugHTTPHandler returns read-only /debug/* handlers for mounting on an existing server (spec §3.6).

func (*Node) DebugStatsData added in v0.1.4

func (n *Node) DebugStatsData() DebugStats

DebugStatsData returns a snapshot for embedding in host-level /debug/stats.

func (*Node) FindNode

func (n *Node) FindNode(ctx context.Context, peer net.Addr, target a2al.NodeID) ([]protocol.NodeInfo, error)

FindNode asks peer for closest nodes to target NodeID.

func (*Node) FindValue

func (n *Node) FindValue(ctx context.Context, peer net.Addr, key a2al.NodeID) (*protocol.SignedRecord, error)

FindValue queries peer for the best endpoint record at key NodeID (legacy helper).

func (*Node) FindValueWithNodes

func (n *Node) FindValueWithNodes(ctx context.Context, peer net.Addr, key a2al.NodeID, recType uint8) ([]protocol.SignedRecord, []protocol.NodeInfo, error)

FindValueWithNodes queries peer. recType 0 requests all record types in the response.

func (*Node) LocalAddr

func (n *Node) LocalAddr() net.Addr

LocalAddr returns the underlying transport address.

func (*Node) NodeID

func (n *Node) NodeID() a2al.NodeID

NodeID returns the DHT key for this node.

func (*Node) PeerHealthOf added in v0.1.3

func (n *Node) PeerHealthOf(id a2al.NodeID) PeerHealthState

PeerHealthOf returns the observed health state for the given peer.

func (*Node) Ping

func (n *Node) Ping(ctx context.Context, peer net.Addr) error

Ping sends PING and waits for PONG. Start() must be running.

func (*Node) PingIdentity

func (n *Node) PingIdentity(ctx context.Context, peer net.Addr) (*PeerIdentity, error)

PingIdentity sends PING, waits for PONG, and returns the remote peer's identity extracted from the response. The peer is automatically registered into the routing table and dial address map.

func (*Node) PublishEndpointRecord

func (n *Node) PublishEndpointRecord(ctx context.Context, rec protocol.SignedRecord) error

PublishEndpointRecord stores the record locally and immediately returns (过程一). Replication to remote peers is handled asynchronously by scheduleReplicate → renewBackground (FindNode + staggered StoreAt).

func (*Node) PublishMailboxRecord

func (n *Node) PublishMailboxRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error

PublishMailboxRecord stores the mailbox record at storeKey (recipient NodeID) locally and replicates to k-closest reachable peers asynchronously.

func (*Node) PublishTopicRecord

func (n *Node) PublishTopicRecord(ctx context.Context, storeKey a2al.NodeID, rec protocol.SignedRecord) error

PublishTopicRecord stores the topic record at storeKey (SHA-256("topic:"+topic)) locally and replicates asynchronously.

func (*Node) RemoveRepSetsForPublisher added in v0.1.3

func (n *Node) RemoveRepSetsForPublisher(publisher a2al.NodeID)

RemoveRepSetsForPublisher removes all repSets whose publisher matches the given NodeID. Call when an agent is deleted so background probes and refill tasks cease.

func (*Node) SendNATProbeReq added in v0.1.4

func (n *Node) SendNATProbeReq(ctx context.Context, probeAddr net.Addr, claimedAddr []byte) (bool, error)

SendNATProbeReq asks probeAddr to send a NATProbeEcho to claimedAddr. claimedAddr is the wire-encoded public UDP address (6 or 18 bytes). Returns true if an echo arrived within the context deadline, nil error on timeout.

func (*Node) SetSelfExtIP

func (n *Node) SetSelfExtIP(ip net.IP)

SetSelfExtIP records our own public IP (from STUN/HTTP probe). Used to detect NAT hairpin peers: nodes behind the same NAT share the same public IP and typically cannot reach each other via that IP (router hairpinning not supported).

func (*Node) Start

func (n *Node) Start()

Start begins the inbound packet loop and background maintenance workers.

func (*Node) StartDebugHTTP

func (n *Node) StartDebugHTTP(addr string) (stop func(), err error)

StartDebugHTTP listens on addr and serves read-only /debug/* JSON (spec §3.6). When using the a2ald daemon the /debug/* routes are served on the API port (default 127.0.0.1:2121) and this method is not needed. Use it only when embedding the dht package directly without the daemon. stop shuts the server down (idempotent).

func (*Node) StartWithBootstrap

func (n *Node) StartWithBootstrap(ctx context.Context, addrs []net.Addr) error

StartWithBootstrap starts the receive loop then bootstraps with raw addresses.

func (*Node) StoreAt

func (n *Node) StoreAt(ctx context.Context, peer net.Addr, storeKey a2al.NodeID, rec protocol.SignedRecord) (bool, error)

StoreAt sends STORE to peer. storeKey zero omits BodyStore.Key (receiver derives key from rec.Address).

type PeerHealthState added in v0.1.3

type PeerHealthState uint8

PeerHealthState classifies a peer's reachability based on observed RPC outcomes.

const (
	PeerHealthUnknown PeerHealthState = iota // no RPC history yet
	PeerHealthGood                           // last RPC succeeded, failCount == 0
	PeerHealthBad                            // consecutive failures >= badHealthThreshold
)

type PeerIdentity

type PeerIdentity struct {
	Address      a2al.Address
	NodeID       a2al.NodeID
	ObservedWire []byte // BodyPong.observed_addr (how reporter sees us); may be nil
}

PeerIdentity holds the identity extracted from a PONG response.

type Query

type Query struct {
	Alpha   int
	Stagger time.Duration
	// contains filtered or unexported fields
}

Query runs iterative FIND_NODE / FIND_VALUE (spec Step 8).

func NewQuery

func NewQuery(n *Node) *Query

NewQuery builds a querier with default α=3 and small stagger between parallel RPCs.

func (*Query) AggregateRecords

func (q *Query) AggregateRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)

AggregateRecords queries until the k-closest set is exhausted, merges and deduplicates (Phase 4 Topic/Mailbox). If the local store already holds valid records the network query is skipped — this fast-path is correct for the common case where the querying node is itself a publisher or a replication target (same a2ald, small network). Bad peers are skipped (with fallback) and a per-batch timeout prevents slow stragglers from stalling the whole traversal.

func (*Query) FindNode

func (q *Query) FindNode(ctx context.Context, target a2al.NodeID) ([]protocol.NodeInfo, error)

FindNode runs iterative FIND_NODE until k-closest known nodes are exhausted or all have been queried. Candidates are seeded from tabNearestHealthy (Good peers first). Within each batch, peers known to be Bad are skipped (tried last as fallback). The collection loop returns as soon as the first peer responds, allowing the next round to start without waiting for slow/unreachable peers.

func (*Query) FindRecords

func (q *Query) FindRecords(ctx context.Context, target a2al.NodeID, recType uint8) ([]protocol.SignedRecord, error)

FindRecords runs iterative FIND_VALUE (recType 0 = all). Returns on first batch that yields matching records after auth.

func (*Query) Resolve

func (q *Query) Resolve(ctx context.Context, target a2al.NodeID) (*protocol.EndpointRecord, error)

Resolve runs iterative FIND_VALUE for target NodeID and returns a verified endpoint record.

type RecordAuthFunc

type RecordAuthFunc func(key a2al.NodeID, rec protocol.SignedRecord, now time.Time) error

RecordAuthFunc decides whether a record may be stored at the given DHT key after signature and expiry checks (Phase 4: includes key binding for sovereign).

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is an in-memory record map keyed by DHT NodeID (Phase 4 multi-category).

func NewStore

func NewStore(auth RecordAuthFunc, maxKeys int) *Store

NewStore creates an empty Store. auth is optional (see Config.RecordAuth). maxKeys limits distinct key count; 0 uses DefaultMaxTotalKeys.

func (*Store) DebugRecords

func (s *Store) DebugRecords(now time.Time) []StoreRecordDebug

DebugRecords lists non-expired verified records (spec §3.6).

func (*Store) Get

func (s *Store) Get(key a2al.NodeID, now time.Time) *protocol.SignedRecord

Get returns the newest valid endpoint record (RecTypeEndpoint) at key, or nil.

func (*Store) GetAll

func (s *Store) GetAll(key a2al.NodeID, recType uint8, now time.Time) []protocol.SignedRecord

GetAll returns verified non-expired records at key, optionally filtered by RecType (0 = all).

func (*Store) Len

func (s *Store) Len() int

Len is the number of distinct key buckets with at least one record.

func (*Store) Put

func (s *Store) Put(key a2al.NodeID, rec protocol.SignedRecord, now time.Time) error

Put stores rec at key. Zero key derives NodeID(rec.Address).

type StoreRecordDebug

type StoreRecordDebug struct {
	KeyNodeIDHex string `json:"key_node_id_hex"`
	AddressHex   string `json:"address_hex"`
	RecType      uint8  `json:"rec_type"`
	Seq          uint64 `json:"seq"`
	Timestamp    uint64 `json:"timestamp"`
	TTL          uint32 `json:"ttl_seconds"`
	PayloadLen   int    `json:"payload_cbor_len"`
}

StoreRecordDebug is a JSON-friendly store row (spec §3.6).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL