Documentation
¶
Index ¶
- Constants
- Variables
- func DiscoverEndpoint(beaconAddr string, nodeID uint32, conn *net.UDPConn) (*net.UDPAddr, error)
- func EncodeSACK(blocks []SACKBlock) []byte
- func RecoveredPanicCount() uint64
- func ValidateWebhookURL(rawURL string) error
- type Config
- type ConnReadWriter
- type ConnState
- type ConnStats
- type Connection
- func (c *Connection) BytesInFlight() int
- func (c *Connection) CloseRecvBuf()
- func (c *Connection) DeliverInOrder(seq uint32, data []byte) uint32
- func (c *Connection) EffectiveWindow() int
- func (c *Connection) ProcessAck(ack uint32, pureACK bool)
- func (c *Connection) ProcessSACK(blocks []SACKBlock)
- func (c *Connection) RecvWindow() uint16
- func (c *Connection) SACKBlocks() []SACKBlock
- func (c *Connection) TrackSend(seq uint32, data []byte)
- func (c *Connection) WindowAvailable() bool
- type ConnectionInfo
- type Daemon
- func (d *Daemon) AddTunnelPeer(nodeID uint32, addr *net.UDPAddr)
- func (d *Daemon) Addr() protocol.Addr
- func (d *Daemon) AdminToken() string
- func (d *Daemon) BroadcastDatagram(netID uint16, dstPort uint16, data []byte, adminToken string) error
- func (d *Daemon) Bus() *inProcessBus
- func (d *Daemon) CachedEndpoint(nodeID uint32) (string, bool)
- func (d *Daemon) CloseConnection(conn *Connection)
- func (d *Daemon) DialConnection(dstAddr protocol.Addr, dstPort uint16) (*Connection, error)
- func (d *Daemon) DialConnectionContext(ctx context.Context, dstAddr protocol.Addr, dstPort uint16) (*Connection, error)
- func (d *Daemon) GetManagedEngine(netID uint16) *ManagedEngine
- func (d *Daemon) GetMemberTags(netID uint16) []string
- func (d *Daemon) GetPolicyRunner(netID uint16) PolicyRunner
- func (d *Daemon) GetTrustChecker() TrustChecker
- func (d *Daemon) HandshakeRevokeTrust(nodeID uint32) error
- func (d *Daemon) HandshakeSendRequest(nodeID uint32, reason string) error
- func (d *Daemon) HandshakeService() HandshakeService
- func (d *Daemon) Identity() *crypto.Identity
- func (d *Daemon) IdentityPath() string
- func (d *Daemon) Info() *DaemonInfo
- func (d *Daemon) NewConnReadWriter(conn *Connection) ConnReadWriter
- func (d *Daemon) NodeID() uint32
- func (d *Daemon) Ports() *PortManager
- func (d *Daemon) PublishEvent(topic string, payload map[string]any)
- func (d *Daemon) RegConnListNodes(netID uint16, token string) (map[string]any, error)
- func (d *Daemon) RegisterHandshakeService(svc HandshakeService)
- func (d *Daemon) RegisterPolicyManager(pm PolicyManager)
- func (d *Daemon) RegisterTrustChecker(tc TrustChecker)
- func (d *Daemon) RegisterWebhookManager(wm WebhookManager)
- func (d *Daemon) RegistryClient() *registry.Client
- func (d *Daemon) RotateKey() (map[string]interface{}, error)
- func (d *Daemon) SendData(conn *Connection, data []byte) error
- func (d *Daemon) SendDatagram(dstAddr protocol.Addr, dstPort uint16, data []byte) error
- func (d *Daemon) SetMemberTags(netID uint16, tags []string)
- func (d *Daemon) SetWebhookURL(url string)
- func (d *Daemon) Start() error
- func (d *Daemon) StartManagedEngine(netID uint16, rules *registrywire.NetworkRules)
- func (d *Daemon) StartPolicyRunner(netID uint16, policyJSON json.RawMessage) error
- func (d *Daemon) Stop() error
- func (d *Daemon) StopManagedEngine(netID uint16)
- func (d *Daemon) StopPolicyRunner(netID uint16)
- func (d *Daemon) TrustAutoApprove() bool
- func (d *Daemon) TrustedPeers() []HandshakeTrustRecord
- func (d *Daemon) TunnelAddr() net.Addr
- func (d *Daemon) Tunnels() *TunnelManager
- type DaemonInfo
- type Event
- type HandshakePendingRecord
- type HandshakeService
- type HandshakeTrustRecord
- type IPCServer
- type IncomingPacket
- type Listener
- type ManagedEngine
- type NetworkMembership
- type PeerInfo
- type PolicyEventType
- type PolicyManager
- type PolicyRunner
- type PortManager
- func (pm *PortManager) AllConnections() []*Connection
- func (pm *PortManager) AllocEphemeralPort() uint16
- func (pm *PortManager) Bind(port uint16) (*Listener, error)
- func (pm *PortManager) ConnectionCountForPort(port uint16) int
- func (pm *PortManager) ConnectionList() []ConnectionInfo
- func (pm *PortManager) FindConnection(localPort uint16, remoteAddr protocol.Addr, remotePort uint16) *Connection
- func (pm *PortManager) GetConnection(id uint32) *Connection
- func (pm *PortManager) GetListener(port uint16) *Listener
- func (pm *PortManager) IdleConnections(maxIdle time.Duration) []*Connection
- func (pm *PortManager) NewConnection(localPort uint16, remoteAddr protocol.Addr, remotePort uint16) *Connection
- func (pm *PortManager) RemoveConnection(id uint32)
- func (pm *PortManager) StaleConnections(timeWaitDur time.Duration) []*Connection
- func (pm *PortManager) TotalActiveConnections() int
- func (pm *PortManager) Unbind(port uint16)
- type SACKBlock
- type TrustChecker
- type TunnelManager
- func (tm *TunnelManager) AddPeer(nodeID uint32, addr *net.UDPAddr)
- func (tm *TunnelManager) Close() error
- func (tm *TunnelManager) DropCrypto(peerNodeID uint32)
- func (tm *TunnelManager) EnableEncryption() error
- func (tm *TunnelManager) HasCrypto(nodeID uint32) bool
- func (tm *TunnelManager) HasPeer(nodeID uint32) bool
- func (tm *TunnelManager) IsEncrypted(nodeID uint32) bool
- func (tm *TunnelManager) IsRelayPeer(nodeID uint32) bool
- func (tm *TunnelManager) Listen(addr string) error
- func (tm *TunnelManager) LocalAddr() net.Addr
- func (tm *TunnelManager) PeerCount() int
- func (tm *TunnelManager) PeerList() []PeerInfo
- func (tm *TunnelManager) RecvCh() <-chan *IncomingPacket
- func (tm *TunnelManager) RegisterWithBeacon()
- func (tm *TunnelManager) RelayPeerIDs() []uint32
- func (tm *TunnelManager) RemovePeer(nodeID uint32)
- func (tm *TunnelManager) RequestHolePunch(targetNodeID uint32)
- func (tm *TunnelManager) Send(nodeID uint32, pkt *protocol.Packet) error
- func (tm *TunnelManager) SendDirectProbe(nodeID uint32, pkt *protocol.Packet) error
- func (tm *TunnelManager) SendTo(addr *net.UDPAddr, nodeID uint32, pkt *protocol.Packet) error
- func (tm *TunnelManager) SetBeaconAddr(addr string) error
- func (tm *TunnelManager) SetEventBus(bus *inProcessBus)
- func (tm *TunnelManager) SetIdentity(id *crypto.Identity)
- func (tm *TunnelManager) SetNodeID(id uint32)
- func (tm *TunnelManager) SetPeerVerifyFunc(fn func(uint32) (ed25519.PublicKey, error))
- func (tm *TunnelManager) SetRelayPeer(nodeID uint32, relay bool)
- func (tm *TunnelManager) SetRelayPeerPinned(nodeID uint32, relay bool)
- type WebhookManager
- type WebhookStats
Constants ¶
const ( PolicyEventConnect = "connect" PolicyEventDial = "dial" PolicyEventDatagram = "datagram" PolicyEventJoin = "join" PolicyEventLeave = "leave" PolicyEventCycle = "cycle" )
PolicyEvent* are the event-type constants daemon code passes into PolicyManager / PolicyRunner. Match coreapi.PolicyEvent* values since both are aliases of string.
const ( DefaultKeepaliveInterval = 60 * time.Second DefaultIdleTimeout = 120 * time.Second DefaultIdleSweepInterval = 15 * time.Second DefaultSYNRateLimit = 100 DefaultMaxConnectionsPerPort = 1024 DefaultMaxTotalConnections = 65536 DefaultTimeWaitDuration = 10 * time.Second )
Default tuning constants (used when Config fields are zero).
const ( DialDirectRetries = 3 // direct connection attempts before relay DialMaxRetries = 7 // total attempts (direct + relay). 3 direct + 4 relay. With DialInitialRTO=250ms exponential-backoff capped at DialMaxRTO=8s, the relay phase is ~7.75s — covers cold-start handshake (key_exchange + flushPending + SYN/SYN-ACK round trip) for typical peers while keeping bad dials from blocking longer than the user's --timeout. The probe-and-adapt machinery (see srttHistory below) will let us shorten this for peers we've successfully dialed before. DialInitialRTO = 250 * time.Millisecond // initial SYN retransmission timeout. Lowered from 1s — modern relay RTT is <200ms; waiting a full second before assuming loss makes cold dials feel like a stall. Three direct retries with exponential backoff (250→500→1000) still cover up to 1.75s of jitter before flipping to relay; that's plenty for an unhealthy direct path while letting the common case (peer is reachable, single retry needed) feel snappy. DialMaxRTO = 8 * time.Second // max backoff for SYN retransmission DialCheckInterval = 10 * time.Millisecond // poll interval for state changes during dial RetxCheckInterval = 100 * time.Millisecond // retransmission check ticker MaxRetxAttempts = 8 // abandon connection after this many retransmissions HeartbeatReregThresh = 3 // heartbeat failures before re-registration SYNBucketAge = 10 * time.Second // stale per-source SYN bucket reap threshold )
Dial and retransmission constants.
const ( ZeroWinProbeInitial = 500 * time.Millisecond // initial zero-window probe interval ZeroWinProbeMax = 30 * time.Second // max zero-window probe backoff // P1-004: stop probing after this many attempts so a peer stuck at // window=0 (crashed, deadlocked, or adversarial) doesn't leak a // goroutine + timer + probe packets indefinitely. ~30 probes at the // 30s cap is ~15 minutes of trying, which matches the idle sweep. MaxZeroWindowProbes = 30 )
Zero-window probe constants.
const ( CmdBind byte = 0x01 CmdBindOK byte = 0x02 CmdDial byte = 0x03 CmdDialOK byte = 0x04 CmdAccept byte = 0x05 CmdSend byte = 0x06 CmdRecv byte = 0x07 CmdClose byte = 0x08 CmdCloseOK byte = 0x09 CmdError byte = 0x0A CmdSendTo byte = 0x0B CmdRecvFrom byte = 0x0C CmdInfo byte = 0x0D CmdInfoOK byte = 0x0E CmdHandshake byte = 0x0F // driver → daemon: handshake request/approve/reject CmdHandshakeOK byte = 0x10 CmdResolveHostname byte = 0x11 CmdResolveHostnameOK byte = 0x12 CmdSetHostname byte = 0x13 CmdSetHostnameOK byte = 0x14 CmdSetVisibility byte = 0x15 CmdSetVisibilityOK byte = 0x16 CmdDeregister byte = 0x17 CmdDeregisterOK byte = 0x18 CmdSetTags byte = 0x19 CmdSetTagsOK byte = 0x1A CmdSetWebhook byte = 0x1B CmdSetWebhookOK byte = 0x1C CmdNetwork byte = 0x1F CmdNetworkOK byte = 0x20 CmdHealth byte = 0x21 CmdHealthOK byte = 0x22 CmdManaged byte = 0x23 CmdManagedOK byte = 0x24 CmdRotateKey byte = 0x25 CmdRotateKeyOK byte = 0x26 CmdBroadcast byte = 0x29 CmdBroadcastOK byte = 0x2A // CmdCancel: driver → daemon, "abandon the in-flight request that // I sent under reqID X". The reqID embedded in the envelope header // IS NOT the reqID being cancelled — that's encoded in the body // payload as a uint64. The envelope's own reqID is the new request // (CmdCancel); we don't reply, so it's effectively unused on the // wire (typically 0 from the driver's send() path). // // Issue #99: without this, a driver that timed out on a slow // daemon dial left the daemon's dispatch goroutine grinding for // the full 14-31 s retry budget, filling the per-conn dispatch // semaphore under burst load (§4.8 stress). CmdCancel byte = 0x2B )
IPC commands (daemon ↔ driver)
const ( SubNetworkList byte = 0x01 SubNetworkJoin byte = 0x02 SubNetworkLeave byte = 0x03 SubNetworkMembers byte = 0x04 SubNetworkInvite byte = 0x05 SubNetworkPollInvites byte = 0x06 SubNetworkRespondInvite byte = 0x07 )
Network sub-commands (second byte of CmdNetwork payload)
const ( SubManagedStatus byte = 0x02 SubManagedCycle byte = 0x04 SubManagedPolicy byte = 0x05 // get/set expr policy SubManagedMemberTags byte = 0x06 // get/set member tags SubManagedReconcile byte = 0x07 // poll registry & refresh peer set )
Managed sub-commands (second byte of CmdManaged payload)
const ( SubHandshakeSend byte = 0x01 SubHandshakeApprove byte = 0x02 SubHandshakeReject byte = 0x03 SubHandshakePending byte = 0x04 SubHandshakeTrusted byte = 0x05 SubHandshakeRevoke byte = 0x06 SubHandshakeWait byte = 0x07 )
Handshake IPC sub-commands
const ( // TopicNetworkJoined fires when reconciliation observes a network // in the registry membership list that was not present in the // prior tick's known set. // // Payload schema: // network_id uint16 — registry network ID (>0; 0 is the // default/backbone and is skipped) // rules any (json) — registrywire.NetworkRules raw form // if the network has managed-engine // rules; nil otherwise. Same format // the registry returns from // ListNetworks(). // expr_policy any (json) — compiled expr policy in the same // format GetExprPolicy returns // (string OR object); nil if the // network has no expr policy. // member_tags []string — local-node tags for this network // at the time of join (already // populated in d.memberTags). TopicNetworkJoined = "network.joined" // TopicNetworkLeft fires when reconciliation observes that a // previously-known network is no longer in the registry membership // list. // // Payload schema: // network_id uint16 — registry network ID TopicNetworkLeft = "network.left" // TopicNetworkTagsChanged fires when reconciliation observes a // non-empty change to the local node's tag list inside a network. // Re-emitted on each tick where the registry's authoritative tags // differ from the previously-cached tags. // // Payload schema: // network_id uint16 — registry network ID // tags []string — the new authoritative tag list TopicNetworkTagsChanged = "network.tags_changed" )
const ( InitialCongWin = 10 * MaxSegmentSize // 40 KB initial congestion window (IW10, RFC 6928) MaxCongWin = 1024 * 1024 // 1 MB max congestion window MaxSegmentSize = 4096 // MTU for virtual segments RecvBufSize = 512 // receive buffer channel capacity (segments) MaxRecvWin = RecvBufSize * MaxSegmentSize // 2 MB max receive window MaxOOOBuf = 128 // max out-of-order segments buffered per connection AcceptQueueLen = 64 // listener accept channel capacity SendBufLen = 256 // send buffer channel capacity (segments) // MaxNagleBuf caps the per-connection NagleBuf at 64 segments // (256 KB). v1.9.1 fix: SendData previously appended without bound, // so an application writing faster than the network could drain // (slow peer, full cwnd, packet loss) leaked memory linearly with // offered-but-undeliverable load. With many connections in that // state, daemon RSS climbed until OOM. SendData now returns // ErrSendBufFull when len(NagleBuf) + len(write) would exceed // this cap; callers must retry with backpressure. // 256 KB accommodates the largest single data-exchange frame // (64 KB) with headroom, while still bounding per-connection memory. MaxNagleBuf = 64 * MaxSegmentSize )
Default window parameters
const ( ClockGranularity = 10 * time.Millisecond // minimum RTTVAR for RTO calculation RTOMin = 200 * time.Millisecond // minimum retransmission timeout RTOMax = 10 * time.Second // maximum retransmission timeout InitialRTO = 1 * time.Second // initial retransmission timeout )
RTO parameters (RFC 6298)
const DefaultNetworkSyncInterval = 5 * time.Minute
DefaultNetworkSyncInterval is how often the daemon refreshes network memberships, port policies, and member tags from the registry.
const DelayedACKThreshold = 2
DelayedACKThreshold is the number of segments to receive before sending an ACK immediately.
const DelayedACKTimeout = 40 * time.Millisecond
DelayedACKTimeout is the max time to delay an ACK (RFC 1122 suggests 500ms max, we use 40ms).
const EndpointCacheTTL = 5 * time.Minute
EndpointCacheTTL is how long a cached endpoint is considered fresh. After this, the entry is stale but still usable as a fallback.
const IPCEnvelopeHeaderSize = 1
IPCEnvelopeHeaderSize is the size of the per-message header that sits inside the ipcutil length-framed envelope: 1 byte cmd.
const MaxConnsPerIPCClient = 4096
MaxConnsPerIPCClient caps how many overlay connections a single IPC client may own at once. P2-002: without this, a rogue or buggy client could exhaust the 65536-slot connection table and DoS the daemon.
const MaxIPCClients = 1024
MaxIPCClients caps the total number of concurrent IPC socket connections to the daemon (P2-002). Without this cap a misbehaving or malicious local client could exhaust the daemon's FD table by opening thousands of IPC sockets, breaking the dashboard, webhooks, and tunnel manager.
const NagleTimeout = 40 * time.Millisecond
NagleTimeout is the maximum time to buffer small writes before flushing.
const RecvChSize = 8192
RecvChSize is the capacity of the incoming packet channel. Increased from 1024 to 8192 for 1M-node scale to prevent drops during bursts (e.g., many peers sending simultaneously after a cron trigger).
const RelayProbeInterval = 5 * time.Minute
RelayProbeInterval is how often we probe relay-flagged peers for direct connectivity.
const ResolveCacheTTL = 60 * time.Second
ResolveCacheTTL is how long a registry resolve response is cached. During cron bursts, agents resolve the same peers repeatedly — this avoids hitting the registry for the same node within the TTL window.
Variables ¶
var ErrEphemeralExhausted = errors.New("ephemeral ports exhausted")
ErrEphemeralExhausted is returned by callers of AllocEphemeralPort when all 16384 ephemeral ports (49152..65535) are simultaneously in use.
var ErrIPCBackpressure = errors.New("ipc: backpressure (client too slow)")
ErrIPCBackpressure is retained for historical compatibility with tests that import it. Post-issue-#99, ipcWrite no longer returns this — it blocks until enqueue or close. The constant is unused in production code paths.
var ErrIPCClosed = errors.New("ipc: connection closed")
ErrIPCClosed is returned when ipcWrite is called after Close.
var ErrPendingDropped = errors.New("pending queue full: oldest queued packet dropped while key exchange pending")
ErrPendingDropped is returned by sendEncryptedToNode when the per-peer pending queue was already at maxPendingPerPeer and the oldest queued packet had to be dropped to make room for the new one. The CALLER's packet is still queued — it will be sent as soon as key exchange finishes — but an older packet was lost to back-pressure.
Callers that distinguish this error from a hard failure can choose to retry (the dial path does this; one of the SYN retransmits will land after the queue drains). Surfacing it as a typed error also lets pilotctl render a "tunnel handshaking" hint instead of an opaque "send SYN: pending queue full" message.
var ErrSendBufFull = errors.New("send buffer full")
SendData sends data over an established connection. Implements Nagle's algorithm: small writes are coalesced into MSS-sized segments unless NoDelay is set. Large writes (>= MSS) are sent immediately. ErrSendBufFull is returned by SendData when the per-connection NagleBuf would exceed MaxNagleBuf if the caller's write were appended. Callers must back off and retry — typically by waiting for a webhook or polling the connection's send-buffer state.
This error replaces the silent unbounded-growth behavior that could OOM the daemon when an application wrote faster than the network drained. Pinned by TestSendDataNagleBufGrowsUnbounded.
var TunnelKeepaliveInterval = 25 * time.Second
TunnelKeepaliveInterval is the minimum gap between successive sends to a peer before keepaliveSweep enqueues a NAT-keepalive frame. Set below the 30 s lower bound of consumer-NAT UDP idle timeout. Tunable for tests via direct assignment.
Functions ¶
func DiscoverEndpoint ¶
DiscoverEndpoint sends a STUN discover to the beacon and returns the observed public endpoint. Thin shim over routing.DiscoverEndpoint — the canonical implementation lives at pkg/daemon/routing/discover.go.
func EncodeSACK ¶
EncodeSACK encodes SACK blocks into a byte slice. Format: "SACK" (4 bytes) + count (1 byte) + N * 8 bytes (Left:4, Right:4).
func RecoveredPanicCount ¶ added in v1.10.0
func RecoveredPanicCount() uint64
RecoveredPanicCount returns the total number of panics swallowed by recoverLayer since process start.
func ValidateWebhookURL ¶ added in v1.7.0
ValidateWebhookURL is a thin shim over urlvalidate.Validate. Lives in pkg/daemon (rather than plugins/webhook) so the IPC handler in ipc.go and external test packages can validate without importing the plugin (which would be an upward L7 → L11 edge).
The canonical implementation is pkg/urlvalidate; this shim exists only as a stable API surface for callers that pre-date T4.1's webhook-inversion.
Types ¶
type Config ¶
type Config struct {
RegistryAddr string
BeaconAddr string
ListenAddr string // UDP listen address for tunnel traffic
SocketPath string // Unix socket path for IPC
Encrypt bool // enable tunnel-layer encryption (X25519 + AES-256-GCM)
RegistryTLS bool // use TLS for registry connection
RegistryFingerprint string // hex SHA-256 fingerprint for TLS cert pinning
IdentityPath string // path to persist Ed25519 identity (empty = no persistence)
Email string // email address for account identification and key recovery
Owner string // deprecated: use Email instead
Endpoint string // fixed public endpoint (host:port) — skips STUN discovery (for cloud VMs)
Public bool // make this node's endpoint publicly discoverable
Hostname string // hostname for discovery (empty = none)
// FakeListenAddr overrides the listen_addr advertised to the
// registry. Real socket binding still uses ListenAddr; only the
// registered metadata is rewritten. Intended for synthetic-fleet
// scenarios where a daemon should appear at a country-residential
// address while still operating on a real OS socket. Endpoint
// learning on the data path is packet-driven, so peer responses
// continue to reach the real socket regardless of the advertised
// addr (see pkg/daemon/tunnel.go peerAddr learning).
FakeListenAddr string
// RelayOnly hides this node's real_addr from peer resolve/lookup
// responses. Peers reach this node only via the beacon-relay path,
// so this node's public IP is never exposed to other daemons.
// Default false (current behavior). Task 32 — once the fleet is on a
// new-enough binary, the operator default will flip to true.
RelayOnly bool
// Built-in services
DisableEcho bool // disable built-in echo service (port 7)
DisableDataExchange bool // disable built-in data exchange service (port 1001)
DisableEventStream bool // disable built-in event stream service (port 1002)
DisablePolicyRunner bool // skip loading expr policy runners and joining networks at startup
// Webhook
WebhookURL string // HTTP(S) endpoint for event notifications (empty = disabled)
WebhookHTTPTimeout time.Duration // HTTP client timeout for webhook POSTs (default 5s)
WebhookRetryBackoff time.Duration // initial retry backoff for webhook POSTs (default 1s)
// Trust
TrustAutoApprove bool // automatically approve all incoming handshake requests
// Fleet enrollment
AdminToken string // admin token for network operations (empty = disabled)
Networks []uint16 // network IDs to auto-join at startup (empty = none)
// Version
Version string // binary version string (injected via LDFLAGS at build time)
// Feature flags — ablation testing. All default false (current behavior).
BeaconRTTProbe bool // probe beacon RTT; override hash pick when >2× slower than best
// Tuning (zero = use defaults)
KeepaliveInterval time.Duration // default 60s
IdleTimeout time.Duration // default 120s
SYNRateLimit int // default 100
MaxConnectionsPerPort int // default 1024
MaxTotalConnections int // default 4096
TimeWaitDuration time.Duration // default 10s
}
type ConnReadWriter ¶ added in v1.10.0
type ConnReadWriter interface {
Read(p []byte) (int, error)
Write(p []byte) (int, error)
Close() error
}
ConnReadWriter is the public interface plugins/runtime sees for the io-side of a Connection. Matches the unexported connAdapter shape.
type ConnStats ¶
type ConnStats struct {
BytesSent uint64 // total user bytes sent
BytesRecv uint64 // total user bytes received
SegsSent uint64 // data segments sent
SegsRecv uint64 // data segments received
Retransmits uint64 // timeout-based retransmissions
FastRetx uint64 // fast retransmissions (3 dup ACKs)
SACKRecv uint64 // SACK blocks received from peer
SACKSent uint64 // SACK blocks sent to peer
DupACKs uint64 // duplicate ACKs received
}
ConnStats tracks per-connection traffic and reliability metrics.
type Connection ¶
type Connection struct {
Mu sync.Mutex // protects State, SendSeq, RecvAck, LastActivity, Stats
ID uint32
LocalAddr protocol.Addr // our virtual address
LocalPort uint16
RemoteAddr protocol.Addr
RemotePort uint16
State ConnState
LastActivity time.Time // updated on send/recv
// Reliable delivery
SendSeq uint32
RecvAck uint32
// SynAckSeq: sequence number used on the original outbound SYN-ACK.
// P1-001 fix: retransmitted SYNs must receive the same seq, not a
// stale `SendSeq-1` that has drifted forward once data flowed.
SynAckSeq uint32
SynAckSeqSet bool
SendBuf chan []byte
RecvBuf chan []byte
// Sliding window + retransmission (send side)
RetxMu sync.Mutex
Unacked []*retxEntry // ordered by seq
LastAck uint32 // highest cumulative ACK received
DupAckCount int // consecutive duplicate ACKs
RTO time.Duration // retransmission timeout
SRTT time.Duration // smoothed RTT
RTTVAR time.Duration // RTT variance (RFC 6298)
CongWin int // congestion window in bytes
SSThresh int // slow-start threshold
InRecovery bool // true during timeout loss recovery
FastRecovery bool // true when recovery was entered via fast retransmit (not timeout)
RecoveryPoint uint32 // highest seq sent when entering recovery
RetxStop chan struct{} // closed to stop retx goroutine
RetxSend func(*protocol.Packet) // callback to send retransmitted packets
WindowCh chan struct{} // signaled when window opens up
PeerRecvWin int // peer's advertised receive window (-1 = not yet received, 0 = explicit zero-window)
// Nagle algorithm (write coalescing)
NagleBuf []byte // pending small write data
NagleMu sync.Mutex // protects NagleBuf
NagleCh chan struct{} // signaled when Nagle should flush
NoDelay bool // if true, disable Nagle (send immediately)
// Receive window (reassembly)
RecvMu sync.Mutex
ExpectedSeq uint32 // next in-order seq expected
OOOBuf []*recvSegment // out-of-order buffer
// Delayed ACK
AckMu sync.Mutex // protects PendingACKs and ACKTimer
PendingACKs int // count of unacked received segments
ACKTimer *time.Timer // delayed ACK timer
// Keepalive dead-peer detection
KeepaliveUnacked int // consecutive unanswered keepalive probes
// Close
CloseOnce sync.Once // ensures RecvBuf is closed exactly once
RecvClosed bool // true after RecvBuf is closed (guarded by RecvMu)
// Retransmit state
LastRetxTime time.Time // when last RTO retransmission fired (prevents cascading)
// Per-connection statistics
Stats ConnStats
// contains filtered or unexported fields
}
func (*Connection) BytesInFlight ¶
func (c *Connection) BytesInFlight() int
BytesInFlight returns total unacknowledged bytes, excluding SACKed segments. SACKed segments remain in c.Unacked until a cumulative ACK removes them, but they are already at the peer so they must not consume congestion-window space.
func (*Connection) CloseRecvBuf ¶
func (c *Connection) CloseRecvBuf()
CloseRecvBuf safely closes RecvBuf exactly once. Sets RecvClosed first so new Phase 2 senders bail in DeliverInOrder, then waits for in-flight senders to drain (recvSenders WG), then closes the channel. This ordering keeps `go test -race` clean — concurrent close-vs-send was previously flagged even though the runtime made it safe via defer recover().
func (*Connection) DeliverInOrder ¶
func (c *Connection) DeliverInOrder(seq uint32, data []byte) uint32
DeliverInOrder handles an incoming data segment, buffering out-of-order segments and delivering in-order data to RecvBuf. Returns the cumulative ACK number (next expected seq).
Three-phase design to avoid both deadlock and sequence leaks:
Phase 1: Collect segments to deliver under RecvMu (don't advance ExpectedSeq).
Phase 2: Deliver outside lock (prevents routeLoop deadlock, C1 fix).
Phase 3: Re-acquire lock, advance ExpectedSeq only for delivered segments,
re-buffer undelivered OOO segments.
Safe because routeLoop is single-goroutine — no concurrent DeliverInOrder calls for the same connection between Phase 2 and Phase 3.
func (*Connection) EffectiveWindow ¶
func (c *Connection) EffectiveWindow() int
EffectiveWindow returns the effective send window (minimum of congestion window and peer's advertised receive window). Must be called with RetxMu held.
func (*Connection) ProcessAck ¶
func (c *Connection) ProcessAck(ack uint32, pureACK bool)
ProcessAck removes segments acknowledged by the given ack number, updates RTT estimate, detects duplicate ACKs for fast retransmit, and grows the congestion window. If pureACK is true, duplicate ACK detection is enabled. Data packets with piggybacked ACKs should pass pureACK=false to avoid false fast retransmits (RFC 5681 Section 3.2).
Lock-order note: Stats fields are guarded by Mu, but the rest of this function's state (Unacked, RTT, CongWin, etc.) is guarded by RetxMu. To avoid an Mu↔RetxMu inversion (a concurrent Stats() reader holds Mu then needs RetxMu, while we hold RetxMu and need Mu), we accumulate stats deltas locally and apply them under Mu AFTER RetxMu is released.
func (*Connection) ProcessSACK ¶
func (c *Connection) ProcessSACK(blocks []SACKBlock)
ProcessSACK marks unacked segments that are covered by SACK blocks. This prevents unnecessary retransmission of segments the peer already has.
Lock-order note: see ProcessAck. Stats writes are deferred out of the RetxMu hold to avoid an Mu↔RetxMu inversion.
func (*Connection) RecvWindow ¶
func (c *Connection) RecvWindow() uint16
RecvWindow returns the number of free segments in the receive buffer, used as the advertised receive window.
func (*Connection) SACKBlocks ¶
func (c *Connection) SACKBlocks() []SACKBlock
SACKBlocks returns SACK blocks describing out-of-order received segments. Must be called with RecvMu held.
func (*Connection) TrackSend ¶
func (c *Connection) TrackSend(seq uint32, data []byte)
TrackSend adds a sent data segment to the retransmission buffer.
func (*Connection) WindowAvailable ¶
func (c *Connection) WindowAvailable() bool
WindowAvailable returns true if the effective window allows more data. Must be called with RetxMu held.
type ConnectionInfo ¶
type ConnectionInfo struct {
ID uint32
LocalPort uint16
RemoteAddr string
RemotePort uint16
State string
SendSeq uint32
RecvAck uint32
CongWin int
SSThresh int
InFlight int
SRTT time.Duration
RTTVAR time.Duration
Unacked int
OOOBuf int
PeerRecvWin int
RecvWin int
InRecovery bool
Stats ConnStats
}
ConnectionInfo describes an active connection for diagnostics.
type Daemon ¶
type Daemon struct {
// AcceptQueueDrops counts SYNs that hit a full Listener.AcceptCh.
// Each drop sends a RST back to the dialer (so the peer learns
// immediately) and bumps this counter for operator visibility.
// Without this, accept-queue overflows are silent slow-downs that
// only surface as application-level connection failures.
AcceptQueueDrops uint64
// contains filtered or unexported fields
}
func (*Daemon) AddTunnelPeer ¶ added in v1.4.0
AddTunnelPeer registers a peer's address in the tunnel manager (for testing/manual setup).
func (*Daemon) AdminToken ¶ added in v1.10.0
func (*Daemon) BroadcastDatagram ¶ added in v1.9.0
func (d *Daemon) BroadcastDatagram(netID uint16, dstPort uint16, data []byte, adminToken string) error
BroadcastDatagram is the admin-gated entry point for sending a datagram to every member of a network. The admin token must be non-empty and equal to the daemon's configured Config.AdminToken (constant-time compare). With a valid token, broadcast is permitted on every network including the backbone (network 0); membership of the sender is NOT required — admin tokens are root-level. Per-recipient outbound port policy still applies.
func (*Daemon) Bus ¶ added in v1.10.0
func (d *Daemon) Bus() *inProcessBus
Bus returns the daemon's in-process pub/sub bus. Plugins/runtime uses it to construct a coreapi.EventBus adapter.
func (*Daemon) CachedEndpoint ¶ added in v1.5.1
CachedEndpoint returns a previously cached endpoint for a peer (exported for testing).
func (*Daemon) CloseConnection ¶
func (d *Daemon) CloseConnection(conn *Connection)
CloseConnection sends FIN and enters FIN_WAIT. The FIN is tracked in the retransmission buffer so it will be retried if lost — the existing retxLoop handles it. When FIN-ACK is received the connection moves to TIME_WAIT and is eventually reaped by idleSweepLoop.
func (*Daemon) DialConnection ¶
DialConnection initiates a connection to a remote address:port. Wraps DialConnectionContext with context.Background() for callers that don't have cancellation semantics (handshake bootstrap, internal probes, tests). New code should prefer DialConnectionContext.
func (*Daemon) DialConnectionContext ¶ added in v1.9.1
func (d *Daemon) DialConnectionContext(ctx context.Context, dstAddr protocol.Addr, dstPort uint16) (*Connection, error)
DialConnectionContext is the cancellable variant of DialConnection. Used by the IPC handler so a dial started on behalf of a pilotctl command is aborted (and its SYN_SENT entry removed) the moment the IPC client disconnects — no more orphan SYN_SENTs from Ctrl+C'd commands grinding through the full retry budget.
Each call returns a fresh, independent *Connection (its own ephemeral port + SYN handshake). This matches BSD-socket semantics where concurrent connect() to the same (peer, port) yield independent streams. Issue #105: the previous (v1.9.1) per-(peer,port) dedup returned the SAME *Connection to every concurrent caller; under fan-in workloads (e.g. 10 task submissions racing) all callers shared one underlying conn-ID, the IPC driver's recvCh map keyed on conn-ID was overwritten by each subsequent registerRecvCh, and frames from multiple callers got muxed into a single byte stream — producing JSON-decode errors on the receiver and indefinite hangs on the senders waiting for a response that never came back to their (orphaned) Conn. The shared resource that genuinely benefits from per-peer dedup is the tunnel/x25519 handshake; that is already idempotent inside TunnelManager.AddPeer/HasPeer.
func (*Daemon) GetManagedEngine ¶ added in v1.6.0
func (d *Daemon) GetManagedEngine(netID uint16) *ManagedEngine
GetManagedEngine returns the managed engine for a network, or nil.
func (*Daemon) GetMemberTags ¶ added in v1.6.0
GetMemberTags returns the cached member tags for the local node in a network.
func (*Daemon) GetPolicyRunner ¶ added in v1.6.0
func (d *Daemon) GetPolicyRunner(netID uint16) PolicyRunner
GetPolicyRunner returns the policy runner for a network, or nil. Delegates to the registered policy manager.
func (*Daemon) GetTrustChecker ¶ added in v1.10.0
func (d *Daemon) GetTrustChecker() TrustChecker
GetTrustChecker exposes the registered trust checker (or nil) so plugins/runtime can include it in the coreapi.Deps it constructs.
func (*Daemon) HandshakeRevokeTrust ¶ added in v1.10.0
func (*Daemon) HandshakeSendRequest ¶ added in v1.10.0
func (*Daemon) HandshakeService ¶ added in v1.10.0
func (d *Daemon) HandshakeService() HandshakeService
HandshakeService returns the registered HandshakeService (or nil if no handshake plugin has been registered). Mostly used by daemon internals; external callers prefer the typed wrappers above.
func (*Daemon) IdentityPath ¶ added in v1.10.0
IdentityPath returns the configured identity file path, or "" if the daemon runs without persistent identity. Exposed so the handshake plugin can derive its trust.json store path from the same directory.
func (*Daemon) NewConnReadWriter ¶ added in v1.10.0
func (d *Daemon) NewConnReadWriter(conn *Connection) ConnReadWriter
NewConnReadWriter wraps a Connection with the daemon's net.Conn adapter. plugins/runtime uses this to expose the conn as a coreapi.Stream without needing access to the unexported connAdapter.
func (*Daemon) Ports ¶ added in v1.10.0
func (d *Daemon) Ports() *PortManager
Ports returns the daemon's port manager. Plugins/runtime uses this to bind a coreapi.Listener for plugin services.
func (*Daemon) PublishEvent ¶ added in v1.10.0
func (*Daemon) RegConnListNodes ¶ added in v1.10.0
func (*Daemon) RegisterHandshakeService ¶ added in v1.10.0
func (d *Daemon) RegisterHandshakeService(svc HandshakeService)
RegisterHandshakeService installs the daemon-wide HandshakeService implementation provided by the handshake plugin (T3.3). Called from the composition root after constructing the plugin's Service. Replaces any previously-registered service.
func (*Daemon) RegisterPolicyManager ¶ added in v1.10.0
func (d *Daemon) RegisterPolicyManager(pm PolicyManager)
RegisterPolicyManager wires the L11 policy plugin into the daemon. cmd/daemon (composition root) constructs the plugin and calls this. Takes the daemon-local PolicyManager interface (primitive signatures only); the plugin's manager satisfies it via structural typing without daemon importing pkg/coreapi (T7.1).
func (*Daemon) RegisterTrustChecker ¶ added in v1.10.0
func (d *Daemon) RegisterTrustChecker(tc TrustChecker)
RegisterTrustChecker installs the daemon-wide TrustChecker used by the handshake handler to gate auto-accept. Replaces any previously- registered checker. Takes the daemon-local interface (primitive signatures) — plugin types satisfy it via structural typing.
func (*Daemon) RegisterWebhookManager ¶ added in v1.10.0
func (d *Daemon) RegisterWebhookManager(wm WebhookManager)
RegisterWebhookManager wires the L11 webhook plugin into the daemon (T4.1). cmd/daemon (composition root) constructs the plugin and calls this with an adapter satisfying the daemon-local interface.
func (*Daemon) RegistryClient ¶ added in v1.10.0
RegistryClient returns the underlying L8 registry-side-channel client (or nil if no registry is configured). Exposed so the handshake plugin can issue Lookup/ReportTrust/RevokeTrust / RequestHandshake / RespondHandshake / PollHandshakes against the same client the daemon uses elsewhere — there is no separate connection or auth context.
func (*Daemon) RotateKey ¶ added in v1.9.0
RotateKey generates a new Ed25519 keypair, proves ownership of the current key to the registry via a `rotate:<node_id>` signature, atomically swaps the in-memory identity on success, and persists it to disk. Returns the registry response on success.
func (*Daemon) SendDatagram ¶
SendDatagram sends an unreliable unicast packet. Broadcast addresses (Node=0xFFFFFFFF) are rejected — use BroadcastDatagram, which requires an admin token.
func (*Daemon) SetMemberTags ¶ added in v1.6.0
SetMemberTags updates the cached member tags for the local node in a network.
func (*Daemon) SetWebhookURL ¶
SetWebhookURL hot-swaps the webhook URL by delegating to the registered WebhookManager (the plugins/webhook plugin). Called from IPC's set-webhook handler. No-op if no manager is registered (e.g. unit-test daemons that bypass cmd/daemon's plugin wiring).
func (*Daemon) StartManagedEngine ¶ added in v1.6.0
func (d *Daemon) StartManagedEngine(netID uint16, rules *registrywire.NetworkRules)
StartManagedEngine starts a managed engine for a newly joined network.
func (*Daemon) StartPolicyRunner ¶ added in v1.6.0
func (d *Daemon) StartPolicyRunner(netID uint16, policyJSON json.RawMessage) error
StartPolicyRunner compiles a policy JSON and registers a runner via the policy manager. Errors when no manager is registered.
func (*Daemon) StopManagedEngine ¶ added in v1.6.0
StopManagedEngine stops a managed engine (e.g., on network leave).
func (*Daemon) StopPolicyRunner ¶ added in v1.6.0
StopPolicyRunner stops the policy runner for a network. No-op if no manager is registered or no runner exists for netID.
func (*Daemon) TrustAutoApprove ¶ added in v1.10.0
TrustAutoApprove reports whether the daemon was started with the trust-auto-approve flag set. Exposed for the handshake plugin's auto-accept gate.
func (*Daemon) TrustedPeers ¶ added in v1.10.0
func (d *Daemon) TrustedPeers() []HandshakeTrustRecord
TrustedPeers returns the trust records held by the registered handshake service. Returns nil when no handshake plugin is wired (e.g. unit tests that bypass plugins/runtime).
func (*Daemon) TunnelAddr ¶ added in v1.4.0
TunnelAddr returns the local UDP address of the tunnel listener.
func (*Daemon) Tunnels ¶ added in v1.10.0
func (d *Daemon) Tunnels() *TunnelManager
Tunnels returns the daemon's tunnel manager. Mostly for in-tree plugins that want direct access (tests + unusual integrations).
type DaemonInfo ¶
type DaemonInfo struct {
NodeID uint32
Address string
Endpoint string // host:port registered with the rendezvous (proves beacon discover succeeded)
Hostname string
Uptime time.Duration
Connections int
Ports int
Peers int
EncryptedPeers int
AuthenticatedPeers int
Encrypt bool
Identity bool // true if identity is persisted
PublicKey string // base64 Ed25519 public key (empty if no identity)
Email string // email address for account identification and key recovery
BytesSent uint64
BytesRecv uint64
PktsSent uint64
PktsRecv uint64
EncryptOK uint64
EncryptFail uint64
HandshakePendingCount int
Version string
Networks []NetworkMembership
PeerList []PeerInfo
ConnList []ConnectionInfo
// v1.9.1: health metrics surfaced for operators / dashboards.
// These counters live elsewhere (Daemon.AcceptQueueDrops, the
// WebhookClient internals) but had no path to reach `pilotctl info`
// until now. Each is monotonic; operators compute rate by diffing
// two snapshots over time.
AcceptQueueDrops uint64
WebhookQueueDropped uint64
WebhookCircuitSkips uint64
RelayPeerCount int // peers currently on relay path (symmetric NAT)
BeaconAddr string // active beacon address
}
type Event ¶ added in v1.10.0
Event is the daemon-local view of a published event. plugins/runtime adapts this to coreapi.Event when wrapping the bus for plugin Deps. Defined here (instead of importing pkg/coreapi) to keep pkg/daemon free of L10 imports (T7.1).
type HandshakePendingRecord ¶ added in v1.10.0
type HandshakePendingRecord struct {
NodeID uint32
PublicKey string
Justification string
ReceivedAt time.Time
}
HandshakePendingRecord mirrors plugins/handshake.PendingHandshake for the same reason as HandshakeTrustRecord above.
type HandshakeService ¶ added in v1.10.0
type HandshakeService interface {
IsTrusted(nodeID uint32) bool
TrustedPeers() []HandshakeTrustRecord
PendingRequests() []HandshakePendingRecord
PendingCount() int
SendRequest(peerNodeID uint32, justification string) error
ApproveHandshake(peerNodeID uint32) error
RejectHandshake(peerNodeID uint32, reason string) error
RevokeTrust(peerNodeID uint32) error
// WaitForTrust blocks until the peer transitions to trusted, or the
// timeout elapses. Returns true if trust was granted in time.
// Wired through SubHandshakeWait so callers (typically pilotctl
// before a first send to a trusted-list peer) can block bidirectional
// operations on trust establishment instead of racing the data send
// against the handshake reply.
WaitForTrust(peerNodeID uint32, timeout time.Duration) bool
// ProcessRelayedRequest / ProcessRelayedApproval / ProcessRelayedRejection
// are invoked from Daemon.pollRelayedHandshakes after parsing the
// registry-inbox payload.
ProcessRelayedRequest(fromNodeID uint32, justification string)
ProcessRelayedApproval(fromNodeID uint32)
ProcessRelayedRejection(fromNodeID uint32)
// Stop drains background RPCs and stops the replay reaper.
Stop()
}
HandshakeService is the daemon-facing surface of the manual trust-handshake plugin (port 444). The plugin's *Manager satisfies this via Go's structural typing — daemon never imports plugins/handshake (T3.3).
All trust-handshake operations route through this interface: IPC command dispatch (ipc.go), trust-gate checks on inbound SYN / datagrams, registry-relay polling, and trust-pair re-sync after reconnect.
type HandshakeTrustRecord ¶ added in v1.10.0
type HandshakeTrustRecord struct {
NodeID uint32
PublicKey string
ApprovedAt time.Time
Mutual bool
Network uint16
}
HandshakeTrustRecord mirrors plugins/handshake.TrustRecord so the daemon-local HandshakeService interface stays primitive-only (no upward import). Field set is identical to the plugin's TrustRecord — Go's structural typing matches []HandshakeTrustRecord against the plugin method's []handshake.TrustRecord because the field shapes align, but only if the slices use the same named type. The plugin's adapter therefore returns a converted []HandshakeTrustRecord built from its own TrustRecord values.
type IPCServer ¶
type IPCServer struct {
// contains filtered or unexported fields
}
IPCServer handles connections from local drivers over Unix socket.
func NewIPCServer ¶
type Listener ¶
type Listener struct {
Port uint16
AcceptCh chan *Connection
// contains filtered or unexported fields
}
func (*Listener) TrySend ¶ added in v1.9.1
func (ln *Listener) TrySend(conn *Connection) bool
TrySend attempts a non-blocking push of conn to AcceptCh. Returns true if the connection was queued, false if the queue was full or the listener has been closed (via Unbind). Safe to call after Unbind — never panics even if AcceptCh is already closed.
v1.9.1 fix: the closed flag is checked under mu before the channel send, eliminating the window where Unbind()'s close(AcceptCh) races with handleStreamPacket's send. Without this guard, a concurrent Unbind could close AcceptCh between GetListener's return and the send, causing a "send on closed channel" panic that killed routeLoop.
type ManagedEngine ¶ added in v1.6.0
type ManagedEngine struct {
// contains filtered or unexported fields
}
ManagedEngine runs the managed network cycle for a single network. It maintains a local peer set and runs periodic prune/fill cycles. All state is daemon-local — the registry only stores the rules.
func NewManagedEngine ¶ added in v1.6.0
func NewManagedEngine(netID uint16, rules *registry.NetworkRules, d *Daemon) *ManagedEngine
NewManagedEngine creates a managed engine for a network. It loads persisted state if available, or bootstraps from the member list.
func (*ManagedEngine) Bootstrap ¶ added in v1.6.0
func (me *ManagedEngine) Bootstrap() error
Bootstrap populates the managed set from the network member list. Called on first join or when persisted state is empty.
func (*ManagedEngine) ForceCycle ¶ added in v1.6.0
func (me *ManagedEngine) ForceCycle() map[string]interface{}
ForceCycle runs a cycle immediately, outside the timer.
func (*ManagedEngine) Start ¶ added in v1.6.0
func (me *ManagedEngine) Start()
Start begins the cycle loop. Should be called after construction.
func (*ManagedEngine) Status ¶ added in v1.6.0
func (me *ManagedEngine) Status() map[string]interface{}
Status returns a summary of the managed engine state.
func (*ManagedEngine) Stop ¶ added in v1.6.0
func (me *ManagedEngine) Stop()
Stop signals the cycle loop to exit and waits for it.
type NetworkMembership ¶ added in v1.6.2
type NetworkMembership struct {
NetworkID uint16 `json:"network_id"`
Address string `json:"address"`
}
DaemonInfo holds status information about the running daemon.
type PeerInfo ¶
type PeerInfo struct {
NodeID uint32
Endpoint string
Encrypted bool
Authenticated bool // true if peer proved Ed25519 identity
Relay bool // true if using beacon relay (symmetric NAT)
}
PeerInfo describes a known peer.
type PolicyEventType ¶ added in v1.10.0
type PolicyEventType = string
PolicyEventType is the kind of protocol event a policy is evaluated against. Type alias to string mirrors coreapi.PolicyEventType so plugin signatures match exactly via structural typing (T7.1).
type PolicyManager ¶ added in v1.10.0
type PolicyManager interface {
Start(netID uint16, policyJSON []byte) (PolicyRunner, error)
Stop(netID uint16)
Get(netID uint16) PolicyRunner
All() []PolicyRunner
StopAll()
LoadPersisted() error
}
PolicyManager owns the per-network registry of policy runners.
type PolicyRunner ¶ added in v1.6.0
type PolicyRunner interface {
NetworkID() uint16
HasMember(peerNodeID uint32) bool
// EvaluatePortGate takes a string event-type ("connect", "dial",
// "datagram", ...). plugins/policy.EventType is a type alias to
// coreapi.PolicyEventType which is itself a type alias to string —
// so plugin signatures match this exactly.
EvaluatePortGate(eventType string, port uint16, peerNodeID uint32, payloadSize int, direction string, localTags, nodeInfoTags []string) bool
EvaluateActions(eventType string, ctx map[string]any)
Status() map[string]any
PeerList() []map[string]interface{}
ForceCycle() map[string]any
ReconcileNow()
PolicyJSON() ([]byte, error)
Stop()
}
PolicyRunner is the daemon-facing surface of a single network's running policy. The plugin's concrete *PolicyRunner satisfies this via structural typing.
type PortManager ¶
type PortManager struct {
// contains filtered or unexported fields
}
PortManager handles virtual port binding and connection tracking.
func NewPortManager ¶
func NewPortManager() *PortManager
func (*PortManager) AllConnections ¶
func (pm *PortManager) AllConnections() []*Connection
AllConnections returns all active connections.
func (*PortManager) AllocEphemeralPort ¶
func (pm *PortManager) AllocEphemeralPort() uint16
AllocEphemeralPort returns an unused ephemeral port in [PortEphemeralMin, PortEphemeralMax], or 0 when all 16384 ports are simultaneously in use. Callers must treat 0 as ErrEphemeralExhausted.
v1.9.1 fix: the previous implementation incremented nextEphPort as uint16 and checked `> PortEphemeralMax` afterward. When nextEphPort was 65535, uint16++ silently wrapped to 0 before the check fired, so the loop would escape the ephemeral range and return port 0 via `portInUse(0) == false`. The fix uses a bounded int counter and checks >= PortEphemeralMax BEFORE the increment so the uint16 field never reaches 0 via overflow.
func (*PortManager) ConnectionCountForPort ¶
func (pm *PortManager) ConnectionCountForPort(port uint16) int
ConnectionCountForPort returns the number of active connections on a port.
func (*PortManager) ConnectionList ¶
func (pm *PortManager) ConnectionList() []ConnectionInfo
ConnectionList returns info about all active connections.
func (*PortManager) FindConnection ¶
func (pm *PortManager) FindConnection(localPort uint16, remoteAddr protocol.Addr, remotePort uint16) *Connection
func (*PortManager) GetConnection ¶
func (pm *PortManager) GetConnection(id uint32) *Connection
func (*PortManager) GetListener ¶
func (pm *PortManager) GetListener(port uint16) *Listener
func (*PortManager) IdleConnections ¶
func (pm *PortManager) IdleConnections(maxIdle time.Duration) []*Connection
IdleConnections returns connections that have been idle longer than the given duration.
func (*PortManager) NewConnection ¶
func (pm *PortManager) NewConnection(localPort uint16, remoteAddr protocol.Addr, remotePort uint16) *Connection
func (*PortManager) RemoveConnection ¶
func (pm *PortManager) RemoveConnection(id uint32)
func (*PortManager) StaleConnections ¶
func (pm *PortManager) StaleConnections(timeWaitDur time.Duration) []*Connection
StaleConnections returns connections in a terminal state that should be cleaned up. CLOSED, FIN_WAIT, CLOSE_WAIT are cleaned up immediately. TIME_WAIT connections are cleaned up after timeWaitDur.
func (*PortManager) TotalActiveConnections ¶
func (pm *PortManager) TotalActiveConnections() int
TotalActiveConnections returns the total number of non-closed connections.
func (*PortManager) Unbind ¶
func (pm *PortManager) Unbind(port uint16)
type SACKBlock ¶
SACKBlock represents a contiguous range of received bytes. Left is the seq of the first byte; Right is the seq of the first byte AFTER the range.
func DecodeSACK ¶
DecodeSACK parses SACK blocks from an ACK payload. Returns nil, false if the payload is not SACK data.
type TrustChecker ¶ added in v1.10.0
TrustChecker is the daemon-facing surface of the trustedagents plugin. The handshake handler consults this for auto-accept.
type TunnelManager ¶
type TunnelManager struct {
// Metrics
BytesSent uint64
BytesRecv uint64
PktsSent uint64
PktsRecv uint64
EncryptOK uint64
EncryptFail uint64
// P1-008: packets dropped from the per-peer pending queue while waiting
// for key exchange. Exposed so operators can tell a congested overlay
// apart from a silent crypto stall.
PendingDrops uint64
// contains filtered or unexported fields
}
TunnelManager manages real UDP tunnels to peer daemons.
func NewTunnelManager ¶
func NewTunnelManager() *TunnelManager
func (*TunnelManager) AddPeer ¶
func (tm *TunnelManager) AddPeer(nodeID uint32, addr *net.UDPAddr)
AddPeer registers a peer's real UDP endpoint.
func (*TunnelManager) Close ¶
func (tm *TunnelManager) Close() error
func (*TunnelManager) DropCrypto ¶ added in v1.9.1
func (tm *TunnelManager) DropCrypto(peerNodeID uint32)
DropCrypto removes the encryption context for a peer, forcing a fresh key exchange on the next encrypted send. Used by the dial-timeout exhausted path to recover from peer-side AEAD key divergence (older daemon versions can drift into a state where their derived key no longer matches ours despite stable X25519 pubkeys; the only recovery is a full re-handshake). Also clears any pending-rekey state so the retransmit loop doesn't immediately fire — the next dial's sendKeyExchangeToNode will re-arm it.
func (*TunnelManager) EnableEncryption ¶
func (tm *TunnelManager) EnableEncryption() error
EnableEncryption generates an X25519 keypair and enables tunnel encryption.
func (*TunnelManager) HasCrypto ¶
func (tm *TunnelManager) HasCrypto(nodeID uint32) bool
HasCrypto returns true if we have an encryption context for a peer (proving prior key exchange).
func (*TunnelManager) HasPeer ¶
func (tm *TunnelManager) HasPeer(nodeID uint32) bool
HasPeer checks if we have a tunnel to a node.
func (*TunnelManager) IsEncrypted ¶
func (tm *TunnelManager) IsEncrypted(nodeID uint32) bool
IsEncrypted returns true if the tunnel to a peer is encrypted.
func (*TunnelManager) IsRelayPeer ¶
func (tm *TunnelManager) IsRelayPeer(nodeID uint32) bool
IsRelayPeer returns true if the peer is in relay mode. Thin shim over routing.Manager.IsRelayPeer.
func (*TunnelManager) Listen ¶
func (tm *TunnelManager) Listen(addr string) error
Listen starts the UDP listener for incoming tunnel traffic.
func (*TunnelManager) LocalAddr ¶
func (tm *TunnelManager) LocalAddr() net.Addr
func (*TunnelManager) PeerCount ¶
func (tm *TunnelManager) PeerCount() int
PeerCount returns the number of known peers.
func (*TunnelManager) PeerList ¶
func (tm *TunnelManager) PeerList() []PeerInfo
PeerList returns all known peers and their endpoints.
func (*TunnelManager) RecvCh ¶
func (tm *TunnelManager) RecvCh() <-chan *IncomingPacket
RecvCh returns the channel for incoming packets.
func (*TunnelManager) RegisterWithBeacon ¶
func (tm *TunnelManager) RegisterWithBeacon()
RegisterWithBeacon sends a MsgDiscover to the beacon from the tunnel socket using the real nodeID, so the beacon knows our endpoint for punch coordination. Thin shim over routing.Manager.RegisterWithBeacon.
func (*TunnelManager) RelayPeerIDs ¶ added in v1.8.0
func (tm *TunnelManager) RelayPeerIDs() []uint32
RelayPeerIDs returns the node IDs of all relay-flagged peers. Thin shim over routing.Manager.RelayPeerIDs.
func (*TunnelManager) RemovePeer ¶
func (tm *TunnelManager) RemovePeer(nodeID uint32)
RemovePeer removes a peer and all per-peer metadata. Long-running daemons with peer churn (handshake revocations, network leaves) previously leaked entries in lastDirectRecv, blackholeMissCount, directClearCount, relayPeers, peerPubKeys, pendingRekey, and lastInboundDecrypt — none of which had any other deletion path. A reused nodeID would also inherit stale state (e.g. trip the relay flip on the third miss because blackholeMissCount=2 from the previous tenant).
func (*TunnelManager) RequestHolePunch ¶
func (tm *TunnelManager) RequestHolePunch(targetNodeID uint32)
RequestHolePunch asks the beacon to coordinate NAT hole-punching with a target peer. Thin shim over routing.Manager.RequestHolePunch.
func (*TunnelManager) Send ¶
func (tm *TunnelManager) Send(nodeID uint32, pkt *protocol.Packet) error
Send encapsulates and sends a packet to the given node.
func (*TunnelManager) SendDirectProbe ¶ added in v1.9.0
func (tm *TunnelManager) SendDirectProbe(nodeID uint32, pkt *protocol.Packet) error
SendDirectProbe sends an encrypted packet straight to the peer's last-known direct UDP endpoint, bypassing the relay wrapping that Send/SendTo would apply if relayPeers[nodeID] is true. Used by relayProbeLoop to test whether the direct path has recovered without tearing down the relay flag for concurrent traffic. Returns an error if no direct endpoint is known or if the peer's stored addr is the beacon (meaning we never learned a real direct addr for this peer).
P1-010 fix: previously relayProbeLoop temporarily flipped SetRelayPeer (nodeID, false), sent the probe via Send, then restored the flag after 2s. During that window every concurrent send — including key-exchange replies triggered by the peer's "no key" warnings — bypassed relay too. If the direct path was still dead (e.g. symmetric NAT + stale mapping), those replies were silently dropped, leaving crypto desynced indefinitely. SendDirectProbe isolates the probe without disturbing other traffic.
func (*TunnelManager) SetBeaconAddr ¶
func (tm *TunnelManager) SetBeaconAddr(addr string) error
SetBeaconAddr configures the beacon address for NAT hole-punching and relay. Thin shim over routing.Manager.SetBeaconAddr.
func (*TunnelManager) SetEventBus ¶ added in v1.10.0
func (tm *TunnelManager) SetEventBus(bus *inProcessBus)
SetEventBus wires the daemon's pub/sub bus into the tunnel layer. Replaces the inline tm.webhook.Emit pattern: core layers Publish, observability plugins (webhook, future eventstream broker) Subscribe via daemonEventBus.
func (*TunnelManager) SetIdentity ¶
func (tm *TunnelManager) SetIdentity(id *crypto.Identity)
SetIdentity sets our Ed25519 identity for signing authenticated key exchanges. Forwards to keyexchange.Manager (L5 owner of identity).
func (*TunnelManager) SetNodeID ¶
func (tm *TunnelManager) SetNodeID(id uint32)
SetNodeID sets our node ID (called after registration). Propagates to the envelope store so the encrypt-side AAD / frame-header use the same value.
func (*TunnelManager) SetPeerVerifyFunc ¶
func (tm *TunnelManager) SetPeerVerifyFunc(fn func(uint32) (ed25519.PublicKey, error))
SetPeerVerifyFunc sets a callback to fetch a peer's Ed25519 public key from the registry. Forwards to keyexchange.Manager.
func (*TunnelManager) SetRelayPeer ¶
func (tm *TunnelManager) SetRelayPeer(nodeID uint32, relay bool)
SetRelayPeer marks a peer as needing relay through the beacon (symmetric NAT). Thin shim over routing.Manager.SetRelayPeer.
func (*TunnelManager) SetRelayPeerPinned ¶ added in v1.9.1
func (tm *TunnelManager) SetRelayPeerPinned(nodeID uint32, relay bool)
SetRelayPeerPinned is like SetRelayPeer but also marks the relay flag as authoritative — ClearRelayOnDirect will never auto-flip a pinned peer back to direct based on observed packet sources. Thin shim over routing.Manager.SetRelayPeerPinned.
type WebhookManager ¶ added in v1.10.0
type WebhookManager interface {
// SetURL hot-swaps the active webhook URL. Empty URL disables
// delivery (no-op until set again).
SetURL(url string)
// Stats returns dispatcher counters for daemon Info. All-zero
// when no client is configured (nil-safe at the plugin level).
Stats() WebhookStats
}
WebhookManager is the daemon-local face of the webhook plugin (plugins/webhook). The plugin owns the HTTP client; daemon only needs to (a) hot-swap the URL when IPC's set-webhook fires and (b) read counters for the DaemonInfo health snapshot.
Defined as an interface so daemon stays free of any plugin import (T4.1 webhook-inversion). cmd/daemon/main.go registers an adapter that satisfies this interface from plugins/webhook.Service.
type WebhookStats ¶ added in v1.10.0
WebhookStats is the daemon-local mirror of plugins/webhook.Stats. Same shape, different package — daemon can hold the value type without importing the plugin.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package envelope is L6 — the per-peer AEAD framing layer.
|
Package envelope is L6 — the per-peer AEAD framing layer. |
|
Package keyexchange is L5 — establishes the per-peer AEAD session key (with optional Ed25519 authentication of identity).
|
Package keyexchange is L5 — establishes the per-peer AEAD session key (with optional Ed25519 authentication of identity). |
|
Package routing is L4 — peer discovery & routing.
|
Package routing is L4 — peer discovery & routing. |
|
Package udpio is the L2 datagram-I/O layer of the daemon: it owns the *net.UDPConn socket FD and provides "dumb" Send/Recv primitives.
|
Package udpio is the L2 datagram-I/O layer of the daemon: it owns the *net.UDPConn socket FD and provides "dumb" Send/Recv primitives. |