liveness

package
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Labels.
	LabelIface     = "iface"
	LabelLocalIP   = "local_ip"
	LabelPeerIP    = "peer_ip"
	LabelState     = "state"
	LabelStateTo   = "state_to"
	LabelStateFrom = "state_from"
	LabelReason    = "reason"
	LabelOperation = "operation"
)
View Source
const (
	// DefaultLivenessPort is the default port for liveness probes.
	// This is expected to be the same across all clients.
	DefaultLivenessPort = 44880
)

Variables

View Source
var (
	ErrShortPacket   = errors.New("short packet")
	ErrInvalidLength = errors.New("invalid length")
)

Functions

func ListenUDP

func ListenUDP(bindIP string, port int) (*udpService, error)

ListenUDP binds an IPv4 UDP socket to bindIP:port and returns a configured UDPService. The returned connection is ready to read/write with control message support enabled.

func NewManager

func NewManager(ctx context.Context, cfg *ManagerConfig) (*manager, error)

NewManager constructs a Manager, opens the UDP socket, and launches the receiver and scheduler loops. The context governs their lifetime.

func NewRouteReaderWriter

func NewRouteReaderWriter(lm Manager, rrw RouteReaderWriter, iface string) *routeReaderWriter

NewRouteReaderWriter creates an interface-scoped RouteReaderWriter that wraps the liveness Manager and a concrete routing backend. This allows the BGP plugin to use standard routing calls while the Manager tracks route liveness on a per-interface basis.

func NewUDPService

func NewUDPService(raw *net.UDPConn) (*udpService, error)

NewUDPService wraps an existing *net.UDPConn and enables IPv4 control messages (IP_PKTINFO-like). On RX we obtain the destination IP and interface index; on TX we can set source IP and interface.

Types

type ControlPacket

type ControlPacket struct {
	Version         uint8  // protocol version; expected to be 1
	State           State  // sender's current session state
	DetectMult      uint8  // detection multiplier (used by peer for detect timeout)
	Length          uint8  // total length, always 40 for this fixed-size implementation
	LocalDiscr      uint32 // sender's discriminator (unique session ID)
	PeerDiscr       uint32 // discriminator of the remote session (echo back)
	DesiredMinTxUs  uint32 // minimum TX interval desired by sender (microseconds)
	RequiredMinRxUs uint32 // minimum RX interval the sender can handle (microseconds)
}

ControlPacket represents the wire format of a minimal BFD control packet. Fields mirror RFC 5880 §4.1 in a compact form using microsecond units for timers.

func UnmarshalControlPacket

func UnmarshalControlPacket(b []byte) (*ControlPacket, error)

UnmarshalControlPacket parses a 40-byte control message from the wire into a ControlPacket. It validates the version and length fields and extracts all header values using big-endian order.

func (*ControlPacket) Marshal

func (c *ControlPacket) Marshal() []byte

Marshal serializes a ControlPacket into its fixed 40-byte wire format.

Field layout (Big Endian):

0: Version (3 high bits) | 5 bits unused (zero)
1: State (2 high bits)   | 6 bits unused (zero)
2: DetectMult
3: Length (always 40)
4–7:  LocalDiscr
8–11: PeerDiscr

12–15: DesiredMinTxUs 16–19: RequiredMinRxUs 20–39: zero padding (unused / reserved)

Only a subset of the full BFD header is implemented; authentication and optional fields are omitted for simplicity.

type DownReason

type DownReason uint8
const (
	DownReasonNone DownReason = iota
	DownReasonTimeout
	DownReasonRemoteAdmin
	DownReasonLocalAdmin
	DownReasonRemoteDown
)

func (DownReason) String

func (d DownReason) String() string

type EventQueue

type EventQueue struct {
	// contains filtered or unexported fields
}

EventQueue is a thread-safe priority queue of scheduled events. It supports pushing events and popping those whose time has come (or is nearest).

func NewEventQueue

func NewEventQueue() *EventQueue

NewEventQueue constructs an initialized empty heap-based event queue.

func (*EventQueue) CountFor

func (q *EventQueue) CountFor(iface, localIP string) int

CountFor returns the number of events in the queue for a given interface and local IP.

func (*EventQueue) Len

func (q *EventQueue) Len() int

Len returns the total number of events in the queue.

func (*EventQueue) Pop

func (q *EventQueue) Pop() *event

Pop removes and returns the next (earliest) event from the queue, or nil if empty.

func (*EventQueue) PopIfDue

func (q *EventQueue) PopIfDue(now time.Time) (*event, time.Duration)

PopIfDue returns the next event if its scheduled time is due (<= now). Otherwise, it returns nil and the duration until the next event’s time, allowing the caller to sleep until that deadline.

func (*EventQueue) Push

func (q *EventQueue) Push(e *event)

Push inserts a new event into the queue and assigns it a sequence number. Later events with identical timestamps are ordered by insertion.

type HandleRxFunc

type HandleRxFunc func(pkt *ControlPacket, peer Peer)

HandleRxFunc defines the handler signature for received control packets. The callback is invoked for every successfully decoded ControlPacket, along with a Peer descriptor identifying interface and IP context.

type Manager

type Manager interface {
	RegisterRoute(r *routing.Route, iface string, port int) error
	WithdrawRoute(r *routing.Route, iface string) error
	LocalAddr() *net.UDPAddr
	GetSessions() []SessionSnapshot
	GetSession(peer Peer) (*Session, bool)
	Close() error
	Err() <-chan error
}

type ManagerConfig

type ManagerConfig struct {
	Logger          *slog.Logger
	Netlinker       RouteReaderWriter
	UDP             UDPService
	MetricsRegistry *prometheus.Registry

	BindIP string // local bind address for the UDP socket (IPv4)
	Port   int    // UDP port to listen/transmit on

	// PassiveMode: if true, Manager does NOT manage kernel routes automatically.
	// Instead it defers to Netlinker calls made by the caller. This enables
	// incremental rollout (observe liveness without changing dataplane).
	PassiveMode bool

	// Local desired probe intervals and detection multiplier for new sessions.
	TxMin      time.Duration
	RxMin      time.Duration
	DetectMult uint8

	// Global bounds for interval clamping and exponential backoff.
	MinTxFloor time.Duration
	MaxTxCeil  time.Duration
	BackoffMax time.Duration

	// Enable per peer metrics for route liveness (high cardinality).
	EnablePeerMetrics bool

	// Maximum number of events to keep in the scheduler queue.
	// This is an upper bound for safety to prevent unbounded
	// memory usage in the event of regressions.
	// suggested: 4 * expected number of sessions
	// default: 10,240
	MaxEvents int
}

ManagerConfig controls Manager behavior, routing integration, and liveness timings.

func (*ManagerConfig) Validate

func (c *ManagerConfig) Validate() error

Validate fills defaults and enforces constraints for ManagerConfig. Returns a descriptive error when required fields are missing/invalid.

type Metrics

type Metrics struct {
	Sessions                 *prometheus.GaugeVec
	SessionTransitions       *prometheus.CounterVec
	RoutesInstalled          *prometheus.GaugeVec
	RouteInstalls            *prometheus.CounterVec
	RouteWithdraws           *prometheus.CounterVec
	ConvergenceToUp          *prometheus.HistogramVec
	ConvergenceToDown        *prometheus.HistogramVec
	SchedulerServiceQueueLen *prometheus.GaugeVec
	SchedulerEventsDropped   *prometheus.CounterVec
	SchedulerTotalQueueLen   prometheus.Gauge
	RouteInstallFailures     *prometheus.CounterVec
	RouteUninstallFailures   *prometheus.CounterVec
	HandleRxDuration         *prometheus.HistogramVec
	ControlPacketsTX         *prometheus.CounterVec
	ControlPacketsRX         *prometheus.CounterVec
	ControlPacketsRxInvalid  *prometheus.CounterVec
	UnknownPeerPackets       *prometheus.CounterVec
	ReadSocketErrors         *prometheus.CounterVec
	WriteSocketErrors        *prometheus.CounterVec
	PeerSessions             *prometheus.GaugeVec
	PeerDetectTime           *prometheus.GaugeVec
}

func (*Metrics) Register

func (m *Metrics) Register(r prometheus.Registerer)

Register all metrics with the provided registry.

type Peer

type Peer struct {
	Interface string
	LocalIP   string
	PeerIP    string
}

Peer identifies a remote endpoint and the local interface context used to reach it. LocalIP is the IP on which we send/receive; PeerIP is the peer’s address.

func (*Peer) String

func (p *Peer) String() string

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver is a long-lived goroutine that continuously reads UDP control packets from the shared transport socket and passes valid ones to a handler.

It abstracts read-loop robustness: manages deadlines, throttles noisy logs, detects fatal network conditions, and honors context cancellation cleanly.

func NewReceiver

func NewReceiver(log *slog.Logger, udp UDPService, handleRx HandleRxFunc, metrics *Metrics) *Receiver

NewReceiver constructs a new Receiver bound to the given UDPService and handler. By default, it throttles repeated read errors to once every 5 seconds.

func (*Receiver) Run

func (r *Receiver) Run(ctx context.Context) error

Run executes the main receive loop until ctx is canceled or the socket fails. It continually reads packets, unmarshals them into ControlPackets, and passes them to handleRx. Errors are rate-limited and fatal errors terminate the loop.

type RouteKey

type RouteKey struct {
	Interface string
	SrcIP     string
	Table     int
	DstPrefix string
	NextHop   string
}

RouteKey uniquely identifies a desired/installed route in the kernel. This is used as a stable key in Manager maps across lifecycle events.

type RouteReaderWriter

type RouteReaderWriter interface {
	RouteAdd(*routing.Route) error
	RouteDelete(*routing.Route) error
	RouteByProtocol(int) ([]*routing.Route, error)
}

RouteReaderWriter is the minimal interface for interacting with the routing backend. It allows adding, deleting, and listing routes by protocol. The BGP plugin uses this to interact with the kernel routing table through the liveness subsystem, without depending on its internal implementation.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler drives session state progression and control message exchange. It runs a single event loop that processes transmit (TX) and detect events across sessions. New sessions schedule TX immediately; detect is armed/re-armed after valid RX during Init/Up.

func NewScheduler

func NewScheduler(log *slog.Logger, udp UDPService, onSessionDown SessionDownFunc, maxEvents int, enablePeerMetrics bool, metrics *Metrics) *Scheduler

NewScheduler constructs a Scheduler bound to a UDP transport and logger. onSessionDown is called asynchronously whenever a session is detected as failed.

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context) error

Run executes the scheduler’s main loop until ctx is canceled. It continuously pops and processes due events, sleeping until the next one if necessary. Each TX event sends a control packet and re-schedules the next TX; each Detect event checks for timeout and invokes onSessionDown if expired.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session models a single bidirectional liveness relationship with a peer, maintaining BFD-like state, timers, and randomized transmission scheduling.

func (*Session) ArmDetect

func (s *Session) ArmDetect(now time.Time) (time.Time, bool)

ArmDetect ensures the detection timer is active and not stale. If expired, it re-arms; if uninitialized, it returns false. Returns the deadline and whether detect should be (re)scheduled.

func (*Session) ComputeNextTx

func (s *Session) ComputeNextTx(now time.Time, rnd *rand.Rand) time.Time

ComputeNextTx picks the next transmit time based on current state, applies exponential backoff when Down, adds ±10% jitter, persists it to s.nextTx, and returns the chosen timestamp.

func (*Session) ExpireIfDue

func (s *Session) ExpireIfDue(now time.Time) (expired bool)

ExpireIfDue transitions an active session to Down if its detect timer has elapsed. Returns true if state changed (Up/Init → Down).

func (*Session) GetState

func (s *Session) GetState() State

GetState returns the current state of the session.

func (*Session) HandleRx

func (s *Session) HandleRx(now time.Time, ctrl *ControlPacket) (changed bool)

HandleRx processes one control packet.

- Ignore all packets while in AdminDown. - Drop if PeerDiscr is non-zero and not equal to our localDiscr. - If ctrl.State==AdminDown: transition → Down, clear detect, reset backoff. - If ctrl.State==Down while we are Up/Init: transition → Down (peer signaled failure). - Learn peerDiscr if unset. Update peer timers. Refresh detectDeadline. - Down: if peerDiscr known and peer echoes our localDiscr with State≥Init → Up; else → Init. - Init: promote to Up when peer echoes our localDiscr with State≥Init. - Up: refresh detect; allow LocalDiscr changes (peer restart). - Timeout handling occurs in ExpireIfDue().

func (*Session) Snapshot

func (s *Session) Snapshot() SessionSnapshot

type SessionDownFunc

type SessionDownFunc func(s *Session)

type SessionSnapshot

type SessionSnapshot struct {
	Peer                Peer
	Route               routing.Route
	State               State
	LocalDiscr          uint32
	PeerDiscr           uint32
	ConvUpStart         time.Time
	ConvDownStart       time.Time
	UpSince             time.Time
	DownSince           time.Time
	LastDownReason      DownReason
	DetectDeadline      time.Time
	NextDetectScheduled time.Time
	LastUpdated         time.Time
}

type State

type State uint8

State encodes the finite-state machine for a BFD-like session. The progression follows AdminDown → Down → Init → Up, with transitions driven by control messages and detect timeouts.

const (
	StateAdminDown State = iota // administratively disabled, no detection
	StateDown                   // no session detected or timed out
	StateInit                   // attempting to establish connectivity
	StateUp                     // session fully established
)

func (State) String

func (s State) String() string

String returns a human-readable string representation of the state.

Returns unknown(<state>) for unknown states.

type UDPService

type UDPService interface {
	ReadFrom(buf []byte) (n int, remoteAddr *net.UDPAddr, localIP net.IP, ifname string, err error)
	WriteTo(pkt []byte, dst *net.UDPAddr, iface string, src net.IP) (int, error)
	SetReadDeadline(t time.Time) error
	LocalAddr() net.Addr
	Close() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL