Documentation
¶
Overview ¶
Package comms provides PTT (push-to-talk) communications using pion-based RTP/RTCP. Build with -tags omd_omit_comms to exclude this package.
Index ¶
- Constants
- Variables
- func SetDefault(svc *Service)
- type BroadcastCapture
- type CommsConfig
- type CommsLifecycle
- type CommsManager
- type CommsRuntime
- type CommsSnapshot
- type CommsSnapshotter
- type FECAdapter
- type FECAdapterSnapshot
- type McastPortConfig
- type McastPortState
- type PortChannel
- type PortSnapshot
- type Service
- func (s *Service) ActiveMulticastAddr() string
- func (s *Service) ActiveMulticastPort() int
- func (s *Service) EnableTalkGroupReceive(portIdx int, enabled bool) error
- func (s *Service) EnableTalkGroupSend(portIdx int, enabled bool) error
- func (s *Service) Snapshot(dst *CommsSnapshot)
- func (s *Service) TalkGroupStates() ([]McastPortState, error)
- func (s *Service) WebAudioBridge() *webaudio.Bridge
- func (s *Service) WebEventSource() *control.WebEventSource
Constants ¶
const TargetBitrate int = 32000
TargetBitrate is the Opus encoder bitrate in bits-per-second used for all comms TX. Exported so the RPC layer can report it to UI clients.
Variables ¶
var ErrNotRunning = errors.New("comms: subsystem is not running")
ErrNotRunning is returned by Service methods when the comms subsystem has not been started (or has been stopped). It is exported so callers can use errors.Is to distinguish it from other failure modes.
Functions ¶
func SetDefault ¶
func SetDefault(svc *Service)
SetDefault publishes svc as the process-wide default Service. Start calls this after constructing the runtime; shutdown passes nil. Tests that build a Service directly use it the same way to wire up the lazy lookup.
Types ¶
type BroadcastCapture ¶
type BroadcastCapture interface {
device.AudioStream
SetTxEnabled(bool)
}
BroadcastCapture is the unified capture stream interface the runtime exposes to the TX path. It extends device.AudioStream (lifecycle Start/Stop/Close, called once per StartHardware cycle) with a per-PTT SetTxEnabled gate. Captured frames always flow to the optional VOX tap; they only flow to the Opus encoder + RTP send when SetTxEnabled(true) is the most recent call. The stream is opened once at StartHardware and stays open for the lifetime of the comms run.
type CommsConfig ¶
type CommsConfig struct {
Log zerolog.Logger
AuxHandler control.AuxEventHandler
Interrupt chan os.Signal
CommKey string
BluetoothInputDevice string
Iface string
BluetoothAudioDeviceHint string
ControlSource string
NanoPTTDeviceName string
RtpID string
NanoPTTDevicePath string
BluetoothOutputDevice string
McastPorts []McastPortConfig
ROIPMaxTXDuration time.Duration
ROIPVOXHoldTime time.Duration
EncoderComplexity int
PttStartDelayMs int
CaptureFramesPerBuffer int
CaptureLatencyMs int
PlaybackLatencyMs int
PacketLossPerc int
HalfDuplexThreshold time.Duration
ROIPVOXThreshold float32
MicGain float32
EnableNanoPTT bool
EnableBluetoothPtt bool
Enable bool
Trace bool
Loopback bool
Debug bool
ROIPCOSGPIOMask byte
// contains filtered or unexported fields
}
CommsConfig holds the static configuration for the comms subsystem. Allocate one with NewComms and call Start to begin operation. All exported fields must be set before Start is called.
CommsConfig is treated as immutable after Start: the live runtime is owned by *Service (returned by Start via SetDefault) so the static config and the per-startup runtime have distinct lifetimes.
func NewComms ¶
func NewComms(cfg CommsConfig) *CommsConfig
NewComms copies cfg and returns a pointer ready for Start.
func (*CommsConfig) Run ¶
func (cfg *CommsConfig) Run(ctx context.Context, rt *CommsRuntime, src control.EventSource)
Run is the main event loop. It starts a receiveLoop goroutine for every Receive-capable port plus a single halfDuplexDecayLoop that clears the cached RemoteRxActive flag when every gate has gone quiet, then blocks dispatching PTT events until ctx is canceled.
func (*CommsConfig) Start ¶
func (cfg *CommsConfig) Start(ctx context.Context) error
Start initializes all comms subsystems and blocks until ctx is canceled. Returns nil on clean shutdown, or an error if initialization fails. The caller is responsible for canceling ctx to stop the subsystem.
func (*CommsConfig) Validate ¶
func (cfg *CommsConfig) Validate() error
Validate checks the comms configuration for self-consistency. Phase 2 of the comms refactor introduced this method specifically to verify that the configured ControlSource maps to a registered backend. Additional checks will be added by later phases. The method intentionally tolerates the empty string (treated as the openvlm default) so callers may invoke it either before or after normalizeControlSource has run.
type CommsLifecycle ¶
CommsLifecycle defines the interface for managing comms runtime lifecycle. The handler depends on this interface so that tests can provide a mock.
type CommsManager ¶
type CommsManager struct {
// contains filtered or unexported fields
}
CommsManager owns the comms lifecycle and serializes enable/disable operations. It implements CommsLifecycle.
func NewCommsManager ¶
func NewCommsManager(cfg *config.Config, logger zerolog.Logger) *CommsManager
NewCommsManager creates a new CommsManager. The manager is created at startup regardless of whether comms is enabled, so the API handler always has it.
func (*CommsManager) Disable ¶
func (m *CommsManager) Disable()
Disable stops the comms subsystem and waits for cleanup to finish. It is idempotent: no-op if not running.
func (*CommsManager) Enable ¶
func (m *CommsManager) Enable() error
Enable starts the comms subsystem. It is idempotent: if comms is already running it returns nil. The subsystem runs in a background goroutine and can be stopped with Disable. Validate is invoked synchronously so an invalid ControlSource is reported to the caller immediately rather than surfacing later as an asynchronous error inside the background goroutine.
func (*CommsManager) IsRunning ¶
func (m *CommsManager) IsRunning() bool
IsRunning reports whether the comms subsystem is currently active.
type CommsRuntime ¶
type CommsRuntime struct {
Decoder codec.AudioDecoder
Encoder codec.AudioEncoder
BroadcastStream BroadcastCapture
FECAdapter *FECAdapter
WebBridge *webaudio.Bridge
WebEvtSrc *control.WebEventSource
LocalIP atomic.Pointer[string]
BroadcastTap atomic.Pointer[chan []float32]
Ports []*PortChannel
BeepBufferStart []int16
BeepBufferStop []int16
// PlaybackOutputLatency is the actual output latency the backend
// granted when the per-port playback streams were opened. The TX
// path uses it in beginTransmission to delay SetTxEnabled(true)
// until the start-tone beep has fully emerged from the speaker so
// an acoustic (or device sidetone) path from speaker → mic cannot
// pick the beep up and transmit it.
PlaybackOutputLatency time.Duration
Broadcasting atomic.Bool
RemoteRxActive atomic.Bool
}
CommsRuntime holds live resources allocated by Start. All audio/network fields are interfaces so that unit tests can inject fakes without hardware.
RemoteRxActive is the cached half-duplex receive flag. The PTT TX path reads it via isReceivingRemote in O(1) instead of walking every port's HalfDuplexGate. It is set immediately by receiveLoop on every inbound packet from a send-enabled port (no false negatives at the start of an incoming stream) and cleared by halfDuplexDecayLoop on a coarse 100 ms ticker once every gate's window has expired.
type CommsSnapshot ¶
type CommsSnapshot struct {
ControlSource string `json:"control_source"`
Ports []PortSnapshot `json:"ports"`
BroadcastEncoder audio.AudioEncoderSnapshot `json:"broadcast_encoder"`
WebBridge webaudio.BridgeSnapshot `json:"web_bridge"`
FECAdapter FECAdapterSnapshot `json:"fec_adapter"`
Enabled bool `json:"enabled"`
Broadcasting bool `json:"broadcasting"`
RemoteRxActive bool `json:"remote_rx_active"`
}
CommsSnapshot is the full comms subsystem section that the instrumentation registry publishes under the "comms" name. Field semantics are documented in docs/instrumentation-snapshot.md — keep that file in sync when adding or renaming fields here.
type CommsSnapshotter ¶
type CommsSnapshotter struct {
// contains filtered or unexported fields
}
CommsSnapshotter is an instrumentation.Snapshotter adapter that wires the comms subsystem into the instrumentation registry. It holds an internal CommsSnapshot that is refreshed in place on every Refresh() call. The adapter looks up the live comms service on each call so it transparently handles enable/disable transitions.
func (*CommsSnapshotter) Data ¶
func (c *CommsSnapshotter) Data() any
Data implements instrumentation.Snapshotter. Returns a pointer that is stable across Refresh calls.
func (*CommsSnapshotter) Refresh ¶
func (c *CommsSnapshotter) Refresh()
Refresh implements instrumentation.Snapshotter. Zero-alloc after the first call that establishes the Ports slice capacity.
type FECAdapter ¶
type FECAdapter struct {
// contains filtered or unexported fields
}
FECAdapter is a local, damped control loop that observes the receive-side jitter-buffer gap-run histogram and adjusts the Opus encoder's packet-loss percentage in response. It runs as a single goroutine per comms runtime, spawned during CommsConfig.Run, and exits on ctx.Done().
The design assumes link symmetry — every node reads its own RX loss and applies the result to its own TX encoder. See the Round 7 plan for the omnidirectional-antenna justification of that assumption.
FECAdapter is safe for concurrent Snapshot calls alongside its own Run goroutine. All mutable state is guarded by a.mu, with four atomic mirrors for the Snapshot path so Snapshot never contends on the tick lock.
func NewFECAdapter ¶
func NewFECAdapter(rt *CommsRuntime, encoder codec.AudioEncoder, floor int, log zerolog.Logger) *FECAdapter
NewFECAdapter constructs an adapter bound to rt. The initial level is clamped to [fecLevel20, fecLevel40] and clamped up to the floor. SetPacketLossPerc(level) is called on the encoder immediately so a stale value from a previous enable cycle does not linger.
func (*FECAdapter) Run ¶
func (a *FECAdapter) Run(ctx context.Context)
Run blocks until ctx is canceled, ticking every fecTickInterval. This is the entry point launched as a goroutine during Run().
func (*FECAdapter) Snapshot ¶
func (a *FECAdapter) Snapshot(dst *FECAdapterSnapshot)
Snapshot copies the adapter's current state into dst using atomic loads. Nil-safe on both receiver and dst. Zero-alloc: no heap allocations beyond what dst already holds.
type FECAdapterSnapshot ¶
type FECAdapterSnapshot struct {
CurrentLevel int `json:"current_level"`
LossEWMA float64 `json:"loss_ewma"`
LastChangeUnixNs int64 `json:"last_change_unix_nano"`
Transitions int64 `json:"transitions"`
WriteErrors int64 `json:"write_errors"`
Floor int `json:"floor"`
}
FECAdapterSnapshot is the publicly-observable state of the adapter. Populated by FECAdapter.Snapshot and carried on CommsSnapshot as the "fec_adapter" section. See docs/instrumentation-snapshot.md.
type McastPortConfig ¶
type McastPortConfig struct {
InitSendEnabled *bool
InitReceiveEnabled *bool
Address string
Port int
Send bool
Receive bool
}
McastPortConfig describes a single multicast endpoint that the comms subsystem listens and/or transmits on. Ports with Send=false will not open an RTP/RTCP sender; ports with Receive=false will not open an RTP receiver socket.
InitSendEnabled and InitReceiveEnabled seed the runtime atomic flags that EnableTalkGroupSend / EnableTalkGroupReceive toggle at runtime. When nil the values fall back to Send and Receive respectively, preserving backward compatibility for any caller that constructs McastPortConfig directly.
type McastPortState ¶
McastPortState is a read-only snapshot of the runtime direction-toggle state for a single port. Returned by Service.TalkGroupStates.
type PortChannel ¶
type PortChannel struct {
RTPSess rtp.Sender
PlaybackStream device.AudioStream
Sender *rtp.SwappableSender
RTCPSend *rtp.SwappableSender
Receiver *rtp.SwappableReceiver
Jitter *rtp.JitterBuffer
PlaybackBuffer chan []int16
ConsecutivePLC int
RxGate control.HalfDuplexGate
PlaybackUnderruns atomic.Int64
// Diagnostic RX-path counters. All monotonic since startup; reporters
// compute deltas across windows. RxPkts is the raw "kernel handed us a
// packet" count from receiveLoop's ReadFromUDP; the remaining counters
// segment that count by what happened next. RxPushed + RxPushRejected
// only sum to RxPkts - RxLoopback - RxParseErrs (and only when the port
// is receive-enabled). Used to localize RX stutter to one specific
// stage of the per-port pipeline.
RxPkts atomic.Int64
RxParseErrs atomic.Int64
RxLoopback atomic.Int64
RxPushed atomic.Int64
RxPushRejected atomic.Int64
// WebPoppedSkipped is bumped by webPlayoutLoop when the jitter
// buffer's PopReady returns skippedMissing=true (an out-of-order
// sequence gap wide enough that the buffer advanced the cursor past
// the hole). Diagnostic only; the audio path never reads this field
// itself. Zero on the hardware playout path, which does not use
// webPlayoutLoop.
WebPoppedSkipped atomic.Int64
SendEnabled atomic.Bool
ReceiveEnabled atomic.Bool
// contains filtered or unexported fields
}
PortChannel holds all live resources for one McastPortConfig entry. sendEnabled and receiveEnabled are atomic bools that can be toggled at runtime via EnableTalkGroupSend / EnableTalkGroupReceive without restarting any goroutine or socket.
jitter is the per-port RTP jitter buffer. It is allocated in buildSinglePortChannel for ports with a Receive socket and shared between receiveLoop (producer) and the malgo playback callback (consumer). For portChannels constructed directly in tests, callers must allocate it explicitly.
consecutivePLC is owned by the malgo playback callback for this port: each port has its own callback running on its own audio thread, so the field is single-writer and does not need atomic semantics. Tests that call playoutOneFrame directly are likewise single-threaded with respect to it.
playbackBuffer is retained as a one-shot side channel for TX beep tones (see transmit.go beginTransmission/endTransmission); the malgo playback callback drains it before falling through to playoutOneFrame so beeps preempt one frame of jitter-buffered audio.
func (*PortChannel) MarkRemoteRx ¶
func (pc *PortChannel) MarkRemoteRx(rt *CommsRuntime)
MarkRemoteRx records that a remote RTP packet has just been received on this port for half-duplex enforcement. It stamps the port's RxGate and, when this port is currently send-enabled, primes the runtime-wide RemoteRxActive cache so the PTT TX path observes a busy channel without waiting for the next halfDuplexDecayLoop tick. Receive-only ports never block our own transmissions, so the cache is left untouched in that case.
Production callers reach this from receiveLoop after a successful RTP parse; tests use it as the single canonical way to express "a remote packet arrived" so the cache invariant is exercised the same way it is in production.
func (*PortChannel) Snapshot ¶
func (pc *PortChannel) Snapshot(dst *PortSnapshot)
Snapshot fills dst with the port's counter state. Nil-safe. Zero-alloc.
type PortSnapshot ¶
type PortSnapshot struct {
// Address is the multicast group address (e.g. "239.0.0.1").
Address string `json:"address"`
// Jitter is the per-port receive jitter buffer snapshot.
Jitter rtp.JitterBufferSnapshot `json:"jitter"`
// RxGate is the per-port half-duplex gate state.
RxGate control.HalfDuplexGateSnapshot `json:"rx_gate"`
// Port is the multicast UDP port.
Port int `json:"port"`
// PlaybackUnderruns counts playback-side decode failures that the
// port audio callback had to recover from via PLC.
PlaybackUnderruns int64 `json:"playback_underruns"`
// RxPkts is the monotonic count of successful ReadFromUDP returns on
// this port's receive socket (packets the kernel handed us).
RxPkts int64 `json:"rx_pkts"`
// RxLoopback counts packets dropped by the loopback filter (own-IP
// suppression) before they reached the RTP parser.
RxLoopback int64 `json:"rx_loopback"`
// RxParseErrs counts packets that failed rtp.ParseIncoming.
RxParseErrs int64 `json:"rx_parse_errs"`
// RxPushed counts packets that PushWithSSRC accepted into the jitter
// buffer. In a healthy stream, RxPushed ≈ RxPkts - RxLoopback -
// RxParseErrs.
RxPushed int64 `json:"rx_pushed"`
// RxPushRejected counts packets that PushWithSSRC rejected as stale,
// duplicate, or overflow. A sustained nonzero delta while
// jitter.ssrc_resets stays flat indicates a consumer-side
// cursor-advance bug or severe sender reordering. See
// "Interpretation heuristics" in docs/instrumentation-snapshot.md.
RxPushRejected int64 `json:"rx_push_rejected"`
// WebPoppedSkipped counts PopReady skippedMissing=true returns from
// webPlayoutLoop: the jitter buffer had enough queued out-of-order
// packets to advance the cursor past a missing frame. Zero on the
// hardware playout path.
WebPoppedSkipped int64 `json:"web_popped_skipped"`
// SendEnabled is the runtime send-direction toggle. A port with
// SendEnabled=false will not open an encoder or publish TX frames.
SendEnabled bool `json:"send_enabled"`
// ReceiveEnabled is the runtime receive-direction toggle. A port
// with ReceiveEnabled=false will not push incoming RTP frames into
// its jitter buffer.
ReceiveEnabled bool `json:"receive_enabled"`
}
PortSnapshot is the per-talk-group section of a CommsSnapshot.
type Service ¶
type Service struct {
Cfg *CommsConfig
Rt *CommsRuntime
}
Service is the live comms subsystem instance returned (conceptually) by Start. It carries an immutable *CommsConfig snapshot and the live *CommsRuntime, and exposes the accessor methods that the HTTP handlers need to read or mutate runtime state.
Cfg is set once at construction and never replaced; Rt is set once after Start finishes building the runtime and is cleared on shutdown via SetDefault(nil). Methods on *Service tolerate nil receivers and a nil Service.Rt so handlers can defensively call them before comms is enabled.
func Default ¶
func Default() *Service
Default returns the Service most recently published by Start, or nil when comms has not been started (or has stopped).
func (*Service) ActiveMulticastAddr ¶
ActiveMulticastAddr returns the multicast group address of the first configured port. Returns "" when no ports are configured.
func (*Service) ActiveMulticastPort ¶
ActiveMulticastPort returns the UDP port of the first configured port. Returns 0 when no ports are configured.
func (*Service) EnableTalkGroupReceive ¶
EnableTalkGroupReceive toggles RTP reception on the port at portIdx.
func (*Service) EnableTalkGroupSend ¶
EnableTalkGroupSend toggles RTP transmission on the port at portIdx.
func (*Service) Snapshot ¶
func (s *Service) Snapshot(dst *CommsSnapshot)
Snapshot fills dst with a consistent view of the comms runtime. The method is nil-safe on the receiver and dereferences s.Rt exactly once so it observes a single consistent runtime pointer across the call, even if SetDefault races with it.
dst.Ports is reused across captures: the slice capacity is retained and only resized when the number of ports changes. After the first call with the steady-state runtime present, Snapshot is allocation- free (verified by TestService_SnapshotZeroAlloc).
func (*Service) TalkGroupStates ¶
func (s *Service) TalkGroupStates() ([]McastPortState, error)
TalkGroupStates returns a snapshot of per-port direction-toggle state.
func (*Service) WebAudioBridge ¶
WebAudioBridge returns the web audio bridge if one was constructed, otherwise nil.
func (*Service) WebEventSource ¶
func (s *Service) WebEventSource() *control.WebEventSource
WebEventSource returns the web control source if one was constructed, otherwise nil.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package audio hosts the malgo (miniaudio) capture/playback wrappers and the Opus broadcast encoder used by the comms subsystem.
|
Package audio hosts the malgo (miniaudio) capture/playback wrappers and the Opus broadcast encoder used by the comms subsystem. |
|
Package audiopool holds audio numeric constants and float32 buffer pools shared by sibling sub-packages of internal/comms (audio/, control/).
|
Package audiopool holds audio numeric constants and float32 buffer pools shared by sibling sub-packages of internal/comms (audio/, control/). |
|
Package codec defines audio encoder/decoder interfaces and an Opus implementation.
|
Package codec defines audio encoder/decoder interfaces and an Opus implementation. |
|
Package control defines the PTT event source abstractions consumed by the comms package.
|
Package control defines the PTT event source abstractions consumed by the comms package. |
|
alsa
Package alsa provides an AuxEventHandler that adjusts the system ALSA playback volume in response to volume up/down aux events.
|
Package alsa provides an AuxEventHandler that adjusts the system ALSA playback volume in response to volume up/down aux events. |
|
Package device provides hardware device discovery helpers for the comms subsystem.
|
Package device provides hardware device discovery helpers for the comms subsystem. |
|
Package webaudio hosts the web-mode audio bridge that replaces the malgo pipeline when the comms control source is "web".
|
Package webaudio hosts the web-mode audio bridge that replaces the malgo pipeline when the comms control source is "web". |