synchronizer

package
v2.16.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2026 License: Apache-2.0 Imports: 18 Imported by: 3

Documentation

Index

Constants

View Source
const (
	DefaultMaxTsDiff                    = time.Minute
	DefaultMaxDriftAdjustment           = 5 * time.Millisecond
	DefaultDriftAdjustmentWindowPercent = 0.0 // 0 -> disables throttling
	DefaultOldPacketThreshold           = 500 * time.Millisecond
)

Variables

View Source
var (
	ErrPacketOutOfOrder = errors.New("packet out-of-order")
	ErrPacketTooOld     = errors.New("packet too old")
)

Functions

This section is empty.

Types

type NtpEstimator added in v2.16.4

type NtpEstimator struct {
	// contains filtered or unexported fields
}

NtpEstimator maintains a linear regression over a sliding window of RTCP sender report pairs to map RTP timestamps to NTP time. It is modeled after Chrome's RtpToNtpEstimator.

func NewNtpEstimator added in v2.16.4

func NewNtpEstimator(clockRate uint32) *NtpEstimator

NewNtpEstimator creates an NtpEstimator for a codec with the given clock rate.

func (*NtpEstimator) IsReady added in v2.16.4

func (e *NtpEstimator) IsReady() bool

IsReady returns true once at least 2 sender reports have been processed and the regression is valid.

func (*NtpEstimator) OnSenderReport added in v2.16.4

func (e *NtpEstimator) OnSenderReport(ntpTime uint64, rtpTimestamp uint32, receivedAt time.Time) SRResult

OnSenderReport ingests a new RTCP sender report observation. ntpTime is the 64-bit NTP timestamp from the SR, rtpTimestamp is the corresponding RTP timestamp, and receivedAt is the local wall-clock time when the SR was received.

func (*NtpEstimator) Reset added in v2.16.4

func (e *NtpEstimator) Reset()

Reset clears all state, returning the estimator to its initial condition. Used when a stream discontinuity is detected (e.g., stream restart with a new RTP offset) and the old regression is no longer valid.

func (*NtpEstimator) RtpToNtp added in v2.16.4

func (e *NtpEstimator) RtpToNtp(rtpTimestamp uint32) (time.Time, error)

RtpToNtp maps an RTP timestamp to wall-clock time using the current regression.

func (*NtpEstimator) Slope added in v2.16.4

func (e *NtpEstimator) Slope() float64

Slope returns the regression slope: seconds of NTP time per RTP tick. For a perfect clock this equals 1/clockRate.

type ParticipantClock added in v2.16.4

type ParticipantClock struct {
	// contains filtered or unexported fields
}

ParticipantClock holds OWD and NTP estimation state for a single participant.

func NewParticipantClock added in v2.16.4

func NewParticipantClock(l logger.Logger) *ParticipantClock

NewParticipantClock creates a new ParticipantClock.

func (*ParticipantClock) HasTrack added in v2.16.4

func (pc *ParticipantClock) HasTrack(trackID string) bool

HasTrack returns true if the participant has a track with the given ID.

func (*ParticipantClock) OnSenderReport added in v2.16.4

func (pc *ParticipantClock) OnSenderReport(trackID string, clockRate uint32, ntpTime uint64, rtpTimestamp uint32, receivedAt time.Time)

OnSenderReport processes an RTCP sender report for a track. It updates the NTP estimator, OWD estimator, and records the NTP epoch.

func (*ParticipantClock) RemoveTrack added in v2.16.4

func (pc *ParticipantClock) RemoveTrack(trackID string)

RemoveTrack removes a track.

func (*ParticipantClock) ResetTrack added in v2.16.4

func (pc *ParticipantClock) ResetTrack(trackID string)

ResetTrack clears the NTP estimator for a track, forcing it to rebuild from new sender reports. Used when a stream discontinuity is detected.

func (*ParticipantClock) RtpToReceiverClock added in v2.16.4

func (pc *ParticipantClock) RtpToReceiverClock(trackID string, rtpTimestamp uint32) (time.Time, error)

RtpToReceiverClock maps an RTP timestamp to a time on the receiver's clock. The result is ntpTime + estimatedOWD, which places the sender's NTP time into the receiver's clock domain.

type SRResult added in v2.16.4

type SRResult int

SRResult indicates the outcome of processing a sender report.

const (
	SRAccepted SRResult = iota
	SRDuplicate
	SROutlier
)

type SenderReportSyncMode added in v2.16.2

type SenderReportSyncMode int
const (
	SenderReportSyncModeUnset SenderReportSyncMode = iota
	// SenderReportSyncModeWithoutRebase uses the legacy sender report path without
	// rebasing SR timestamps onto the local clock. This mode can still act on SR
	// drift unless combined with WithAudioPTSAdjustmentDisabled for audio tracks.
	SenderReportSyncModeWithoutRebase
	// SenderReportSyncModeRebase rebases sender reports onto the local clock and
	// applies drift using the gradual adjustment path.
	SenderReportSyncModeRebase
	// SenderReportSyncModeOneShot rebases sender reports onto the local clock but
	// applies drift only as threshold-triggered one-shot corrections.
	SenderReportSyncModeOneShot
)

func (SenderReportSyncMode) String added in v2.16.2

func (m SenderReportSyncMode) String() string

type SessionTimeline added in v2.16.4

type SessionTimeline struct {
	// contains filtered or unexported fields
}

SessionTimeline establishes a shared recording timeline and maps each participant's NTP clock domain onto it using OWD (one-way delay) normalization. This is the key component that fixes cross-participant misalignment.

Algorithm:

  1. Each SR provides a pair: (senderNtpTime, receivedAtWallClock). The difference is the one-way delay (OWD).
  2. Using the OWDEstimator, estimate each participant's OWD. The min observed OWD approximates true propagation delay.
  3. To map a participant's RTP timestamp to the session timeline: sessionPTS = ntpTime + estimatedOWD - sessionStart

func NewSessionTimeline added in v2.16.4

func NewSessionTimeline(l logger.Logger) *SessionTimeline

NewSessionTimeline creates a new SessionTimeline.

func (*SessionTimeline) AddParticipant added in v2.16.4

func (st *SessionTimeline) AddParticipant(participantID string) *ParticipantClock

AddParticipant registers a new participant with the given participantID.

func (*SessionTimeline) GetOrAddParticipant added in v2.16.4

func (st *SessionTimeline) GetOrAddParticipant(participantID string) *ParticipantClock

GetOrAddParticipant returns the ParticipantClock for the given participantID, creating one if it doesn't exist. This is safe for concurrent use.

func (*SessionTimeline) GetParticipantClock added in v2.16.4

func (st *SessionTimeline) GetParticipantClock(participantID string) *ParticipantClock

GetParticipantClock returns the ParticipantClock for a participant, or nil.

func (*SessionTimeline) GetSessionPTS added in v2.16.4

func (st *SessionTimeline) GetSessionPTS(participantID, trackID string, rtpTimestamp uint32) (time.Duration, error)

GetSessionPTS maps an RTP timestamp for a participant's track to a position on the shared session timeline.

The formula is: sessionPTS = ntpTime + estimatedOWD - sessionStart

func (*SessionTimeline) OnSenderReport added in v2.16.4

func (st *SessionTimeline) OnSenderReport(participantID, trackID string, clockRate uint32, ntpTime uint64, rtpTimestamp uint32, receivedAt time.Time)

OnSenderReport processes an RTCP sender report for a participant's track. It delegates to the ParticipantClock to update the NTP estimator, OWD estimator, and NTP epoch.

func (*SessionTimeline) RemoveParticipant added in v2.16.4

func (st *SessionTimeline) RemoveParticipant(participantID string)

RemoveParticipant removes the participant with the given participantID.

func (*SessionTimeline) ResetTrack added in v2.16.4

func (st *SessionTimeline) ResetTrack(participantID, trackID string)

ResetTrack clears the NTP estimator for a track, forcing it to rebuild from new sender reports. Used when a stream discontinuity is detected.

func (*SessionTimeline) SetSessionStart added in v2.16.4

func (st *SessionTimeline) SetSessionStart(t time.Time)

SetSessionStart sets the session start time (wall-clock time when the first packet of any track arrived at the receiver).

type Sync added in v2.16.4

type Sync interface {
	AddTrack(track TrackRemote, participantID string) TrackSync
	RemoveTrack(trackID string)
	OnRTCP(packet rtcp.Packet)
	End()
	GetStartedAt() int64
	GetEndedAt() int64
	SetMediaRunningTime(mediaRunningTime func() (time.Duration, bool))
}

Sync is the top-level synchronization interface. Implemented by both Synchronizer (legacy) and SyncEngine (new).

type SyncEngine added in v2.16.4

type SyncEngine struct {
	// contains filtered or unexported fields
}

SyncEngine orchestrates NtpEstimator, ParticipantClock, and SessionTimeline to provide cross-participant alignment and per-participant A/V lip sync. It implements the Sync interface.

func NewSyncEngine added in v2.16.4

func NewSyncEngine(opts ...SyncEngineOption) *SyncEngine

NewSyncEngine creates a new SyncEngine with the given options.

func (*SyncEngine) AddTrack added in v2.16.4

func (e *SyncEngine) AddTrack(track TrackRemote, participantID string) TrackSync

AddTrack registers a new track and returns a TrackSync handle.

func (*SyncEngine) End added in v2.16.4

func (e *SyncEngine) End()

End signals the end of the session and sets drain ceilings on all tracks.

func (*SyncEngine) GetEndedAt added in v2.16.4

func (e *SyncEngine) GetEndedAt() int64

GetEndedAt returns the end timestamp in nanoseconds, or 0 if not ended.

func (*SyncEngine) GetStartedAt added in v2.16.4

func (e *SyncEngine) GetStartedAt() int64

GetStartedAt returns the start timestamp in nanoseconds, or 0 if not started.

func (*SyncEngine) OnRTCP added in v2.16.4

func (e *SyncEngine) OnRTCP(packet rtcp.Packet)

OnRTCP processes an RTCP packet, dispatching sender reports to the appropriate track's NTP estimator and ParticipantClock.

func (*SyncEngine) RemoveTrack added in v2.16.4

func (e *SyncEngine) RemoveTrack(trackID string)

RemoveTrack removes a track by track ID.

func (*SyncEngine) SetMediaRunningTime added in v2.16.4

func (e *SyncEngine) SetMediaRunningTime(mediaRunningTime func() (time.Duration, bool))

SetMediaRunningTime sets the external media running time provider.

type SyncEngineOption added in v2.16.4

type SyncEngineOption func(*SyncEngine)

SyncEngineOption configures a SyncEngine.

func WithSyncEngineAudioDriftCompensated added in v2.16.4

func WithSyncEngineAudioDriftCompensated() SyncEngineOption

WithSyncEngineAudioDriftCompensated signals that audio drift is handled externally (e.g., by a tempo controller) and the sync engine should not apply NTP PTS corrections to audio tracks. NTP regression still runs for drift measurement and reporting.

func WithSyncEngineLogger added in v2.16.4

func WithSyncEngineLogger(l logger.Logger) SyncEngineOption

WithSyncEngineLogger sets the logger for the sync engine and all sub-components.

func WithSyncEngineMediaRunningTime added in v2.16.4

func WithSyncEngineMediaRunningTime(mediaRunningTime func() (time.Duration, bool), maxDelay time.Duration) SyncEngineOption

WithSyncEngineMediaRunningTime sets the initial media running time provider and max delay. If a track's PTS falls behind the deadline by more than maxDelay for >10s, PTS is force-corrected.

func WithSyncEngineOldPacketThreshold added in v2.16.4

func WithSyncEngineOldPacketThreshold(d time.Duration) SyncEngineOption

WithSyncEngineOldPacketThreshold sets the age after which packets are dropped. Zero disables the check.

func WithSyncEngineOnStarted added in v2.16.4

func WithSyncEngineOnStarted(f func()) SyncEngineOption

WithSyncEngineOnStarted sets a callback invoked once the first track is initialized.

func WithSyncEngineStartGate added in v2.16.4

func WithSyncEngineStartGate() SyncEngineOption

WithSyncEngineStartGate enables the burst-estimation start gate on all tracks.

type Synchronizer

type Synchronizer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

a single Synchronizer is shared between all audio and video writers

func NewSynchronizer

func NewSynchronizer(onStarted func()) *Synchronizer

func NewSynchronizerWithOptions added in v2.10.0

func NewSynchronizerWithOptions(opts ...SynchronizerOption) *Synchronizer

func (*Synchronizer) AddTrack

func (s *Synchronizer) AddTrack(track TrackRemote, participantID string) *TrackSynchronizer

func (*Synchronizer) AsSyncInterface added in v2.16.4

func (s *Synchronizer) AsSyncInterface() Sync

AsSyncInterface returns a Sync-compatible wrapper around this Synchronizer.

func (*Synchronizer) End

func (s *Synchronizer) End()

func (*Synchronizer) GetEndedAt

func (s *Synchronizer) GetEndedAt() int64

func (*Synchronizer) GetStartedAt

func (s *Synchronizer) GetStartedAt() int64

func (*Synchronizer) OnRTCP

func (s *Synchronizer) OnRTCP(packet rtcp.Packet)

OnRTCP syncs a/v using sender reports

func (*Synchronizer) RemoveTrack

func (s *Synchronizer) RemoveTrack(trackID string)

func (*Synchronizer) SetMediaRunningTime added in v2.12.0

func (s *Synchronizer) SetMediaRunningTime(mediaRunningTime func() (time.Duration, bool))

SetMediaRunningTime updates the external media running time provider after the synchronizer has been created. Passing a nil provider clears the configuration.

type SynchronizerAdapter added in v2.16.4

type SynchronizerAdapter struct {
	*Synchronizer
}

SynchronizerAdapter wraps the legacy Synchronizer to implement the Sync interface. The Synchronizer's own AddTrack returns *TrackSynchronizer (concrete type); this adapter's AddTrack returns TrackSync so that *SynchronizerAdapter satisfies Sync.

func (*SynchronizerAdapter) AddTrack added in v2.16.4

func (a *SynchronizerAdapter) AddTrack(track TrackRemote, participantID string) TrackSync

type SynchronizerConfig added in v2.10.0

type SynchronizerConfig struct {
	MaxTsDiff                    time.Duration
	MaxDriftAdjustment           time.Duration
	DriftAdjustmentWindowPercent float64
	SenderReportSyncMode         SenderReportSyncMode
	// Legacy compatibility control for audio in SenderReportSyncModeWithoutRebase.
	AudioPTSAdjustmentDisabled bool
	// Deprecated: use SenderReportSyncMode instead.
	RTCPSenderReportRebaseEnabled bool
	OldPacketThreshold            time.Duration
	EnableStartGate               bool

	OneShotDriftCorrectionThreshold time.Duration

	OnStarted func()

	MediaRunningTime         func() (time.Duration, bool)
	MaxMediaRunningTimeDelay time.Duration
}

SynchronizerConfig holds configuration for the Synchronizer

type SynchronizerOption added in v2.10.0

type SynchronizerOption func(*SynchronizerConfig)

func WithAudioPTSAdjustmentDisabled added in v2.11.3

func WithAudioPTSAdjustmentDisabled() SynchronizerOption

WithAudioPTSAdjustmentDisabled - disables auto PTS adjustments after sender reports Use case: when media processing pipeline needs stable - monotonically increasing PTS sequence - small adjustments coming from RTCP sender reports could cause gaps in the audio Media processing pipeline could opt out of auto PTS adjustments and handle the gap by e.g modifying tempo to compensate instead.

This option is still required if you want the legacy SenderReportSyncModeWithoutRebase path but do not want audio SR drift to drive gradual PTS offset changes.

func WithDriftAdjustmentWindowPercent added in v2.12.0

func WithDriftAdjustmentWindowPercent(driftAdjustmentWindowPercent float64) SynchronizerOption

WithDriftAdjustmentWinddowPercent controls throttling of how often drift adjustments are applied Throttles PTS adjustment to a limited amount in a time window. This setting determines how long a certain amount of adjustment throttles the next adjustment.

For example, if a 1ms adjustment is appied at 1%, it means that 1ms is 1% of ajustment window, so the adjustment window is 100ms and next adjustment will not be applied till that time elapses

With the settings of 5ms adjustment at 5%, a mamximum adjustment of 5ms per 100ms

func WithMaxDriftAdjustment added in v2.12.0

func WithMaxDriftAdjustment(maxDriftAdjustment time.Duration) SynchronizerOption

WithMaxDriftAdjustment sets the maximum drift adjustment applied at a time

func WithMaxTsDiff added in v2.10.0

func WithMaxTsDiff(maxTsDiff time.Duration) SynchronizerOption

WithMaxTsDiff sets the maximum acceptable difference between RTP packets In case exceeded the synchronizer will adjust the PTS offset to keep the audio and video in sync

func WithMediaRunningTime added in v2.12.0

func WithMediaRunningTime(mediaRunningTime func() (time.Duration, bool), maxMediaRunningTimeDelay time.Duration) SynchronizerOption

WithMediaRunningTime sets the callback to be called to get the media running time maxMediaRunningTimeDelay is the maximum allowed latency a packet can be behind the media running time

func WithOldPacketThreshold added in v2.12.0

func WithOldPacketThreshold(oldPacketThreshold time.Duration) SynchronizerOption

WithOldPacketThreshold sets the threshold at which a packet is considered old

func WithOnStarted added in v2.10.0

func WithOnStarted(onStarted func()) SynchronizerOption

WithOnStarted sets the callback to be called when the synchronizer starts

func WithOneShotDriftCorrectionThreshold added in v2.16.2

func WithOneShotDriftCorrectionThreshold(threshold time.Duration) SynchronizerOption

WithOneShotDriftCorrectionThreshold sets the threshold for one-shot PTS correction. It is used only when SenderReportSyncMode is SenderReportSyncModeOneShot. In that mode, OWD-normalized drift from RTCP Sender Reports is monitored, but gradual PTS adjustments are suppressed. Instead, a one-shot PTS correction is applied when the drift reaches this threshold.

This is useful for pipelines with e.g live audio mixer where:

  • Gradual PTS adjustments cause audible gaps (so they must be disabled)
  • But the input stream may have unsignalled timing drift (e.g. SIP silence suppression) that needs correction before the track falls out of the mixer's live window

The corrected PTS is sanity-checked against the media live window to reject bogus SRs.

func WithRTCPSenderReportRebaseEnabled deprecated added in v2.12.0

func WithRTCPSenderReportRebaseEnabled() SynchronizerOption

Deprecated: use WithSenderReportSyncMode(SenderReportSyncModeRebase) instead.

WithRTCPSenderReportRebaseEnabled - enables rebasing RTCP Sender Report to local clock

func WithSenderReportSyncMode added in v2.16.2

func WithSenderReportSyncMode(mode SenderReportSyncMode) SynchronizerOption

WithSenderReportSyncMode explicitly selects how RTCP sender report drift is measured and applied.

func WithStartGate added in v2.12.0

func WithStartGate() SynchronizerOption

WithStartGate enabled will buffer incoming packets until pacing stabilizes before initializing tracks

type TrackRemote

type TrackRemote interface {
	ID() string
	Codec() webrtc.RTPCodecParameters
	Kind() webrtc.RTPCodecType
	SSRC() webrtc.SSRC
}

type TrackSync added in v2.16.4

type TrackSync interface {
	PrimeForStart(pkt jitter.ExtPacket) ([]jitter.ExtPacket, int, bool)
	GetPTS(pkt jitter.ExtPacket) (time.Duration, error)
	OnSenderReport(f func(drift time.Duration))
	LastPTSAdjusted() time.Duration
	Close()
}

TrackSync is the per-track synchronization interface. Implemented by both TrackSynchronizer (legacy) and syncEngineTrack (new).

type TrackSynchronizer

type TrackSynchronizer struct {
	sync.Mutex

	*rtputil.RTPConverter
	// contains filtered or unexported fields
}

func (*TrackSynchronizer) Close added in v2.12.0

func (t *TrackSynchronizer) Close()

func (*TrackSynchronizer) GetPTS

GetPTS will adjust PTS offsets if necessary Packets are expected to be in order

func (*TrackSynchronizer) Initialize

func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet)

Initialize should be called as soon as the first packet is received

func (*TrackSynchronizer) LastPTSAdjusted added in v2.16.2

func (t *TrackSynchronizer) LastPTSAdjusted() time.Duration

func (*TrackSynchronizer) MarshalLogObject added in v2.12.0

func (t *TrackSynchronizer) MarshalLogObject(e zapcore.ObjectEncoder) error

func (*TrackSynchronizer) OnSenderReport added in v2.8.2

func (t *TrackSynchronizer) OnSenderReport(f func(drift time.Duration))

func (*TrackSynchronizer) PrimeForStart added in v2.12.0

func (t *TrackSynchronizer) PrimeForStart(pkt jitter.ExtPacket) ([]jitter.ExtPacket, int, bool)

PrimeForStart buffers incoming packets until pacing stabilizes, initializing the track synchronizer automatically once a suitable sequence has been observed. It returns the packets that should be forwarded, the number of packets dropped while waiting, and a boolean indicating whether the track is ready to process samples. After the gate finishes, there is no need to call the API again.

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL