Documentation
¶
Overview ¶
Design of Prober
Probing is to used to check for existence of excess channel capacity. This is especially useful in the downstream direction of SFU. SFU forwards audio/video streams from one or more publishers to all the subscribers. But, the downstream channel of a subscriber may not be big enough to carry all the streams. It is also a time varying quantity.
When there is not enough capacity, some streams will be paused. To resume a stream, SFU would need to know that the channel has enough capacity. That's where probing comes in. When conditions are favorable, SFU can send probe packets so that the bandwidth estimator has more data to estimate available channel capacity better. NOTE: What defines `favorable conditions` is implementation dependent.
There are two options for probing
- Use padding only RTP packets: This one is preferable as probe rate can be controlled more tightly.
- Resume a paused stream or forward a higher spatial layer: Have to find a stream at probing rate. Also, a stream could get a key frame unexpectedly boosting rate in the probing window.
The strategy used depends on stream allocator implementation. This module can be used if the stream allocator decides to use padding only RTP packets for probing purposes.
Implementation: There are a couple of options
- Check prober in the forwarding path (pull from prober). This is preferred for scalability reasons. But, this suffers from not being able to probe when all streams are paused (could be due to downstream bandwidth constraints or the corresponding upstream tracks may have paused due to upstream bandwidth constraints). Another issue is not being to have tight control on probing window boundary as the packet forwarding path may not have a packet to forward. But, it should not be a major concern as long as some stream(s) is/are forwarded as there should be a packet atleast every 60 ms or so (forwarding only one stream at 15 fps). Usually, it will be serviced much more frequently when there are multiple streams getting forwarded.
- Run it a go routine. But, that would have to wake up very often to prevent bunching up of probe packets. So, a scalability concern as there is one prober per subscriber peer connection. But, probe windows should be very short (of the order of 100s of ms). So, this approach might be fine.
The implementation here follows the second approach of using a go routine.
Pacing: ------ Ideally, the subscriber peer connection should have a pacer which trickles data out at the estimated channel capacity rate (and estimated channel capacity + probing rate when actively probing).
But, there a few significant challenges
- Pacer will require buffering of forwarded packets. That means more memory, more CPU (have to make copy of packets) and more latency in the media stream.
- Scalability concern as SFU may be handling hundreds of subscriber peer connections and each one processing the pacing loop at 5ms interval will add up.
So, this module assumes that pacing is inherently provided by the publishers for media streams. That is a reasonable assumption given that publishing clients will run their own pacer and pacing data out at a steady rate.
A further assumption is that if there are multiple publishers for a subscriber peer connection, all the publishers are not pacing in sync, i. e. each publisher's pacer is completely independent and SFU will be receiving the media packets with a good spread and not clumped together.
Given those assumptions, this module monitors media send rate and adjusts probing packet sends accordingly. Although the probing may have a high enough wake up frequency, it is for short windows. For example, probing at 5 Mbps for 1/2 second and sending 1000 byte probe per iteration will wake up every 1.6 ms. That is very high, but should last for 1/2 second or so.
5 Mbps over 1/2 second = 2.5 Mbps 2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes 313 probes over 1/2 second = 1.6 ms between probes
A few things to note
- When a probe cluster is added, the expected media rate is provided. So, the wake up interval takes that into account. For example, if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected to be provided by media traffic, the wake up interval becomes 8 ms.
- The amount of probing should actually be capped at some value to avoid too much self-induced congestion. It maybe something like 500 kbps. That will increase the wake up interval to 16 ms in the above example.
- In practice, the probing interval may also be shorter. Typically, it can be run for 2 - 3 RTTs to get a good measurement. For the longest hauls, RTT could be 250 ms or so leading to the probing window being long(ish). But, RTT should be much shorter especially if the subscriber peer connection of the client is able to connect to the nearest data center.
Design of StreamAllocator ¶
Each participant uses one peer connection for all downstream traffic. It is possible that the downstream peer connection gets congested. In such an event, the SFU (sender on that peer connection) should take measures to mitigate the media loss and latency that would result from such a congestion.
This module is supposed to aggregate down stream tracks and drive bandwidth allocation with the goals of
- Try and send highest quality media
- React as quickly as possible to mitigate congestion
Setup: ------ The following should be done to set up a stream allocator
- There will be one of these per subscriber peer connection. Created in livekit-sever/transport.go for subscriber type peer connections.
- In `AddSubscribedTrack` of livekit-server/participant.go, the created downTrack is added to the stream allocator.
- In `RemoveSubscribedTrack` of livekit-server/participant.go, the downTrack is removed from the stream allocator.
- Both video and audio tracks are added to this module. Although the stream allocator does not act on audio track forwarding, audio track information like loss rate may be used to adjust available bandwidth.
Callbacks: ---------- StreamAllocator registers the following callbacks on all registered down tracks
- OnREMB: called when down track receives RTCP REMB. Note that REMB is a peer connection level aggregate metric. But, it contains all the SSRCs used in the calculation of that REMB. So, there could be multiple callbacks per RTCP REMB received (one each from down track pertaining to the contained SSRCs) with the same estimated channel capacity.
- AddReceiverReportListener: called when down track received RTCP RR (Receiver Report).
- OnAvailableLayersChanged: called when the feeding track changes its layers. This could happen due to publisher throttling layers due to upstream congestion in its path.
- OnSubscriptionChanged: called when a down track settings are changed resulting from client side requests (muting/pausing a video or limiting maximum layer).
- OnPacketSent: called when a media packet is forwarded by the down track. As this happens once per forwarded packet, processing in this callback should be kept to a minimum.
The following may be needed depending on the StreamAllocator algorithm
- OnBitrateUpdate: called periodically to update the bit rate at which a down track is forwarding. This can be used to measure any overshoot and adjust allocations accordingly. This may have granular information like primary bitrate, retransmitted bitrate and padding bitrate.
State machine: -------------- The most critical component. It should monitor current state of channel and take actions to provide the best user experience by striving to achieve the goals outlined earlier
States:
------
- State_PRE_COMMIT: Before the first estimate is committed.
Estimated channel capacity is initialized to some
arbitrarily high value to start streaming immediately.
Serves two purposes
1. Gives the bandwidth estimation algorithms data
2. Start streaming as soon as a user joins. Imagine
a user joining a room with 10 participants already
in it. That user should start receiving streams
from everybody as soon as possible.
- State_STABLE: When all streams are forwarded at their optimal requested layers.
- State_DEFICIENT: When at least one stream is not able to forward optimal requested layers.
- State_GRATUITOUS_PROBING: When all streams are forwarded at their optimal requested layers,
but probing for extra capacity to be prepared for cases like
new participant joining and streaming OR an existing participant
starting a new stream like enabling camera or screen share.
Signals:
-------
Each state should take action based on these signals and advance the state machine based
on the result of the action.
- Signal_ADD_TRACK: A new track has been added.
- Signal_REMOVE_TRACK: An existing track has been removed.
- Signal_ESTIMATE_INCREASE: Estimated channel capacity is increasing.
- Signal_ESTIMATE_DECREASE: Estimated channel capacity is decreasing. Note that when
channel gets congested, it is possible to get several of these
in a very short time window.
- Signal_RECEIVER_REPORT: An RTCP Receiver Report received from some down track.
- Signal_AVAILABLE_LAYERS_ADD: Available layers of publisher changed, new layer(s) available.
- Signal_AVAILABLE_LAYERS_REMOVE: Available layers of publisher changed, some previously
available layer(s) not available anymore.
- Signal_SUBSCRIPTION_CHANGE: Subscription changed (mute/requested layers changed).
- Signal_PERIODIC_PING: Periodic ping
There are several interesting challenges which are documented in relevant code below.
Index ¶
- Constants
- Variables
- type BoostMode
- type Cluster
- type DownTrack
- func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener)
- func (d *DownTrack) AdjustAllocation(availableChannelCapacity uint64) (uint64, uint64)
- func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error)
- func (d *DownTrack) Close()
- func (d *DownTrack) Codec() webrtc.RTPCodecCapability
- func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport
- func (d *DownTrack) CreateSourceDescriptionChunks() []rtcp.SourceDescriptionChunk
- func (d *DownTrack) CurrentMaxLossFraction() uint8
- func (d *DownTrack) CurrentSpatialLayer() int32
- func (d *DownTrack) DebugInfo() map[string]interface{}
- func (d *DownTrack) Enabled() bool
- func (d *DownTrack) ID() string
- func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64)
- func (d *DownTrack) Kind() webrtc.RTPCodecType
- func (d *DownTrack) MaxSpatialLayer() int32
- func (d *DownTrack) MaybeTranslateVP8(pkt *rtp.Packet, meta packetMeta) error
- func (d *DownTrack) Mute(val bool)
- func (d *DownTrack) OnAvailableLayersChanged(fn func(dt *DownTrack, layerAdded bool))
- func (d *DownTrack) OnBind(fn func())
- func (d *DownTrack) OnCloseHandler(fn func())
- func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int))
- func (d *DownTrack) OnREMB(fn func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate))
- func (d *DownTrack) OnRTCP(fn func([]rtcp.Packet))
- func (d *DownTrack) OnSubscriptionChanged(fn func(dt *DownTrack))
- func (d *DownTrack) SSRC() uint32
- func (d *DownTrack) SetInitialLayers(spatialLayer, temporalLayer int32)
- func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter)
- func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver)
- func (d *DownTrack) Stop() error
- func (d *DownTrack) StreamID() string
- func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error
- func (d *DownTrack) SwitchSpatialLayerDone(layer int32)
- func (d *DownTrack) SwitchTemporalLayer(targetLayer int32, setAsMax bool)
- func (d *DownTrack) TargetSpatialLayer() int32
- func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error
- func (d *DownTrack) UpdateStats(packetLen uint32)
- func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded bool) (int32, error)
- func (d *DownTrack) WritePaddingRTP(bytesToSend int) int
- func (d *DownTrack) WriteRTP(p *buffer.ExtPacket, layer int32) error
- func (d *DownTrack) WriteRTPHeaderExtensions(hdr *rtp.Header) error
- type DownTrackType
- type Event
- type Munger
- func (m *Munger) IsOnFrameBoundary() bool
- func (m *Munger) PacketDropped(extPkt *buffer.ExtPacket)
- func (m *Munger) SetLastSnTs(extPkt *buffer.ExtPacket)
- func (m *Munger) UpdateAndGetPaddingSnTs(forceMarker bool) (uint16, uint32, error)
- func (m *Munger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (uint16, uint32, SequenceNumberOrdering, error)
- func (m *Munger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32)
- type MungerParams
- type Prober
- type Receiver
- type ReceiverOpts
- type ReceiverReportListener
- type SequenceNumberOrdering
- type Signal
- type State
- type StreamAllocator
- type StreamStatus
- type StreamTracker
- type Track
- func (t *Track) AdjustAllocation(availableChannelCapacity uint64) (uint64, uint64)
- func (t *Track) BandwidthOptimal() uint64
- func (t *Track) BandwidthRequested() uint64
- func (t *Track) GetPacketStats() (webrtc.RTPCodecType, uint32, uint32)
- func (t *Track) IncreaseAllocation() (bool, uint64)
- func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport)
- func (t *Track) WritePaddingRTP(bytesToSend int) int
- type VP8Munger
- func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket)
- func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, ...) (*buffer.VP8, error)
- func (v *VP8Munger) UpdateAndGetPadding(newPicture bool) (*buffer.VP8, error)
- func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket)
- type VP8MungerParams
- type VP8PictureIdWrapHandler
- type WebRTCReceiver
- func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool)
- func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer, bestQualityFirst bool)
- func (w *WebRTCReceiver) Codec() webrtc.RTPCodecParameters
- func (w *WebRTCReceiver) DebugInfo() map[string]interface{}
- func (w *WebRTCReceiver) DeleteDownTrack(peerID string)
- func (w *WebRTCReceiver) GetBitrate() [3]uint64
- func (w *WebRTCReceiver) GetBitrateTemporal() [3][4]uint64
- func (w *WebRTCReceiver) GetBitrateTemporalCumulative() [3][4]uint64
- func (w *WebRTCReceiver) GetMaxTemporalLayer() [3]int32
- func (w *WebRTCReceiver) GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64)
- func (w *WebRTCReceiver) HasSpatialLayer(layer int32) bool
- func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType
- func (w *WebRTCReceiver) OnCloseHandler(fn func())
- func (w *WebRTCReceiver) RetransmitPackets(track *DownTrack, packets []packetMeta) error
- func (w *WebRTCReceiver) SSRC(layer int) uint32
- func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet)
- func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet)
- func (w *WebRTCReceiver) SetTrackMeta(trackID, streamID string)
- func (w *WebRTCReceiver) SetUpTrackPaused(paused bool)
- func (w *WebRTCReceiver) StreamID() string
- func (w *WebRTCReceiver) TrackID() string
Constants ¶
const ( RTPPaddingMaxPayloadSize = 255 RTPPaddingEstimatedHeaderSize = 20 RTPBlankFramesMax = 6 )
const ( InitialChannelCapacity = 100 * 1000 * 1000 // 100 Mbps EstimateEpsilon = 2000 // 2 kbps BoostPct = 8 BoostMinBps = 20 * 1000 // 20 kbps BoostMaxBps = 60 * 1000 // 60 kbps GratuitousProbeHeadroomBps = 1 * 1000 * 1000 // if headroom is more than 1 Mbps, don't probe GratuitousProbePct = 10 GratuitousProbeMaxBps = 300 * 1000 // 300 kbps GratuitousProbeMinDurationMs = 500 GratuitousProbeMaxDurationMs = 600 AudioLossWeight = 0.75 VideoLossWeight = 0.25 )
Variables ¶
var ( ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache") ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded") ErrDuplicatePacket = errors.New("duplicate packet") ErrPaddingNotOnFrameBoundary = errors.New("padding cannot send on non-frame boundary") ErrNotVP8 = errors.New("not VP8") ErrOutOfOrderVP8PictureIdCacheMiss = errors.New("out-of-order VP8 picture id not found in cache") ErrFilteredVP8TemporalLayer = errors.New("filtered VP8 temporal layer") ErrNoRequiredBuff = errors.New("buff size if less than required") )
var ( VP8KeyFrame1x1 = []byte{0x10, 0x02, 0x00, 0x9d, 0x01, 0x2a, 0x01, 0x00, 0x01, 0x00, 0x0b, 0xc7, 0x08, 0x85, 0x85, 0x88, 0x85, 0x84, 0x88, 0x3f, 0x82, 0x00, 0x0c, 0x0d, 0x60, 0x00, 0xfe, 0xe6, 0xb5, 0x00} H264KeyFrame2x2SPS = []byte{0x67, 0x42, 0xc0, 0x1f, 0x0f, 0xd9, 0x1f, 0x88, 0x88, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xc8, 0x3c, 0x60, 0xc9, 0x20} H264KeyFrame2x2PPS = []byte{0x68, 0x87, 0xcb, 0x83, 0xcb, 0x20} H264KeyFrame2x2IDR = []byte{0x65, 0x88, 0x84, 0x0a, 0xf2, 0x62, 0x80, 0x00, 0xa7, 0xbe} H264KeyFrame2x2 = [][]byte{H264KeyFrame2x2SPS, H264KeyFrame2x2PPS, H264KeyFrame2x2IDR} )
var ( ErrSpatialNotSupported = errors.New("current track does not support simulcast/SVC") ErrSpatialLayerNotFound = errors.New("the requested layer does not exist") )
var ( // LK-TODO-START // These constants will definitely require more tweaking. // In fact, simple time tresholded rules most proably will not be enough. // LK-TODO-END EstimateCommitMs = 2 * 1000 * time.Millisecond // 2 seconds ProbeWaitMs = 5 * 1000 * time.Millisecond // 5 seconds GratuitousProbeWaitMs = 8 * 1000 * time.Millisecond // 8 seconds BoostWaitMs = 3 * 1000 * time.Millisecond // 3 seconds )
var Logger logr.Logger = logr.Discard()
Logger is an implementation of logr.Logger. If is not provided - will be turned off.
Functions ¶
This section is empty.
Types ¶
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
func NewCluster ¶
func (*Cluster) GetSleepDuration ¶
func (*Cluster) PacketSent ¶
type DownTrack ¶
type DownTrack struct {
// contains filtered or unexported fields
}
DownTrack implements TrackLocal, is the track used to write packets to SFU Subscriber, the track handle the packets for simple, simulcast and SVC Publisher.
func NewDownTrack ¶
func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, peerID string, mt int) (*DownTrack, error)
NewDownTrack returns a DownTrack.
func (*DownTrack) AddReceiverReportListener ¶
func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener)
func (*DownTrack) AdjustAllocation ¶
func (*DownTrack) Bind ¶
func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error)
Bind is called by the PeerConnection after negotiation is complete This asserts that the code requested is supported by the remote peer. If so it setups all the state (SSRC and PayloadType) to have a call
func (*DownTrack) Codec ¶
func (d *DownTrack) Codec() webrtc.RTPCodecCapability
Codec returns current track codec capability
func (*DownTrack) CreateSenderReport ¶
func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport
func (*DownTrack) CreateSourceDescriptionChunks ¶
func (d *DownTrack) CreateSourceDescriptionChunks() []rtcp.SourceDescriptionChunk
func (*DownTrack) CurrentMaxLossFraction ¶
func (*DownTrack) CurrentSpatialLayer ¶
func (*DownTrack) ID ¶
ID is the unique identifier for this Track. This should be unique for the stream, but doesn't have to globally unique. A common example would be 'audio' or 'video' and StreamID would be 'desktop' or 'webcam'
func (*DownTrack) IncreaseAllocation ¶
func (*DownTrack) Kind ¶
func (d *DownTrack) Kind() webrtc.RTPCodecType
Kind controls if this TrackLocal is audio or video
func (*DownTrack) MaxSpatialLayer ¶
func (*DownTrack) MaybeTranslateVP8 ¶
func (*DownTrack) OnAvailableLayersChanged ¶
func (*DownTrack) OnCloseHandler ¶
func (d *DownTrack) OnCloseHandler(fn func())
OnCloseHandler method to be called on remote tracked removed
func (*DownTrack) OnPacketSent ¶
func (*DownTrack) OnREMB ¶
func (d *DownTrack) OnREMB(fn func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate))
func (*DownTrack) OnSubscriptionChanged ¶
func (*DownTrack) SetInitialLayers ¶
func (*DownTrack) SetRTPHeaderExtensions ¶
func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter)
Sets RTP header extensions for this track
func (*DownTrack) SetTransceiver ¶
func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver)
func (*DownTrack) SwitchSpatialLayer ¶
SwitchSpatialLayer switches the current layer
func (*DownTrack) SwitchSpatialLayerDone ¶
func (*DownTrack) SwitchTemporalLayer ¶
func (*DownTrack) TargetSpatialLayer ¶
func (*DownTrack) Unbind ¶
func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error
Unbind implements the teardown logic when the track is no longer needed. This happens because a track has been stopped.
func (*DownTrack) UpdateStats ¶
func (*DownTrack) UptrackLayersChange ¶
func (*DownTrack) WritePaddingRTP ¶
WritePaddingRTP tries to write as many padding only RTP packets as necessary to satisfy given size to the DownTrack
type DownTrackType ¶
type DownTrackType int
DownTrackType determines the type of track
const ( SimpleDownTrack DownTrackType = iota + 1 SimulcastDownTrack )
type Munger ¶
type Munger struct {
MungerParams
// contains filtered or unexported fields
}
func (*Munger) IsOnFrameBoundary ¶
func (*Munger) PacketDropped ¶
func (*Munger) SetLastSnTs ¶
func (*Munger) UpdateAndGetPaddingSnTs ¶
func (*Munger) UpdateAndGetSnTs ¶
type Prober ¶
type Prober struct {
// contains filtered or unexported fields
}
func (*Prober) AddCluster ¶
func (*Prober) OnSendProbe ¶
func (*Prober) PacketSent ¶
type Receiver ¶
type Receiver interface {
TrackID() string
StreamID() string
Codec() webrtc.RTPCodecParameters
Kind() webrtc.RTPCodecType
SSRC(layer int) uint32
SetTrackMeta(trackID, streamID string)
AddUpTrack(track *webrtc.TrackRemote, buffer *buffer.Buffer, bestQualityFirst bool)
AddDownTrack(track *DownTrack, bestQualityFirst bool)
SetUpTrackPaused(paused bool)
HasSpatialLayer(layer int32) bool
GetBitrate() [3]uint64
GetBitrateTemporal() [3][4]uint64
GetBitrateTemporalCumulative() [3][4]uint64
GetMaxTemporalLayer() [3]int32
RetransmitPackets(track *DownTrack, packets []packetMeta) error
DeleteDownTrack(peerID string)
OnCloseHandler(fn func())
SendRTCP(p []rtcp.Packet)
SetRTCPCh(ch chan []rtcp.Packet)
GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64)
DebugInfo() map[string]interface{}
}
Receiver defines a interface for a track receivers
func NewWebRTCReceiver ¶
func NewWebRTCReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, pid string, opts ...ReceiverOpts) Receiver
NewWebRTCReceiver creates a new webrtc track receivers
type ReceiverOpts ¶
type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver
func WithLoadBalanceThreshold ¶
func WithLoadBalanceThreshold(downTracks int) ReceiverOpts
WithLoadBalanceThreshold enables parallelization of packet writes when downTracks exceeds threshold Value should be between 3 and 150. For a server handling a few large rooms, use a smaller value (required to handle very large (250+ participant) rooms). For a server handling many small rooms, use a larger value or disable. Set to 0 (disabled) by default.
func WithPliThrottle ¶
func WithPliThrottle(period int64) ReceiverOpts
WithPliThrottle indicates minimum time(ms) between sending PLIs
func WithStreamTrackers ¶
func WithStreamTrackers() ReceiverOpts
WithStreamTrackers enables StreamTracker use for simulcast
type ReceiverReportListener ¶
type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport)
type SequenceNumberOrdering ¶
type SequenceNumberOrdering int
const ( SequenceNumberOrderingContiguous SequenceNumberOrdering = iota SequenceNumberOrderingOutOfOrder SequenceNumberOrderingGap SequenceNumberOrderingUnknown )
type StreamAllocator ¶
type StreamAllocator struct {
// contains filtered or unexported fields
}
LK-TODO add logger and log interesting events
func NewStreamAllocator ¶
func NewStreamAllocator() *StreamAllocator
func (*StreamAllocator) AddTrack ¶
func (s *StreamAllocator) AddTrack(downTrack *DownTrack)
func (*StreamAllocator) RemoveTrack ¶
func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack)
func (*StreamAllocator) Start ¶
func (s *StreamAllocator) Start()
func (*StreamAllocator) Stop ¶
func (s *StreamAllocator) Stop()
type StreamStatus ¶
type StreamStatus int32
const ( StreamStatusStopped StreamStatus = 0 StreamStatusActive StreamStatus = 1 )
func (StreamStatus) String ¶
func (s StreamStatus) String() string
type StreamTracker ¶
type StreamTracker struct {
// number of samples needed per cycle
SamplesRequired uint32
// number of cycles needed to be active
CyclesRequired uint64
CycleDuration time.Duration
OnStatusChanged func(StreamStatus)
// contains filtered or unexported fields
}
StreamTracker keeps track of packet flow and ensures a particular uptrack is consistently producing It runs its own goroutine for detection, and fires OnStatusChanged callback
func NewStreamTracker ¶
func NewStreamTracker() *StreamTracker
func (*StreamTracker) Observe ¶
func (s *StreamTracker) Observe(sn uint16)
Observe a packet that's received
func (*StreamTracker) SetPaused ¶
func (s *StreamTracker) SetPaused(paused bool)
func (*StreamTracker) Start ¶
func (s *StreamTracker) Start()
func (*StreamTracker) Status ¶
func (s *StreamTracker) Status() StreamStatus
func (*StreamTracker) Stop ¶
func (s *StreamTracker) Stop()
type Track ¶
type Track struct {
// contains filtered or unexported fields
}
func (*Track) AdjustAllocation ¶
func (*Track) BandwidthOptimal ¶
func (*Track) BandwidthRequested ¶
func (*Track) GetPacketStats ¶
func (t *Track) GetPacketStats() (webrtc.RTPCodecType, uint32, uint32)
func (*Track) IncreaseAllocation ¶
func (*Track) UpdatePacketStats ¶
func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport)
LK-TODO this should probably be maintained in downTrack and this module can query what it needs
func (*Track) WritePaddingRTP ¶
type VP8Munger ¶
type VP8Munger struct {
VP8MungerParams
// contains filtered or unexported fields
}
func NewVP8Munger ¶
func NewVP8Munger() *VP8Munger
func (*VP8Munger) UpdateAndGet ¶
func (*VP8Munger) UpdateAndGetPadding ¶
func (*VP8Munger) UpdateOffsets ¶
type VP8MungerParams ¶
type VP8MungerParams struct {
// contains filtered or unexported fields
}
VP8 munger
type VP8PictureIdWrapHandler ¶
type VP8PictureIdWrapHandler struct {
// contains filtered or unexported fields
}
func (*VP8PictureIdWrapHandler) Init ¶
func (v *VP8PictureIdWrapHandler) Init(extPictureId int32, mBit bool)
func (*VP8PictureIdWrapHandler) MaxPictureId ¶
func (v *VP8PictureIdWrapHandler) MaxPictureId() int32
func (*VP8PictureIdWrapHandler) Unwrap ¶
func (v *VP8PictureIdWrapHandler) Unwrap(pictureId uint16, mBit bool) (int32, bool)
unwrap picture id and update the maxPictureId. return unwrapped value, and whether picture id is newer
func (*VP8PictureIdWrapHandler) UpdateMaxPictureId ¶
func (v *VP8PictureIdWrapHandler) UpdateMaxPictureId(extPictureId int32, mBit bool)
type WebRTCReceiver ¶
type WebRTCReceiver struct {
// contains filtered or unexported fields
}
WebRTCReceiver receives a video track
func (*WebRTCReceiver) AddDownTrack ¶
func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool)
func (*WebRTCReceiver) AddUpTrack ¶
func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer, bestQualityFirst bool)
func (*WebRTCReceiver) Codec ¶
func (w *WebRTCReceiver) Codec() webrtc.RTPCodecParameters
func (*WebRTCReceiver) DebugInfo ¶
func (w *WebRTCReceiver) DebugInfo() map[string]interface{}
func (*WebRTCReceiver) DeleteDownTrack ¶
func (w *WebRTCReceiver) DeleteDownTrack(peerID string)
DeleteDownTrack removes a DownTrack from a Receiver
func (*WebRTCReceiver) GetBitrate ¶
func (w *WebRTCReceiver) GetBitrate() [3]uint64
func (*WebRTCReceiver) GetBitrateTemporal ¶
func (w *WebRTCReceiver) GetBitrateTemporal() [3][4]uint64
func (*WebRTCReceiver) GetBitrateTemporalCumulative ¶
func (w *WebRTCReceiver) GetBitrateTemporalCumulative() [3][4]uint64
func (*WebRTCReceiver) GetMaxTemporalLayer ¶
func (w *WebRTCReceiver) GetMaxTemporalLayer() [3]int32
func (*WebRTCReceiver) GetSenderReportTime ¶
func (w *WebRTCReceiver) GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64)
func (*WebRTCReceiver) HasSpatialLayer ¶
func (w *WebRTCReceiver) HasSpatialLayer(layer int32) bool
func (*WebRTCReceiver) Kind ¶
func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType
func (*WebRTCReceiver) OnCloseHandler ¶
func (w *WebRTCReceiver) OnCloseHandler(fn func())
OnCloseHandler method to be called on remote tracked removed
func (*WebRTCReceiver) RetransmitPackets ¶
func (w *WebRTCReceiver) RetransmitPackets(track *DownTrack, packets []packetMeta) error
func (*WebRTCReceiver) SSRC ¶
func (w *WebRTCReceiver) SSRC(layer int) uint32
func (*WebRTCReceiver) SendRTCP ¶
func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet)
func (*WebRTCReceiver) SetRTCPCh ¶
func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet)
func (*WebRTCReceiver) SetTrackMeta ¶
func (w *WebRTCReceiver) SetTrackMeta(trackID, streamID string)
func (*WebRTCReceiver) SetUpTrackPaused ¶
func (w *WebRTCReceiver) SetUpTrackPaused(paused bool)
SetUpTrackPaused indicates upstream will not be sending any data. this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off the layer
func (*WebRTCReceiver) StreamID ¶
func (w *WebRTCReceiver) StreamID() string
func (*WebRTCReceiver) TrackID ¶
func (w *WebRTCReceiver) TrackID() string