rtp

package
v0.0.0-...-ce36b61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// PrebufferPackets is the number of frames the jitter buffer must
	// hold before playout begins. Each frame is 20 ms, so 5 packets ≈ 100 ms.
	// This sets the minimum receive latency floor and the safety margin for
	// arrival jitter on the mesh. 100 ms tolerates roughly one full packet
	// of late arrival before the buffer empties and playout falls into PLC.
	PrebufferPackets = 5
	MaxDepth         = 24

	// MaxOpusPayloadSize is the RFC 6716 §3.2.1 maximum encoded frame size.
	// Payload pool buffers are sized to this capacity to eliminate per-packet
	// heap allocations in the jitter buffer hot path.
	MaxOpusPayloadSize = 1275
)
View Source
const (
	PayloadTypeOpus = uint8(111)

	FrameSamples = uint32(960) // 20 ms at 48 kHz
	MTU          = uint16(1400)
)
View Source
const SwapCloseGrace = 50 * time.Millisecond

SwapCloseGrace is how long a swap defers closing the previous PacketWriter so any in-flight lock-free Write calls against it can finish their single UDP syscall. Writes on the hot path take no lock at all (they only do an atomic Load), so using a WaitGroup would panic ("Add called concurrently with Wait"); instead we bound the grace period and rely on the fact that Write's critical path is a single non-blocking sendto(2).

Variables

This section is empty.

Functions

func ParseIncoming

func ParseIncoming(raw []byte) (*pionrtp.Packet, error)

ParseIncoming parses a raw UDP datagram into a pion RTP Packet. Returns an error if the bytes are not a valid RTP packet.

func SSRCFromID

func SSRCFromID(id string) uint32

SSRCFromID returns a deterministic uint32 SSRC derived from an ID string using the FNV-1a 32-bit hash.

Types

type JitterBuffer

type JitterBuffer struct {
	Overflows  atomic.Int64
	SSRCResets atomic.Int64
	IdleResets atomic.Int64
	// GapRuns{N} counts contiguous sequence-gap runs observed by the
	// skip-missing branch of popReadyLocked. Each gap is bucketed once
	// at the point the consumer first skips into it — subsequent
	// single-step cursor advances over the same run are not re-counted.
	// The buckets distinguish isolated loss (GapRuns1, GapRuns2to5)
	// from long contiguous bursts (GapRuns11to20 and up) so operators
	// can tell whether FEC/redundancy or masking is the right recovery
	// strategy. MaxDepth caps any single measured run at maxDepth, so
	// the larger buckets will only fire if MaxDepth is raised.
	GapRuns1      atomic.Int64
	GapRuns2to5   atomic.Int64
	GapRuns6to10  atomic.Int64
	GapRuns11to20 atomic.Int64
	GapRuns21to50 atomic.Int64
	GapRunsOver50 atomic.Int64
	// contains filtered or unexported fields
}

JitterBuffer is a sequence-number-ordered buffer for RTP audio payloads. It provides prebuffering, late-packet dropping, gap detection for PLC, and SSRC-change/idle-gap detection so a new talker is never silently dropped because their starting sequence number lies in the "past half" of the previous talker's frozen sequence cursor.

Internally, frames are stored in a fixed-size circular array indexed by (seq % maxDepth), eliminating all map allocations on the hot path.

notifyCh is the optional edge-triggered "frame available" wakeup used by consumers that prefer push-driven scheduling over a polling ticker (see EnableNotify and webPlayoutLoop). It is nil until a consumer opts in; malgo-driven consumers, which are clocked by the audio hardware, do not call EnableNotify and pay nothing on the push hot path beyond the nil-check.

func NewJitterBuffer

func NewJitterBuffer(prebuffer, maxDepth int) *JitterBuffer

func (*JitterBuffer) AdvancePast

func (jb *JitterBuffer) AdvancePast()

advancePast discards the current expected sequence number and advances the playout cursor by one.

func (*JitterBuffer) EnableNotify

func (jb *JitterBuffer) EnableNotify() <-chan struct{}

EnableNotify lazily allocates and returns the edge-triggered "frame available" channel. Consumers that prefer push-driven wakeup over a polling ticker call it once at startup and then select on the returned channel. Each successful Push that follows fires a non-blocking signal (depth-1 buffer, coalesced); the consumer should drain every available payload after each wake. Calling EnableNotify a second time returns the same channel.

func (*JitterBuffer) PopOrConceal

func (jb *JitterBuffer) PopOrConceal(recentWindow time.Duration) (payload []byte, conceal bool)

popOrConceal performs the full playout-tick logic under a single lock acquisition, eliminating the 3-lock-per-tick pattern previously used by playoutLoop (popReady + shouldConceal + advancePast).

Returns:

  • payload != nil: an in-order frame was available.
  • conceal == true: no frame was available but the stream is active and PLC should be applied. The playout cursor has already been advanced.
  • both nil/false: no frame available and no concealment needed.

func (*JitterBuffer) PopReady

func (jb *JitterBuffer) PopReady() (payload []byte, ready bool, skippedMissing bool)

popReady returns the next in-order payload when available.

skippedMissing is true when the buffer advances past a gap (caller should apply PLC for the skipped frame). ready is true when a payload is returned.

func (*JitterBuffer) Push deprecated

func (jb *JitterBuffer) Push(seq uint16, payload []byte) bool

push stores a received payload keyed by sequence number. Returns false if the packet is stale, a duplicate, or the buffer is full.

Deprecated: use pushWithSSRC. push is retained for tests that pre-date SSRC tracking; it treats every packet as belonging to a single anonymous stream.

func (*JitterBuffer) PushWithSSRC

func (jb *JitterBuffer) PushWithSSRC(ssrc uint32, seq uint16, payload []byte, onSSRCChange func(oldSSRC, newSSRC uint32)) bool

pushWithSSRC stores a received payload, tracking the SSRC of the source stream. When the SSRC changes mid-stream, the buffer is reset and re- initialized from the new packet — without this, a new talker whose starting sequence number happens to lie in the "past half" of the previous talker's frozen cursor would be silently rejected forever.

If onSSRCChange is non-nil, it is invoked (without holding jb.mu) when an SSRC change is detected, with the old and new SSRC values. Pass nil if you don't need notification (e.g. tests).

func (*JitterBuffer) ReleasePayload

func (jb *JitterBuffer) ReleasePayload(p []byte)

releasePayload returns a jitter-buffer payload slice back to the pool. Only pool-allocated slices (cap == MaxOpusPayloadSize) are accepted; anything else (test slices, nil) is silently ignored.

func (*JitterBuffer) Reset

func (jb *JitterBuffer) Reset()

reset clears all buffered state so the jitter buffer can be reused for a new RTP stream (e.g. after a talk-group switch). The next push will re-initialize the expected sequence number from the first arriving packet.

func (*JitterBuffer) ShouldConceal

func (jb *JitterBuffer) ShouldConceal(recentWindow time.Duration) bool

shouldConceal returns true when a packet arrived recently enough that PLC is appropriate for a missing frame (i.e. the stream is active but gapped).

func (*JitterBuffer) Snapshot

func (jb *JitterBuffer) Snapshot(dst *JitterBufferSnapshot)

Snapshot copies the current counter values into dst using atomic loads. Safe to call concurrently with pushWithSSRC and pop. Nil-safe on both receiver and dst. Allocation-free.

type JitterBufferSnapshot

type JitterBufferSnapshot struct {
	// Overflows is the monotonic count of incoming packets the jitter
	// buffer rejected because it was full. Sustained non-zero overflow
	// deltas suggest the receiver is behind the sender or the network
	// is bursting.
	Overflows int64 `json:"overflows"`
	// SSRCResets is the monotonic count of mid-stream SSRC transitions
	// the jitter buffer handled by resetting and re-initializing. A
	// high value suggests multiple talkers or a sender restart.
	SSRCResets int64 `json:"ssrc_resets"`
	// IdleResets is the monotonic count of gap-driven buffer resets
	// (the "same SSRC but the stream has been silent for longer than
	// the idle threshold" safety net).
	IdleResets int64 `json:"idle_resets"`
	// GapRuns{N} is the monotonic count of contiguous sequence-gap
	// runs observed at the jitter buffer's skip-missing branch,
	// bucketed by run length in frames (20 ms each). A single gap is
	// counted exactly once at the skip point, not once per skipped
	// frame. Distribution across buckets distinguishes isolated loss
	// (bucket 1, 2–5) from long contiguous bursts (11–20 and up) and
	// drives the choice of recovery strategy (FEC/RED vs masking).
	GapRuns1      int64 `json:"gap_runs_1"`
	GapRuns2to5   int64 `json:"gap_runs_2_5"`
	GapRuns6to10  int64 `json:"gap_runs_6_10"`
	GapRuns11to20 int64 `json:"gap_runs_11_20"`
	GapRuns21to50 int64 `json:"gap_runs_21_50"`
	GapRunsOver50 int64 `json:"gap_runs_over_50"`
}

JitterBufferSnapshot is a point-in-time copy of the per-port receive jitter buffer's monotonic counters. Field semantics are documented in docs/instrumentation-snapshot.md — keep that file in sync when adding or renaming fields here.

Note that depth/occupancy are intentionally omitted. Reading them would require acquiring jb.mu, which the snapshot path never holds. The three fields below are read via atomic loads and so observe producer writes without contention.

type PacketReader

type PacketReader interface {
	ReadFromUDP(b []byte) (int, *net.UDPAddr, error)
	Close() error
}

PacketReader abstracts the UDP receive path so receiveLoop can be exercised with pre-seeded byte sequences in tests.

type PacketWriter

type PacketWriter interface {
	Write(b []byte) (int, error)
}

PacketWriter abstracts the UDP send path so the broadcast callback can be tested without a live socket.

type Sender

type Sender interface {
	Send(payload []byte) error
}

Sender is the interface the broadcast encoder uses to ship an encoded Opus frame over the network. Backed by Session in production; tests inject mockRTPSender.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session wraps a pion Packetizer and an interceptor chain that generates periodic RTCP Sender Reports. One session represents one local SSRC (the node running this software).

The RTCP path is one-way outbound: the SR generator fires every 5 seconds and writes to the provided rtcpTransport. Inbound RTP is parsed externally with ParseIncoming; the session does not handle receive stats (each remote SSRC in a multicast group would require a separate receiver).

Concurrency: Send is the sole writer to packetizer and rtpWriter. In production each *Session is owned by exactly one PortChannel, and Send is only called from the per-encoder broadcastEncoder.encodeLoop goroutine (one writer per encodeLoop, one encodeLoop per process). The pion SenderInterceptor's RTCP timer runs on its own goroutine but only writes to the RTCP transport (bound separately at NewSession), not the RTP packetizer or rtpWriter — so there is no contention with Send. Adding a second concurrent Send caller without external synchronization would be a bug; the call invariant is enforced by code review, not a mutex.

func NewSession

func NewSession(
	ssrc uint32,
	rtpTransport PacketWriter,
	rtcpTransport PacketWriter,
	log zerolog.Logger,
) (*Session, error)

NewSession creates a Session that sends RTP via rtpTransport and RTCP Sender Reports via rtcpTransport.

The interceptor chain contains a single report.SenderInterceptor that generates an outbound RTCP SR every 5 seconds. Inbound RTCP (e.g. Receiver Reports from peers) is not processed — in a multicast PTT topology each transmission has multiple receivers and no single feedback path is meaningful.

func (*Session) Close

func (s *Session) Close() error

Close shuts down the interceptor chain, stopping internal timer goroutines.

func (*Session) Send

func (s *Session) Send(payload []byte) error

Send encodes payload into one or more RTP packets using the Packetizer and writes them through the interceptor chain (and ultimately the UDP socket). For Opus, Packetize always returns exactly one packet.

Send is single-writer; see Session's type comment for the call invariant.

type SwappableReceiver

type SwappableReceiver struct {
	// contains filtered or unexported fields
}

SwappableReceiver wraps a PacketReader so it can be atomically replaced at runtime without races with the blocking receive loop.

ReadFromUDP snapshots the current implementation under a read lock and then releases the lock before blocking, so a concurrent swap is never blocked by an in-flight I/O call. Closing the old connection after the swap unblocks any in-progress ReadFromUDP on the old socket, causing receiveLoop to loop back and immediately read from the new socket.

func NewSwappableReceiver

func NewSwappableReceiver(r PacketReader) *SwappableReceiver

func (*SwappableReceiver) Close

func (r *SwappableReceiver) Close() error

Close satisfies PacketReader and closes the current underlying reader.

func (*SwappableReceiver) ReadFromUDP

func (r *SwappableReceiver) ReadFromUDP(b []byte) (int, *net.UDPAddr, error)

ReadFromUDP satisfies PacketReader.

func (*SwappableReceiver) Swap

Swap atomically replaces the underlying PacketReader and returns the previous one so the caller can close it (which unblocks any in-flight ReadFromUDP on the old connection).

type SwappableSender

type SwappableSender struct {
	// contains filtered or unexported fields
}

SwappableSender wraps a PacketWriter so it can be atomically replaced at runtime without races with in-flight writes.

Concurrency model:

  • The hot path (Write) is lock-free: it atomically loads the current PacketWriter pointer and performs the single underlying UDP sendto(2). No mutex and no WaitGroup increment, so concurrent writers do not contend against each other or against swappers.
  • swap() serializes concurrent swappers via swapMu, publishes the new PacketWriter via atomic.Pointer.Store(), and returns the previous one. The caller is responsible for closing the previous PacketWriter.
  • Because the hot path holds no lock, swap cannot prove that writers are done with the previous pointer before it returns. Callers that close the returned PacketWriter synchronously risk closing a socket out from under an in-flight sendto(2). To avoid this, swap returns a deferred closer via swapAndDeferClose which schedules the underlying close after SwapCloseGrace — long enough for any in-flight write syscall to complete. This is the tradeoff the refactor plan explicitly allows as the fallback to a WaitGroup-based drain (the WaitGroup approach fails with "Add called concurrently with Wait" because writers do not take any lock that could be ordered against Wait).
  • Close takes a snapshot and closes it if it implements io.Closer.

func NewSwappableSender

func NewSwappableSender(w PacketWriter) *SwappableSender

func (*SwappableSender) Close

func (s *SwappableSender) Close() error

Close closes the current underlying PacketWriter if it implements io.Closer.

func (*SwappableSender) Swap

Swap atomically replaces the underlying PacketWriter and returns the previous one so the caller can close it. The caller must not close the returned PacketWriter synchronously — see swapAndDeferClose for the safe close path that honors the SwapCloseGrace window for in-flight writes.

func (*SwappableSender) SwapAndDeferClose

func (s *SwappableSender) SwapAndDeferClose(newW PacketWriter)

SwapAndDeferClose replaces the underlying PacketWriter with newW and schedules the previous one's Close (if it implements io.Closer) after SwapCloseGrace. The grace window lets any in-flight lock-free Write on the old impl finish its single sendto(2) before the underlying fd is closed.

func (*SwappableSender) Write

func (s *SwappableSender) Write(b []byte) (int, error)

Write satisfies PacketWriter. Fully lock-free on the hot path: a single atomic pointer load, then the underlying Write (one sendto(2) on the UDP fast path).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL