Documentation
¶
Index ¶
- Constants
- func ParseIncoming(raw []byte) (*pionrtp.Packet, error)
- func SSRCFromID(id string) uint32
- type JitterBuffer
- func (jb *JitterBuffer) AdvancePast()
- func (jb *JitterBuffer) EnableNotify() <-chan struct{}
- func (jb *JitterBuffer) PopOrConceal(recentWindow time.Duration) (payload []byte, conceal bool)
- func (jb *JitterBuffer) PopReady() (payload []byte, ready bool, skippedMissing bool)
- func (jb *JitterBuffer) Push(seq uint16, payload []byte) booldeprecated
- func (jb *JitterBuffer) PushWithSSRC(ssrc uint32, seq uint16, payload []byte, ...) bool
- func (jb *JitterBuffer) ReleasePayload(p []byte)
- func (jb *JitterBuffer) Reset()
- func (jb *JitterBuffer) ShouldConceal(recentWindow time.Duration) bool
- func (jb *JitterBuffer) Snapshot(dst *JitterBufferSnapshot)
- type JitterBufferSnapshot
- type PacketReader
- type PacketWriter
- type Sender
- type Session
- type SwappableReceiver
- type SwappableSender
Constants ¶
const ( // PrebufferPackets is the number of frames the jitter buffer must // hold before playout begins. Each frame is 20 ms, so 5 packets ≈ 100 ms. // This sets the minimum receive latency floor and the safety margin for // arrival jitter on the mesh. 100 ms tolerates roughly one full packet // of late arrival before the buffer empties and playout falls into PLC. PrebufferPackets = 5 MaxDepth = 24 // MaxOpusPayloadSize is the RFC 6716 §3.2.1 maximum encoded frame size. // Payload pool buffers are sized to this capacity to eliminate per-packet // heap allocations in the jitter buffer hot path. MaxOpusPayloadSize = 1275 )
const ( PayloadTypeOpus = uint8(111) FrameSamples = uint32(960) // 20 ms at 48 kHz MTU = uint16(1400) )
const SwapCloseGrace = 50 * time.Millisecond
SwapCloseGrace is how long a swap defers closing the previous PacketWriter so any in-flight lock-free Write calls against it can finish their single UDP syscall. Writes on the hot path take no lock at all (they only do an atomic Load), so using a WaitGroup would panic ("Add called concurrently with Wait"); instead we bound the grace period and rely on the fact that Write's critical path is a single non-blocking sendto(2).
Variables ¶
This section is empty.
Functions ¶
func ParseIncoming ¶
ParseIncoming parses a raw UDP datagram into a pion RTP Packet. Returns an error if the bytes are not a valid RTP packet.
func SSRCFromID ¶
SSRCFromID returns a deterministic uint32 SSRC derived from an ID string using the FNV-1a 32-bit hash.
Types ¶
type JitterBuffer ¶
type JitterBuffer struct {
Overflows atomic.Int64
SSRCResets atomic.Int64
IdleResets atomic.Int64
// GapRuns{N} counts contiguous sequence-gap runs observed by the
// skip-missing branch of popReadyLocked. Each gap is bucketed once
// at the point the consumer first skips into it — subsequent
// single-step cursor advances over the same run are not re-counted.
// The buckets distinguish isolated loss (GapRuns1, GapRuns2to5)
// from long contiguous bursts (GapRuns11to20 and up) so operators
// can tell whether FEC/redundancy or masking is the right recovery
// strategy. MaxDepth caps any single measured run at maxDepth, so
// the larger buckets will only fire if MaxDepth is raised.
GapRuns1 atomic.Int64
GapRuns2to5 atomic.Int64
GapRuns6to10 atomic.Int64
GapRuns11to20 atomic.Int64
GapRuns21to50 atomic.Int64
GapRunsOver50 atomic.Int64
// contains filtered or unexported fields
}
JitterBuffer is a sequence-number-ordered buffer for RTP audio payloads. It provides prebuffering, late-packet dropping, gap detection for PLC, and SSRC-change/idle-gap detection so a new talker is never silently dropped because their starting sequence number lies in the "past half" of the previous talker's frozen sequence cursor.
Internally, frames are stored in a fixed-size circular array indexed by (seq % maxDepth), eliminating all map allocations on the hot path.
notifyCh is the optional edge-triggered "frame available" wakeup used by consumers that prefer push-driven scheduling over a polling ticker (see EnableNotify and webPlayoutLoop). It is nil until a consumer opts in; malgo-driven consumers, which are clocked by the audio hardware, do not call EnableNotify and pay nothing on the push hot path beyond the nil-check.
func NewJitterBuffer ¶
func NewJitterBuffer(prebuffer, maxDepth int) *JitterBuffer
func (*JitterBuffer) AdvancePast ¶
func (jb *JitterBuffer) AdvancePast()
advancePast discards the current expected sequence number and advances the playout cursor by one.
func (*JitterBuffer) EnableNotify ¶
func (jb *JitterBuffer) EnableNotify() <-chan struct{}
EnableNotify lazily allocates and returns the edge-triggered "frame available" channel. Consumers that prefer push-driven wakeup over a polling ticker call it once at startup and then select on the returned channel. Each successful Push that follows fires a non-blocking signal (depth-1 buffer, coalesced); the consumer should drain every available payload after each wake. Calling EnableNotify a second time returns the same channel.
func (*JitterBuffer) PopOrConceal ¶
func (jb *JitterBuffer) PopOrConceal(recentWindow time.Duration) (payload []byte, conceal bool)
popOrConceal performs the full playout-tick logic under a single lock acquisition, eliminating the 3-lock-per-tick pattern previously used by playoutLoop (popReady + shouldConceal + advancePast).
Returns:
- payload != nil: an in-order frame was available.
- conceal == true: no frame was available but the stream is active and PLC should be applied. The playout cursor has already been advanced.
- both nil/false: no frame available and no concealment needed.
func (*JitterBuffer) PopReady ¶
func (jb *JitterBuffer) PopReady() (payload []byte, ready bool, skippedMissing bool)
popReady returns the next in-order payload when available.
skippedMissing is true when the buffer advances past a gap (caller should apply PLC for the skipped frame). ready is true when a payload is returned.
func (*JitterBuffer) Push
deprecated
func (jb *JitterBuffer) Push(seq uint16, payload []byte) bool
push stores a received payload keyed by sequence number. Returns false if the packet is stale, a duplicate, or the buffer is full.
Deprecated: use pushWithSSRC. push is retained for tests that pre-date SSRC tracking; it treats every packet as belonging to a single anonymous stream.
func (*JitterBuffer) PushWithSSRC ¶
func (jb *JitterBuffer) PushWithSSRC(ssrc uint32, seq uint16, payload []byte, onSSRCChange func(oldSSRC, newSSRC uint32)) bool
pushWithSSRC stores a received payload, tracking the SSRC of the source stream. When the SSRC changes mid-stream, the buffer is reset and re- initialized from the new packet — without this, a new talker whose starting sequence number happens to lie in the "past half" of the previous talker's frozen cursor would be silently rejected forever.
If onSSRCChange is non-nil, it is invoked (without holding jb.mu) when an SSRC change is detected, with the old and new SSRC values. Pass nil if you don't need notification (e.g. tests).
func (*JitterBuffer) ReleasePayload ¶
func (jb *JitterBuffer) ReleasePayload(p []byte)
releasePayload returns a jitter-buffer payload slice back to the pool. Only pool-allocated slices (cap == MaxOpusPayloadSize) are accepted; anything else (test slices, nil) is silently ignored.
func (*JitterBuffer) Reset ¶
func (jb *JitterBuffer) Reset()
reset clears all buffered state so the jitter buffer can be reused for a new RTP stream (e.g. after a talk-group switch). The next push will re-initialize the expected sequence number from the first arriving packet.
func (*JitterBuffer) ShouldConceal ¶
func (jb *JitterBuffer) ShouldConceal(recentWindow time.Duration) bool
shouldConceal returns true when a packet arrived recently enough that PLC is appropriate for a missing frame (i.e. the stream is active but gapped).
func (*JitterBuffer) Snapshot ¶
func (jb *JitterBuffer) Snapshot(dst *JitterBufferSnapshot)
Snapshot copies the current counter values into dst using atomic loads. Safe to call concurrently with pushWithSSRC and pop. Nil-safe on both receiver and dst. Allocation-free.
type JitterBufferSnapshot ¶
type JitterBufferSnapshot struct {
// Overflows is the monotonic count of incoming packets the jitter
// buffer rejected because it was full. Sustained non-zero overflow
// deltas suggest the receiver is behind the sender or the network
// is bursting.
Overflows int64 `json:"overflows"`
// SSRCResets is the monotonic count of mid-stream SSRC transitions
// the jitter buffer handled by resetting and re-initializing. A
// high value suggests multiple talkers or a sender restart.
SSRCResets int64 `json:"ssrc_resets"`
// IdleResets is the monotonic count of gap-driven buffer resets
// (the "same SSRC but the stream has been silent for longer than
// the idle threshold" safety net).
IdleResets int64 `json:"idle_resets"`
// GapRuns{N} is the monotonic count of contiguous sequence-gap
// runs observed at the jitter buffer's skip-missing branch,
// bucketed by run length in frames (20 ms each). A single gap is
// counted exactly once at the skip point, not once per skipped
// frame. Distribution across buckets distinguishes isolated loss
// (bucket 1, 2–5) from long contiguous bursts (11–20 and up) and
// drives the choice of recovery strategy (FEC/RED vs masking).
GapRuns1 int64 `json:"gap_runs_1"`
GapRuns2to5 int64 `json:"gap_runs_2_5"`
GapRuns6to10 int64 `json:"gap_runs_6_10"`
GapRuns11to20 int64 `json:"gap_runs_11_20"`
GapRuns21to50 int64 `json:"gap_runs_21_50"`
GapRunsOver50 int64 `json:"gap_runs_over_50"`
}
JitterBufferSnapshot is a point-in-time copy of the per-port receive jitter buffer's monotonic counters. Field semantics are documented in docs/instrumentation-snapshot.md — keep that file in sync when adding or renaming fields here.
Note that depth/occupancy are intentionally omitted. Reading them would require acquiring jb.mu, which the snapshot path never holds. The three fields below are read via atomic loads and so observe producer writes without contention.
type PacketReader ¶
PacketReader abstracts the UDP receive path so receiveLoop can be exercised with pre-seeded byte sequences in tests.
type PacketWriter ¶
PacketWriter abstracts the UDP send path so the broadcast callback can be tested without a live socket.
type Sender ¶
Sender is the interface the broadcast encoder uses to ship an encoded Opus frame over the network. Backed by Session in production; tests inject mockRTPSender.
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session wraps a pion Packetizer and an interceptor chain that generates periodic RTCP Sender Reports. One session represents one local SSRC (the node running this software).
The RTCP path is one-way outbound: the SR generator fires every 5 seconds and writes to the provided rtcpTransport. Inbound RTP is parsed externally with ParseIncoming; the session does not handle receive stats (each remote SSRC in a multicast group would require a separate receiver).
Concurrency: Send is the sole writer to packetizer and rtpWriter. In production each *Session is owned by exactly one PortChannel, and Send is only called from the per-encoder broadcastEncoder.encodeLoop goroutine (one writer per encodeLoop, one encodeLoop per process). The pion SenderInterceptor's RTCP timer runs on its own goroutine but only writes to the RTCP transport (bound separately at NewSession), not the RTP packetizer or rtpWriter — so there is no contention with Send. Adding a second concurrent Send caller without external synchronization would be a bug; the call invariant is enforced by code review, not a mutex.
func NewSession ¶
func NewSession( ssrc uint32, rtpTransport PacketWriter, rtcpTransport PacketWriter, log zerolog.Logger, ) (*Session, error)
NewSession creates a Session that sends RTP via rtpTransport and RTCP Sender Reports via rtcpTransport.
The interceptor chain contains a single report.SenderInterceptor that generates an outbound RTCP SR every 5 seconds. Inbound RTCP (e.g. Receiver Reports from peers) is not processed — in a multicast PTT topology each transmission has multiple receivers and no single feedback path is meaningful.
func (*Session) Send ¶
Send encodes payload into one or more RTP packets using the Packetizer and writes them through the interceptor chain (and ultimately the UDP socket). For Opus, Packetize always returns exactly one packet.
Send is single-writer; see Session's type comment for the call invariant.
type SwappableReceiver ¶
type SwappableReceiver struct {
// contains filtered or unexported fields
}
SwappableReceiver wraps a PacketReader so it can be atomically replaced at runtime without races with the blocking receive loop.
ReadFromUDP snapshots the current implementation under a read lock and then releases the lock before blocking, so a concurrent swap is never blocked by an in-flight I/O call. Closing the old connection after the swap unblocks any in-progress ReadFromUDP on the old socket, causing receiveLoop to loop back and immediately read from the new socket.
func NewSwappableReceiver ¶
func NewSwappableReceiver(r PacketReader) *SwappableReceiver
func (*SwappableReceiver) Close ¶
func (r *SwappableReceiver) Close() error
Close satisfies PacketReader and closes the current underlying reader.
func (*SwappableReceiver) ReadFromUDP ¶
ReadFromUDP satisfies PacketReader.
func (*SwappableReceiver) Swap ¶
func (r *SwappableReceiver) Swap(newR PacketReader) PacketReader
Swap atomically replaces the underlying PacketReader and returns the previous one so the caller can close it (which unblocks any in-flight ReadFromUDP on the old connection).
type SwappableSender ¶
type SwappableSender struct {
// contains filtered or unexported fields
}
SwappableSender wraps a PacketWriter so it can be atomically replaced at runtime without races with in-flight writes.
Concurrency model:
- The hot path (Write) is lock-free: it atomically loads the current PacketWriter pointer and performs the single underlying UDP sendto(2). No mutex and no WaitGroup increment, so concurrent writers do not contend against each other or against swappers.
- swap() serializes concurrent swappers via swapMu, publishes the new PacketWriter via atomic.Pointer.Store(), and returns the previous one. The caller is responsible for closing the previous PacketWriter.
- Because the hot path holds no lock, swap cannot prove that writers are done with the previous pointer before it returns. Callers that close the returned PacketWriter synchronously risk closing a socket out from under an in-flight sendto(2). To avoid this, swap returns a deferred closer via swapAndDeferClose which schedules the underlying close after SwapCloseGrace — long enough for any in-flight write syscall to complete. This is the tradeoff the refactor plan explicitly allows as the fallback to a WaitGroup-based drain (the WaitGroup approach fails with "Add called concurrently with Wait" because writers do not take any lock that could be ordered against Wait).
- Close takes a snapshot and closes it if it implements io.Closer.
func NewSwappableSender ¶
func NewSwappableSender(w PacketWriter) *SwappableSender
func (*SwappableSender) Close ¶
func (s *SwappableSender) Close() error
Close closes the current underlying PacketWriter if it implements io.Closer.
func (*SwappableSender) Swap ¶
func (s *SwappableSender) Swap(newW PacketWriter) PacketWriter
Swap atomically replaces the underlying PacketWriter and returns the previous one so the caller can close it. The caller must not close the returned PacketWriter synchronously — see swapAndDeferClose for the safe close path that honors the SwapCloseGrace window for in-flight writes.
func (*SwappableSender) SwapAndDeferClose ¶
func (s *SwappableSender) SwapAndDeferClose(newW PacketWriter)
SwapAndDeferClose replaces the underlying PacketWriter with newW and schedules the previous one's Close (if it implements io.Closer) after SwapCloseGrace. The grace window lets any in-flight lock-free Write on the old impl finish its single sendto(2) before the underlying fd is closed.