Documentation
¶
Overview ¶
Package directory implements the registry's node directory: registration, lookup, resolve, deregister, heartbeat, list-nodes, hostname/tag/visibility management, and the stale-node reaper. It is extracted from pkg/registry/server as part of the R4.2 registry decomposition.
Thread safety: all exported Handler* methods are safe for concurrent use. Locking is delegated to the server through mu (shared with Server) and callback functions that acquire whatever locks are needed internally.
Index ¶
- Constants
- func ValidateHostname(name string) error
- type Callbacks
- type KeyInfo
- type ListNodesCacheState
- type NetworkMemberView
- type NodeInfo
- type NodeShards
- type Store
- func (st *Store) AdminListNodesCached() ([]byte, error)
- func (st *Store) HandleBinaryHeartbeat(conn net.Conn, payload []byte)
- func (st *Store) HandleBinaryLookup(conn net.Conn, payload []byte)
- func (st *Store) HandleBinaryResolve(conn net.Conn, payload []byte)
- func (st *Store) HandleDeregister(msg map[string]interface{}) (map[string]interface{}, error)
- func (st *Store) HandleHeartbeat(msg map[string]interface{}) (map[string]interface{}, error)
- func (st *Store) HandleListNodes(msg map[string]interface{}, ...) (map[string]interface{}, error)
- func (st *Store) HandleLookup(msg map[string]interface{}) (map[string]interface{}, error)
- func (st *Store) HandleReRegister(pubKeyB64, listenAddr, owner, hostname string, lanAddrs []string, ...) (map[string]interface{}, error)
- func (st *Store) HandleRegister(msg map[string]interface{}, remoteAddr string, ...) (map[string]interface{}, error)
- func (st *Store) HandleResolve(msg map[string]interface{}) (map[string]interface{}, error)
- func (st *Store) HandleResolveHostname(msg map[string]interface{}) (map[string]interface{}, error)
- func (st *Store) HandleSetHostname(msg map[string]interface{}) (map[string]interface{}, error)
- func (st *Store) HandleSetTags(msg map[string]interface{}) (map[string]interface{}, error)
- func (st *Store) HandleSetVisibility(msg map[string]interface{}) (map[string]interface{}, error)
- func (st *Store) InvalidateAdminListNodesCache()
- func (st *Store) InvalidateListNodesCacheForNetwork(netID uint16)
- func (st *Store) PerNetworkListNodesCached(netID uint16) ([]byte, error)
- func (st *Store) ReapStaleNodes(threshold time.Time)
- func (st *Store) ReplaceState(nodes map[uint32]*NodeInfo, pubKeyIdx map[string]uint32, ...)
Constants ¶
const ( // NumNodeShards must match the value in server.go. NumNodeShards = 256 // ReapChunkSize is how many nodes to process per reap tick. ReapChunkSize = 10000 // AdminListNodesTTL is how long an admin list_nodes response is cached. AdminListNodesTTL = 1 * time.Second // RawResponseKey is the sentinel map key for pre-marshalled responses. RawResponseKey = "_pilot_raw_body" )
Variables ¶
This section is empty.
Functions ¶
func ValidateHostname ¶
ValidateHostname checks that a hostname is valid for registration.
Types ¶
type Callbacks ¶
type Callbacks struct {
// Save triggers a debounced snapshot write.
Save func()
// Audit records an audit log entry.
Audit func(action string, attrs ...any)
// RecordWALRegister records a new-node WAL entry.
RecordWALRegister func(nodeID uint32, owner, pubKeyB64, realAddr, hostname string, lanAddrs []string, version, createdAt string)
// RecordWALDeregister records a deregistration WAL entry.
RecordWALDeregister func(nodeID uint32)
// InvalidateListNodesCache drops the per-network list_nodes cache.
InvalidateListNodesCache func(netID uint16)
// InvalidateAdminListNodesCache drops the backbone admin list_nodes cache.
InvalidateAdminListNodesCache func()
// PublishMembershipChanged publishes a membership.changed bus event.
PublishMembershipChanged func(netID uint16)
// RemoveFromNetworks removes a node from all its networks in the server's
// network map. The nodeID and its current Networks slice are passed.
// Caller holds mu.Lock. Returns the networks that lost an owner.
RemoveFromNetworks func(nodeID uint32, networks []uint16) (lostOwnerNets []uint16)
// ClearInviteInbox removes all pending invites for the given node.
ClearInviteInbox func(nodeID uint32)
// RequireAdminToken validates the admin_token field in a message.
RequireAdminToken func(msg map[string]interface{}) error
// RequireAdminTokenLocked is like RequireAdminToken but mu is already held.
RequireAdminTokenLocked func(msg map[string]interface{}) error
// AdminToken returns the current admin token.
AdminToken func() string
// VerifyNodeSignature verifies an Ed25519 (or admin-token) signature.
VerifyNodeSignature func(pubKey []byte, adminToken string, msg map[string]interface{}, challenge string) error
// IsTrusted returns true if nodeA and nodeB have a mutual trust pair.
IsTrusted func(nodeA, nodeB uint32) bool
// BeaconAddr returns the current beacon address.
BeaconAddr func() string
// IncRegistrations increments the registration counter.
IncRegistrations func()
// IncDeregistrations increments the deregistration counter.
IncDeregistrations func()
// IncRequestsTotal increments the named request counter.
IncRequestsTotal func(label string)
// IncErrorsTotal increments the named error counter.
IncErrorsTotal func(label string)
// ObserveRequestDuration records a request duration histogram sample.
ObserveRequestDuration func(label string, seconds float64)
// Now returns the current time (injectable for tests).
Now func() time.Time
// AddNodeToBackbone adds a node to the backbone (network 0) Members list.
// Called when a new node is registered or a reaped node is reclaimed.
// Caller holds mu.Lock.
AddNodeToBackbone func(nodeID uint32)
// ScanNetworkMemberships returns the set of non-backbone networkIDs that
// still list nodeID as a member. Used to restore network memberships after
// a reaped node reclaims its old identity. Caller holds mu.Lock.
ScanNetworkMemberships func(nodeID uint32) []uint16
}
Callbacks collects the cross-package effects that directory handlers need to trigger without importing the full server package.
type KeyInfo ¶
type KeyInfo struct {
CreatedAt time.Time `json:"created_at"`
RotatedAt time.Time `json:"rotated_at,omitempty"`
RotateCount int `json:"rotate_count"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
}
KeyInfo holds key lifecycle metadata for a node.
type ListNodesCacheState ¶
type ListNodesCacheState struct {
Mu sync.Mutex
FullBody []byte
BuiltAt time.Time
Building bool
Cond *sync.Cond
LastBuildErr error
CacheHits uint64
CacheWaits uint64
CacheRebuilds uint64
}
ListNodesCacheState is the per-network singleflight + TTL cache entry for list_nodes responses.
type NetworkMemberView ¶
type NetworkMemberView struct {
Members []uint32
MemberRoles map[uint32]string
MemberTags map[uint32][]string
}
NetworkMemberView carries the per-node membership data needed for list_nodes.
type NodeInfo ¶
type NodeInfo struct {
ID uint32
Owner string
PublicKey []byte
RealAddr string
Networks []uint16
LastSeen time.Time
Public bool
Hostname string
Tags []string
TaskExec bool
LANAddrs []string
KeyMeta KeyInfo
ExternalID string
Version string
RelayOnly bool
// LastSeenNano stores time.UnixNano() for lock-free heartbeat updates.
LastSeenNano atomic.Int64
// LastVerifiedNano stores the last Ed25519 verify time. Used by verify-skip.
LastVerifiedNano atomic.Int64
}
NodeInfo holds the in-memory state for a single registered node.
func (*NodeInfo) GetLastSeen ¶
GetLastSeen returns the most recent last-seen time, preferring the atomic value.
func (*NodeInfo) SetLastSeen ¶
SetLastSeen updates both the struct field and the atomic value. Caller must hold s.mu.Lock().
type NodeShards ¶
type NodeShards = [NumNodeShards]sync.RWMutex
NodeShards is the fixed-size per-node striped mutex array.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store holds the node directory handler logic. The actual data maps (nodes, indices, etc.) are owned by the parent server and shared via pointer.
func NewStore ¶
func NewStore( mu *sync.RWMutex, nodeShards *NodeShards, nodes map[uint32]*NodeInfo, pubKeyIdx map[string]uint32, ownerIdx map[string]uint32, hostnameIdx map[string]uint32, nextNode *uint32, maxNodes *int, reapCursor *uint32, listNodesCache *ListNodesCacheState, listNodesPerNetMu *sync.Mutex, listNodesPerNet *map[uint16]*ListNodesCacheState, getNetworkView func(netID uint16) (NetworkMemberView, bool), cb Callbacks, ) *Store
NewStore creates a ready-to-use directory Store. All pointer parameters are shared with the parent server and must not be nil.
func (*Store) AdminListNodesCached ¶
AdminListNodesCached returns the FULL pre-marshalled admin list_nodes response.
func (*Store) HandleBinaryHeartbeat ¶
HandleBinaryHeartbeat processes a native binary heartbeat request.
func (*Store) HandleBinaryLookup ¶
HandleBinaryLookup processes a native binary lookup request.
func (*Store) HandleBinaryResolve ¶
HandleBinaryResolve processes a native binary resolve request.
func (*Store) HandleDeregister ¶
HandleDeregister handles a deregister message. Cascade: removes from all networks (via callback), clears invite inbox, invalidates caches, records WAL, signals save.
func (*Store) HandleHeartbeat ¶
HandleHeartbeat handles a JSON heartbeat message.
func (*Store) HandleListNodes ¶
func (st *Store) HandleListNodes(msg map[string]interface{}, requireAdminToken func(msg map[string]interface{}) error) (map[string]interface{}, error)
HandleListNodes handles a list_nodes message.
func (*Store) HandleLookup ¶
HandleLookup handles a lookup message.
func (*Store) HandleReRegister ¶
func (st *Store) HandleReRegister(pubKeyB64, listenAddr, owner, hostname string, lanAddrs []string, version string, relayOnly bool) (map[string]interface{}, error)
HandleReRegister handles a node presenting an existing public key. Fast path for known-key reconnects; slow path for new nodes and index mutations.
func (*Store) HandleRegister ¶
func (st *Store) HandleRegister( msg map[string]interface{}, remoteAddr string, verifyToken func(token string) (externalID string, err error), setExternalID func(nodeID uint32, externalID string), ) (map[string]interface{}, error)
HandleRegister processes a register message.
func (*Store) HandleResolve ¶
HandleResolve handles a resolve message.
func (*Store) HandleResolveHostname ¶
HandleResolveHostname resolves a node by its hostname.
func (*Store) HandleSetHostname ¶
HandleSetHostname handles a set_hostname message.
func (*Store) HandleSetTags ¶
HandleSetTags handles a set_tags message.
func (*Store) HandleSetVisibility ¶
HandleSetVisibility handles a set_visibility message.
func (*Store) InvalidateAdminListNodesCache ¶
func (st *Store) InvalidateAdminListNodesCache()
InvalidateAdminListNodesCache forces the next AdminListNodesCached() to rebuild.
func (*Store) InvalidateListNodesCacheForNetwork ¶
InvalidateListNodesCacheForNetwork drops the cached response for a network.
func (*Store) PerNetworkListNodesCached ¶
PerNetworkListNodesCached returns the FULL pre-marshalled list_nodes response for a per-network request using singleflight + 1s TTL semantics.
func (*Store) ReapStaleNodes ¶
ReapStaleNodes removes nodes whose last heartbeat is older than threshold.
func (*Store) ReplaceState ¶
func (st *Store) ReplaceState( nodes map[uint32]*NodeInfo, pubKeyIdx map[string]uint32, ownerIdx map[string]uint32, hostnameIdx map[string]uint32, )
ReplaceState atomically swaps the directory's node maps and indices with the provided replacement values. Called by the server after a replication snapshot is applied so the directory's pointers don't go stale. Must be called with s.mu held (Lock).