comms

package
v0.0.0-...-ce36b61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: GPL-3.0 Imports: 25 Imported by: 0

README

Comms (Push-to-Talk) internals

This directory implements a multicast PTT audio pipeline in Go using malgo (miniaudio), Opus, and the pion RTP/RTCP stack. It supports multiple parallel multicast talk groups, half-duplex enforcement, four selectable PTT control sources (OpenVLM HID dongle, ROIP analog-radio bridge, web/RPC, and Linux evdev keys), an adaptive Opus FEC controller, and a browser-driven web mode that bypasses the audio backend entirely. See ARCHITECTURE.md for the full internal walkthrough; this README is the field-operator reference.

Build with -tags omd_omit_comms to exclude the entire package from the binary (a stub doc.go kept behind the matching positive tag satisfies the import graph). The full build requires CGo (libopus, hidapi, libasound; miniaudio is vendored via malgo).


High-level flow

  1. Configuration is loaded via config (comms.* keys). McastPorts is sourced from config.GetMulticastTalkGroups() and carries one entry per multicast talk group; per-port send / receive direction can be toggled at runtime via Service.EnableTalkGroupSend / EnableTalkGroupReceive. Only the first talk group is active at startup — sockets are still opened for the rest so any port can be enabled later without a restart.
  2. Audio:
    • Opus encoder/decoder are created with the VoIP profile.
    • In hardware modes, one shared malgo capture stream feeds an audio.BroadcastEncoder whose dedicated encode goroutine runs Opus + UDP send off the audio callback thread. The capture device is opened once at StartHardware and stays open for the lifetime of the comms run; an atomic TX gate (BroadcastEncoder.SetTxEnabled) decides per-PTT whether captured frames reach the encoder. The VOX tap runs regardless of the gate.
    • One malgo playback stream is opened per port; each runs its own int16-native callback that drains a TX-beep side channel ahead of falling through to playoutOneFrame.
    • In web mode the malgo pipeline is skipped entirely; the browser is the I/O device and all audio crosses through webaudio.Bridge.
  3. Network:
    • Each McastPortConfig entry opens its own UDP sender (dialled from the interface IP), receiver (SO_REUSEPORT listener on 0.0.0.0:<port>), and RTCP sender on <port>+1. All three are wrapped in rtp.SwappableSender / rtp.SwappableReceiver so the underlying connections can be replaced without locking the hot path.
    • Each port also gets its own rtp.JitterBuffer and rtp.Session (one local SSRC per node).
  4. PTT control source (selected by controlSource, dispatched via the control.Register registry):
    • openvlm (default): GPIO3 on the OpenVLM USB HID dongle — PTTDown on press, PTTUp on release (hold-to-talk). Also emits AuxEvent values for VOL+/VOL− button transitions.
    • roip: same dongle, no manual button — squelch GPIO (COS) with VOX fallback for analog radio bridging.
    • web: events injected via RPC handlers; the browser owns audio I/O.
    • nanoptt: Linux evdev key press → PTTToggle (press-to-toggle).
  5. Adaptive FEC:
    • FECAdapter runs as a separate goroutine, observes each port's jitter-buffer gap-run histogram, and adjusts the Opus encoder's packet-loss-perc level (20 / 30 / 40) in response to the measured loss EWMA. The configured comms.packetLossPerc is the floor; the adapter only ramps up above it.

Audio/codec parameters

The constants live in audiopool/audiopool.go:

  • audiopool.SampleRate = 48 000 Hz
  • audiopool.Channels = 1 (mono)
  • audiopool.FrameSize = 960 samples (20 ms at 48 kHz)
  • audiopool.EncBufSize = 1450 bytes (matches UDP MTU)
  • Opus bitrate (comms.go targetBitrate, also exported as comms.TargetBitrate) = 32 000 bps
  • Opus complexity (comms.go encoderComplexity) = 5 (the libopus reference VoIP default), capped at 10 and overridable per CommsConfig.EncoderComplexity (1..10). The previous default of 10 was too expensive on linux/mipsle edge routers — at complexity 10, Opus encode at 48 kHz mono regularly took 20–30 ms per 20 ms frame, saturating the per-frame budget and dropping captured frames. Operators on faster CPUs can opt back in via comms.encoderComplexity.
  • Initial packet loss percent (comms.go packetLossPerc) = 20. Clamped to [10, 40] and used as the floor by FECAdapter.
  • In-band FEC: enabled
  • DTX: disabled

audiopool also exports the Float32Pool / Int16Pool / EncBufPool sync.Pools used by the capture / playback / encode hot paths to avoid per-frame allocations.

Playback device latency

Each port's malgo playback stream is opened with its DeviceConfig.PeriodSizeInFrames derived from comms.playbackLatencyMs by audio.Init.BuildAudio. When the config value is ≤ 0 the code uses audiopool.FrameSize (one Opus frame, 20 ms) as a safe default; otherwise it computes the equivalent frame count at 48 kHz. The playoutOneFrame closure is re-aligned onto 20 ms chunks by the playback chunker regardless of which period ALSA ultimately picks, so the downstream encoder never sees the discrepancy. Per port one malgo playback stream runs on its own audio thread; their state is fully independent.

This is the only layer of buffering that protects against playback-side OS scheduling stalls — the Go-side jitter buffer (pc.Jitter) sits upstream of the DAC and cannot help once the audio thread is preempted. The four layers absorb different classes of stutter:

Class of stutter Heard by Mitigated by
Network arrival jitter (out-of-order, late packets) Local listener Go jitter prebuffer
Brief packet loss bursts Local listener Opus PLC + jitter buffer
Playback-side OS scheduling stalls Local listener malgo playback period buffer
Capture-side OS scheduling stalls Remote listeners malgo capture period buffer
Tuning comms.playbackLatencyMs

Some hardware (e.g. the OpenVLM USB audio class device) wants a very small period — ~20 ms or one full Opus frame per callback — which gives the audio thread effectively zero scheduling slack and on-device stutter persists even with a healthy Go-side jitter buffer. The comms.playbackLatencyMs config knob lets you suggest a larger period directly (default: 60 ms = three Opus frames per callback, two frames of slack).

miniaudio (the C library behind malgo) may still round the period to a backend-preferred value at InitDevice time — for example ALSA's USB audio class driver typically rounds the request up to a power of two and gives us 1024 frames when we ask for 960. The TX path models that rounding via nextPow2 and multiplies by malgoLowLatencyPeriods (3, matching miniaudio's LowLatency profile) to produce Init.PlaybackOutputLatency — the value the beep-emergence settle wait anchors on. The requested period is logged at Info level on stream open as comms: playback stream opened with:

  • configured_latency_ms — the value from comms.playbackLatencyMs
  • requested_period_frames — what we passed to malgo.DeviceConfig.PeriodSizeInFrames
  • effective_period_frames — the same value rounded up to the next power of two (the model of what ALSA actually uses)
  • requested_period_duration — the equivalent duration at 48 kHz
  • periodsmalgoLowLatencyPeriods (3); the ALSA ring depth in periods
  • ring_latencyeffective_period_frames * periods converted to a duration; this is what PlaybackOutputLatency carries forward into transmitSettleWait
  • port — the multicast port the playback stream belongs to
  • device — the resolved malgo device name

miniaudio does not expose the negotiated runtime period after InitDevice, so we log what we requested (and what we model) rather than what we got. If audio-thread underruns persist after raising comms.playbackLatencyMs, the next knob is the per-period count in audio/malgo_playback.go (which would require restructuring playoutOneFrame to loop).

Capture device latency

The shared malgo capture stream is opened with its DeviceConfig.PeriodSizeInFrames derived from the comms.captureFramesPerBuffer / comms.captureLatencyMs pair by audio.Init.OpenBroadcastStream: PeriodSizeInFrames defaults to audiopool.FrameSize (960, one 20 ms Opus frame) unless an operator overrides it, and the ALSA periods count (the ring depth) is derived from captureLatencyMs via buildCapturePeriods (clamped to [3, 16]). The encode pipeline is unchanged because the captureChunker in audio/malgo_capture.go re-aligns whatever period ALSA picks back onto 20 ms chunks before frames reach the encoder.

This is the symmetric counterpart of comms.playbackLatencyMs for the capture side. The failure mode is different in whose speaker stutters:

  • Playback preemption: thread is late → DAC underruns → this device's local listener hears a click.
  • Capture preemption: thread is late → ADC device buffer overruns → samples are silently dropped → the RTP stream sent over the air has a gap → remote listeners hear stutter.

So unlike playback, the on-device user is not the one who hears capture-side underruns — the people you're talking to are. This makes capture-side stalls much harder to detect by ear: a transmitter can sound fine to itself while every receiver hears it stuttering.

Tuning comms.captureLatencyMs

Same logic as comms.playbackLatencyMs: hardware that needs a tiny period leaves the capture audio thread with effectively zero scheduling slack. Default is 60 ms of total ring depth (three 20 ms periods), which buildCapturePeriods translates into the matching Periods count handed to miniaudio.

The requested period is logged at Info level on stream open as comms: broadcast stream opened with:

  • configured_latency_ms — the value from comms.captureLatencyMs
  • requested_period_frames — what we passed to malgo.DeviceConfig.PeriodSizeInFrames
  • requested_period_duration — the equivalent duration at 48 kHz
  • device — the resolved malgo capture device name
  • encode_chan_depthbroadcastEncoderChanDepth (currently 10)

If audio-thread overruns persist after raising comms.captureLatencyMs, the next knob is comms.captureFramesPerBuffer (the period size itself); changing it requires the encoder to keep consuming multi-frame chunks, which the chunker already handles transparently.

Always-on capture + TX gate

Under the unified design the capture device is opened once at StartHardware and stays open for the lifetime of the comms run. The malgo capture callback fires every 20 ms regardless of PTT state — what changes per PTT cycle is whether the callback's output reaches the Opus encoder.

The pivot is BroadcastEncoder.SetTxEnabled(bool), an atomic gate flipped by beginTransmission / endTransmission. When closed:

  • The audio callback still runs — framesCaptured and the inter-arrival gap stats still advance.
  • The VOX tap (if subscribed) still receives float32 frames so the ROIP control source can make PTT decisions while otherwise idle.
  • The Int16 → encCh hand-off is skipped; no Opus encode, no UDP send.

This eliminates the per-PTT device open/close that previously caused "first-cycle" capture stalls on USB audio class devices.

Encode + send goroutine

The malgo capture callback does not run the Opus encoder or sendToAllPorts. Both moved off the audio callback thread into a dedicated goroutine inside audio.BroadcastEncoder, mirroring how the receive side already isolates Opus decode from each port's malgo playback callback.

Why: the audio callback fires every 20 ms with no scheduling slack beyond the device buffer above. If the work it does ever takes longer than that — Opus encode at complexity 10 with FEC, blocking UDP multicast write to multiple ports, GC pause, cgo thread handoff — the next callback misses its deadline, the ADC ring buffer overruns, and samples are silently dropped at the device. Web mode (controlSource: web) sidesteps this because the browser does the encoding and the server only receives pre-encoded bytes; hardware modes did not, until this change.

The hot path is int16-native: malgo delivers samples as a byte slice which the wrapper interprets as []int16 directly, the encode goroutine calls EncodeS16 directly, and there is no float32↔int16 round-trip on mipsle softfloat targets.

Pipeline:

malgo capture callback (audio thread, every 20 ms) — func(in []int16)
  ├─ recordCaptureArrival: update captureGapMaxNs / captureLateCount
  ├─ Optional VOX tap (always; not gated by TX): float32 conversion +
  │     non-blocking send to atomic-pointer chan via audiopool.Float32Pool
  ├─ if !txEnabled.Load(): return  ← TX gate closed
  ├─ audiopool.Int16Pool.Get(); copy(in)               ← cheap
  └─ non-blocking send → encCh (cap = broadcastEncoderChanDepth = 10)
       on full: framesDropped++, return slice to pool

encodeLoop goroutine (separate goroutine)
  └─ for fp := range encCh:
       ├─ apply MicGain in int16 space, clamp to [-32768, 32767]
       ├─ recordEncodeDuration around deps.Encoder.EncodeS16(pcm, buf)
       │     on error: encodeErrors++, log Debug, drop frame
       │     on first over-budget cycle: one-shot Warn
       └─ deps.Send(buf[:n])                       ← cfg.sendToAllPorts
             walks rt.Ports; for each pc with SendEnabled and
             RTPSess != nil:
               pc.RTPSess.Send(payload)
                 ├─ pion Packetizer + RTCP SR interceptor
                 └─ rtp.SwappableSender.Write → net.UDPConn.Write

broadcastEncoderChanDepth = 10 frames = 200 ms of slack, sized to match the receive-side jitter prebuffer. Any encoder spike the receiver can absorb downstream the producer can absorb upstream too. The previous depth of 3 (60 ms) was too tight for slow MIPS targets where Opus encode plus GC pauses regularly crossed the per-frame budget. To raise the slack, edit broadcastEncoderChanDepth in audio/encoder.go and re-bench. Drops are counted, not silenced.

Per-cycle cycle stats

Every PTT cycle, when SetTxEnabled(false) is called the audio.BroadcastEncoder logs a Debug line summarizing what happened during the transmission:

comms: broadcast cycle stats captured=1500 encoded=1500 dropped=0
  encode_errors=0 encode_dur_max=8ms encode_dur_avg=4ms frame_budget=20ms
  capture_gap_max=21ms capture_late=0
  • captured — frames delivered by malgo to the capture callback (≈ 50 × seconds-of-PTT at audiopool.FrameSize / audiopool.SampleRate)
  • encoded — frames the consumer goroutine successfully Opus-encoded and shipped via sendToAllPorts
  • dropped — frames the producer dropped because encCh was full (the consumer fell more than 200 ms behind cumulatively)
  • encode_errors — frames where EncodeS16 returned an error; the underlying error is also logged at Debug level
  • encode_dur_max / encode_dur_avg — peak / mean libopus encode time during the cycle. Compare against frame_budget (20 ms).
  • frame_budget — the per-frame deadline (one Opus frame at 48 kHz mono). The first frame to cross it within a cycle also triggers a one-shot comms: opus encode exceeded per-frame budget Warn.
  • capture_gap_max — peak inter-arrival between successive callbacks. A value substantially above 20 ms indicates audio-thread preemption.
  • capture_late — count of callbacks whose arrival was ≥ 2 × frame_budget late.

Diagnostic decision tree from the cycle stats:

Pattern Meaning Next lever
dropped=0 encode_errors=0, no stutter at remote Pipeline healthy. Done.
dropped=0 encode_errors=0, remote still stutters Bottleneck is downstream of the encode goroutine. Tune SO_SNDBUF on the multicast sockets; pcap on the wire to look for jitter or loss.
dropped > 0 Encode-and-send loop occasionally takes >200 ms cumulative. Lower EncoderComplexity (try 3), grow broadcastEncoderChanDepth, or check actual_input_latency vs. requested.
encode_dur_max ≈ frame_budget libopus is starving the audio thread. Lower EncoderComplexity.
capture_late > 0 or capture_gap_max ≫ 20ms Audio callback thread is being preempted. Raise comms.captureLatencyMs; check CPU contention.
encode_errors > 0 libopus is failing under pressure; the surfaced error message says why. Address the specific error.

The same counters are exported atomically via audio.AudioEncoderSnapshot and surface in the periodic instrumentation snapshot under comms.broadcast_encoder — see docs/instrumentation-snapshot.md.

Receive path
  1. One receiveLoop goroutine per receive-capable port reads UDP datagrams from pc.Receiver (a *rtp.SwappableReceiver).
  2. Parses them as RTP via rtp.ParseIncoming.
  3. Calls pc.MarkRemoteRx(rt) to stamp the per-port HalfDuplexGate and prime rt.RemoteRxActive.
  4. Pushes the payload via jitter.PushWithSSRC so an SSRC change (new talker) cleanly resets the buffer rather than silently dropping the new stream.
  5. Hardware mode: each port's malgo playback callback calls playoutOneFrame once per ~20 ms, decoding into an int16 buffer; the callback first drains the per-port PlaybackBuffer (TX-beep side channel) before falling through.
  6. Web mode: a webPlayoutLoop per port uses jitter.EnableNotify for edge-triggered wakeups and forwards raw Opus payloads to rt.WebBridge.PushRxFrame for the browser to decode and play.

halfDuplexDecayLoop runs alongside the receive loops on a 100 ms ticker (halfDuplexDecayInterval). It walks every send-enabled port's RxGate.Active() and clears rt.RemoteRxActive when no gate is within its window — so the PTT TX path can read the half-duplex flag in O(1) via isReceivingRemote(rt).

The PortChannel carries per-port atomic.Int64 counters (RxPkts, RxLoopback, RxParseErrs, RxPushed, RxPushRejected, PlaybackUnderruns, WebPoppedSkipped) used both by tests and by the periodic instrumentation snapshot — see snapshot.go.


Multicast UDP

For each McastPortConfig entry, buildSinglePortChannel opens:

  • RTP sendernet.DialUDP("udp4", localIP, mcastAddr:port). Dialling from the interface IP guarantees outbound multicast egresses the chosen interface. Wrapped in *rtp.SwappableSender. Multicast TTL = rtpMulticastTTL (currently 32) — generous enough for any realistic mesh diameter through the full batman-adv + VXLAN + Tailscale path, where each hop and bridge decrements TTL. The prior value of 1 silently black-holed voice on multi-hop deployments.
  • RTCP sender — same address with port+1. Standard RTP port-pairing. Wrapped in a separate *rtp.SwappableSender and configured with the same multicast TTL.
  • RTP receivernet.ListenConfig{Control: SO_REUSEPORT} → ListenPacket("udp4", "0.0.0.0:port"), then device.JoinMulticastGroup. SO_REUSEPORT lets a replacement socket bind to the same port while the current receiver is still open (so swap plumbing can acquire the new socket before closing the old). Wrapped in *rtp.SwappableReceiver. The socket requests SO_RCVBUF = 1 MiB (rxSocketBufBytes in network.go) and logs the actual granted value plus the current per-socket kernel drop counter from /proc/net/udp at Debug level — Linux clamps SO_RCVBUF at net.core.rmem_max, so an undersized sysctl is observable in the startup log without external tools.

Loopback suppression:

  • If loopback is false, packets from any loopback address or from the local interface IP are silently dropped. The receive loop caches the parsed net.IP so the comparison is allocation-free per packet.

Trace logging:

  • When trace is true, each incoming RTP packet is logged with source address, sequence number, timestamp, SSRC, and payload size.
Runtime endpoint changes

Per-port direction toggling at runtime is exposed via the public Service API:

svc := comms.Default()
_ = svc.EnableTalkGroupSend(idx, true)
_ = svc.EnableTalkGroupReceive(idx, false)
states, _ := svc.TalkGroupStates()

The SendEnabled / ReceiveEnabled atomics on each PortChannel are checked by sendToAllPorts, receiveLoop, and halfDuplexDecayLoop, so direction changes take effect on the next packet without restarting any goroutine or socket.

The lower-level endpoint-swap plumbing (network.replaceNetwork + rtp.SwappableSender.SwapAndDeferClose) still exists for future use. The previous public UpdateMulticastEndpoint helper was removed during the refactor; if it returns, the SwapCloseGrace window in the swap path means callers do not have to synchronize against in-flight Write calls.


RTP / RTCP stack

The comms package uses pion for all RTP/RTCP work through the rtp sub-package:

  • rtp.Session wraps a pion Packetizer and an interceptor chain. One session per PortChannel (one local SSRC).
  • The interceptor chain registers report.SenderInterceptor (interval: 5 s) which generates outbound RTCP Sender Reports that give receivers clock reference and packet count.
  • Inbound RTCP is not processed: in a multicast PTT topology there is no single feedback path and each transmission may have many simultaneous receivers.
  • Inbound RTP packets are parsed by rtp.ParseIncoming (a thin wrapper around pionrtp.Packet.Unmarshal).

RTP encapsulation details:

0               1               2               3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P|X| CC=0 |M|  PT=111      |       sequence number         |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                         timestamp                             |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                            SSRC                               |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                       Opus payload…                           |
  • Payload type: rtp.PayloadTypeOpus = 111 (standard dynamic PT for Opus)
  • Clock rate: 48 000 Hz
  • MTU: rtp.MTU = 1400 bytes
  • Frame samples: rtp.FrameSamples = 960 (20 ms)
  • SSRC: rtp.SSRCFromID(RtpID) (FNV-1a 32-bit hash; falls back to hostname, then local IP)
RTP ID and SSRC

comms.rtpId controls SSRC derivation:

  1. Uses comms.rtpId if set.
  2. Otherwise uses the system hostname (filled in by applyDefaults).
  3. If neither is available, falls back to the local interface IP.

SSRC is computed as the FNV-1a 32-bit hash of the chosen string via rtp.SSRCFromID.


RTP jitter buffer

rtp.JitterBuffer is a sequence-number-ordered buffer backed by a fixed-size ring of slots (no map allocations on the hot path). It smooths network reordering and provides Packet Loss Concealment.

  • Prebuffer: rtp.PrebufferPackets = 5 frames before playout begins (≈100 ms safety margin).
  • Max depth: rtp.MaxDepth = 24. Newly arriving packets that find a full buffer increment Overflows and are dropped.
  • Slot indexing: seq % maxDepth — duplicates and stale slots are detected without iterating the buffer.
  • Payload pool: a per-buffer sync.Pool of []byte sized to MaxOpusPayloadSize = 1275 (RFC 6716 §3.2.1 maximum) eliminates per-packet heap allocations. Consumers must call jitter.ReleasePayload(p) after DecodeS16 finishes with the buffer.
  • SSRC tracking: PushWithSSRC(ssrc, seq, payload, onChange) resets the buffer cleanly when a new talker arrives, calling the change callback for logging. Without this, a new talker whose starting sequence number happens to lie in the "past half" of the previous talker's frozen cursor would be silently rejected forever.
  • Idle reset: a 2 s gap (jitterIdleResetThreshold) since the last push triggers a fresh-stream reset on the next packet, catching edge cases the SSRC check cannot (sender resets seq without rotating SSRC, RFC 3550 §8.2 collision-driven SSRC rotation, …).
  • Gap detection: PopOrConceal(window) returns (payload, false) for an in-order frame, (nil, true) for a recent-stream gap that warrants PLC, or (nil, false) for genuine silence. The PLC concealment cap maxConsecutivePLC = 10 (≈200 ms) lives in receive.go.
  • Edge-triggered notify: EnableNotify() returns a coalesced wakeup channel for push-driven consumers (web mode); malgo-driven consumers do not call it and pay only a nil-check on the push hot path.
  • Gap-run histogram (atomic.Int64): GapRuns1, GapRuns2to5, GapRuns6to10, GapRuns11to20, GapRuns21to50, GapRunsOver50. Bucket counts of consecutive missing frames; consumed by FECAdapter to compute the loss EWMA (see Adaptive FEC below).
  • Counters (atomic.Int64): Overflows, SSRCResets, IdleResets.

Sequence numbers use uint16 wrap-around-aware comparison (seqLess) so streams that cross the 65 535 → 0 boundary are handled correctly.

The playout clock for hardware mode is the per-port malgo playback callback — calling playoutOneFrame once per audio period (20 ms). One frame is produced per call regardless of whether a real payload or a PLC frame is used.


Adaptive FEC

FECAdapter is a damped control loop that observes the receive-side jitter-buffer gap-run histogram and adjusts the Opus encoder's packet-loss-perc level in response. It runs as a single goroutine per comms runtime, spawned during CommsConfig.Run, and exits on ctx.Done().

State machine: the controller moves through three discrete levels (fecLevel20 = 20, fecLevel30 = 30, fecLevel40 = 40), clamped at the configured floor (comms.packetLossPerc). The configured value is the lower bound only; the adapter is free to raise above it but never drops below.

  • Tick cadence: fecTickInterval = 2 s. On each tick the adapter reads each port's RxPushed and gap-run buckets, computes deltas against the previous tick, and converts the bucket midpoints (gapBucketMidpoints = [1, 3, 8, 15, 35, 75]) into an estimated count of missing frames.
  • EWMA: loss_ewma = 0.2 × raw + 0.8 × prev (fecEWMAAlpha = 0.2). At 2 s ticks the 63 % response time is roughly 10 s — long enough to ride out a single noisy window, short enough to catch sustained loss.
  • Upgrade thresholds (loss ratio): 20→30 at 0.08, 30→40 at 0.20. Requires fecUpgradeDwell = 2 consecutive ticks above the threshold before transitioning.
  • Downgrade thresholds: 40→30 at 0.10, 30→20 at 0.03. Requires fecDowngradeDwell = 15 consecutive ticks below — much longer than upgrade dwell so the adapter does not flap on a brief recovery window.
  • Idle stall reset: after fecSilentStallLimit = 30 ticks (≈ 60 s) with zero pushed and zero gap activity the EWMA is reset to zero. This prevents a stale loss estimate from carrying across a long inter-PTT gap.

Snapshot fields (FECAdapterSnapshot, exposed under comms.fec_adapter): current_level, loss_ewma, last_change_unix_nano, transitions, write_errors, floor. See docs/instrumentation-snapshot.md for interpretation heuristics.

The design assumes link symmetry — every node reads its own RX loss and applies the result to its own TX encoder. This is sound for the omnidirectional-antenna deployment but would need rethinking for asymmetric links.


PTT control handling

openvlm backend (default)

The OpenVLM (Open Voice Link Module) is a USB HID audio dongle widely used as a push-to-talk controller. control.OpenVLMSource reads HID input reports and decodes:

Source bits Transition Event emitted
IR1 bit 2 (GPIO3) LOW → HIGH Button pressed PTTDown (PTT channel)
IR1 bit 2 (GPIO3) HIGH → LOW Button released PTTUp (PTT channel)
IR0 bit 0 (OpenVLMVolUpMask) edge VOL+ press / release VolumeUpPressed / VolumeUpReleased (aux channel)
IR0 bit 1 (OpenVLMVolDnMask) edge VOL− press / release VolumeDownPressed / VolumeDownReleased (aux channel)

The HID report structure:

Byte 0: Report ID (prepended by OS — shifted by 1 when n ≥ 5)
Byte 1: IR0 (GPIO8–GPIO5, plus VOL+/VOL− on bits 0/1)
Byte 2: IR1 (GPIO4–GPIO1) ← bit 2 = GPIO3 = PTT, bit 0 = GPIO1 = OpenVLM strap
Byte 3: IR2
Byte 4: IR3

Aux events flow on a separate buffered channel (control.AuxEventSource) with non-blocking sends — a missing or slow consumer cannot stall the HID read loop. The comms run loop dispatches received aux events to CommsConfig.AuxHandler (typically alsa.Controller; see Volume control below).

OpenVLM identification and ALSA card detection

The 0x0D8C:0x0012 VID/PID is shared with generic CM108 USB audio dongles. To distinguish a real OpenVLM from a generic dongle that happens to be plugged in alongside one, the unified discovery scan in device.DiscoverCM108 walks /sys/bus/usb/devices/ and produces CM108Descriptor entries with the device's HID path, ALSA card index, USB serial, and SysPath. device.CheckOpenVLMIdentity then issues a HIDIOCGINPUT ioctl and inspects GPIO1 (bit 0 of HID_IR1): OpenVLM hardware wires GPIO1 high via a board strap, generic CM108 dongles leave it low. The CM108B datasheet (§7.4) requires IR0[7:6] == 0 for IR1[3:0] to reflect live GPIO state; the helper checks this and returns an error otherwise.

OpenVLMSource.Events runs the GPIO1 probe at startup and prefers a descriptor whose IsOpenVLM is set when opening the HID device, falling back to "any matching VID/PID" if no positively-identified unit is found.

ALSA card auto-detection runs before the malgo context is initialized when controlSource is openvlm or roip. It uses the same DiscoverCM108 walk to map the OpenVLM to an ALSA card index and sets ALSA_CARD so malgo selects the correct sound card. If ALSA_CARD is already set, it is left unchanged. A fallback path scans /proc/asound/card*/usbid for 0d8c:0012 when sysfs discovery fails.

PTTDownbeginTransmission; PTTUpendTransmission.

roip backend

control.ROIPSource uses the same OpenVLM USB audio dongle but operates without a manual PTT button — it automatically bridges an analog handheld radio into the multicast comms network. Detection strategy (half-duplex enforced throughout):

  1. COS (Carrier-Operated Squelch): the radio squelch output is wired to an OpenVLM GPIO pin. The HID report is polled; ROIPCOSGPIOMask selects the IR1 bit. PTTDown on the HIGH→LOW squelch edge, PTTUp on LOW→HIGH.
  2. VOX fallback: if the HID device is unavailable or ROIPCOSGPIOMask is 0, an audio energy threshold (audiopool.RMSEnergy) is applied to the always-on broadcast capture stream's VOX tap. ROIPVOXOnsetFrames = 3 (60 ms) prevents false triggers. The tap pointer is published on rt.BroadcastTap (atomic.Pointer[chan []float32]) so the capture callback can forward float32 frames whenever a tap is registered, regardless of the TX gate.

ROIPMaxTXDuration caps a single transmission as a safety ceiling. The half-duplex callbacks (isReceiving / isBroadcasting) and the broadcast tap setters (setTap / clearTap) are passed in as plain function values from control_register.go so the control package never imports the parent.

web backend

control.WebEventSource is a lightweight buffered channel backend whose Push(ev PTTEvent) method is called from the RPC handler when a browser client presses or releases the on-screen PTT button. Service.WebEventSource() returns the live instance for the handler to inject events into.

In web mode the entire malgo pipeline is bypassed:

  • rt.BroadcastStream is left nil; beginTransmission and endTransmission short-circuit on rt.WebBridge != nil.
  • Per-port playback streams are not opened.
  • Inbound audio is forwarded raw via webaudio.Bridge.PushRxFrameRxFrames() channel → RPC stream → browser decoder.
  • Outbound audio is injected via webaudio.Bridge.InjectTxFrame → bound SendFncfg.sendToAllPorts(rt, payload).

This makes web mode usable on hardware without a sound card — the malgo context never gets initialized, so device-open failures cannot stop the daemon from starting.

nanoptt backend

control.NanoPTTSource reads from a Linux evdev input device matched by NanoPTTDeviceName within the NanoPTTDevicePath glob (handled by device.FindEvdev).

  • If commKey is any, any key press emits PTTToggle.
  • Otherwise the key code must match the decimal EV_KEY code.

On each matching key press (EV_KEY value = 1), a PTTToggle event is emitted. The Run loop checks current Broadcasting state and calls beginTransmission or endTransmission accordingly — a press-to-toggle model. Key releases are logged at debug level but produce no event.

Transmission lifecycle

beginTransmission (in transmit.go):

  1. Returns immediately if rt.Broadcasting is already true.
  2. Returns immediately if cfg.isReceivingRemote(rt) is true (half-duplex; reads rt.RemoteRxActive in O(1)).
  3. rt.Broadcasting.Store(true).
  4. Web mode short-circuit: if rt.WebBridge != nil, log "Begin web transmission" and return.
  5. drainPlaybackBuffer(rt) — discards stale beep frames in every port.
  6. Queues the 1 000 Hz start-tone (rt.BeepBufferStart, []int16, amplitude 0.2 * 32767) into every port's PlaybackBuffer.
  7. Sleeps cfg.transmitSettleWait(rt) — the greater of cfg.pttStartDelay() (default 50 ms; configurable via PttStartDelayMs; set negative to skip entirely) and rt.PlaybackOutputLatency + 20 ms beep + beepSettleMargin (40 ms). The second term is required so the start tone has fully emerged from the speaker before the TX gate opens; without it any acoustic or device sidetone path from speaker → mic captures the beep and the remote side hears it on the next transmission. The first term covers USB audio class capture devices that need extra time to commit their first DMA cycle. PlaybackOutputLatency is computed in audio.Init.BuildAudio from the rounded-up effective period (nextPow2(playbackPeriod)) multiplied by malgoLowLatencyPeriods (3) — modeling miniaudio's LowLatency profile so the wait covers the full ALSA ring, not just one period.
  8. Verifies rt.BroadcastStream is non-nil (the always-on capture stream opened at StartHardware); if it is unexpectedly nil, logs an Error, clears Broadcasting, and returns.
  9. Calls rt.BroadcastStream.SetTxEnabled(true) to open the TX gate. The capture device itself is already running; the gate just lets captured frames reach the Opus encoder.

endTransmission:

  1. Returns immediately if rt.Broadcasting is already false.
  2. Web mode short-circuit: clear Broadcasting, log, return.
  3. Calls rt.BroadcastStream.SetTxEnabled(false) to close the TX gate. The capture device keeps running so the VOX tap (if any) keeps observing the mic. The SetTxEnabled(false) call also emits the per-cycle stats Debug log.
  4. drainPlaybackBuffer(rt).
  5. Queues the 600 Hz stop-tone into every port's PlaybackBuffer.
  6. rt.Broadcasting.Store(false).

Volume control via ALSA

control/alsa.Controller is the AuxEventHandler wired into CommsConfig.AuxHandler by the manager. It adjusts a named ALSA mixer simple-control on VolumeUpPressed / VolumeDownPressed and ignores release events.

Key properties:

  • Pure-Go ALSA binding (github.com/gen2brain/alsa) — no CGO, cross- compiles cleanly to linux/amd64, linux/arm64, and linux/mipsle. Does not depend on alsa-utils being installed on the target.
  • Card selection reads the ALSA_CARD environment variable set by DetectAndSetALSACard during startup. Volume events are silently ignored if ALSA_CARD is unset or non-numeric — this matches behaviour on systems with no OpenVLM connected.
  • Default control name: Master (alsa.DefaultControlName). On a CM108B Master control (38 raw steps from −37 dB to 0 dB) one raw step is approximately one dB; this approximation does not generalize to all cards. The control name and step size can be overridden via the Controller.ControlName / Controller.Step fields.
  • Errors are swallowed — a transient ALSA failure (mixer open, control not found, range query) is logged at Warn or Debug and never propagated back through the dispatch loop. The volume button must never crash the daemon.

The dispatch loop (runAuxPump in transmit.go) forwards each aux event from the source's AuxEvents() channel directly to the handler's Handle(ctx, ev). The handler is invoked synchronously on the aux pump goroutine, so long-running work inside Handle would block the pump — the ALSA controller's mixer transactions complete in microseconds.


Instrumentation snapshots

Every counter mentioned above also surfaces in the periodic JSON snapshot the daemon writes when instrumentation.enable: true. The comms section is emitted by CommsSnapshotter under comms and contains:

  • enabled, broadcasting, remote_rx_active, control_source
  • ports[] — per-talk-group with rx_pkts, rx_loopback, rx_parse_errs, rx_pushed, rx_push_rejected, playback_underruns, web_popped_skipped, send_enabled, receive_enabled, plus nested jitter and rx_gate snapshots.
  • broadcast_encoderaudio.AudioEncoderSnapshot (frames captured / encoded / dropped, encode-duration histogram, capture gap stats, TX gate state).
  • web_bridgewebaudio.BridgeSnapshot (RX frame queue depth, dropped frames, web TX inject counts).
  • fec_adapterFECAdapterSnapshot (current level, loss EWMA, last change time, transitions, write errors, floor).

Field semantics, unit annotations, and triage heuristics live in docs/instrumentation-snapshot.md. That document MUST be updated in the same changeset as any addition, removal, or rename of a snapshot field — the framework's whole point is that operators and LLMs can read the JSON without reading Go source.


ALSA log filtering

The logProc callback passed to malgo.InitContext in audio.Init.StartHardware drops poll() failed and EPIPE lines emitted by miniaudio during USB audio class startup and under normal scheduling jitter — both are recovered internally via snd_pcm_recover and do not correspond to lost audio. Anything else from miniaudio still lands at Trace level prefixed with source=malgo.

The full build requires libasound for the rest of the malgo ALSA backend; the omd_omit_comms lite build skips the package entirely.


Config keys

The minimal example_config.yml shipped with the repo is intentionally terse — most operators rely on the in-code defaults. The full set of comms.* keys recognised by internal/config is:

comms:
  enable: false
  controlSource: openvlm     # openvlm | roip | web | nanoptt
  debug: false
  trace: false
  loopback: true
  micGain: 8.0               # float32; >1 amplifies, <1 attenuates
  encoderComplexity: 5       # 1..10 (defaults to 5)
  packetLossPerc: 20         # initial Opus FEC level; clamped [10,40].
                             # Used as the FEC adapter's lower bound; the
                             # adapter ramps up to 30 / 40 under loss.
  playbackLatencyMs: 60      # malgo playback period hint (ms);
                             # translated to PeriodSizeInFrames — default
                             # 20 ms when ≤ 0
  captureLatencyMs: 60       # malgo capture period hint (ms); translated
                             # to the ALSA periods count (ring depth)
                             # via buildCapturePeriods (clamped [3,16])
  captureFramesPerBuffer: 0  # malgo capture period frames; 0 → 960 (one
                             # 20 ms Opus frame), <0 → let miniaudio pick

  nanoPTT:
    enable: false
    devicePath: /dev/input/event*    # glob for evdev device enumeration
    deviceName: AllInOneCable        # exact evdev device name to match

  bluetoothPtt:
    enable: false
    BluetoothAudioDeviceHint: ""     # optional shared substring (e.g. "OpenVLM")
    BluetoothInputDevice: ""         # capture device substring or index
    BluetoothOutputDevice: ""        # playback device substring or index

The interface comes from the global meshNet.interface key (read by CommsManager.buildCommsConfig via cfg.GetMeshNetInterface()), and multicast talk-group entries (McastPorts) are sourced from the global config.GetMulticastTalkGroups() helper rather than living under the comms: namespace, so a single config defines them once for all subsystems that need them.

nanoPTT.* keys are only relevant when controlSource: nanoptt. The bluetoothPtt.* keys carry the audio device names used by both openvlm and roip modes (the historical name predates the wider audio refactor). ALSA card auto-detection runs for both openvlm and roip. CommsConfig.AuxHandler is wired by the manager to a fresh alsa.Controller so VOL+/VOL− on the OpenVLM updates the system mixer.

Configurable Go fields without yaml keys (yet): CommKey, RtpID, HalfDuplexThreshold, PttStartDelayMs, and the entire ROIP* group are present on CommsConfig but not currently loaded by CommsManager.buildCommsConfig. They use their compile-time defaults today; wire them through internal/config/config.go if you need them externally tunable.


Source files

Top-level orchestration
File Responsibility
comms.go buildCodec, sendToAllPorts, buildEventSource, TargetBitrate, package-level constants
config.go CommsConfig, CommsRuntime, BroadcastCapture interface, NewComms, applyDefaults
lifecycle.go Start, startHardwareAudio (audio.Init wiring)
manager.go CommsManager (Enable/Disable/IsRunning); wires alsa.Controller as the default AuxHandler
service.go Service + Default()/SetDefault() singleton; per-handler accessors
transmit.go Run, beginTransmission, endTransmission, drainPlaybackBuffer, pttStartDelay, transmitSettleWait, runAuxPump
receive.go receiveLoop, playoutOneFrame, webPlayoutLoop, halfDuplexDecayLoop, isReceivingRemote
network.go buildNetwork, buildSinglePortChannel, replaceNetwork, listenRTPReceiver, multicast TTL + SO_RCVBUF setup
port_channel.go McastPortConfig, McastPortState, PortChannel, MarkRemoteRx, closePartial
fec_adapter.go FECAdapter, FECAdapterSnapshot, EWMA + state machine
snapshot.go CommsSnapshot, PortSnapshot, CommsSnapshotter (instrumentation registry adapter)
control_register.go init() registering the four backends; buildControlDeps; Validate
device.go normalizeControlSource, findCommDevice
doc.go Package doc + omd_omit_comms build stub
Sub-packages
Path Responsibility
audio/ BroadcastEncoder (always-on malgo capture + dedicated encode goroutine + TX gate), Init (hardware startup), PortSlot, Deps, SendFn, AudioEncoderSnapshot
audiopool/ Audio constants (FrameSize, SampleRate, Channels, EncBufSize), buffer pools (Float32Pool, Int16Pool, EncBufPool), RMSEnergy
codec/ AudioEncoder/AudioDecoder interfaces, Opus implementation (NewOpusEncoder/NewOpusDecoder/EncodeS16/DecodeS16/DecodeFloat32/SetPacketLossPerc)
control/ EventSource, PTTEvent, four backends (OpenVLMSource, ROIPSource, NanoPTTSource, WebEventSource), AuxEvent/AuxEventSource/AuxEventHandler, HalfDuplexGate, registry (Register/Lookup/Factory/ControlDeps), HIDDevice/HIDOpener, DetectAndSetALSACard
control/alsa/ Controller — pure-Go ALSA mixer AuxEventHandler for VOL+/VOL− on the OpenVLM
device/ AudioStream interface + NewMalgoStream, DiscoverCM108 (unified sysfs walk) + CM108Descriptor, CheckOpenVLMIdentity (HID GPIO1 strap probe), Cache, FindEvdev, IfaceIPv4/JoinMulticastGroup, ResolveAudio/LogAudioDevices
rtp/ Session (pion Packetizer + RTCP SR), Sender, JitterBuffer (ring buffer + SSRC tracking + EnableNotify + gap-run histogram), JitterBufferSnapshot, PacketWriter/PacketReader, SwappableSender (lock-free + SwapAndDeferClose)/SwappableReceiver, SSRCFromID, ParseIncoming
webaudio/ Bridge (RPC ↔ comms runtime plumbing for web mode), NewBridge, SendFn, InjectTxFrame, PushRxFrame, RxFrames, BridgeSnapshot

Documentation

Overview

Package comms provides PTT (push-to-talk) communications using pion-based RTP/RTCP. Build with -tags omd_omit_comms to exclude this package.

Index

Constants

View Source
const TargetBitrate int = 32000

TargetBitrate is the Opus encoder bitrate in bits-per-second used for all comms TX. Exported so the RPC layer can report it to UI clients.

Variables

View Source
var ErrNotRunning = errors.New("comms: subsystem is not running")

ErrNotRunning is returned by Service methods when the comms subsystem has not been started (or has been stopped). It is exported so callers can use errors.Is to distinguish it from other failure modes.

Functions

func SetDefault

func SetDefault(svc *Service)

SetDefault publishes svc as the process-wide default Service. Start calls this after constructing the runtime; shutdown passes nil. Tests that build a Service directly use it the same way to wire up the lazy lookup.

Types

type BroadcastCapture

type BroadcastCapture interface {
	device.AudioStream
	SetTxEnabled(bool)
}

BroadcastCapture is the unified capture stream interface the runtime exposes to the TX path. It extends device.AudioStream (lifecycle Start/Stop/Close, called once per StartHardware cycle) with a per-PTT SetTxEnabled gate. Captured frames always flow to the optional VOX tap; they only flow to the Opus encoder + RTP send when SetTxEnabled(true) is the most recent call. The stream is opened once at StartHardware and stays open for the lifetime of the comms run.

type CommsConfig

type CommsConfig struct {
	Log        zerolog.Logger
	AuxHandler control.AuxEventHandler
	Interrupt  chan os.Signal

	CommKey                  string
	BluetoothInputDevice     string
	Iface                    string
	BluetoothAudioDeviceHint string
	ControlSource            string
	NanoPTTDeviceName        string
	RtpID                    string
	NanoPTTDevicePath        string
	BluetoothOutputDevice    string
	McastPorts               []McastPortConfig
	ROIPMaxTXDuration        time.Duration
	ROIPVOXHoldTime          time.Duration
	EncoderComplexity        int
	PttStartDelayMs          int
	CaptureFramesPerBuffer   int
	CaptureLatencyMs         int
	PlaybackLatencyMs        int
	PacketLossPerc           int
	HalfDuplexThreshold      time.Duration
	ROIPVOXThreshold         float32
	MicGain                  float32
	EnableNanoPTT            bool
	EnableBluetoothPtt       bool
	Enable                   bool
	Trace                    bool
	Loopback                 bool
	Debug                    bool
	ROIPCOSGPIOMask          byte
	// contains filtered or unexported fields
}

CommsConfig holds the static configuration for the comms subsystem. Allocate one with NewComms and call Start to begin operation. All exported fields must be set before Start is called.

CommsConfig is treated as immutable after Start: the live runtime is owned by *Service (returned by Start via SetDefault) so the static config and the per-startup runtime have distinct lifetimes.

func NewComms

func NewComms(cfg CommsConfig) *CommsConfig

NewComms copies cfg and returns a pointer ready for Start.

func (*CommsConfig) Run

func (cfg *CommsConfig) Run(ctx context.Context, rt *CommsRuntime, src control.EventSource)

Run is the main event loop. It starts a receiveLoop goroutine for every Receive-capable port plus a single halfDuplexDecayLoop that clears the cached RemoteRxActive flag when every gate has gone quiet, then blocks dispatching PTT events until ctx is canceled.

func (*CommsConfig) Start

func (cfg *CommsConfig) Start(ctx context.Context) error

Start initializes all comms subsystems and blocks until ctx is canceled. Returns nil on clean shutdown, or an error if initialization fails. The caller is responsible for canceling ctx to stop the subsystem.

func (*CommsConfig) Validate

func (cfg *CommsConfig) Validate() error

Validate checks the comms configuration for self-consistency. Phase 2 of the comms refactor introduced this method specifically to verify that the configured ControlSource maps to a registered backend. Additional checks will be added by later phases. The method intentionally tolerates the empty string (treated as the openvlm default) so callers may invoke it either before or after normalizeControlSource has run.

type CommsLifecycle

type CommsLifecycle interface {
	Enable() error
	Disable()
	IsRunning() bool
}

CommsLifecycle defines the interface for managing comms runtime lifecycle. The handler depends on this interface so that tests can provide a mock.

type CommsManager

type CommsManager struct {
	// contains filtered or unexported fields
}

CommsManager owns the comms lifecycle and serializes enable/disable operations. It implements CommsLifecycle.

func NewCommsManager

func NewCommsManager(cfg *config.Config, logger zerolog.Logger) *CommsManager

NewCommsManager creates a new CommsManager. The manager is created at startup regardless of whether comms is enabled, so the API handler always has it.

func (*CommsManager) Disable

func (m *CommsManager) Disable()

Disable stops the comms subsystem and waits for cleanup to finish. It is idempotent: no-op if not running.

func (*CommsManager) Enable

func (m *CommsManager) Enable() error

Enable starts the comms subsystem. It is idempotent: if comms is already running it returns nil. The subsystem runs in a background goroutine and can be stopped with Disable. Validate is invoked synchronously so an invalid ControlSource is reported to the caller immediately rather than surfacing later as an asynchronous error inside the background goroutine.

func (*CommsManager) IsRunning

func (m *CommsManager) IsRunning() bool

IsRunning reports whether the comms subsystem is currently active.

type CommsRuntime

type CommsRuntime struct {
	Decoder         codec.AudioDecoder
	Encoder         codec.AudioEncoder
	BroadcastStream BroadcastCapture
	FECAdapter      *FECAdapter
	WebBridge       *webaudio.Bridge
	WebEvtSrc       *control.WebEventSource
	LocalIP         atomic.Pointer[string]
	BroadcastTap    atomic.Pointer[chan []float32]
	Ports           []*PortChannel
	BeepBufferStart []int16
	BeepBufferStop  []int16
	// PlaybackOutputLatency is the actual output latency the backend
	// granted when the per-port playback streams were opened. The TX
	// path uses it in beginTransmission to delay SetTxEnabled(true)
	// until the start-tone beep has fully emerged from the speaker so
	// an acoustic (or device sidetone) path from speaker → mic cannot
	// pick the beep up and transmit it.
	PlaybackOutputLatency time.Duration
	Broadcasting          atomic.Bool
	RemoteRxActive        atomic.Bool
}

CommsRuntime holds live resources allocated by Start. All audio/network fields are interfaces so that unit tests can inject fakes without hardware.

RemoteRxActive is the cached half-duplex receive flag. The PTT TX path reads it via isReceivingRemote in O(1) instead of walking every port's HalfDuplexGate. It is set immediately by receiveLoop on every inbound packet from a send-enabled port (no false negatives at the start of an incoming stream) and cleared by halfDuplexDecayLoop on a coarse 100 ms ticker once every gate's window has expired.

type CommsSnapshot

type CommsSnapshot struct {
	ControlSource    string                     `json:"control_source"`
	Ports            []PortSnapshot             `json:"ports"`
	BroadcastEncoder audio.AudioEncoderSnapshot `json:"broadcast_encoder"`
	WebBridge        webaudio.BridgeSnapshot    `json:"web_bridge"`
	FECAdapter       FECAdapterSnapshot         `json:"fec_adapter"`
	Enabled          bool                       `json:"enabled"`
	Broadcasting     bool                       `json:"broadcasting"`
	RemoteRxActive   bool                       `json:"remote_rx_active"`
}

CommsSnapshot is the full comms subsystem section that the instrumentation registry publishes under the "comms" name. Field semantics are documented in docs/instrumentation-snapshot.md — keep that file in sync when adding or renaming fields here.

type CommsSnapshotter

type CommsSnapshotter struct {
	// contains filtered or unexported fields
}

CommsSnapshotter is an instrumentation.Snapshotter adapter that wires the comms subsystem into the instrumentation registry. It holds an internal CommsSnapshot that is refreshed in place on every Refresh() call. The adapter looks up the live comms service on each call so it transparently handles enable/disable transitions.

func (*CommsSnapshotter) Data

func (c *CommsSnapshotter) Data() any

Data implements instrumentation.Snapshotter. Returns a pointer that is stable across Refresh calls.

func (*CommsSnapshotter) Refresh

func (c *CommsSnapshotter) Refresh()

Refresh implements instrumentation.Snapshotter. Zero-alloc after the first call that establishes the Ports slice capacity.

type FECAdapter

type FECAdapter struct {
	// contains filtered or unexported fields
}

FECAdapter is a local, damped control loop that observes the receive-side jitter-buffer gap-run histogram and adjusts the Opus encoder's packet-loss percentage in response. It runs as a single goroutine per comms runtime, spawned during CommsConfig.Run, and exits on ctx.Done().

The design assumes link symmetry — every node reads its own RX loss and applies the result to its own TX encoder. See the Round 7 plan for the omnidirectional-antenna justification of that assumption.

FECAdapter is safe for concurrent Snapshot calls alongside its own Run goroutine. All mutable state is guarded by a.mu, with four atomic mirrors for the Snapshot path so Snapshot never contends on the tick lock.

func NewFECAdapter

func NewFECAdapter(rt *CommsRuntime, encoder codec.AudioEncoder, floor int, log zerolog.Logger) *FECAdapter

NewFECAdapter constructs an adapter bound to rt. The initial level is clamped to [fecLevel20, fecLevel40] and clamped up to the floor. SetPacketLossPerc(level) is called on the encoder immediately so a stale value from a previous enable cycle does not linger.

func (*FECAdapter) Run

func (a *FECAdapter) Run(ctx context.Context)

Run blocks until ctx is canceled, ticking every fecTickInterval. This is the entry point launched as a goroutine during Run().

func (*FECAdapter) Snapshot

func (a *FECAdapter) Snapshot(dst *FECAdapterSnapshot)

Snapshot copies the adapter's current state into dst using atomic loads. Nil-safe on both receiver and dst. Zero-alloc: no heap allocations beyond what dst already holds.

type FECAdapterSnapshot

type FECAdapterSnapshot struct {
	CurrentLevel     int     `json:"current_level"`
	LossEWMA         float64 `json:"loss_ewma"`
	LastChangeUnixNs int64   `json:"last_change_unix_nano"`
	Transitions      int64   `json:"transitions"`
	WriteErrors      int64   `json:"write_errors"`
	Floor            int     `json:"floor"`
}

FECAdapterSnapshot is the publicly-observable state of the adapter. Populated by FECAdapter.Snapshot and carried on CommsSnapshot as the "fec_adapter" section. See docs/instrumentation-snapshot.md.

type McastPortConfig

type McastPortConfig struct {
	InitSendEnabled    *bool
	InitReceiveEnabled *bool
	Address            string
	Port               int
	Send               bool
	Receive            bool
}

McastPortConfig describes a single multicast endpoint that the comms subsystem listens and/or transmits on. Ports with Send=false will not open an RTP/RTCP sender; ports with Receive=false will not open an RTP receiver socket.

InitSendEnabled and InitReceiveEnabled seed the runtime atomic flags that EnableTalkGroupSend / EnableTalkGroupReceive toggle at runtime. When nil the values fall back to Send and Receive respectively, preserving backward compatibility for any caller that constructs McastPortConfig directly.

type McastPortState

type McastPortState struct {
	Address        string
	Port           int
	SendEnabled    bool
	ReceiveEnabled bool
}

McastPortState is a read-only snapshot of the runtime direction-toggle state for a single port. Returned by Service.TalkGroupStates.

type PortChannel

type PortChannel struct {
	RTPSess        rtp.Sender
	PlaybackStream device.AudioStream
	Sender         *rtp.SwappableSender
	RTCPSend       *rtp.SwappableSender
	Receiver       *rtp.SwappableReceiver
	Jitter         *rtp.JitterBuffer
	PlaybackBuffer chan []int16

	ConsecutivePLC    int
	RxGate            control.HalfDuplexGate
	PlaybackUnderruns atomic.Int64

	// Diagnostic RX-path counters. All monotonic since startup; reporters
	// compute deltas across windows. RxPkts is the raw "kernel handed us a
	// packet" count from receiveLoop's ReadFromUDP; the remaining counters
	// segment that count by what happened next. RxPushed + RxPushRejected
	// only sum to RxPkts - RxLoopback - RxParseErrs (and only when the port
	// is receive-enabled). Used to localize RX stutter to one specific
	// stage of the per-port pipeline.
	RxPkts         atomic.Int64
	RxParseErrs    atomic.Int64
	RxLoopback     atomic.Int64
	RxPushed       atomic.Int64
	RxPushRejected atomic.Int64

	// WebPoppedSkipped is bumped by webPlayoutLoop when the jitter
	// buffer's PopReady returns skippedMissing=true (an out-of-order
	// sequence gap wide enough that the buffer advanced the cursor past
	// the hole). Diagnostic only; the audio path never reads this field
	// itself. Zero on the hardware playout path, which does not use
	// webPlayoutLoop.
	WebPoppedSkipped atomic.Int64

	SendEnabled    atomic.Bool
	ReceiveEnabled atomic.Bool
	// contains filtered or unexported fields
}

PortChannel holds all live resources for one McastPortConfig entry. sendEnabled and receiveEnabled are atomic bools that can be toggled at runtime via EnableTalkGroupSend / EnableTalkGroupReceive without restarting any goroutine or socket.

jitter is the per-port RTP jitter buffer. It is allocated in buildSinglePortChannel for ports with a Receive socket and shared between receiveLoop (producer) and the malgo playback callback (consumer). For portChannels constructed directly in tests, callers must allocate it explicitly.

consecutivePLC is owned by the malgo playback callback for this port: each port has its own callback running on its own audio thread, so the field is single-writer and does not need atomic semantics. Tests that call playoutOneFrame directly are likewise single-threaded with respect to it.

playbackBuffer is retained as a one-shot side channel for TX beep tones (see transmit.go beginTransmission/endTransmission); the malgo playback callback drains it before falling through to playoutOneFrame so beeps preempt one frame of jitter-buffered audio.

func (*PortChannel) MarkRemoteRx

func (pc *PortChannel) MarkRemoteRx(rt *CommsRuntime)

MarkRemoteRx records that a remote RTP packet has just been received on this port for half-duplex enforcement. It stamps the port's RxGate and, when this port is currently send-enabled, primes the runtime-wide RemoteRxActive cache so the PTT TX path observes a busy channel without waiting for the next halfDuplexDecayLoop tick. Receive-only ports never block our own transmissions, so the cache is left untouched in that case.

Production callers reach this from receiveLoop after a successful RTP parse; tests use it as the single canonical way to express "a remote packet arrived" so the cache invariant is exercised the same way it is in production.

func (*PortChannel) Snapshot

func (pc *PortChannel) Snapshot(dst *PortSnapshot)

Snapshot fills dst with the port's counter state. Nil-safe. Zero-alloc.

type PortSnapshot

type PortSnapshot struct {
	// Address is the multicast group address (e.g. "239.0.0.1").
	Address string `json:"address"`
	// Jitter is the per-port receive jitter buffer snapshot.
	Jitter rtp.JitterBufferSnapshot `json:"jitter"`
	// RxGate is the per-port half-duplex gate state.
	RxGate control.HalfDuplexGateSnapshot `json:"rx_gate"`
	// Port is the multicast UDP port.
	Port int `json:"port"`
	// PlaybackUnderruns counts playback-side decode failures that the
	// port audio callback had to recover from via PLC.
	PlaybackUnderruns int64 `json:"playback_underruns"`
	// RxPkts is the monotonic count of successful ReadFromUDP returns on
	// this port's receive socket (packets the kernel handed us).
	RxPkts int64 `json:"rx_pkts"`
	// RxLoopback counts packets dropped by the loopback filter (own-IP
	// suppression) before they reached the RTP parser.
	RxLoopback int64 `json:"rx_loopback"`
	// RxParseErrs counts packets that failed rtp.ParseIncoming.
	RxParseErrs int64 `json:"rx_parse_errs"`
	// RxPushed counts packets that PushWithSSRC accepted into the jitter
	// buffer. In a healthy stream, RxPushed ≈ RxPkts - RxLoopback -
	// RxParseErrs.
	RxPushed int64 `json:"rx_pushed"`
	// RxPushRejected counts packets that PushWithSSRC rejected as stale,
	// duplicate, or overflow. A sustained nonzero delta while
	// jitter.ssrc_resets stays flat indicates a consumer-side
	// cursor-advance bug or severe sender reordering. See
	// "Interpretation heuristics" in docs/instrumentation-snapshot.md.
	RxPushRejected int64 `json:"rx_push_rejected"`
	// WebPoppedSkipped counts PopReady skippedMissing=true returns from
	// webPlayoutLoop: the jitter buffer had enough queued out-of-order
	// packets to advance the cursor past a missing frame. Zero on the
	// hardware playout path.
	WebPoppedSkipped int64 `json:"web_popped_skipped"`
	// SendEnabled is the runtime send-direction toggle. A port with
	// SendEnabled=false will not open an encoder or publish TX frames.
	SendEnabled bool `json:"send_enabled"`
	// ReceiveEnabled is the runtime receive-direction toggle. A port
	// with ReceiveEnabled=false will not push incoming RTP frames into
	// its jitter buffer.
	ReceiveEnabled bool `json:"receive_enabled"`
}

PortSnapshot is the per-talk-group section of a CommsSnapshot.

type Service

type Service struct {
	Cfg *CommsConfig
	Rt  *CommsRuntime
}

Service is the live comms subsystem instance returned (conceptually) by Start. It carries an immutable *CommsConfig snapshot and the live *CommsRuntime, and exposes the accessor methods that the HTTP handlers need to read or mutate runtime state.

Cfg is set once at construction and never replaced; Rt is set once after Start finishes building the runtime and is cleared on shutdown via SetDefault(nil). Methods on *Service tolerate nil receivers and a nil Service.Rt so handlers can defensively call them before comms is enabled.

func Default

func Default() *Service

Default returns the Service most recently published by Start, or nil when comms has not been started (or has stopped).

func (*Service) ActiveMulticastAddr

func (s *Service) ActiveMulticastAddr() string

ActiveMulticastAddr returns the multicast group address of the first configured port. Returns "" when no ports are configured.

func (*Service) ActiveMulticastPort

func (s *Service) ActiveMulticastPort() int

ActiveMulticastPort returns the UDP port of the first configured port. Returns 0 when no ports are configured.

func (*Service) EnableTalkGroupReceive

func (s *Service) EnableTalkGroupReceive(portIdx int, enabled bool) error

EnableTalkGroupReceive toggles RTP reception on the port at portIdx.

func (*Service) EnableTalkGroupSend

func (s *Service) EnableTalkGroupSend(portIdx int, enabled bool) error

EnableTalkGroupSend toggles RTP transmission on the port at portIdx.

func (*Service) Snapshot

func (s *Service) Snapshot(dst *CommsSnapshot)

Snapshot fills dst with a consistent view of the comms runtime. The method is nil-safe on the receiver and dereferences s.Rt exactly once so it observes a single consistent runtime pointer across the call, even if SetDefault races with it.

dst.Ports is reused across captures: the slice capacity is retained and only resized when the number of ports changes. After the first call with the steady-state runtime present, Snapshot is allocation- free (verified by TestService_SnapshotZeroAlloc).

func (*Service) TalkGroupStates

func (s *Service) TalkGroupStates() ([]McastPortState, error)

TalkGroupStates returns a snapshot of per-port direction-toggle state.

func (*Service) WebAudioBridge

func (s *Service) WebAudioBridge() *webaudio.Bridge

WebAudioBridge returns the web audio bridge if one was constructed, otherwise nil.

func (*Service) WebEventSource

func (s *Service) WebEventSource() *control.WebEventSource

WebEventSource returns the web control source if one was constructed, otherwise nil.

Directories

Path Synopsis
Package audio hosts the malgo (miniaudio) capture/playback wrappers and the Opus broadcast encoder used by the comms subsystem.
Package audio hosts the malgo (miniaudio) capture/playback wrappers and the Opus broadcast encoder used by the comms subsystem.
Package audiopool holds audio numeric constants and float32 buffer pools shared by sibling sub-packages of internal/comms (audio/, control/).
Package audiopool holds audio numeric constants and float32 buffer pools shared by sibling sub-packages of internal/comms (audio/, control/).
Package codec defines audio encoder/decoder interfaces and an Opus implementation.
Package codec defines audio encoder/decoder interfaces and an Opus implementation.
Package control defines the PTT event source abstractions consumed by the comms package.
Package control defines the PTT event source abstractions consumed by the comms package.
alsa
Package alsa provides an AuxEventHandler that adjusts the system ALSA playback volume in response to volume up/down aux events.
Package alsa provides an AuxEventHandler that adjusts the system ALSA playback volume in response to volume up/down aux events.
Package device provides hardware device discovery helpers for the comms subsystem.
Package device provides hardware device discovery helpers for the comms subsystem.
Package webaudio hosts the web-mode audio bridge that replaces the malgo pipeline when the comms control source is "web".
Package webaudio hosts the web-mode audio bridge that replaces the malgo pipeline when the comms control source is "web".

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL