directory

package
v1.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: AGPL-3.0 Imports: 13 Imported by: 0

Documentation

Overview

Package directory implements the registry's node directory: registration, lookup, resolve, deregister, heartbeat, list-nodes, hostname/tag/visibility management, and the stale-node reaper. It is extracted from pkg/registry/server as part of the R4.2 registry decomposition.

Thread safety: all exported Handler* methods are safe for concurrent use. Locking is delegated to the server through mu (shared with Server) and callback functions that acquire whatever locks are needed internally.

Index

Constants

View Source
const (
	// NumNodeShards must match the value in server.go.
	NumNodeShards = 256

	// ReapChunkSize is how many nodes to process per reap tick.
	ReapChunkSize = 10000

	// AdminListNodesTTL is how long an admin list_nodes response is cached.
	AdminListNodesTTL = 1 * time.Second

	// RawResponseKey is the sentinel map key for pre-marshalled responses.
	RawResponseKey = "_pilot_raw_body"
)

Variables

This section is empty.

Functions

func ValidateHostname

func ValidateHostname(name string) error

ValidateHostname checks that a hostname is valid for registration.

Types

type Callbacks

type Callbacks struct {
	// Save triggers a debounced snapshot write.
	Save func()
	// Audit records an audit log entry.
	Audit func(action string, attrs ...any)
	// RecordWALRegister records a new-node WAL entry.
	RecordWALRegister func(nodeID uint32, owner, pubKeyB64, realAddr, hostname string, lanAddrs []string, version, createdAt string)
	// RecordWALDeregister records a deregistration WAL entry.
	RecordWALDeregister func(nodeID uint32)
	// InvalidateListNodesCache drops the per-network list_nodes cache.
	InvalidateListNodesCache func(netID uint16)
	// InvalidateAdminListNodesCache drops the backbone admin list_nodes cache.
	InvalidateAdminListNodesCache func()
	// PublishMembershipChanged publishes a membership.changed bus event.
	PublishMembershipChanged func(netID uint16)
	// RemoveFromNetworks removes a node from all its networks in the server's
	// network map. The nodeID and its current Networks slice are passed.
	// Caller holds mu.Lock. Returns the networks that lost an owner.
	RemoveFromNetworks func(nodeID uint32, networks []uint16) (lostOwnerNets []uint16)
	// ClearInviteInbox removes all pending invites for the given node.
	ClearInviteInbox func(nodeID uint32)
	// RequireAdminToken validates the admin_token field in a message.
	RequireAdminToken func(msg map[string]interface{}) error
	// RequireAdminTokenLocked is like RequireAdminToken but mu is already held.
	RequireAdminTokenLocked func(msg map[string]interface{}) error
	// AdminToken returns the current admin token.
	AdminToken func() string
	// VerifyNodeSignature verifies an Ed25519 (or admin-token) signature.
	VerifyNodeSignature func(pubKey []byte, adminToken string, msg map[string]interface{}, challenge string) error
	// IsTrusted returns true if nodeA and nodeB have a mutual trust pair.
	IsTrusted func(nodeA, nodeB uint32) bool
	// BeaconAddr returns the current beacon address.
	BeaconAddr func() string
	// IncRegistrations increments the registration counter.
	IncRegistrations func()
	// IncDeregistrations increments the deregistration counter.
	IncDeregistrations func()
	// IncRequestsTotal increments the named request counter.
	IncRequestsTotal func(label string)
	// IncErrorsTotal increments the named error counter.
	IncErrorsTotal func(label string)
	// ObserveRequestDuration records a request duration histogram sample.
	ObserveRequestDuration func(label string, seconds float64)
	// Now returns the current time (injectable for tests).
	Now func() time.Time
	// AddNodeToBackbone adds a node to the backbone (network 0) Members list.
	// Called when a new node is registered or a reaped node is reclaimed.
	// Caller holds mu.Lock.
	AddNodeToBackbone func(nodeID uint32)
	// ScanNetworkMemberships returns the set of non-backbone networkIDs that
	// still list nodeID as a member. Used to restore network memberships after
	// a reaped node reclaims its old identity. Caller holds mu.Lock.
	ScanNetworkMemberships func(nodeID uint32) []uint16
}

Callbacks collects the cross-package effects that directory handlers need to trigger without importing the full server package.

type KeyInfo

type KeyInfo struct {
	CreatedAt   time.Time `json:"created_at"`
	RotatedAt   time.Time `json:"rotated_at,omitempty"`
	RotateCount int       `json:"rotate_count"`
	ExpiresAt   time.Time `json:"expires_at,omitempty"`
}

KeyInfo holds key lifecycle metadata for a node.

type ListNodesCacheState

type ListNodesCacheState struct {
	Mu            sync.Mutex
	FullBody      []byte
	BuiltAt       time.Time
	Building      bool
	Cond          *sync.Cond
	LastBuildErr  error
	CacheHits     uint64
	CacheWaits    uint64
	CacheRebuilds uint64
}

ListNodesCacheState is the per-network singleflight + TTL cache entry for list_nodes responses.

type NetworkMemberView

type NetworkMemberView struct {
	Members     []uint32
	MemberRoles map[uint32]string
	MemberTags  map[uint32][]string
}

NetworkMemberView carries the per-node membership data needed for list_nodes.

type NodeInfo

type NodeInfo struct {
	ID         uint32
	Owner      string
	PublicKey  []byte
	RealAddr   string
	Networks   []uint16
	LastSeen   time.Time
	Public     bool
	Hostname   string
	Tags       []string
	TaskExec   bool
	LANAddrs   []string
	KeyMeta    KeyInfo
	ExternalID string
	Version    string
	RelayOnly  bool

	// LastSeenNano stores time.UnixNano() for lock-free heartbeat updates.
	LastSeenNano atomic.Int64
	// LastVerifiedNano stores the last Ed25519 verify time. Used by verify-skip.
	LastVerifiedNano atomic.Int64
}

NodeInfo holds the in-memory state for a single registered node.

func (*NodeInfo) GetLastSeen

func (n *NodeInfo) GetLastSeen() time.Time

GetLastSeen returns the most recent last-seen time, preferring the atomic value.

func (*NodeInfo) SetLastSeen

func (n *NodeInfo) SetLastSeen(t time.Time)

SetLastSeen updates both the struct field and the atomic value. Caller must hold s.mu.Lock().

type NodeShards

type NodeShards = [NumNodeShards]sync.RWMutex

NodeShards is the fixed-size per-node striped mutex array.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store holds the node directory handler logic. The actual data maps (nodes, indices, etc.) are owned by the parent server and shared via pointer.

func NewStore

func NewStore(
	mu *sync.RWMutex,
	nodeShards *NodeShards,
	nodes map[uint32]*NodeInfo,
	pubKeyIdx map[string]uint32,
	ownerIdx map[string]uint32,
	hostnameIdx map[string]uint32,
	nextNode *uint32,
	maxNodes *int,
	reapCursor *uint32,
	listNodesCache *ListNodesCacheState,
	listNodesPerNetMu *sync.Mutex,
	listNodesPerNet *map[uint16]*ListNodesCacheState,
	getNetworkView func(netID uint16) (NetworkMemberView, bool),
	cb Callbacks,
) *Store

NewStore creates a ready-to-use directory Store. All pointer parameters are shared with the parent server and must not be nil.

func (*Store) AdminListNodesCached

func (st *Store) AdminListNodesCached() ([]byte, error)

AdminListNodesCached returns the FULL pre-marshalled admin list_nodes response.

func (*Store) HandleBinaryHeartbeat

func (st *Store) HandleBinaryHeartbeat(conn net.Conn, payload []byte)

HandleBinaryHeartbeat processes a native binary heartbeat request.

func (*Store) HandleBinaryLookup

func (st *Store) HandleBinaryLookup(conn net.Conn, payload []byte)

HandleBinaryLookup processes a native binary lookup request.

func (*Store) HandleBinaryResolve

func (st *Store) HandleBinaryResolve(conn net.Conn, payload []byte)

HandleBinaryResolve processes a native binary resolve request.

func (*Store) HandleDeregister

func (st *Store) HandleDeregister(msg map[string]interface{}) (map[string]interface{}, error)

HandleDeregister handles a deregister message. Cascade: removes from all networks (via callback), clears invite inbox, invalidates caches, records WAL, signals save.

func (*Store) HandleHeartbeat

func (st *Store) HandleHeartbeat(msg map[string]interface{}) (map[string]interface{}, error)

HandleHeartbeat handles a JSON heartbeat message.

func (*Store) HandleListNodes

func (st *Store) HandleListNodes(msg map[string]interface{}, requireAdminToken func(msg map[string]interface{}) error) (map[string]interface{}, error)

HandleListNodes handles a list_nodes message.

func (*Store) HandleLookup

func (st *Store) HandleLookup(msg map[string]interface{}) (map[string]interface{}, error)

HandleLookup handles a lookup message.

func (*Store) HandleReRegister

func (st *Store) HandleReRegister(pubKeyB64, listenAddr, owner, hostname string, lanAddrs []string, version string, relayOnly bool) (map[string]interface{}, error)

HandleReRegister handles a node presenting an existing public key. Fast path for known-key reconnects; slow path for new nodes and index mutations.

func (*Store) HandleRegister

func (st *Store) HandleRegister(
	msg map[string]interface{},
	remoteAddr string,
	verifyToken func(token string) (externalID string, err error),
	setExternalID func(nodeID uint32, externalID string),
) (map[string]interface{}, error)

HandleRegister processes a register message.

func (*Store) HandleResolve

func (st *Store) HandleResolve(msg map[string]interface{}) (map[string]interface{}, error)

HandleResolve handles a resolve message.

func (*Store) HandleResolveHostname

func (st *Store) HandleResolveHostname(msg map[string]interface{}) (map[string]interface{}, error)

HandleResolveHostname resolves a node by its hostname.

func (*Store) HandleSetHostname

func (st *Store) HandleSetHostname(msg map[string]interface{}) (map[string]interface{}, error)

HandleSetHostname handles a set_hostname message.

func (*Store) HandleSetTags

func (st *Store) HandleSetTags(msg map[string]interface{}) (map[string]interface{}, error)

HandleSetTags handles a set_tags message.

func (*Store) HandleSetVisibility

func (st *Store) HandleSetVisibility(msg map[string]interface{}) (map[string]interface{}, error)

HandleSetVisibility handles a set_visibility message.

func (*Store) InvalidateAdminListNodesCache

func (st *Store) InvalidateAdminListNodesCache()

InvalidateAdminListNodesCache forces the next AdminListNodesCached() to rebuild.

func (*Store) InvalidateListNodesCacheForNetwork

func (st *Store) InvalidateListNodesCacheForNetwork(netID uint16)

InvalidateListNodesCacheForNetwork drops the cached response for a network.

func (*Store) PerNetworkListNodesCached

func (st *Store) PerNetworkListNodesCached(netID uint16) ([]byte, error)

PerNetworkListNodesCached returns the FULL pre-marshalled list_nodes response for a per-network request using singleflight + 1s TTL semantics.

func (*Store) ReapStaleNodes

func (st *Store) ReapStaleNodes(threshold time.Time)

ReapStaleNodes removes nodes whose last heartbeat is older than threshold.

func (*Store) ReplaceState

func (st *Store) ReplaceState(
	nodes map[uint32]*NodeInfo,
	pubKeyIdx map[string]uint32,
	ownerIdx map[string]uint32,
	hostnameIdx map[string]uint32,
)

ReplaceState atomically swaps the directory's node maps and indices with the provided replacement values. Called by the server after a replication snapshot is applied so the directory's pointers don't go stale. Must be called with s.mu held (Lock).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL