daemon

package
v1.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: AGPL-3.0 Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PolicyEventConnect  = "connect"
	PolicyEventDial     = "dial"
	PolicyEventDatagram = "datagram"
	PolicyEventJoin     = "join"
	PolicyEventLeave    = "leave"
	PolicyEventCycle    = "cycle"
)

PolicyEvent* are the event-type constants daemon code passes into PolicyManager / PolicyRunner. Match coreapi.PolicyEvent* values since both are aliases of string.

View Source
const (
	DefaultKeepaliveInterval     = 60 * time.Second
	DefaultIdleTimeout           = 120 * time.Second
	DefaultIdleSweepInterval     = 15 * time.Second
	DefaultSYNRateLimit          = 100
	DefaultMaxConnectionsPerPort = 1024
	DefaultMaxTotalConnections   = 65536
	DefaultTimeWaitDuration      = 10 * time.Second
)

Default tuning constants (used when Config fields are zero).

View Source
const (
	DialDirectRetries    = 3                      // direct connection attempts before relay
	DialMaxRetries       = 7                      // total attempts (direct + relay). 3 direct + 4 relay. With DialInitialRTO=250ms exponential-backoff capped at DialMaxRTO=8s, the relay phase is ~7.75s — covers cold-start handshake (key_exchange + flushPending + SYN/SYN-ACK round trip) for typical peers while keeping bad dials from blocking longer than the user's --timeout. The probe-and-adapt machinery (see srttHistory below) will let us shorten this for peers we've successfully dialed before.
	DialInitialRTO       = 250 * time.Millisecond // initial SYN retransmission timeout. Lowered from 1s — modern relay RTT is <200ms; waiting a full second before assuming loss makes cold dials feel like a stall. Three direct retries with exponential backoff (250→500→1000) still cover up to 1.75s of jitter before flipping to relay; that's plenty for an unhealthy direct path while letting the common case (peer is reachable, single retry needed) feel snappy.
	DialMaxRTO           = 8 * time.Second        // max backoff for SYN retransmission
	DialCheckInterval    = 10 * time.Millisecond  // poll interval for state changes during dial
	RetxCheckInterval    = 100 * time.Millisecond // retransmission check ticker
	MaxRetxAttempts      = 8                      // abandon connection after this many retransmissions
	HeartbeatReregThresh = 3                      // heartbeat failures before re-registration
	SYNBucketAge         = 10 * time.Second       // stale per-source SYN bucket reap threshold
)

Dial and retransmission constants.

View Source
const (
	ZeroWinProbeInitial = 500 * time.Millisecond // initial zero-window probe interval
	ZeroWinProbeMax     = 30 * time.Second       // max zero-window probe backoff
	// P1-004: stop probing after this many attempts so a peer stuck at
	// window=0 (crashed, deadlocked, or adversarial) doesn't leak a
	// goroutine + timer + probe packets indefinitely. ~30 probes at the
	// 30s cap is ~15 minutes of trying, which matches the idle sweep.
	MaxZeroWindowProbes = 30
)

Zero-window probe constants.

View Source
const (
	CmdBind              byte = 0x01
	CmdBindOK            byte = 0x02
	CmdDial              byte = 0x03
	CmdDialOK            byte = 0x04
	CmdAccept            byte = 0x05
	CmdSend              byte = 0x06
	CmdRecv              byte = 0x07
	CmdClose             byte = 0x08
	CmdCloseOK           byte = 0x09
	CmdError             byte = 0x0A
	CmdSendTo            byte = 0x0B
	CmdRecvFrom          byte = 0x0C
	CmdInfo              byte = 0x0D
	CmdInfoOK            byte = 0x0E
	CmdHandshake         byte = 0x0F // driver → daemon: handshake request/approve/reject
	CmdHandshakeOK       byte = 0x10
	CmdResolveHostname   byte = 0x11
	CmdResolveHostnameOK byte = 0x12
	CmdSetHostname       byte = 0x13
	CmdSetHostnameOK     byte = 0x14
	CmdSetVisibility     byte = 0x15
	CmdSetVisibilityOK   byte = 0x16
	CmdDeregister        byte = 0x17
	CmdDeregisterOK      byte = 0x18
	CmdSetTags           byte = 0x19
	CmdSetTagsOK         byte = 0x1A
	CmdSetWebhook        byte = 0x1B
	CmdSetWebhookOK      byte = 0x1C
	CmdNetwork           byte = 0x1F
	CmdNetworkOK         byte = 0x20
	CmdHealth            byte = 0x21
	CmdHealthOK          byte = 0x22
	CmdManaged           byte = 0x23
	CmdManagedOK         byte = 0x24
	CmdRotateKey         byte = 0x25
	CmdRotateKeyOK       byte = 0x26
	CmdBroadcast         byte = 0x29
	CmdBroadcastOK       byte = 0x2A
	// CmdCancel: driver → daemon, "abandon the in-flight request that
	// I sent under reqID X". The reqID embedded in the envelope header
	// IS NOT the reqID being cancelled — that's encoded in the body
	// payload as a uint64. The envelope's own reqID is the new request
	// (CmdCancel); we don't reply, so it's effectively unused on the
	// wire (typically 0 from the driver's send() path).
	//
	// Issue #99: without this, a driver that timed out on a slow
	// daemon dial left the daemon's dispatch goroutine grinding for
	// the full 14-31 s retry budget, filling the per-conn dispatch
	// semaphore under burst load (§4.8 stress).
	CmdCancel byte = 0x2B
)

IPC commands (daemon ↔ driver)

View Source
const (
	SubNetworkList          byte = 0x01
	SubNetworkJoin          byte = 0x02
	SubNetworkLeave         byte = 0x03
	SubNetworkMembers       byte = 0x04
	SubNetworkInvite        byte = 0x05
	SubNetworkPollInvites   byte = 0x06
	SubNetworkRespondInvite byte = 0x07
)

Network sub-commands (second byte of CmdNetwork payload)

View Source
const (
	SubManagedStatus     byte = 0x02
	SubManagedCycle      byte = 0x04
	SubManagedPolicy     byte = 0x05 // get/set expr policy
	SubManagedMemberTags byte = 0x06 // get/set member tags
	SubManagedReconcile  byte = 0x07 // poll registry & refresh peer set
)

Managed sub-commands (second byte of CmdManaged payload)

View Source
const (
	SubHandshakeSend    byte = 0x01
	SubHandshakeApprove byte = 0x02
	SubHandshakeReject  byte = 0x03
	SubHandshakePending byte = 0x04
	SubHandshakeTrusted byte = 0x05
	SubHandshakeRevoke  byte = 0x06
	SubHandshakeWait    byte = 0x07
)

Handshake IPC sub-commands

View Source
const (
	// TopicNetworkJoined fires when reconciliation observes a network
	// in the registry membership list that was not present in the
	// prior tick's known set.
	//
	// Payload schema:
	//   network_id    uint16        — registry network ID (>0; 0 is the
	//                                  default/backbone and is skipped)
	//   rules         any (json)    — registrywire.NetworkRules raw form
	//                                  if the network has managed-engine
	//                                  rules; nil otherwise. Same format
	//                                  the registry returns from
	//                                  ListNetworks().
	//   expr_policy   any (json)    — compiled expr policy in the same
	//                                  format GetExprPolicy returns
	//                                  (string OR object); nil if the
	//                                  network has no expr policy.
	//   member_tags   []string      — local-node tags for this network
	//                                  at the time of join (already
	//                                  populated in d.memberTags).
	TopicNetworkJoined = "network.joined"

	// TopicNetworkLeft fires when reconciliation observes that a
	// previously-known network is no longer in the registry membership
	// list.
	//
	// Payload schema:
	//   network_id    uint16        — registry network ID
	TopicNetworkLeft = "network.left"

	// TopicNetworkTagsChanged fires when reconciliation observes a
	// non-empty change to the local node's tag list inside a network.
	// Re-emitted on each tick where the registry's authoritative tags
	// differ from the previously-cached tags.
	//
	// Payload schema:
	//   network_id    uint16        — registry network ID
	//   tags          []string      — the new authoritative tag list
	TopicNetworkTagsChanged = "network.tags_changed"
)
View Source
const (
	InitialCongWin = 10 * MaxSegmentSize          // 40 KB initial congestion window (IW10, RFC 6928)
	MaxCongWin     = 1024 * 1024                  // 1 MB max congestion window
	MaxSegmentSize = 4096                         // MTU for virtual segments
	RecvBufSize    = 512                          // receive buffer channel capacity (segments)
	MaxRecvWin     = RecvBufSize * MaxSegmentSize // 2 MB max receive window
	MaxOOOBuf      = 128                          // max out-of-order segments buffered per connection
	AcceptQueueLen = 64                           // listener accept channel capacity
	SendBufLen     = 256                          // send buffer channel capacity (segments)

	// MaxNagleBuf caps the per-connection NagleBuf at 64 segments
	// (256 KB). v1.9.1 fix: SendData previously appended without bound,
	// so an application writing faster than the network could drain
	// (slow peer, full cwnd, packet loss) leaked memory linearly with
	// offered-but-undeliverable load. With many connections in that
	// state, daemon RSS climbed until OOM. SendData now returns
	// ErrSendBufFull when len(NagleBuf) + len(write) would exceed
	// this cap; callers must retry with backpressure.
	// 256 KB accommodates the largest single data-exchange frame
	// (64 KB) with headroom, while still bounding per-connection memory.
	MaxNagleBuf = 64 * MaxSegmentSize
)

Default window parameters

View Source
const (
	ClockGranularity = 10 * time.Millisecond  // minimum RTTVAR for RTO calculation
	RTOMin           = 200 * time.Millisecond // minimum retransmission timeout
	RTOMax           = 10 * time.Second       // maximum retransmission timeout
	InitialRTO       = 1 * time.Second        // initial retransmission timeout
)

RTO parameters (RFC 6298)

View Source
const DefaultNetworkSyncInterval = 5 * time.Minute

DefaultNetworkSyncInterval is how often the daemon refreshes network memberships, port policies, and member tags from the registry.

View Source
const DelayedACKThreshold = 2

DelayedACKThreshold is the number of segments to receive before sending an ACK immediately.

View Source
const DelayedACKTimeout = 40 * time.Millisecond

DelayedACKTimeout is the max time to delay an ACK (RFC 1122 suggests 500ms max, we use 40ms).

View Source
const EndpointCacheTTL = 5 * time.Minute

EndpointCacheTTL is how long a cached endpoint is considered fresh. After this, the entry is stale but still usable as a fallback.

View Source
const IPCEnvelopeHeaderSize = 1

IPCEnvelopeHeaderSize is the size of the per-message header that sits inside the ipcutil length-framed envelope: 1 byte cmd.

View Source
const MaxConnsPerIPCClient = 4096

MaxConnsPerIPCClient caps how many overlay connections a single IPC client may own at once. P2-002: without this, a rogue or buggy client could exhaust the 65536-slot connection table and DoS the daemon.

View Source
const MaxIPCClients = 1024

MaxIPCClients caps the total number of concurrent IPC socket connections to the daemon (P2-002). Without this cap a misbehaving or malicious local client could exhaust the daemon's FD table by opening thousands of IPC sockets, breaking the dashboard, webhooks, and tunnel manager.

View Source
const NagleTimeout = 40 * time.Millisecond

NagleTimeout is the maximum time to buffer small writes before flushing.

View Source
const RecvChSize = 8192

RecvChSize is the capacity of the incoming packet channel. Increased from 1024 to 8192 for 1M-node scale to prevent drops during bursts (e.g., many peers sending simultaneously after a cron trigger).

View Source
const RelayProbeInterval = 5 * time.Minute

RelayProbeInterval is how often we probe relay-flagged peers for direct connectivity.

View Source
const ResolveCacheTTL = 60 * time.Second

ResolveCacheTTL is how long a registry resolve response is cached. During cron bursts, agents resolve the same peers repeatedly — this avoids hitting the registry for the same node within the TTL window.

Variables

View Source
var ErrEphemeralExhausted = errors.New("ephemeral ports exhausted")

ErrEphemeralExhausted is returned by callers of AllocEphemeralPort when all 16384 ephemeral ports (49152..65535) are simultaneously in use.

View Source
var ErrIPCBackpressure = errors.New("ipc: backpressure (client too slow)")

ErrIPCBackpressure is retained for historical compatibility with tests that import it. Post-issue-#99, ipcWrite no longer returns this — it blocks until enqueue or close. The constant is unused in production code paths.

View Source
var ErrIPCClosed = errors.New("ipc: connection closed")

ErrIPCClosed is returned when ipcWrite is called after Close.

View Source
var ErrPendingDropped = errors.New("pending queue full: oldest queued packet dropped while key exchange pending")

ErrPendingDropped is returned by sendEncryptedToNode when the per-peer pending queue was already at maxPendingPerPeer and the oldest queued packet had to be dropped to make room for the new one. The CALLER's packet is still queued — it will be sent as soon as key exchange finishes — but an older packet was lost to back-pressure.

Callers that distinguish this error from a hard failure can choose to retry (the dial path does this; one of the SYN retransmits will land after the queue drains). Surfacing it as a typed error also lets pilotctl render a "tunnel handshaking" hint instead of an opaque "send SYN: pending queue full" message.

View Source
var ErrSendBufFull = errors.New("send buffer full")

SendData sends data over an established connection. Implements Nagle's algorithm: small writes are coalesced into MSS-sized segments unless NoDelay is set. Large writes (>= MSS) are sent immediately. ErrSendBufFull is returned by SendData when the per-connection NagleBuf would exceed MaxNagleBuf if the caller's write were appended. Callers must back off and retry — typically by waiting for a webhook or polling the connection's send-buffer state.

This error replaces the silent unbounded-growth behavior that could OOM the daemon when an application wrote faster than the network drained. Pinned by TestSendDataNagleBufGrowsUnbounded.

View Source
var TunnelKeepaliveInterval = 25 * time.Second

TunnelKeepaliveInterval is the minimum gap between successive sends to a peer before keepaliveSweep enqueues a NAT-keepalive frame. Set below the 30 s lower bound of consumer-NAT UDP idle timeout. Tunable for tests via direct assignment.

Functions

func DiscoverEndpoint

func DiscoverEndpoint(beaconAddr string, nodeID uint32, conn *net.UDPConn) (*net.UDPAddr, error)

DiscoverEndpoint sends a STUN discover to the beacon and returns the observed public endpoint. Thin shim over routing.DiscoverEndpoint — the canonical implementation lives at pkg/daemon/routing/discover.go.

func EncodeSACK

func EncodeSACK(blocks []SACKBlock) []byte

EncodeSACK encodes SACK blocks into a byte slice. Format: "SACK" (4 bytes) + count (1 byte) + N * 8 bytes (Left:4, Right:4).

func RecoveredPanicCount added in v1.10.0

func RecoveredPanicCount() uint64

RecoveredPanicCount returns the total number of panics swallowed by recoverLayer since process start.

func ValidateWebhookURL added in v1.7.0

func ValidateWebhookURL(rawURL string) error

ValidateWebhookURL is a thin shim over urlvalidate.Validate. Lives in pkg/daemon (rather than plugins/webhook) so the IPC handler in ipc.go and external test packages can validate without importing the plugin (which would be an upward L7 → L11 edge).

The canonical implementation is pkg/urlvalidate; this shim exists only as a stable API surface for callers that pre-date T4.1's webhook-inversion.

Types

type Config

type Config struct {
	RegistryAddr        string
	BeaconAddr          string
	ListenAddr          string // UDP listen address for tunnel traffic
	SocketPath          string // Unix socket path for IPC
	Encrypt             bool   // enable tunnel-layer encryption (X25519 + AES-256-GCM)
	RegistryTLS         bool   // use TLS for registry connection
	RegistryFingerprint string // hex SHA-256 fingerprint for TLS cert pinning
	IdentityPath        string // path to persist Ed25519 identity (empty = no persistence)
	Email               string // email address for account identification and key recovery
	Owner               string // deprecated: use Email instead

	Endpoint string // fixed public endpoint (host:port) — skips STUN discovery (for cloud VMs)
	Public   bool   // make this node's endpoint publicly discoverable
	Hostname string // hostname for discovery (empty = none)

	// FakeListenAddr overrides the listen_addr advertised to the
	// registry. Real socket binding still uses ListenAddr; only the
	// registered metadata is rewritten. Intended for synthetic-fleet
	// scenarios where a daemon should appear at a country-residential
	// address while still operating on a real OS socket. Endpoint
	// learning on the data path is packet-driven, so peer responses
	// continue to reach the real socket regardless of the advertised
	// addr (see pkg/daemon/tunnel.go peerAddr learning).
	FakeListenAddr string

	// RelayOnly hides this node's real_addr from peer resolve/lookup
	// responses. Peers reach this node only via the beacon-relay path,
	// so this node's public IP is never exposed to other daemons.
	// Default false (current behavior). Task 32 — once the fleet is on a
	// new-enough binary, the operator default will flip to true.
	RelayOnly bool

	// Built-in services
	DisableEcho         bool // disable built-in echo service (port 7)
	DisableDataExchange bool // disable built-in data exchange service (port 1001)
	DisableEventStream  bool // disable built-in event stream service (port 1002)
	DisablePolicyRunner bool // skip loading expr policy runners and joining networks at startup

	// Webhook
	WebhookURL          string        // HTTP(S) endpoint for event notifications (empty = disabled)
	WebhookHTTPTimeout  time.Duration // HTTP client timeout for webhook POSTs (default 5s)
	WebhookRetryBackoff time.Duration // initial retry backoff for webhook POSTs (default 1s)

	// Trust
	TrustAutoApprove bool // automatically approve all incoming handshake requests

	// Fleet enrollment
	AdminToken string   // admin token for network operations (empty = disabled)
	Networks   []uint16 // network IDs to auto-join at startup (empty = none)

	// Version
	Version string // binary version string (injected via LDFLAGS at build time)

	// Feature flags — ablation testing. All default false (current behavior).
	BeaconRTTProbe bool // probe beacon RTT; override hash pick when >2× slower than best

	// Tuning (zero = use defaults)
	KeepaliveInterval     time.Duration // default 60s
	IdleTimeout           time.Duration // default 120s
	SYNRateLimit          int           // default 100
	MaxConnectionsPerPort int           // default 1024
	MaxTotalConnections   int           // default 4096
	TimeWaitDuration      time.Duration // default 10s
}

type ConnReadWriter added in v1.10.0

type ConnReadWriter interface {
	Read(p []byte) (int, error)
	Write(p []byte) (int, error)
	Close() error
}

ConnReadWriter is the public interface plugins/runtime sees for the io-side of a Connection. Matches the unexported connAdapter shape.

type ConnState

type ConnState uint8
const (
	StateClosed ConnState = iota
	StateListen
	StateSynSent
	StateSynReceived
	StateEstablished
	StateFinWait
	StateCloseWait
	StateTimeWait
)

func (ConnState) String

func (s ConnState) String() string

type ConnStats

type ConnStats struct {
	BytesSent   uint64 // total user bytes sent
	BytesRecv   uint64 // total user bytes received
	SegsSent    uint64 // data segments sent
	SegsRecv    uint64 // data segments received
	Retransmits uint64 // timeout-based retransmissions
	FastRetx    uint64 // fast retransmissions (3 dup ACKs)
	SACKRecv    uint64 // SACK blocks received from peer
	SACKSent    uint64 // SACK blocks sent to peer
	DupACKs     uint64 // duplicate ACKs received
}

ConnStats tracks per-connection traffic and reliability metrics.

type Connection

type Connection struct {
	Mu           sync.Mutex // protects State, SendSeq, RecvAck, LastActivity, Stats
	ID           uint32
	LocalAddr    protocol.Addr // our virtual address
	LocalPort    uint16
	RemoteAddr   protocol.Addr
	RemotePort   uint16
	State        ConnState
	LastActivity time.Time // updated on send/recv
	// Reliable delivery
	SendSeq uint32
	RecvAck uint32
	// SynAckSeq: sequence number used on the original outbound SYN-ACK.
	// P1-001 fix: retransmitted SYNs must receive the same seq, not a
	// stale `SendSeq-1` that has drifted forward once data flowed.
	SynAckSeq    uint32
	SynAckSeqSet bool
	SendBuf      chan []byte
	RecvBuf      chan []byte
	// Sliding window + retransmission (send side)
	RetxMu        sync.Mutex
	Unacked       []*retxEntry           // ordered by seq
	LastAck       uint32                 // highest cumulative ACK received
	DupAckCount   int                    // consecutive duplicate ACKs
	RTO           time.Duration          // retransmission timeout
	SRTT          time.Duration          // smoothed RTT
	RTTVAR        time.Duration          // RTT variance (RFC 6298)
	CongWin       int                    // congestion window in bytes
	SSThresh      int                    // slow-start threshold
	InRecovery    bool                   // true during timeout loss recovery
	FastRecovery  bool                   // true when recovery was entered via fast retransmit (not timeout)
	RecoveryPoint uint32                 // highest seq sent when entering recovery
	RetxStop      chan struct{}          // closed to stop retx goroutine
	RetxSend      func(*protocol.Packet) // callback to send retransmitted packets
	WindowCh      chan struct{}          // signaled when window opens up
	PeerRecvWin   int                    // peer's advertised receive window (-1 = not yet received, 0 = explicit zero-window)
	// Nagle algorithm (write coalescing)
	NagleBuf []byte        // pending small write data
	NagleMu  sync.Mutex    // protects NagleBuf
	NagleCh  chan struct{} // signaled when Nagle should flush
	NoDelay  bool          // if true, disable Nagle (send immediately)
	// Receive window (reassembly)
	RecvMu      sync.Mutex
	ExpectedSeq uint32         // next in-order seq expected
	OOOBuf      []*recvSegment // out-of-order buffer
	// Delayed ACK
	AckMu       sync.Mutex  // protects PendingACKs and ACKTimer
	PendingACKs int         // count of unacked received segments
	ACKTimer    *time.Timer // delayed ACK timer
	// Keepalive dead-peer detection
	KeepaliveUnacked int // consecutive unanswered keepalive probes
	// Close
	CloseOnce  sync.Once // ensures RecvBuf is closed exactly once
	RecvClosed bool      // true after RecvBuf is closed (guarded by RecvMu)

	// Retransmit state
	LastRetxTime time.Time // when last RTO retransmission fired (prevents cascading)
	// Per-connection statistics
	Stats ConnStats
	// contains filtered or unexported fields
}

func (*Connection) BytesInFlight

func (c *Connection) BytesInFlight() int

BytesInFlight returns total unacknowledged bytes, excluding SACKed segments. SACKed segments remain in c.Unacked until a cumulative ACK removes them, but they are already at the peer so they must not consume congestion-window space.

func (*Connection) CloseRecvBuf

func (c *Connection) CloseRecvBuf()

CloseRecvBuf safely closes RecvBuf exactly once. Sets RecvClosed first so new Phase 2 senders bail in DeliverInOrder, then waits for in-flight senders to drain (recvSenders WG), then closes the channel. This ordering keeps `go test -race` clean — concurrent close-vs-send was previously flagged even though the runtime made it safe via defer recover().

func (*Connection) DeliverInOrder

func (c *Connection) DeliverInOrder(seq uint32, data []byte) uint32

DeliverInOrder handles an incoming data segment, buffering out-of-order segments and delivering in-order data to RecvBuf. Returns the cumulative ACK number (next expected seq).

Three-phase design to avoid both deadlock and sequence leaks:

Phase 1: Collect segments to deliver under RecvMu (don't advance ExpectedSeq).
Phase 2: Deliver outside lock (prevents routeLoop deadlock, C1 fix).
Phase 3: Re-acquire lock, advance ExpectedSeq only for delivered segments,
         re-buffer undelivered OOO segments.

Safe because routeLoop is single-goroutine — no concurrent DeliverInOrder calls for the same connection between Phase 2 and Phase 3.

func (*Connection) EffectiveWindow

func (c *Connection) EffectiveWindow() int

EffectiveWindow returns the effective send window (minimum of congestion window and peer's advertised receive window). Must be called with RetxMu held.

func (*Connection) ProcessAck

func (c *Connection) ProcessAck(ack uint32, pureACK bool)

ProcessAck removes segments acknowledged by the given ack number, updates RTT estimate, detects duplicate ACKs for fast retransmit, and grows the congestion window. If pureACK is true, duplicate ACK detection is enabled. Data packets with piggybacked ACKs should pass pureACK=false to avoid false fast retransmits (RFC 5681 Section 3.2).

Lock-order note: Stats fields are guarded by Mu, but the rest of this function's state (Unacked, RTT, CongWin, etc.) is guarded by RetxMu. To avoid an Mu↔RetxMu inversion (a concurrent Stats() reader holds Mu then needs RetxMu, while we hold RetxMu and need Mu), we accumulate stats deltas locally and apply them under Mu AFTER RetxMu is released.

func (*Connection) ProcessSACK

func (c *Connection) ProcessSACK(blocks []SACKBlock)

ProcessSACK marks unacked segments that are covered by SACK blocks. This prevents unnecessary retransmission of segments the peer already has.

Lock-order note: see ProcessAck. Stats writes are deferred out of the RetxMu hold to avoid an Mu↔RetxMu inversion.

func (*Connection) RecvWindow

func (c *Connection) RecvWindow() uint16

RecvWindow returns the number of free segments in the receive buffer, used as the advertised receive window.

func (*Connection) SACKBlocks

func (c *Connection) SACKBlocks() []SACKBlock

SACKBlocks returns SACK blocks describing out-of-order received segments. Must be called with RecvMu held.

func (*Connection) TrackSend

func (c *Connection) TrackSend(seq uint32, data []byte)

TrackSend adds a sent data segment to the retransmission buffer.

func (*Connection) WindowAvailable

func (c *Connection) WindowAvailable() bool

WindowAvailable returns true if the effective window allows more data. Must be called with RetxMu held.

type ConnectionInfo

type ConnectionInfo struct {
	ID          uint32
	LocalPort   uint16
	RemoteAddr  string
	RemotePort  uint16
	State       string
	SendSeq     uint32
	RecvAck     uint32
	CongWin     int
	SSThresh    int
	InFlight    int
	SRTT        time.Duration
	RTTVAR      time.Duration
	Unacked     int
	OOOBuf      int
	PeerRecvWin int
	RecvWin     int
	InRecovery  bool
	Stats       ConnStats
}

ConnectionInfo describes an active connection for diagnostics.

type Daemon

type Daemon struct {

	// AcceptQueueDrops counts SYNs that hit a full Listener.AcceptCh.
	// Each drop sends a RST back to the dialer (so the peer learns
	// immediately) and bumps this counter for operator visibility.
	// Without this, accept-queue overflows are silent slow-downs that
	// only surface as application-level connection failures.
	AcceptQueueDrops uint64
	// contains filtered or unexported fields
}

func New

func New(cfg Config) *Daemon

func (*Daemon) AddTunnelPeer added in v1.4.0

func (d *Daemon) AddTunnelPeer(nodeID uint32, addr *net.UDPAddr)

AddTunnelPeer registers a peer's address in the tunnel manager (for testing/manual setup).

func (*Daemon) Addr

func (d *Daemon) Addr() protocol.Addr

func (*Daemon) AdminToken added in v1.10.0

func (d *Daemon) AdminToken() string

func (*Daemon) BroadcastDatagram added in v1.9.0

func (d *Daemon) BroadcastDatagram(netID uint16, dstPort uint16, data []byte, adminToken string) error

BroadcastDatagram is the admin-gated entry point for sending a datagram to every member of a network. The admin token must be non-empty and equal to the daemon's configured Config.AdminToken (constant-time compare). With a valid token, broadcast is permitted on every network including the backbone (network 0); membership of the sender is NOT required — admin tokens are root-level. Per-recipient outbound port policy still applies.

func (*Daemon) Bus added in v1.10.0

func (d *Daemon) Bus() *inProcessBus

Bus returns the daemon's in-process pub/sub bus. Plugins/runtime uses it to construct a coreapi.EventBus adapter.

func (*Daemon) CachedEndpoint added in v1.5.1

func (d *Daemon) CachedEndpoint(nodeID uint32) (string, bool)

CachedEndpoint returns a previously cached endpoint for a peer (exported for testing).

func (*Daemon) CloseConnection

func (d *Daemon) CloseConnection(conn *Connection)

CloseConnection sends FIN and enters FIN_WAIT. The FIN is tracked in the retransmission buffer so it will be retried if lost — the existing retxLoop handles it. When FIN-ACK is received the connection moves to TIME_WAIT and is eventually reaped by idleSweepLoop.

func (*Daemon) DialConnection

func (d *Daemon) DialConnection(dstAddr protocol.Addr, dstPort uint16) (*Connection, error)

DialConnection initiates a connection to a remote address:port. Wraps DialConnectionContext with context.Background() for callers that don't have cancellation semantics (handshake bootstrap, internal probes, tests). New code should prefer DialConnectionContext.

func (*Daemon) DialConnectionContext added in v1.9.1

func (d *Daemon) DialConnectionContext(ctx context.Context, dstAddr protocol.Addr, dstPort uint16) (*Connection, error)

DialConnectionContext is the cancellable variant of DialConnection. Used by the IPC handler so a dial started on behalf of a pilotctl command is aborted (and its SYN_SENT entry removed) the moment the IPC client disconnects — no more orphan SYN_SENTs from Ctrl+C'd commands grinding through the full retry budget.

Each call returns a fresh, independent *Connection (its own ephemeral port + SYN handshake). This matches BSD-socket semantics where concurrent connect() to the same (peer, port) yield independent streams. Issue #105: the previous (v1.9.1) per-(peer,port) dedup returned the SAME *Connection to every concurrent caller; under fan-in workloads (e.g. 10 task submissions racing) all callers shared one underlying conn-ID, the IPC driver's recvCh map keyed on conn-ID was overwritten by each subsequent registerRecvCh, and frames from multiple callers got muxed into a single byte stream — producing JSON-decode errors on the receiver and indefinite hangs on the senders waiting for a response that never came back to their (orphaned) Conn. The shared resource that genuinely benefits from per-peer dedup is the tunnel/x25519 handshake; that is already idempotent inside TunnelManager.AddPeer/HasPeer.

func (*Daemon) GetManagedEngine added in v1.6.0

func (d *Daemon) GetManagedEngine(netID uint16) *ManagedEngine

GetManagedEngine returns the managed engine for a network, or nil.

func (*Daemon) GetMemberTags added in v1.6.0

func (d *Daemon) GetMemberTags(netID uint16) []string

GetMemberTags returns the cached member tags for the local node in a network.

func (*Daemon) GetPolicyRunner added in v1.6.0

func (d *Daemon) GetPolicyRunner(netID uint16) PolicyRunner

GetPolicyRunner returns the policy runner for a network, or nil. Delegates to the registered policy manager.

func (*Daemon) GetTrustChecker added in v1.10.0

func (d *Daemon) GetTrustChecker() TrustChecker

GetTrustChecker exposes the registered trust checker (or nil) so plugins/runtime can include it in the coreapi.Deps it constructs.

func (*Daemon) HandshakeRevokeTrust added in v1.10.0

func (d *Daemon) HandshakeRevokeTrust(nodeID uint32) error

func (*Daemon) HandshakeSendRequest added in v1.10.0

func (d *Daemon) HandshakeSendRequest(nodeID uint32, reason string) error

func (*Daemon) HandshakeService added in v1.10.0

func (d *Daemon) HandshakeService() HandshakeService

HandshakeService returns the registered HandshakeService (or nil if no handshake plugin has been registered). Mostly used by daemon internals; external callers prefer the typed wrappers above.

func (*Daemon) Identity

func (d *Daemon) Identity() *crypto.Identity

Identity returns the daemon's Ed25519 identity (may be nil if unset).

func (*Daemon) IdentityPath added in v1.10.0

func (d *Daemon) IdentityPath() string

IdentityPath returns the configured identity file path, or "" if the daemon runs without persistent identity. Exposed so the handshake plugin can derive its trust.json store path from the same directory.

func (*Daemon) Info

func (d *Daemon) Info() *DaemonInfo

Info returns current daemon status.

func (*Daemon) NewConnReadWriter added in v1.10.0

func (d *Daemon) NewConnReadWriter(conn *Connection) ConnReadWriter

NewConnReadWriter wraps a Connection with the daemon's net.Conn adapter. plugins/runtime uses this to expose the conn as a coreapi.Stream without needing access to the unexported connAdapter.

func (*Daemon) NodeID

func (d *Daemon) NodeID() uint32

func (*Daemon) Ports added in v1.10.0

func (d *Daemon) Ports() *PortManager

Ports returns the daemon's port manager. Plugins/runtime uses this to bind a coreapi.Listener for plugin services.

func (*Daemon) PublishEvent added in v1.10.0

func (d *Daemon) PublishEvent(topic string, payload map[string]any)

func (*Daemon) RegConnListNodes added in v1.10.0

func (d *Daemon) RegConnListNodes(netID uint16, token string) (map[string]any, error)

func (*Daemon) RegisterHandshakeService added in v1.10.0

func (d *Daemon) RegisterHandshakeService(svc HandshakeService)

RegisterHandshakeService installs the daemon-wide HandshakeService implementation provided by the handshake plugin (T3.3). Called from the composition root after constructing the plugin's Service. Replaces any previously-registered service.

func (*Daemon) RegisterPolicyManager added in v1.10.0

func (d *Daemon) RegisterPolicyManager(pm PolicyManager)

RegisterPolicyManager wires the L11 policy plugin into the daemon. cmd/daemon (composition root) constructs the plugin and calls this. Takes the daemon-local PolicyManager interface (primitive signatures only); the plugin's manager satisfies it via structural typing without daemon importing pkg/coreapi (T7.1).

func (*Daemon) RegisterTrustChecker added in v1.10.0

func (d *Daemon) RegisterTrustChecker(tc TrustChecker)

RegisterTrustChecker installs the daemon-wide TrustChecker used by the handshake handler to gate auto-accept. Replaces any previously- registered checker. Takes the daemon-local interface (primitive signatures) — plugin types satisfy it via structural typing.

func (*Daemon) RegisterWebhookManager added in v1.10.0

func (d *Daemon) RegisterWebhookManager(wm WebhookManager)

RegisterWebhookManager wires the L11 webhook plugin into the daemon (T4.1). cmd/daemon (composition root) constructs the plugin and calls this with an adapter satisfying the daemon-local interface.

func (*Daemon) RegistryClient added in v1.10.0

func (d *Daemon) RegistryClient() *registry.Client

RegistryClient returns the underlying L8 registry-side-channel client (or nil if no registry is configured). Exposed so the handshake plugin can issue Lookup/ReportTrust/RevokeTrust / RequestHandshake / RespondHandshake / PollHandshakes against the same client the daemon uses elsewhere — there is no separate connection or auth context.

func (*Daemon) RotateKey added in v1.9.0

func (d *Daemon) RotateKey() (map[string]interface{}, error)

RotateKey generates a new Ed25519 keypair, proves ownership of the current key to the registry via a `rotate:<node_id>` signature, atomically swaps the in-memory identity on success, and persists it to disk. Returns the registry response on success.

func (*Daemon) SendData

func (d *Daemon) SendData(conn *Connection, data []byte) error

func (*Daemon) SendDatagram

func (d *Daemon) SendDatagram(dstAddr protocol.Addr, dstPort uint16, data []byte) error

SendDatagram sends an unreliable unicast packet. Broadcast addresses (Node=0xFFFFFFFF) are rejected — use BroadcastDatagram, which requires an admin token.

func (*Daemon) SetMemberTags added in v1.6.0

func (d *Daemon) SetMemberTags(netID uint16, tags []string)

SetMemberTags updates the cached member tags for the local node in a network.

func (*Daemon) SetWebhookURL

func (d *Daemon) SetWebhookURL(url string)

SetWebhookURL hot-swaps the webhook URL by delegating to the registered WebhookManager (the plugins/webhook plugin). Called from IPC's set-webhook handler. No-op if no manager is registered (e.g. unit-test daemons that bypass cmd/daemon's plugin wiring).

func (*Daemon) Start

func (d *Daemon) Start() error

func (*Daemon) StartManagedEngine added in v1.6.0

func (d *Daemon) StartManagedEngine(netID uint16, rules *registrywire.NetworkRules)

StartManagedEngine starts a managed engine for a newly joined network.

func (*Daemon) StartPolicyRunner added in v1.6.0

func (d *Daemon) StartPolicyRunner(netID uint16, policyJSON json.RawMessage) error

StartPolicyRunner compiles a policy JSON and registers a runner via the policy manager. Errors when no manager is registered.

func (*Daemon) Stop

func (d *Daemon) Stop() error

func (*Daemon) StopManagedEngine added in v1.6.0

func (d *Daemon) StopManagedEngine(netID uint16)

StopManagedEngine stops a managed engine (e.g., on network leave).

func (*Daemon) StopPolicyRunner added in v1.6.0

func (d *Daemon) StopPolicyRunner(netID uint16)

StopPolicyRunner stops the policy runner for a network. No-op if no manager is registered or no runner exists for netID.

func (*Daemon) TrustAutoApprove added in v1.10.0

func (d *Daemon) TrustAutoApprove() bool

TrustAutoApprove reports whether the daemon was started with the trust-auto-approve flag set. Exposed for the handshake plugin's auto-accept gate.

func (*Daemon) TrustedPeers added in v1.10.0

func (d *Daemon) TrustedPeers() []HandshakeTrustRecord

TrustedPeers returns the trust records held by the registered handshake service. Returns nil when no handshake plugin is wired (e.g. unit tests that bypass plugins/runtime).

func (*Daemon) TunnelAddr added in v1.4.0

func (d *Daemon) TunnelAddr() net.Addr

TunnelAddr returns the local UDP address of the tunnel listener.

func (*Daemon) Tunnels added in v1.10.0

func (d *Daemon) Tunnels() *TunnelManager

Tunnels returns the daemon's tunnel manager. Mostly for in-tree plugins that want direct access (tests + unusual integrations).

type DaemonInfo

type DaemonInfo struct {
	NodeID                uint32
	Address               string
	Endpoint              string // host:port registered with the rendezvous (proves beacon discover succeeded)
	Hostname              string
	Uptime                time.Duration
	Connections           int
	Ports                 int
	Peers                 int
	EncryptedPeers        int
	AuthenticatedPeers    int
	Encrypt               bool
	Identity              bool   // true if identity is persisted
	PublicKey             string // base64 Ed25519 public key (empty if no identity)
	Email                 string // email address for account identification and key recovery
	BytesSent             uint64
	BytesRecv             uint64
	PktsSent              uint64
	PktsRecv              uint64
	EncryptOK             uint64
	EncryptFail           uint64
	HandshakePendingCount int
	Version               string
	Networks              []NetworkMembership
	PeerList              []PeerInfo
	ConnList              []ConnectionInfo

	// v1.9.1: health metrics surfaced for operators / dashboards.
	// These counters live elsewhere (Daemon.AcceptQueueDrops, the
	// WebhookClient internals) but had no path to reach `pilotctl info`
	// until now. Each is monotonic; operators compute rate by diffing
	// two snapshots over time.
	AcceptQueueDrops    uint64
	WebhookQueueDropped uint64
	WebhookCircuitSkips uint64

	RelayPeerCount int    // peers currently on relay path (symmetric NAT)
	BeaconAddr     string // active beacon address
}

type Event added in v1.10.0

type Event struct {
	Topic   string
	NodeID  uint32
	Time    time.Time
	Payload map[string]any
}

Event is the daemon-local view of a published event. plugins/runtime adapts this to coreapi.Event when wrapping the bus for plugin Deps. Defined here (instead of importing pkg/coreapi) to keep pkg/daemon free of L10 imports (T7.1).

type HandshakePendingRecord added in v1.10.0

type HandshakePendingRecord struct {
	NodeID        uint32
	PublicKey     string
	Justification string
	ReceivedAt    time.Time
}

HandshakePendingRecord mirrors plugins/handshake.PendingHandshake for the same reason as HandshakeTrustRecord above.

type HandshakeService added in v1.10.0

type HandshakeService interface {
	IsTrusted(nodeID uint32) bool
	TrustedPeers() []HandshakeTrustRecord
	PendingRequests() []HandshakePendingRecord
	PendingCount() int
	SendRequest(peerNodeID uint32, justification string) error
	ApproveHandshake(peerNodeID uint32) error
	RejectHandshake(peerNodeID uint32, reason string) error
	RevokeTrust(peerNodeID uint32) error

	// WaitForTrust blocks until the peer transitions to trusted, or the
	// timeout elapses. Returns true if trust was granted in time.
	// Wired through SubHandshakeWait so callers (typically pilotctl
	// before a first send to a trusted-list peer) can block bidirectional
	// operations on trust establishment instead of racing the data send
	// against the handshake reply.
	WaitForTrust(peerNodeID uint32, timeout time.Duration) bool

	// ProcessRelayedRequest / ProcessRelayedApproval / ProcessRelayedRejection
	// are invoked from Daemon.pollRelayedHandshakes after parsing the
	// registry-inbox payload.
	ProcessRelayedRequest(fromNodeID uint32, justification string)
	ProcessRelayedApproval(fromNodeID uint32)
	ProcessRelayedRejection(fromNodeID uint32)

	// Stop drains background RPCs and stops the replay reaper.
	Stop()
}

HandshakeService is the daemon-facing surface of the manual trust-handshake plugin (port 444). The plugin's *Manager satisfies this via Go's structural typing — daemon never imports plugins/handshake (T3.3).

All trust-handshake operations route through this interface: IPC command dispatch (ipc.go), trust-gate checks on inbound SYN / datagrams, registry-relay polling, and trust-pair re-sync after reconnect.

type HandshakeTrustRecord added in v1.10.0

type HandshakeTrustRecord struct {
	NodeID     uint32
	PublicKey  string
	ApprovedAt time.Time
	Mutual     bool
	Network    uint16
}

HandshakeTrustRecord mirrors plugins/handshake.TrustRecord so the daemon-local HandshakeService interface stays primitive-only (no upward import). Field set is identical to the plugin's TrustRecord — Go's structural typing matches []HandshakeTrustRecord against the plugin method's []handshake.TrustRecord because the field shapes align, but only if the slices use the same named type. The plugin's adapter therefore returns a converted []HandshakeTrustRecord built from its own TrustRecord values.

type IPCServer

type IPCServer struct {
	// contains filtered or unexported fields
}

IPCServer handles connections from local drivers over Unix socket.

func NewIPCServer

func NewIPCServer(socketPath string, d *Daemon) *IPCServer

func (*IPCServer) Close

func (s *IPCServer) Close() error

func (*IPCServer) DeliverDatagram

func (s *IPCServer) DeliverDatagram(srcAddr protocol.Addr, srcPort uint16, dstPort uint16, data []byte)

Deliver a datagram to any listening IPC client

func (*IPCServer) Start

func (s *IPCServer) Start() error

type IncomingPacket

type IncomingPacket struct {
	Packet *protocol.Packet
	From   *net.UDPAddr
}

type Listener

type Listener struct {
	Port     uint16
	AcceptCh chan *Connection
	// contains filtered or unexported fields
}

func (*Listener) TrySend added in v1.9.1

func (ln *Listener) TrySend(conn *Connection) bool

TrySend attempts a non-blocking push of conn to AcceptCh. Returns true if the connection was queued, false if the queue was full or the listener has been closed (via Unbind). Safe to call after Unbind — never panics even if AcceptCh is already closed.

v1.9.1 fix: the closed flag is checked under mu before the channel send, eliminating the window where Unbind()'s close(AcceptCh) races with handleStreamPacket's send. Without this guard, a concurrent Unbind could close AcceptCh between GetListener's return and the send, causing a "send on closed channel" panic that killed routeLoop.

type ManagedEngine added in v1.6.0

type ManagedEngine struct {
	// contains filtered or unexported fields
}

ManagedEngine runs the managed network cycle for a single network. It maintains a local peer set and runs periodic prune/fill cycles. All state is daemon-local — the registry only stores the rules.

func NewManagedEngine added in v1.6.0

func NewManagedEngine(netID uint16, rules *registry.NetworkRules, d *Daemon) *ManagedEngine

NewManagedEngine creates a managed engine for a network. It loads persisted state if available, or bootstraps from the member list.

func (*ManagedEngine) Bootstrap added in v1.6.0

func (me *ManagedEngine) Bootstrap() error

Bootstrap populates the managed set from the network member list. Called on first join or when persisted state is empty.

func (*ManagedEngine) ForceCycle added in v1.6.0

func (me *ManagedEngine) ForceCycle() map[string]interface{}

ForceCycle runs a cycle immediately, outside the timer.

func (*ManagedEngine) Start added in v1.6.0

func (me *ManagedEngine) Start()

Start begins the cycle loop. Should be called after construction.

func (*ManagedEngine) Status added in v1.6.0

func (me *ManagedEngine) Status() map[string]interface{}

Status returns a summary of the managed engine state.

func (*ManagedEngine) Stop added in v1.6.0

func (me *ManagedEngine) Stop()

Stop signals the cycle loop to exit and waits for it.

type NetworkMembership added in v1.6.2

type NetworkMembership struct {
	NetworkID uint16 `json:"network_id"`
	Address   string `json:"address"`
}

DaemonInfo holds status information about the running daemon.

type PeerInfo

type PeerInfo struct {
	NodeID        uint32
	Endpoint      string
	Encrypted     bool
	Authenticated bool // true if peer proved Ed25519 identity
	Relay         bool // true if using beacon relay (symmetric NAT)
}

PeerInfo describes a known peer.

type PolicyEventType added in v1.10.0

type PolicyEventType = string

PolicyEventType is the kind of protocol event a policy is evaluated against. Type alias to string mirrors coreapi.PolicyEventType so plugin signatures match exactly via structural typing (T7.1).

type PolicyManager added in v1.10.0

type PolicyManager interface {
	Start(netID uint16, policyJSON []byte) (PolicyRunner, error)
	Stop(netID uint16)
	Get(netID uint16) PolicyRunner
	All() []PolicyRunner
	StopAll()
	LoadPersisted() error
}

PolicyManager owns the per-network registry of policy runners.

type PolicyRunner added in v1.6.0

type PolicyRunner interface {
	NetworkID() uint16
	HasMember(peerNodeID uint32) bool

	// EvaluatePortGate takes a string event-type ("connect", "dial",
	// "datagram", ...). plugins/policy.EventType is a type alias to
	// coreapi.PolicyEventType which is itself a type alias to string —
	// so plugin signatures match this exactly.
	EvaluatePortGate(eventType string, port uint16, peerNodeID uint32, payloadSize int, direction string, localTags, nodeInfoTags []string) bool

	EvaluateActions(eventType string, ctx map[string]any)
	Status() map[string]any
	PeerList() []map[string]interface{}
	ForceCycle() map[string]any
	ReconcileNow()
	PolicyJSON() ([]byte, error)
	Stop()
}

PolicyRunner is the daemon-facing surface of a single network's running policy. The plugin's concrete *PolicyRunner satisfies this via structural typing.

type PortManager

type PortManager struct {
	// contains filtered or unexported fields
}

PortManager handles virtual port binding and connection tracking.

func NewPortManager

func NewPortManager() *PortManager

func (*PortManager) AllConnections

func (pm *PortManager) AllConnections() []*Connection

AllConnections returns all active connections.

func (*PortManager) AllocEphemeralPort

func (pm *PortManager) AllocEphemeralPort() uint16

AllocEphemeralPort returns an unused ephemeral port in [PortEphemeralMin, PortEphemeralMax], or 0 when all 16384 ports are simultaneously in use. Callers must treat 0 as ErrEphemeralExhausted.

v1.9.1 fix: the previous implementation incremented nextEphPort as uint16 and checked `> PortEphemeralMax` afterward. When nextEphPort was 65535, uint16++ silently wrapped to 0 before the check fired, so the loop would escape the ephemeral range and return port 0 via `portInUse(0) == false`. The fix uses a bounded int counter and checks >= PortEphemeralMax BEFORE the increment so the uint16 field never reaches 0 via overflow.

func (*PortManager) Bind

func (pm *PortManager) Bind(port uint16) (*Listener, error)

func (*PortManager) ConnectionCountForPort

func (pm *PortManager) ConnectionCountForPort(port uint16) int

ConnectionCountForPort returns the number of active connections on a port.

func (*PortManager) ConnectionList

func (pm *PortManager) ConnectionList() []ConnectionInfo

ConnectionList returns info about all active connections.

func (*PortManager) FindConnection

func (pm *PortManager) FindConnection(localPort uint16, remoteAddr protocol.Addr, remotePort uint16) *Connection

func (*PortManager) GetConnection

func (pm *PortManager) GetConnection(id uint32) *Connection

func (*PortManager) GetListener

func (pm *PortManager) GetListener(port uint16) *Listener

func (*PortManager) IdleConnections

func (pm *PortManager) IdleConnections(maxIdle time.Duration) []*Connection

IdleConnections returns connections that have been idle longer than the given duration.

func (*PortManager) NewConnection

func (pm *PortManager) NewConnection(localPort uint16, remoteAddr protocol.Addr, remotePort uint16) *Connection

func (*PortManager) RemoveConnection

func (pm *PortManager) RemoveConnection(id uint32)

func (*PortManager) StaleConnections

func (pm *PortManager) StaleConnections(timeWaitDur time.Duration) []*Connection

StaleConnections returns connections in a terminal state that should be cleaned up. CLOSED, FIN_WAIT, CLOSE_WAIT are cleaned up immediately. TIME_WAIT connections are cleaned up after timeWaitDur.

func (*PortManager) TotalActiveConnections

func (pm *PortManager) TotalActiveConnections() int

TotalActiveConnections returns the total number of non-closed connections.

func (*PortManager) Unbind

func (pm *PortManager) Unbind(port uint16)

type SACKBlock

type SACKBlock struct {
	Left  uint32
	Right uint32
}

SACKBlock represents a contiguous range of received bytes. Left is the seq of the first byte; Right is the seq of the first byte AFTER the range.

func DecodeSACK

func DecodeSACK(data []byte) ([]SACKBlock, bool)

DecodeSACK parses SACK blocks from an ACK payload. Returns nil, false if the payload is not SACK data.

type TrustChecker added in v1.10.0

type TrustChecker interface {
	IsTrusted(nodeID uint32) (string, bool)
}

TrustChecker is the daemon-facing surface of the trustedagents plugin. The handshake handler consults this for auto-accept.

type TunnelManager

type TunnelManager struct {

	// Metrics
	BytesSent   uint64
	BytesRecv   uint64
	PktsSent    uint64
	PktsRecv    uint64
	EncryptOK   uint64
	EncryptFail uint64
	// P1-008: packets dropped from the per-peer pending queue while waiting
	// for key exchange. Exposed so operators can tell a congested overlay
	// apart from a silent crypto stall.
	PendingDrops uint64
	// contains filtered or unexported fields
}

TunnelManager manages real UDP tunnels to peer daemons.

func NewTunnelManager

func NewTunnelManager() *TunnelManager

func (*TunnelManager) AddPeer

func (tm *TunnelManager) AddPeer(nodeID uint32, addr *net.UDPAddr)

AddPeer registers a peer's real UDP endpoint.

func (*TunnelManager) Close

func (tm *TunnelManager) Close() error

func (*TunnelManager) DropCrypto added in v1.9.1

func (tm *TunnelManager) DropCrypto(peerNodeID uint32)

DropCrypto removes the encryption context for a peer, forcing a fresh key exchange on the next encrypted send. Used by the dial-timeout exhausted path to recover from peer-side AEAD key divergence (older daemon versions can drift into a state where their derived key no longer matches ours despite stable X25519 pubkeys; the only recovery is a full re-handshake). Also clears any pending-rekey state so the retransmit loop doesn't immediately fire — the next dial's sendKeyExchangeToNode will re-arm it.

func (*TunnelManager) EnableEncryption

func (tm *TunnelManager) EnableEncryption() error

EnableEncryption generates an X25519 keypair and enables tunnel encryption.

func (*TunnelManager) HasCrypto

func (tm *TunnelManager) HasCrypto(nodeID uint32) bool

HasCrypto returns true if we have an encryption context for a peer (proving prior key exchange).

func (*TunnelManager) HasPeer

func (tm *TunnelManager) HasPeer(nodeID uint32) bool

HasPeer checks if we have a tunnel to a node.

func (*TunnelManager) IsEncrypted

func (tm *TunnelManager) IsEncrypted(nodeID uint32) bool

IsEncrypted returns true if the tunnel to a peer is encrypted.

func (*TunnelManager) IsRelayPeer

func (tm *TunnelManager) IsRelayPeer(nodeID uint32) bool

IsRelayPeer returns true if the peer is in relay mode. Thin shim over routing.Manager.IsRelayPeer.

func (*TunnelManager) Listen

func (tm *TunnelManager) Listen(addr string) error

Listen starts the UDP listener for incoming tunnel traffic.

func (*TunnelManager) LocalAddr

func (tm *TunnelManager) LocalAddr() net.Addr

func (*TunnelManager) PeerCount

func (tm *TunnelManager) PeerCount() int

PeerCount returns the number of known peers.

func (*TunnelManager) PeerList

func (tm *TunnelManager) PeerList() []PeerInfo

PeerList returns all known peers and their endpoints.

func (*TunnelManager) RecvCh

func (tm *TunnelManager) RecvCh() <-chan *IncomingPacket

RecvCh returns the channel for incoming packets.

func (*TunnelManager) RegisterWithBeacon

func (tm *TunnelManager) RegisterWithBeacon()

RegisterWithBeacon sends a MsgDiscover to the beacon from the tunnel socket using the real nodeID, so the beacon knows our endpoint for punch coordination. Thin shim over routing.Manager.RegisterWithBeacon.

func (*TunnelManager) RelayPeerIDs added in v1.8.0

func (tm *TunnelManager) RelayPeerIDs() []uint32

RelayPeerIDs returns the node IDs of all relay-flagged peers. Thin shim over routing.Manager.RelayPeerIDs.

func (*TunnelManager) RemovePeer

func (tm *TunnelManager) RemovePeer(nodeID uint32)

RemovePeer removes a peer and all per-peer metadata. Long-running daemons with peer churn (handshake revocations, network leaves) previously leaked entries in lastDirectRecv, blackholeMissCount, directClearCount, relayPeers, peerPubKeys, pendingRekey, and lastInboundDecrypt — none of which had any other deletion path. A reused nodeID would also inherit stale state (e.g. trip the relay flip on the third miss because blackholeMissCount=2 from the previous tenant).

func (*TunnelManager) RequestHolePunch

func (tm *TunnelManager) RequestHolePunch(targetNodeID uint32)

RequestHolePunch asks the beacon to coordinate NAT hole-punching with a target peer. Thin shim over routing.Manager.RequestHolePunch.

func (*TunnelManager) Send

func (tm *TunnelManager) Send(nodeID uint32, pkt *protocol.Packet) error

Send encapsulates and sends a packet to the given node.

func (*TunnelManager) SendDirectProbe added in v1.9.0

func (tm *TunnelManager) SendDirectProbe(nodeID uint32, pkt *protocol.Packet) error

SendDirectProbe sends an encrypted packet straight to the peer's last-known direct UDP endpoint, bypassing the relay wrapping that Send/SendTo would apply if relayPeers[nodeID] is true. Used by relayProbeLoop to test whether the direct path has recovered without tearing down the relay flag for concurrent traffic. Returns an error if no direct endpoint is known or if the peer's stored addr is the beacon (meaning we never learned a real direct addr for this peer).

P1-010 fix: previously relayProbeLoop temporarily flipped SetRelayPeer (nodeID, false), sent the probe via Send, then restored the flag after 2s. During that window every concurrent send — including key-exchange replies triggered by the peer's "no key" warnings — bypassed relay too. If the direct path was still dead (e.g. symmetric NAT + stale mapping), those replies were silently dropped, leaving crypto desynced indefinitely. SendDirectProbe isolates the probe without disturbing other traffic.

func (*TunnelManager) SendTo

func (tm *TunnelManager) SendTo(addr *net.UDPAddr, nodeID uint32, pkt *protocol.Packet) error

SendTo sends a packet to a specific UDP address (relay-aware).

func (*TunnelManager) SetBeaconAddr

func (tm *TunnelManager) SetBeaconAddr(addr string) error

SetBeaconAddr configures the beacon address for NAT hole-punching and relay. Thin shim over routing.Manager.SetBeaconAddr.

func (*TunnelManager) SetEventBus added in v1.10.0

func (tm *TunnelManager) SetEventBus(bus *inProcessBus)

SetEventBus wires the daemon's pub/sub bus into the tunnel layer. Replaces the inline tm.webhook.Emit pattern: core layers Publish, observability plugins (webhook, future eventstream broker) Subscribe via daemonEventBus.

func (*TunnelManager) SetIdentity

func (tm *TunnelManager) SetIdentity(id *crypto.Identity)

SetIdentity sets our Ed25519 identity for signing authenticated key exchanges. Forwards to keyexchange.Manager (L5 owner of identity).

func (*TunnelManager) SetNodeID

func (tm *TunnelManager) SetNodeID(id uint32)

SetNodeID sets our node ID (called after registration). Propagates to the envelope store so the encrypt-side AAD / frame-header use the same value.

func (*TunnelManager) SetPeerVerifyFunc

func (tm *TunnelManager) SetPeerVerifyFunc(fn func(uint32) (ed25519.PublicKey, error))

SetPeerVerifyFunc sets a callback to fetch a peer's Ed25519 public key from the registry. Forwards to keyexchange.Manager.

func (*TunnelManager) SetRelayPeer

func (tm *TunnelManager) SetRelayPeer(nodeID uint32, relay bool)

SetRelayPeer marks a peer as needing relay through the beacon (symmetric NAT). Thin shim over routing.Manager.SetRelayPeer.

func (*TunnelManager) SetRelayPeerPinned added in v1.9.1

func (tm *TunnelManager) SetRelayPeerPinned(nodeID uint32, relay bool)

SetRelayPeerPinned is like SetRelayPeer but also marks the relay flag as authoritative — ClearRelayOnDirect will never auto-flip a pinned peer back to direct based on observed packet sources. Thin shim over routing.Manager.SetRelayPeerPinned.

type WebhookManager added in v1.10.0

type WebhookManager interface {
	// SetURL hot-swaps the active webhook URL. Empty URL disables
	// delivery (no-op until set again).
	SetURL(url string)

	// Stats returns dispatcher counters for daemon Info. All-zero
	// when no client is configured (nil-safe at the plugin level).
	Stats() WebhookStats
}

WebhookManager is the daemon-local face of the webhook plugin (plugins/webhook). The plugin owns the HTTP client; daemon only needs to (a) hot-swap the URL when IPC's set-webhook fires and (b) read counters for the DaemonInfo health snapshot.

Defined as an interface so daemon stays free of any plugin import (T4.1 webhook-inversion). cmd/daemon/main.go registers an adapter that satisfies this interface from plugins/webhook.Service.

type WebhookStats added in v1.10.0

type WebhookStats struct {
	Dropped      uint64
	CircuitSkips uint64
}

WebhookStats is the daemon-local mirror of plugins/webhook.Stats. Same shape, different package — daemon can hold the value type without importing the plugin.

Directories

Path Synopsis
Package envelope is L6 — the per-peer AEAD framing layer.
Package envelope is L6 — the per-peer AEAD framing layer.
Package keyexchange is L5 — establishes the per-peer AEAD session key (with optional Ed25519 authentication of identity).
Package keyexchange is L5 — establishes the per-peer AEAD session key (with optional Ed25519 authentication of identity).
Package routing is L4 — peer discovery & routing.
Package routing is L4 — peer discovery & routing.
Package udpio is the L2 datagram-I/O layer of the daemon: it owns the *net.UDPConn socket FD and provides "dumb" Send/Recv primitives.
Package udpio is the L2 datagram-I/O layer of the daemon: it owns the *net.UDPConn socket FD and provides "dumb" Send/Recv primitives.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL