Documentation
¶
Index ¶
- Constants
- Variables
- func ListenUDP(bindIP string, port int) (*udpService, error)
- func NewManager(ctx context.Context, cfg *ManagerConfig) (*manager, error)
- func NewRouteReaderWriter(lm Manager, rrw RouteReaderWriter, iface string) *routeReaderWriter
- func NewUDPService(raw *net.UDPConn) (*udpService, error)
- type ControlPacket
- type DownReason
- type EventQueue
- type HandleRxFunc
- type Manager
- type ManagerConfig
- type Metrics
- type Peer
- type Receiver
- type RouteKey
- type RouteReaderWriter
- type Scheduler
- type Session
- func (s *Session) ArmDetect(now time.Time) (time.Time, bool)
- func (s *Session) ComputeNextTx(now time.Time, rnd *rand.Rand) time.Time
- func (s *Session) ExpireIfDue(now time.Time) (expired bool)
- func (s *Session) GetState() State
- func (s *Session) HandleRx(now time.Time, ctrl *ControlPacket) (changed bool)
- func (s *Session) Snapshot() SessionSnapshot
- type SessionDownFunc
- type SessionSnapshot
- type State
- type UDPService
Constants ¶
const ( // Labels. LabelIface = "iface" LabelLocalIP = "local_ip" LabelPeerIP = "peer_ip" LabelState = "state" LabelStateTo = "state_to" LabelStateFrom = "state_from" LabelReason = "reason" LabelOperation = "operation" )
const ( // DefaultLivenessPort is the default port for liveness probes. // This is expected to be the same across all clients. DefaultLivenessPort = 44880 )
Variables ¶
var ( ErrShortPacket = errors.New("short packet") ErrInvalidLength = errors.New("invalid length") )
Functions ¶
func ListenUDP ¶
ListenUDP binds an IPv4 UDP socket to bindIP:port and returns a configured UDPService. The returned connection is ready to read/write with control message support enabled.
func NewManager ¶
func NewManager(ctx context.Context, cfg *ManagerConfig) (*manager, error)
NewManager constructs a Manager, opens the UDP socket, and launches the receiver and scheduler loops. The context governs their lifetime.
func NewRouteReaderWriter ¶
func NewRouteReaderWriter(lm Manager, rrw RouteReaderWriter, iface string) *routeReaderWriter
NewRouteReaderWriter creates an interface-scoped RouteReaderWriter that wraps the liveness Manager and a concrete routing backend. This allows the BGP plugin to use standard routing calls while the Manager tracks route liveness on a per-interface basis.
func NewUDPService ¶
NewUDPService wraps an existing *net.UDPConn and enables IPv4 control messages (IP_PKTINFO-like). On RX we obtain the destination IP and interface index; on TX we can set source IP and interface.
Types ¶
type ControlPacket ¶
type ControlPacket struct {
Version uint8 // protocol version; expected to be 1
State State // sender's current session state
DetectMult uint8 // detection multiplier (used by peer for detect timeout)
Length uint8 // total length, always 40 for this fixed-size implementation
LocalDiscr uint32 // sender's discriminator (unique session ID)
PeerDiscr uint32 // discriminator of the remote session (echo back)
DesiredMinTxUs uint32 // minimum TX interval desired by sender (microseconds)
RequiredMinRxUs uint32 // minimum RX interval the sender can handle (microseconds)
}
ControlPacket represents the wire format of a minimal BFD control packet. Fields mirror RFC 5880 §4.1 in a compact form using microsecond units for timers.
func UnmarshalControlPacket ¶
func UnmarshalControlPacket(b []byte) (*ControlPacket, error)
UnmarshalControlPacket parses a 40-byte control message from the wire into a ControlPacket. It validates the version and length fields and extracts all header values using big-endian order.
func (*ControlPacket) Marshal ¶
func (c *ControlPacket) Marshal() []byte
Marshal serializes a ControlPacket into its fixed 40-byte wire format.
Field layout (Big Endian):
0: Version (3 high bits) | 5 bits unused (zero) 1: State (2 high bits) | 6 bits unused (zero) 2: DetectMult 3: Length (always 40) 4–7: LocalDiscr 8–11: PeerDiscr
12–15: DesiredMinTxUs 16–19: RequiredMinRxUs 20–39: zero padding (unused / reserved)
Only a subset of the full BFD header is implemented; authentication and optional fields are omitted for simplicity.
type DownReason ¶
type DownReason uint8
const ( DownReasonNone DownReason = iota DownReasonTimeout DownReasonRemoteAdmin DownReasonLocalAdmin DownReasonRemoteDown )
func (DownReason) String ¶
func (d DownReason) String() string
type EventQueue ¶
type EventQueue struct {
// contains filtered or unexported fields
}
EventQueue is a thread-safe priority queue of scheduled events. It supports pushing events and popping those whose time has come (or is nearest).
func NewEventQueue ¶
func NewEventQueue() *EventQueue
NewEventQueue constructs an initialized empty heap-based event queue.
func (*EventQueue) CountFor ¶
func (q *EventQueue) CountFor(iface, localIP string) int
CountFor returns the number of events in the queue for a given interface and local IP.
func (*EventQueue) Len ¶
func (q *EventQueue) Len() int
Len returns the total number of events in the queue.
func (*EventQueue) Pop ¶
func (q *EventQueue) Pop() *event
Pop removes and returns the next (earliest) event from the queue, or nil if empty.
func (*EventQueue) PopIfDue ¶
func (q *EventQueue) PopIfDue(now time.Time) (*event, time.Duration)
PopIfDue returns the next event if its scheduled time is due (<= now). Otherwise, it returns nil and the duration until the next event’s time, allowing the caller to sleep until that deadline.
func (*EventQueue) Push ¶
func (q *EventQueue) Push(e *event)
Push inserts a new event into the queue and assigns it a sequence number. Later events with identical timestamps are ordered by insertion.
type HandleRxFunc ¶
type HandleRxFunc func(pkt *ControlPacket, peer Peer)
HandleRxFunc defines the handler signature for received control packets. The callback is invoked for every successfully decoded ControlPacket, along with a Peer descriptor identifying interface and IP context.
type ManagerConfig ¶
type ManagerConfig struct {
Logger *slog.Logger
Netlinker RouteReaderWriter
UDP UDPService
MetricsRegistry *prometheus.Registry
BindIP string // local bind address for the UDP socket (IPv4)
Port int // UDP port to listen/transmit on
// PassiveMode: if true, Manager does NOT manage kernel routes automatically.
// Instead it defers to Netlinker calls made by the caller. This enables
// incremental rollout (observe liveness without changing dataplane).
PassiveMode bool
// Local desired probe intervals and detection multiplier for new sessions.
TxMin time.Duration
RxMin time.Duration
DetectMult uint8
// Global bounds for interval clamping and exponential backoff.
MinTxFloor time.Duration
MaxTxCeil time.Duration
BackoffMax time.Duration
// Enable per peer metrics for route liveness (high cardinality).
EnablePeerMetrics bool
// Maximum number of events to keep in the scheduler queue.
// This is an upper bound for safety to prevent unbounded
// memory usage in the event of regressions.
// suggested: 4 * expected number of sessions
// default: 10,240
MaxEvents int
}
ManagerConfig controls Manager behavior, routing integration, and liveness timings.
func (*ManagerConfig) Validate ¶
func (c *ManagerConfig) Validate() error
Validate fills defaults and enforces constraints for ManagerConfig. Returns a descriptive error when required fields are missing/invalid.
type Metrics ¶
type Metrics struct {
Sessions *prometheus.GaugeVec
SessionTransitions *prometheus.CounterVec
RoutesInstalled *prometheus.GaugeVec
RouteInstalls *prometheus.CounterVec
RouteWithdraws *prometheus.CounterVec
ConvergenceToUp *prometheus.HistogramVec
ConvergenceToDown *prometheus.HistogramVec
SchedulerServiceQueueLen *prometheus.GaugeVec
SchedulerEventsDropped *prometheus.CounterVec
SchedulerTotalQueueLen prometheus.Gauge
RouteInstallFailures *prometheus.CounterVec
RouteUninstallFailures *prometheus.CounterVec
HandleRxDuration *prometheus.HistogramVec
ControlPacketsTX *prometheus.CounterVec
ControlPacketsRX *prometheus.CounterVec
ControlPacketsRxInvalid *prometheus.CounterVec
UnknownPeerPackets *prometheus.CounterVec
ReadSocketErrors *prometheus.CounterVec
WriteSocketErrors *prometheus.CounterVec
PeerSessions *prometheus.GaugeVec
PeerDetectTime *prometheus.GaugeVec
}
func (*Metrics) Register ¶
func (m *Metrics) Register(r prometheus.Registerer)
Register all metrics with the provided registry.
type Peer ¶
Peer identifies a remote endpoint and the local interface context used to reach it. LocalIP is the IP on which we send/receive; PeerIP is the peer’s address.
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver is a long-lived goroutine that continuously reads UDP control packets from the shared transport socket and passes valid ones to a handler.
It abstracts read-loop robustness: manages deadlines, throttles noisy logs, detects fatal network conditions, and honors context cancellation cleanly.
func NewReceiver ¶
func NewReceiver(log *slog.Logger, udp UDPService, handleRx HandleRxFunc, metrics *Metrics) *Receiver
NewReceiver constructs a new Receiver bound to the given UDPService and handler. By default, it throttles repeated read errors to once every 5 seconds.
type RouteKey ¶
RouteKey uniquely identifies a desired/installed route in the kernel. This is used as a stable key in Manager maps across lifecycle events.
type RouteReaderWriter ¶
type RouteReaderWriter interface {
RouteAdd(*routing.Route) error
RouteDelete(*routing.Route) error
RouteByProtocol(int) ([]*routing.Route, error)
}
RouteReaderWriter is the minimal interface for interacting with the routing backend. It allows adding, deleting, and listing routes by protocol. The BGP plugin uses this to interact with the kernel routing table through the liveness subsystem, without depending on its internal implementation.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler drives session state progression and control message exchange. It runs a single event loop that processes transmit (TX) and detect events across sessions. New sessions schedule TX immediately; detect is armed/re-armed after valid RX during Init/Up.
func NewScheduler ¶
func NewScheduler(log *slog.Logger, udp UDPService, onSessionDown SessionDownFunc, maxEvents int, enablePeerMetrics bool, metrics *Metrics) *Scheduler
NewScheduler constructs a Scheduler bound to a UDP transport and logger. onSessionDown is called asynchronously whenever a session is detected as failed.
func (*Scheduler) Run ¶
Run executes the scheduler’s main loop until ctx is canceled. It continuously pops and processes due events, sleeping until the next one if necessary. Each TX event sends a control packet and re-schedules the next TX; each Detect event checks for timeout and invokes onSessionDown if expired.
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session models a single bidirectional liveness relationship with a peer, maintaining BFD-like state, timers, and randomized transmission scheduling.
func (*Session) ArmDetect ¶
ArmDetect ensures the detection timer is active and not stale. If expired, it re-arms; if uninitialized, it returns false. Returns the deadline and whether detect should be (re)scheduled.
func (*Session) ComputeNextTx ¶
ComputeNextTx picks the next transmit time based on current state, applies exponential backoff when Down, adds ±10% jitter, persists it to s.nextTx, and returns the chosen timestamp.
func (*Session) ExpireIfDue ¶
ExpireIfDue transitions an active session to Down if its detect timer has elapsed. Returns true if state changed (Up/Init → Down).
func (*Session) HandleRx ¶
func (s *Session) HandleRx(now time.Time, ctrl *ControlPacket) (changed bool)
HandleRx processes one control packet.
- Ignore all packets while in AdminDown. - Drop if PeerDiscr is non-zero and not equal to our localDiscr. - If ctrl.State==AdminDown: transition → Down, clear detect, reset backoff. - If ctrl.State==Down while we are Up/Init: transition → Down (peer signaled failure). - Learn peerDiscr if unset. Update peer timers. Refresh detectDeadline. - Down: if peerDiscr known and peer echoes our localDiscr with State≥Init → Up; else → Init. - Init: promote to Up when peer echoes our localDiscr with State≥Init. - Up: refresh detect; allow LocalDiscr changes (peer restart). - Timeout handling occurs in ExpireIfDue().
func (*Session) Snapshot ¶
func (s *Session) Snapshot() SessionSnapshot
type SessionDownFunc ¶
type SessionDownFunc func(s *Session)