Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CopyReader ¶ added in v0.0.20
type CopyReader struct {
// contains filtered or unexported fields
}
CopyReader implements ingestor.PacketReader for `copy://<upstream>` URLs.
One of two modes is selected at construction based on the upstream shape:
- sub != nil: single-stream mode — subscribe to upstream's main buffer (AVPackets); ReadPackets reads sub.Recv() directly.
- abrInner != nil: ABR mode — best rendition buffer is wrapped through a TS demuxer; ReadPackets delegates to abrInner.ReadPackets.
func NewCopyReader ¶ added in v0.0.20
func NewCopyReader(input domain.Input, bufSvc *buffer.Service, lookup StreamLookup) (*CopyReader, error)
NewCopyReader constructs a CopyReader from a copy:// input. Errors:
- URL malformed (`copy://` grammar violation) — propagates protocol error
- upstream missing in lookup — runtime error; coordinator surfaces it as input degraded so the manager can switch to a fallback
Mode selection:
- upstream is single-stream → subscribe upstream's main buffer (AVPackets)
- upstream is ABR → subscribe upstream's BEST rendition buffer (TS bytes) and demux to AVPackets via TSDemuxPacketReader. Coordinator only routes this case here when downstream has its own transcoder; the no-transcoder ABR-copy mirror path is handled by Coordinator.startABRCopy.
func (*CopyReader) Close ¶ added in v0.0.20
func (r *CopyReader) Close() error
Close unsubscribes. Idempotent — safe to call from multiple goroutines.
func (*CopyReader) Open ¶ added in v0.0.20
func (r *CopyReader) Open(ctx context.Context) error
Open subscribes to the upstream buffer. Returns an error when the buffer doesn't exist yet (upstream not started, or torn down).
func (*CopyReader) ReadPackets ¶ added in v0.0.20
ReadPackets blocks until one upstream packet arrives or ctx is done. Returns io.EOF when upstream tears down (subscriber channel closes).
Single-stream mode: packets without an AV payload (TS-only) are skipped silently — they shouldn't normally appear in a main buffer. ABR mode delegates to the inner TS demuxer.
type FileReader ¶
type FileReader struct {
// contains filtered or unexported fields
}
FileReader is a pull source for local media files. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.
func NewFileReader ¶
func NewFileReader(path string, loop bool) *FileReader
NewFileReader constructs a FileReader without opening the file. path must be a resolved absolute filesystem path. The caller (typically internal/ingestor.NewPacketReader) is responsible for resolving any file:// URL through the VOD registry first.
Logs the loop flag at construction so operators can verify the deployed binary's vod.Registry resolver hands the expected value (loop=true is the default for file:// URLs without an explicit ?loop=false override).
func (*FileReader) Close ¶
func (r *FileReader) Close() error
Close releases the open file handle. Calling Close before Open, or calling it twice, is safe.
type HLSReader ¶
type HLSReader struct {
// contains filtered or unexported fields
}
HLSReader polls an HLS playlist and emits MPEG-TS segment bytes in order. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.
Expected URL format: http[s]://host/path/to/playlist.m3u8 Master playlists are transparently resolved to the highest-bandwidth variant.
Open/Close may be called multiple times (runPullWorker reconnects on error). Each Open() creates a fresh output channel so that the previous poll goroutine's deferred close does not affect the new one.
func NewHLSReader ¶
func NewHLSReader(input domain.Input, cfg config.IngestorConfig) *HLSReader
NewHLSReader constructs an HLSReader without opening the connection.
func (*HLSReader) Close ¶
Close stops the poll goroutine. Safe to call before Open or more than once.
type MixerReader ¶ added in v0.0.22
type MixerReader struct {
// contains filtered or unexported fields
}
MixerReader implements ingestor.PacketReader for `mixer://` URLs.
Each source (video, audio) selects one of two modes at construction:
- direct mode: subscribe upstream's main buffer (single-stream upstream; packets arrive as AVPackets). Uses the *Sub fields below.
- ABR mode: subscribe upstream's best rendition (TS bytes) and demux via TSDemuxPacketReader. Uses the *Inner / *Chunk fields below.
The two modes can mix freely: e.g. ABR video + single-stream audio is the common case for "replace audio of an ABR camera with internet radio".
func NewMixerReader ¶ added in v0.0.22
func NewMixerReader(input domain.Input, bufSvc *buffer.Service, lookup StreamLookup) (*MixerReader, error)
NewMixerReader constructs a MixerReader from a mixer:// input. Errors:
- URL malformed → propagates protocol error
- either upstream missing in lookup → runtime error; coordinator surfaces as input degraded so the manager can switch to a fallback (none allowed in v1, so effectively the stream stops)
Mode per source: ABR upstream → tap best rendition + TS demux; otherwise → subscribe main buffer directly. Modes are independent for video vs audio.
func (*MixerReader) Close ¶ added in v0.0.22
func (r *MixerReader) Close() error
Close stops all sources + pumps. Idempotent.
func (*MixerReader) Open ¶ added in v0.0.22
func (r *MixerReader) Open(ctx context.Context) error
Open subscribes to both upstream buffers and (for ABR sources) starts pump goroutines that bridge TS-demux output into select-friendly channels. On any failure, partial state is rolled back.
func (*MixerReader) ReadPackets ¶ added in v0.0.22
ReadPackets blocks until one upstream packet arrives or ctx is done.
Selection rules:
- video source ends (sub close or pump EOF) → io.EOF (always; downstream stream stops)
- audio source ends + audio_failure=down (default) → io.EOF
- audio source ends + audio_failure=continue → drop the audio source and keep forwarding video-only on subsequent calls
- packets whose codec doesn't match the source channel are dropped (e.g. video upstream emitting an audio AAC packet — we want video only from the video source)
type RTMPReader ¶
type RTMPReader struct {
// contains filtered or unexported fields
}
RTMPReader connects to a remote RTMP server in play (pull) mode and emits domain.AVPacket.
Life cycle: NewRTMPReader → Open (blocks on TCP+RTMP handshake) → ReadPackets loop → Close. The underlying read goroutine exits when the server closes the connection or Close is called; both paths close pkts so ReadPackets returns io.EOF.
func NewRTMPReader ¶
func NewRTMPReader(input domain.Input) *RTMPReader
NewRTMPReader constructs an RTMPReader without opening a connection.
func (*RTMPReader) Close ¶
func (r *RTMPReader) Close() error
Close closes the RTMP connection; the readLoop then closes the packet channels.
func (*RTMPReader) Open ¶
func (r *RTMPReader) Open(ctx context.Context) error
Open dials the RTMP source, negotiates codec data, and starts the background read loop. Blocks until the TCP connection + RTMP handshake + codec probing complete.
func (*RTMPReader) ReadPackets ¶
ReadPackets blocks until at least one AVPacket is available or the source ends.
type RTSPReader ¶
type RTSPReader struct {
// contains filtered or unexported fields
}
RTSPReader connects to a remote RTSP server in play (pull) mode and emits domain.AVPacket. It uses gortsplib for proper RTCP-based A/V synchronisation and RTP jitter buffering.
func NewRTSPReader ¶
func NewRTSPReader(input domain.Input) *RTSPReader
NewRTSPReader constructs an RTSPReader without opening a connection.
func (*RTSPReader) Close ¶
func (r *RTSPReader) Close() error
Close closes the RTSP connection and waits for the read loop to stop.
func (*RTSPReader) Open ¶
func (r *RTSPReader) Open(ctx context.Context) error
Open dials the RTSP source, negotiates tracks, and starts the background read loop. It blocks until DESCRIBE + SETUP + PLAY complete.
func (*RTSPReader) ReadPackets ¶
ReadPackets blocks until at least one AVPacket is available or the source ends.
type SRTReader ¶
type SRTReader struct {
// contains filtered or unexported fields
}
SRTReader dials an SRT server in caller mode and emits raw MPEG-TS chunks. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.
func NewSRTReader ¶
NewSRTReader constructs an SRTReader without opening a connection.
func (*SRTReader) Close ¶
Close stops the context watcher and closes the SRT socket. Safe to call before Open or more than once.
Wait for the watcher goroutine to exit BEFORE nilling out r.conn — the watcher may be inside `if r.conn != nil { r.conn.Close() }` when its callerCtx fires; mutating r.conn without that wait would race the read.
type StreamLookup ¶ added in v0.0.20
type StreamLookup func(domain.StreamCode) (*domain.Stream, bool)
StreamLookup resolves an upstream stream by code. Plumbed from the API layer (or any layer holding the repo) so this package stays free of the store dependency.
type TSChunkReader ¶
type TSChunkReader interface {
Open(ctx context.Context) error
Read(ctx context.Context) ([]byte, error)
Close() error
}
TSChunkReader is an MPEG-TS byte source (UDP, HLS, file, SRT, …).
type TSDemuxOption ¶
type TSDemuxOption func(*TSDemuxPacketReader)
TSDemuxOption configures a TSDemuxPacketReader at construction time.
func WithRealtimePacing ¶
func WithRealtimePacing() TSDemuxOption
WithRealtimePacing enables DTS-based real-time throttling on AVPacket emission. Use it for chunk-based sources (HLS) that deliver bursts faster than playback rate; do not use it for live transports (UDP, SRT, RTMP) where the source already paces itself.
type TSDemuxPacketReader ¶
type TSDemuxPacketReader struct {
// contains filtered or unexported fields
}
TSDemuxPacketReader wraps a TSChunkReader and emits domain.AVPacket values.
func NewBufferTSDemuxReader ¶ added in v0.0.23
func NewBufferTSDemuxReader(bufSvc *buffer.Service, bufID domain.StreamCode) *TSDemuxPacketReader
NewBufferTSDemuxReader composes newBufferTSChunkReader with the TS demuxer to produce a *TSDemuxPacketReader that reads AVPackets straight from a Buffer Hub buffer carrying MPEG-TS bytes. Convenience for callers outside this package (coordinator's ABR-mixer pipeline) that need this combination without exposing the internal chunk-reader type.
func NewTSDemuxPacketReader ¶
func NewTSDemuxPacketReader(r TSChunkReader, opts ...TSDemuxOption) *TSDemuxPacketReader
NewTSDemuxPacketReader wraps r and emits domain.AVPacket values.
func (*TSDemuxPacketReader) Close ¶
func (d *TSDemuxPacketReader) Close() error
Close tears down both goroutines and the underlying reader. It is safe to call Close concurrently or multiple times.
func (*TSDemuxPacketReader) Open ¶
func (d *TSDemuxPacketReader) Open(ctx context.Context) error
Open starts the pump and demux goroutines.
func (*TSDemuxPacketReader) ReadPackets ¶
ReadPackets blocks until at least one AVPacket is available, then drains as many as are ready (up to 256 at once) to amortise call overhead. Returns (nil, io.EOF) when the source is exhausted.
type TSPassthroughPacketReader ¶ added in v0.0.61
type TSPassthroughPacketReader struct {
// contains filtered or unexported fields
}
TSPassthroughPacketReader adapts a TSChunkReader to the PacketReader interface by emitting one `AVPacket{Codec: AVCodecRawTSChunk}` per chunk.
func NewTSPassthroughPacketReader ¶ added in v0.0.61
func NewTSPassthroughPacketReader(r TSChunkReader) *TSPassthroughPacketReader
NewTSPassthroughPacketReader wraps r so each TS chunk surfaces as a raw-TS-marker AVPacket — see file header for why this beats demux/remux.
func (*TSPassthroughPacketReader) Close ¶ added in v0.0.61
func (p *TSPassthroughPacketReader) Close() error
Close delegates to the underlying chunk reader.
func (*TSPassthroughPacketReader) Open ¶ added in v0.0.61
func (p *TSPassthroughPacketReader) Open(ctx context.Context) error
Open delegates to the underlying chunk reader.
func (*TSPassthroughPacketReader) ReadPackets ¶ added in v0.0.61
ReadPackets reads ONE chunk per call and surfaces it as a single raw-TS-marker AVPacket. Returning a one-element batch matches the existing PacketReader contract; the buffer-hub fan-out is the same regardless of batch size.
type UDPReader ¶
type UDPReader struct {
// contains filtered or unexported fields
}
UDPReader listens on a UDP port and emits raw MPEG-TS chunks. It implements TSChunkReader and is intended to be wrapped with NewTSDemuxPacketReader.
func NewUDPReader ¶
NewUDPReader constructs a UDPReader for the given input without opening a connection.
func (*UDPReader) Close ¶
Close stops the pump goroutine, closes the UDP socket, and waits for clean shutdown. Safe to call before Open or multiple times.
Lifetime ordering matters under -race: pump() reads r.conn for SetReadDeadline and ReadFromUDP, so r.conn = nil must NOT happen until wg.Wait() has returned. The same applies to r.done / r.chunks — pump may still be in flight when Close starts. We close the socket (which unblocks pump's syscall), wait for pump to exit, and only then null out the shared fields.
func (*UDPReader) LocalAddr ¶
LocalAddr returns the local address the socket is bound to. Returns nil if the socket has not been opened yet.
func (*UDPReader) Open ¶
Open binds the UDP socket, optionally joins a multicast group, and starts the background pump goroutine.
Each Open() call creates a fresh socket so the reader can be reused after Close() by the ingestor reconnection loop.
func (*UDPReader) Read ¶
Read returns the next validated MPEG-TS datagram (with any RTP header stripped).
When the pump goroutine exits due to a real network error, Read returns that error (not io.EOF) so the upstream pumpChunks goroutine can store it in TSDemuxPacketReader.readErr and the worker will reconnect instead of triggering permanent failover.
Returns (nil, io.EOF) only when Close() is called cleanly without a pump error. Returns (nil, ctx.Err()) when the caller's context is cancelled.