Documentation
¶
Overview ¶
Design of Prober
Probing is used to check for existence of excess channel capacity. This is especially useful in the downstream direction of SFU. SFU forwards audio/video streams from one or more publishers to all the subscribers. But, the downstream channel of a subscriber may not be big enough to carry all the streams. It is also a time varying quantity.
When there is not enough capacity, some streams will be paused. To resume a stream, SFU would need to know that the channel has enough capacity. That's where probing comes in. When conditions are favorable, SFU can send probe packets so that the bandwidth estimator has more data to estimate available channel capacity better. NOTE: What defines `favorable conditions` is implementation dependent.
There are two options for probing
- Use padding only RTP packets: This one is preferable as probe rate can be controlled more tightly.
- Resume a paused stream or forward a higher spatial layer: Have to find a stream at probing rate. Also, a stream could get a key frame unexpectedly boosting rate in the probing window.
The strategy used depends on stream allocator implementation. This module can be used if the stream allocator decides to use padding only RTP packets for probing purposes.
Implementation: There are a couple of options
- Check prober in the forwarding path (pull from prober). This is preferred for scalability reasons. But, this suffers from not being able to probe when all streams are paused (could be due to downstream bandwidth constraints or the corresponding upstream tracks may have paused due to upstream bandwidth constraints). Another issue is not being to have tight control on probing window boundary as the packet forwarding path may not have a packet to forward. But, it should not be a major concern as long as some stream(s) is/are forwarded as there should be a packet at least every 60 ms or so (forwarding only one stream at 15 fps). Usually, it will be serviced much more frequently when there are multiple streams getting forwarded.
- Run it a go routine. But, that would have to wake up very often to prevent bunching up of probe packets. So, a scalability concern as there is one prober per subscriber peer connection. But, probe windows should be very short (of the order of 100s of ms). So, this approach might be fine.
The implementation here follows the second approach of using a go routine.
Pacing: ------ Ideally, the subscriber peer connection should have a pacer which trickles data out at the estimated channel capacity rate (and estimated channel capacity + probing rate when actively probing).
But, there a few significant challenges
- Pacer will require buffering of forwarded packets. That means more memory, more CPU (have to make copy of packets) and more latency in the media stream.
- Scalability concern as SFU may be handling hundreds of subscriber peer connections and each one processing the pacing loop at 5ms interval will add up.
So, this module assumes that pacing is inherently provided by the publishers for media streams. That is a reasonable assumption given that publishing clients will run their own pacer and pacing data out at a steady rate.
A further assumption is that if there are multiple publishers for a subscriber peer connection, all the publishers are not pacing in sync, i.e. each publisher's pacer is completely independent and SFU will be receiving the media packets with a good spread and not clumped together.
Given those assumptions, this module monitors media send rate and adjusts probing packet sends accordingly. Although the probing may have a high enough wake up frequency, it is for short windows. For example, probing at 5 Mbps for 1/2 second and sending 1000 byte probe per iteration will wake up every 1.6 ms. That is very high, but should last for 1/2 second or so.
5 Mbps over 1/2 second = 2.5 Mbps 2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes 313 probes over 1/2 second = 1.6 ms between probes
A few things to note
- When a probe cluster is added, the expected media rate is provided. So, the wake-up interval takes that into account. For example, if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected to be provided by media traffic, the wake-up interval becomes 8 ms.
- The amount of probing should actually be capped at some value to avoid too much self-induced congestion. It maybe something like 500 kbps. That will increase the wake-up interval to 16 ms in the above example.
- In practice, the probing interval may also be shorter. Typically, it can be run for 2 - 3 RTTs to get a good measurement. For the longest hauls, RTT could be 250 ms or so leading to the probing window being long(ish). But, RTT should be much shorter especially if the subscriber peer connection of the client is able to connect to the nearest data center.
Index ¶
- Constants
- Variables
- type AddTrackParams
- type ChannelCongestionReason
- type ChannelObserver
- func (c *ChannelObserver) AddEstimate(estimate int64)
- func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32)
- func (c *ChannelObserver) GetHighestEstimate() int64
- func (c *ChannelObserver) GetLowestEstimate() int64
- func (c *ChannelObserver) GetNackRatio() (uint32, uint32, float64)
- func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason)
- func (c *ChannelObserver) SeedEstimate(estimate int64)
- func (c *ChannelObserver) SeedNack(packets uint32, repeatedNacks uint32)
- type ChannelObserverParams
- type ChannelTrend
- type Cluster
- func (c *Cluster) GetInfo() ProbeClusterInfo
- func (c *Cluster) GetSleepDuration() time.Duration
- func (c *Cluster) IsFinished() bool
- func (c *Cluster) PacketsSent(size int)
- func (c *Cluster) ProbeSent(size int)
- func (c *Cluster) Process(pl ProberListener)
- func (c *Cluster) Start()
- func (c *Cluster) String() string
- type Event
- type MaxDistanceSorter
- type MinDistanceSorter
- type ProbeClusterId
- type ProbeClusterInfo
- type Prober
- func (p *Prober) AddCluster(desiredRateBps int, expectedRateBps int, minDuration time.Duration, ...) ProbeClusterId
- func (p *Prober) IsRunning() bool
- func (p *Prober) PacketsSent(size int)
- func (p *Prober) ProbeSent(size int)
- func (p *Prober) Reset()
- func (p *Prober) SetProberListener(listener ProberListener)
- type ProberListener
- type ProberParams
- type StreamAllocator
- func (s *StreamAllocator) AddTrack(downTrack *sfu.DownTrack, params AddTrackParams)
- func (s *StreamAllocator) OnActiveChanged(isActive bool)
- func (s *StreamAllocator) OnAvailableLayersChanged(downTrack *sfu.DownTrack)
- func (s *StreamAllocator) OnBitrateAvailabilityChanged(downTrack *sfu.DownTrack)
- func (s *StreamAllocator) OnMaxPublishedLayerChanged(downTrack *sfu.DownTrack)
- func (s *StreamAllocator) OnPacketsSent(downTrack *sfu.DownTrack, size int)
- func (s *StreamAllocator) OnProbeClusterDone(info ProbeClusterInfo)
- func (s *StreamAllocator) OnREMB(downTrack *sfu.DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate)
- func (s *StreamAllocator) OnSendProbe(bytesToSend int)
- func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error)
- func (s *StreamAllocator) OnSubscribedLayersChanged(downTrack *sfu.DownTrack, layers buffer.VideoLayer)
- func (s *StreamAllocator) OnSubscriptionChanged(downTrack *sfu.DownTrack)
- func (s *StreamAllocator) OnTargetLayerReached(downTrack *sfu.DownTrack)
- func (s *StreamAllocator) OnTransportCCFeedback(downTrack *sfu.DownTrack, fb *rtcp.TransportLayerCC)
- func (s *StreamAllocator) RemoveTrack(downTrack *sfu.DownTrack)
- func (s *StreamAllocator) SetBandwidthEstimator(bwe cc.BandwidthEstimator)
- func (s *StreamAllocator) SetTrackPriority(downTrack *sfu.DownTrack, priority uint8)
- func (s *StreamAllocator) Start()
- func (s *StreamAllocator) Stop()
- type StreamAllocatorParams
- type StreamState
- type StreamStateInfo
- type StreamStateUpdate
- type Track
- func (t *Track) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (sfu.VideoAllocation, bool)
- func (t *Track) AllocateOptimal(allowOvershoot bool) sfu.VideoAllocation
- func (t *Track) BandwidthRequested() int64
- func (t *Track) DistanceToDesired() float64
- func (t *Track) DownTrack() *sfu.DownTrack
- func (t *Track) GetNackDelta() (uint32, uint32)
- func (t *Track) GetNextHigherTransition(allowOvershoot bool) (sfu.VideoTransition, bool)
- func (t *Track) ID() livekit.TrackID
- func (t *Track) IsDeficient() bool
- func (t *Track) IsManaged() bool
- func (t *Track) Pause() sfu.VideoAllocation
- func (t *Track) Priority() uint8
- func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, ...) int64
- func (t *Track) ProvisionalAllocateCommit() sfu.VideoAllocation
- func (t *Track) ProvisionalAllocateGetBestWeightedTransition() sfu.VideoTransition
- func (t *Track) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) sfu.VideoTransition
- func (t *Track) ProvisionalAllocatePrepare()
- func (t *Track) PublisherID() livekit.ParticipantID
- func (t *Track) SetDirty(isDirty bool) bool
- func (t *Track) SetMaxLayers(layers buffer.VideoLayer) bool
- func (t *Track) SetPaused(isPaused bool) bool
- func (t *Track) SetPriority(priority uint8) bool
- func (t *Track) WritePaddingRTP(bytesToSend int) int
- type TrackSorter
- type TrendDetector
- func (t *TrendDetector) AddValue(value int64)
- func (t *TrendDetector) GetDirection() TrendDirection
- func (t *TrendDetector) GetHighest() int64
- func (t *TrendDetector) GetLowest() int64
- func (t *TrendDetector) GetValues() []int64
- func (t *TrendDetector) Seed(value int64)
- func (t *TrendDetector) ToString() string
- type TrendDetectorParams
- type TrendDirection
Constants ¶
const ( ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate ProbeWaitBase = 5 * time.Second ProbeBackoffFactor = 1.5 ProbeWaitMax = 30 * time.Second ProbeSettleWait = 250 ProbeTrendWait = 2 * time.Second ProbePct = 120 ProbeMinBps = 200 * 1000 // 200 kbps ProbeMinDuration = 20 * time.Second ProbeMaxDuration = 21 * time.Second PriorityMin = uint8(1) PriorityMax = uint8(255) PriorityDefaultVideo = PriorityMin FlagAllowOvershootWhileOptimal = true FlagAllowOvershootWhileDeficient = false FlagAllowOvershootExemptTrackWhileDeficient = true FlagAllowOvershootInProbe = true FlagAllowOvershootInCatchup = true )
Variables ¶
var ( ChannelObserverParamsProbe = ChannelObserverParams{ Name: "probe", EstimateRequiredSamples: 3, EstimateDownwardTrendThreshold: 0.0, EstimateCollapseValues: false, NackWindowMinDuration: 500 * time.Millisecond, NackWindowMaxDuration: 1 * time.Second, NackRatioThreshold: 0.04, } ChannelObserverParamsNonProbe = ChannelObserverParams{ Name: "non-probe", EstimateRequiredSamples: 8, EstimateDownwardTrendThreshold: -0.5, EstimateCollapseValues: true, NackWindowMinDuration: 1 * time.Second, NackWindowMaxDuration: 2 * time.Second, NackRatioThreshold: 0.08, } )
Functions ¶
This section is empty.
Types ¶
type AddTrackParams ¶
type AddTrackParams struct {
Source livekit.TrackSource
Priority uint8
IsSimulcast bool
PublisherID livekit.ParticipantID
}
type ChannelCongestionReason ¶
type ChannelCongestionReason int
const ( ChannelCongestionReasonNone ChannelCongestionReason = iota ChannelCongestionReasonEstimate ChannelCongestionReasonLoss )
func (ChannelCongestionReason) String ¶
func (c ChannelCongestionReason) String() string
type ChannelObserver ¶
type ChannelObserver struct {
// contains filtered or unexported fields
}
func NewChannelObserver ¶
func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *ChannelObserver
func (*ChannelObserver) AddEstimate ¶
func (c *ChannelObserver) AddEstimate(estimate int64)
func (*ChannelObserver) AddNack ¶
func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32)
func (*ChannelObserver) GetHighestEstimate ¶
func (c *ChannelObserver) GetHighestEstimate() int64
func (*ChannelObserver) GetLowestEstimate ¶
func (c *ChannelObserver) GetLowestEstimate() int64
func (*ChannelObserver) GetNackRatio ¶
func (c *ChannelObserver) GetNackRatio() (uint32, uint32, float64)
func (*ChannelObserver) GetTrend ¶
func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason)
func (*ChannelObserver) SeedEstimate ¶
func (c *ChannelObserver) SeedEstimate(estimate int64)
func (*ChannelObserver) SeedNack ¶
func (c *ChannelObserver) SeedNack(packets uint32, repeatedNacks uint32)
type ChannelObserverParams ¶
type ChannelTrend ¶
type ChannelTrend int
const ( ChannelTrendNeutral ChannelTrend = iota ChannelTrendClearing ChannelTrendCongesting )
func (ChannelTrend) String ¶
func (c ChannelTrend) String() string
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
func NewCluster ¶
func (*Cluster) GetInfo ¶
func (c *Cluster) GetInfo() ProbeClusterInfo
func (*Cluster) GetSleepDuration ¶
func (*Cluster) IsFinished ¶
func (*Cluster) PacketsSent ¶
func (*Cluster) Process ¶
func (c *Cluster) Process(pl ProberListener)
type MaxDistanceSorter ¶
type MaxDistanceSorter []*Track
func (MaxDistanceSorter) Len ¶
func (m MaxDistanceSorter) Len() int
func (MaxDistanceSorter) Less ¶
func (m MaxDistanceSorter) Less(i, j int) bool
func (MaxDistanceSorter) Swap ¶
func (m MaxDistanceSorter) Swap(i, j int)
type MinDistanceSorter ¶
type MinDistanceSorter []*Track
func (MinDistanceSorter) Len ¶
func (m MinDistanceSorter) Len() int
func (MinDistanceSorter) Less ¶
func (m MinDistanceSorter) Less(i, j int) bool
func (MinDistanceSorter) Swap ¶
func (m MinDistanceSorter) Swap(i, j int)
type ProbeClusterInfo ¶
type ProbeClusterInfo struct {
Id ProbeClusterId
BytesSent int
Duration time.Duration
}
type Prober ¶
type Prober struct {
// contains filtered or unexported fields
}
func NewProber ¶
func NewProber(params ProberParams) *Prober
func (*Prober) AddCluster ¶
func (*Prober) PacketsSent ¶
func (*Prober) SetProberListener ¶
func (p *Prober) SetProberListener(listener ProberListener)
type ProberListener ¶
type ProberListener interface {
OnSendProbe(bytesToSend int)
OnProbeClusterDone(info ProbeClusterInfo)
OnActiveChanged(isActive bool)
}
type ProberParams ¶
type StreamAllocator ¶
type StreamAllocator struct {
// contains filtered or unexported fields
}
func NewStreamAllocator ¶
func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator
func (*StreamAllocator) AddTrack ¶
func (s *StreamAllocator) AddTrack(downTrack *sfu.DownTrack, params AddTrackParams)
func (*StreamAllocator) OnActiveChanged ¶
func (s *StreamAllocator) OnActiveChanged(isActive bool)
called when prober active state changes
func (*StreamAllocator) OnAvailableLayersChanged ¶
func (s *StreamAllocator) OnAvailableLayersChanged(downTrack *sfu.DownTrack)
called when feeding track's layer availability changes
func (*StreamAllocator) OnBitrateAvailabilityChanged ¶
func (s *StreamAllocator) OnBitrateAvailabilityChanged(downTrack *sfu.DownTrack)
called when feeding track's bitrate measurement of any layer is available
func (*StreamAllocator) OnMaxPublishedLayerChanged ¶
func (s *StreamAllocator) OnMaxPublishedLayerChanged(downTrack *sfu.DownTrack)
called when feeding track's max publisher layer changes
func (*StreamAllocator) OnPacketsSent ¶
func (s *StreamAllocator) OnPacketsSent(downTrack *sfu.DownTrack, size int)
called when a video DownTrack sends a packet
func (*StreamAllocator) OnProbeClusterDone ¶
func (s *StreamAllocator) OnProbeClusterDone(info ProbeClusterInfo)
called when prober wants to send packet(s)
func (*StreamAllocator) OnREMB ¶
func (s *StreamAllocator) OnREMB(downTrack *sfu.DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate)
called when a new REMB is received (receive side bandwidth estimation)
func (*StreamAllocator) OnSendProbe ¶
func (s *StreamAllocator) OnSendProbe(bytesToSend int)
called when prober wants to send packet(s)
func (*StreamAllocator) OnStreamStateChange ¶
func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error)
func (*StreamAllocator) OnSubscribedLayersChanged ¶
func (s *StreamAllocator) OnSubscribedLayersChanged(downTrack *sfu.DownTrack, layers buffer.VideoLayer)
called when subscribed layers changes (limiting max layers)
func (*StreamAllocator) OnSubscriptionChanged ¶
func (s *StreamAllocator) OnSubscriptionChanged(downTrack *sfu.DownTrack)
called when subscription settings changes (muting/unmuting of track)
func (*StreamAllocator) OnTargetLayerReached ¶
func (s *StreamAllocator) OnTargetLayerReached(downTrack *sfu.DownTrack)
called when forwarder finds a target layer
func (*StreamAllocator) OnTransportCCFeedback ¶
func (s *StreamAllocator) OnTransportCCFeedback(downTrack *sfu.DownTrack, fb *rtcp.TransportLayerCC)
called when a new transport-cc feedback is received
func (*StreamAllocator) RemoveTrack ¶
func (s *StreamAllocator) RemoveTrack(downTrack *sfu.DownTrack)
func (*StreamAllocator) SetBandwidthEstimator ¶
func (s *StreamAllocator) SetBandwidthEstimator(bwe cc.BandwidthEstimator)
func (*StreamAllocator) SetTrackPriority ¶
func (s *StreamAllocator) SetTrackPriority(downTrack *sfu.DownTrack, priority uint8)
func (*StreamAllocator) Start ¶
func (s *StreamAllocator) Start()
func (*StreamAllocator) Stop ¶
func (s *StreamAllocator) Stop()
type StreamAllocatorParams ¶
type StreamAllocatorParams struct {
Config config.CongestionControlConfig
Logger logger.Logger
}
type StreamState ¶
type StreamState int
const ( StreamStateActive StreamState = iota StreamStatePaused )
func (StreamState) String ¶
func (s StreamState) String() string
type StreamStateInfo ¶
type StreamStateInfo struct {
ParticipantID livekit.ParticipantID
TrackID livekit.TrackID
State StreamState
}
type StreamStateUpdate ¶
type StreamStateUpdate struct {
StreamStates []*StreamStateInfo
}
func NewStreamStateUpdate ¶
func NewStreamStateUpdate() *StreamStateUpdate
func (*StreamStateUpdate) Empty ¶
func (s *StreamStateUpdate) Empty() bool
func (*StreamStateUpdate) HandleStreamingChange ¶
func (s *StreamStateUpdate) HandleStreamingChange(isPaused bool, track *Track)
type Track ¶
type Track struct {
// contains filtered or unexported fields
}
func NewTrack ¶
func NewTrack( downTrack *sfu.DownTrack, source livekit.TrackSource, isSimulcast bool, publisherID livekit.ParticipantID, logger logger.Logger, ) *Track
func (*Track) AllocateNextHigher ¶
func (*Track) AllocateOptimal ¶
func (t *Track) AllocateOptimal(allowOvershoot bool) sfu.VideoAllocation
func (*Track) BandwidthRequested ¶
func (*Track) DistanceToDesired ¶
func (*Track) GetNackDelta ¶
func (*Track) GetNextHigherTransition ¶
func (t *Track) GetNextHigherTransition(allowOvershoot bool) (sfu.VideoTransition, bool)
func (*Track) IsDeficient ¶
func (*Track) Pause ¶
func (t *Track) Pause() sfu.VideoAllocation
func (*Track) ProvisionalAllocate ¶
func (*Track) ProvisionalAllocateCommit ¶
func (t *Track) ProvisionalAllocateCommit() sfu.VideoAllocation
func (*Track) ProvisionalAllocateGetBestWeightedTransition ¶
func (t *Track) ProvisionalAllocateGetBestWeightedTransition() sfu.VideoTransition
func (*Track) ProvisionalAllocateGetCooperativeTransition ¶
func (t *Track) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) sfu.VideoTransition
func (*Track) ProvisionalAllocatePrepare ¶
func (t *Track) ProvisionalAllocatePrepare()
func (*Track) PublisherID ¶
func (t *Track) PublisherID() livekit.ParticipantID
func (*Track) SetMaxLayers ¶
func (t *Track) SetMaxLayers(layers buffer.VideoLayer) bool
func (*Track) SetPriority ¶
func (*Track) WritePaddingRTP ¶
type TrackSorter ¶
type TrackSorter []*Track
func (TrackSorter) Len ¶
func (t TrackSorter) Len() int
func (TrackSorter) Less ¶
func (t TrackSorter) Less(i, j int) bool
func (TrackSorter) Swap ¶
func (t TrackSorter) Swap(i, j int)
type TrendDetector ¶
type TrendDetector struct {
// contains filtered or unexported fields
}
func NewTrendDetector ¶
func NewTrendDetector(params TrendDetectorParams) *TrendDetector
func (*TrendDetector) AddValue ¶
func (t *TrendDetector) AddValue(value int64)
func (*TrendDetector) GetDirection ¶
func (t *TrendDetector) GetDirection() TrendDirection
func (*TrendDetector) GetHighest ¶
func (t *TrendDetector) GetHighest() int64
func (*TrendDetector) GetLowest ¶
func (t *TrendDetector) GetLowest() int64
func (*TrendDetector) GetValues ¶
func (t *TrendDetector) GetValues() []int64
func (*TrendDetector) Seed ¶
func (t *TrendDetector) Seed(value int64)
func (*TrendDetector) ToString ¶
func (t *TrendDetector) ToString() string
type TrendDetectorParams ¶
type TrendDirection ¶
type TrendDirection int
const ( TrendDirectionNeutral TrendDirection = iota TrendDirectionUpward TrendDirectionDownward )
func (TrendDirection) String ¶
func (t TrendDirection) String() string