Documentation
¶
Index ¶
- type CopyReader
- type FileReader
- type HLSReader
- type HTTPTSReader
- type MPTSProgramFilter
- type MixerReader
- type PIDFilter
- type RTMPMsgConverter
- type RTMPReader
- type RTSPReader
- type SRTReader
- type StatsDemuxer
- type StreamLookup
- type TSChunkReader
- type TSDemuxOption
- type TSDemuxPacketReader
- type TSPassthroughPacketReader
- type UDPReader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CopyReader ¶ added in v0.0.20
type CopyReader struct {
// contains filtered or unexported fields
}
CopyReader implements ingestor.PacketReader for `copy://<upstream>` URLs.
One of two modes is selected at construction based on the upstream shape:
- sub != nil: single-stream mode — subscribe to upstream's main buffer (AVPackets); ReadPackets reads sub.Recv() directly.
- abrInner != nil: ABR mode — best rendition buffer is wrapped through a TS demuxer; ReadPackets delegates to abrInner.ReadPackets.
func NewCopyReader ¶ added in v0.0.20
func NewCopyReader(input domain.Input, bufSvc *buffer.Service, lookup StreamLookup) (*CopyReader, error)
NewCopyReader constructs a CopyReader from a copy:// input. Errors:
- URL malformed (`copy://` grammar violation) — propagates protocol error
- upstream missing in lookup — runtime error; coordinator surfaces it as input degraded so the manager can switch to a fallback
Mode selection:
- upstream is single-stream → subscribe upstream's main buffer (AVPackets)
- upstream is ABR → subscribe upstream's BEST rendition buffer (TS bytes) and demux to AVPackets via TSDemuxPacketReader. Coordinator only routes this case here when downstream has its own transcoder; the no-transcoder ABR-copy mirror path is handled by Coordinator.startABRCopy.
func (*CopyReader) Close ¶ added in v0.0.20
func (r *CopyReader) Close() error
Close unsubscribes. Idempotent — safe to call from multiple goroutines.
func (*CopyReader) Open ¶ added in v0.0.20
func (r *CopyReader) Open(ctx context.Context) error
Open subscribes to the upstream buffer. Returns an error when the buffer doesn't exist yet (upstream not started, or torn down).
func (*CopyReader) ReadPackets ¶ added in v0.0.20
ReadPackets blocks until one upstream packet arrives or ctx is done. Returns io.EOF when upstream tears down (subscriber channel closes).
Single-stream mode: packets without an AV payload (TS-only) are skipped silently — they shouldn't normally appear in a main buffer. ABR mode delegates to the inner TS demuxer.
type FileReader ¶
type FileReader struct {
// contains filtered or unexported fields
}
FileReader is a pull source for local media files. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.
func NewFileReader ¶
func NewFileReader(path string, loop bool) *FileReader
NewFileReader constructs a FileReader without opening the file. path must be a resolved absolute filesystem path. The caller (typically internal/ingestor.NewPacketReader) is responsible for resolving any file:// URL through the VOD registry first.
Logs the loop flag at construction so operators can verify the deployed binary's vod.Registry resolver hands the expected value (loop=true is the default for file:// URLs without an explicit ?loop=false override).
func (*FileReader) Close ¶
func (r *FileReader) Close() error
Close releases the open file handle. Calling Close before Open, or calling it twice, is safe.
type HLSReader ¶
type HLSReader struct {
// contains filtered or unexported fields
}
HLSReader polls an HLS playlist and emits MPEG-TS segment bytes in order. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.
Expected URL format: http[s]://host/path/to/playlist.m3u8 Master playlists are transparently resolved to the highest-bandwidth variant.
Open/Close may be called multiple times (runPullWorker reconnects on error). Each Open() creates a fresh output channel so that the previous poll goroutine's deferred close does not affect the new one.
func NewHLSReader ¶
func NewHLSReader(input domain.Input, cfg config.IngestorConfig) *HLSReader
NewHLSReader constructs an HLSReader without opening the connection.
func (*HLSReader) Close ¶
Close stops the poll goroutine. Safe to call before Open or more than once.
type HTTPTSReader ¶ added in v0.0.68
type HTTPTSReader struct {
// contains filtered or unexported fields
}
HTTPTSReader pulls a continuous MPEG-TS stream over chunked HTTP.
func NewHTTPTSReader ¶ added in v0.0.68
func NewHTTPTSReader(input domain.Input) *HTTPTSReader
NewHTTPTSReader constructs a reader for the given input. Connection is deferred to Open so the constructor never blocks.
func (*HTTPTSReader) Close ¶ added in v0.0.68
func (r *HTTPTSReader) Close() error
Close releases the response body. Safe to call before Open or multiple times.
type MPTSProgramFilter ¶ added in v0.0.66
type MPTSProgramFilter struct {
// contains filtered or unexported fields
}
MPTSProgramFilter wraps a TSChunkReader and emits chunks containing only TS packets belonging to one program from the source MPTS.
func NewMPTSProgramFilter ¶ added in v0.0.66
func NewMPTSProgramFilter(r TSChunkReader, program int) *MPTSProgramFilter
NewMPTSProgramFilter wraps r so each Read returns a chunk containing only the chosen program. program must be ≥ 1; passing 0 or less is a config error and reader.go is responsible for not constructing the filter then.
func (*MPTSProgramFilter) Close ¶ added in v0.0.66
func (f *MPTSProgramFilter) Close() error
Close delegates to the wrapped reader.
func (*MPTSProgramFilter) Open ¶ added in v0.0.66
func (f *MPTSProgramFilter) Open(ctx context.Context) error
Open delegates to the wrapped reader and resets PSI state so a reconnection picks up a possibly-changed PMT layout.
func (*MPTSProgramFilter) Read ¶ added in v0.0.66
func (f *MPTSProgramFilter) Read(ctx context.Context) ([]byte, error)
Read returns the next non-empty filtered chunk. If filtering produces an empty result for the upstream chunk (e.g. early datagrams that contain only other programs' packets), it loops to the next upstream chunk so the caller never sees a zero-length non-error read.
type MixerReader ¶ added in v0.0.22
type MixerReader struct {
// contains filtered or unexported fields
}
MixerReader implements ingestor.PacketReader for `mixer://` URLs.
Each source (video, audio) selects one of two modes at construction:
- direct mode: subscribe upstream's main buffer (single-stream upstream; packets arrive as AVPackets). Uses the *Sub fields below.
- ABR mode: subscribe upstream's best rendition (TS bytes) and demux via TSDemuxPacketReader. Uses the *Inner / *Chunk fields below.
The two modes can mix freely: e.g. ABR video + single-stream audio is the common case for "replace audio of an ABR camera with internet radio".
func NewMixerReader ¶ added in v0.0.22
func NewMixerReader(input domain.Input, bufSvc *buffer.Service, lookup StreamLookup) (*MixerReader, error)
NewMixerReader constructs a MixerReader from a mixer:// input. Errors:
- URL malformed → propagates protocol error
- either upstream missing in lookup → runtime error; coordinator surfaces as input degraded so the manager can switch to a fallback (none allowed in v1, so effectively the stream stops)
Mode per source: ABR upstream → tap best rendition + TS demux; otherwise → subscribe main buffer directly. Modes are independent for video vs audio.
func (*MixerReader) Close ¶ added in v0.0.22
func (r *MixerReader) Close() error
Close stops all sources + pumps. Idempotent.
func (*MixerReader) Open ¶ added in v0.0.22
func (r *MixerReader) Open(ctx context.Context) error
Open subscribes to both upstream buffers and (for ABR sources) starts pump goroutines that bridge TS-demux output into select-friendly channels. On any failure, partial state is rolled back.
func (*MixerReader) ReadPackets ¶ added in v0.0.22
ReadPackets blocks until one upstream packet arrives or ctx is done.
Selection rules:
- video source ends (sub close or pump EOF) → io.EOF (always; downstream stream stops)
- audio source ends + audio_failure=down (default) → io.EOF
- audio source ends + audio_failure=continue → drop the audio source and keep forwarding video-only on subsequent calls
- packets whose codec doesn't match the source channel are dropped (e.g. video upstream emitting an audio AAC packet — we want video only from the video source)
type PIDFilter ¶ added in v0.0.67
type PIDFilter struct {
// contains filtered or unexported fields
}
PIDFilter forwards only TS packets whose PID is in the configured set.
func NewPIDFilter ¶ added in v0.0.67
func NewPIDFilter(r TSChunkReader, pids []int) *PIDFilter
NewPIDFilter wraps r so each Read returns a chunk containing only packets whose PID is in pids. PIDs outside the valid TS range (0..8191) are silently ignored. An empty pids list is treated as "drop everything", which the caller's reader.go avoids by not constructing the filter then.
type RTMPMsgConverter ¶ added in v0.0.74
type RTMPMsgConverter struct {
// contains filtered or unexported fields
}
RTMPMsgConverter holds the per-stream codec configuration state and converts incoming RTMP messages to domain AVPackets.
func NewRTMPMsgConverter ¶ added in v0.0.74
func NewRTMPMsgConverter() *RTMPMsgConverter
NewRTMPMsgConverter constructs an empty converter ready to receive sequence headers. AVPackets emitted before the first matching codec sequence header arrives are dropped (no codec config to attach).
func (*RTMPMsgConverter) Convert ¶ added in v0.0.74
func (c *RTMPMsgConverter) Convert(msg base.RtmpMsg) []domain.AVPacket
Convert processes one RtmpMsg and returns the resulting AVPacket(s). Sequence-header messages update internal state and emit no packet (returns nil). Unsupported codecs (legacy RTMP audio: PCM/G.711/Speex, non-AVC/HEVC video) are silently dropped.
type RTMPReader ¶
type RTMPReader struct {
// contains filtered or unexported fields
}
RTMPReader pulls a remote RTMP play stream and emits domain.AVPacket.
Owns one lal PullSession plus a goroutine that waits on its WaitChan to detect server disconnect. The OnReadRtmpAvMsg callback runs on lal's internal read goroutine; we convert each RtmpMsg into AVPacket(s) and push to the buffered pkts channel that ReadPackets drains.
func NewRTMPReader ¶
func NewRTMPReader(input domain.Input) *RTMPReader
NewRTMPReader constructs a reader for input without opening any connections. URL parse / dial / handshake happens in Open.
func (*RTMPReader) Close ¶
func (r *RTMPReader) Close() error
Close tears down the RTMP session and waits for the watcher goroutine to exit. Idempotent.
func (*RTMPReader) Open ¶
func (r *RTMPReader) Open(ctx context.Context) error
Open dials the upstream, completes the RTMP handshake, sends the play command, and blocks until lal confirms play-start (or the configured timeout fires). After this returns nil, ReadPackets is ready to drain frames.
func (*RTMPReader) ReadPackets ¶
ReadPackets blocks until at least one AVPacket is available, then drains up to 256 more so the caller's per-call overhead amortises across a burst. Returns (nil, io.EOF) on stream end and (nil, ctx.Err()) on caller cancellation.
type RTSPReader ¶
type RTSPReader struct {
// contains filtered or unexported fields
}
RTSPReader connects to a remote RTSP server in play (pull) mode and emits domain.AVPacket. It uses gortsplib for proper RTCP-based A/V synchronisation and RTP jitter buffering.
func NewRTSPReader ¶
func NewRTSPReader(input domain.Input) *RTSPReader
NewRTSPReader constructs an RTSPReader without opening a connection.
func (*RTSPReader) Close ¶
func (r *RTSPReader) Close() error
Close closes the RTSP connection and waits for the read loop to stop.
func (*RTSPReader) Open ¶
func (r *RTSPReader) Open(ctx context.Context) error
Open dials the RTSP source, negotiates tracks, and starts the background read loop. It blocks until DESCRIBE + SETUP + PLAY complete.
func (*RTSPReader) ReadPackets ¶
ReadPackets blocks until at least one AVPacket is available or the source ends.
type SRTReader ¶
type SRTReader struct {
// contains filtered or unexported fields
}
SRTReader dials an SRT server in caller mode and emits raw MPEG-TS chunks. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.
func NewSRTReader ¶
NewSRTReader constructs an SRTReader without opening a connection.
func (*SRTReader) Close ¶
Close stops the context watcher and closes the SRT socket. Safe to call before Open or more than once.
Wait for the watcher goroutine to exit BEFORE nilling out r.conn — the watcher may be inside `if r.conn != nil { r.conn.Close() }` when its callerCtx fires; mutating r.conn without that wait would race the read.
type StatsDemuxer ¶ added in v0.0.69
type StatsDemuxer struct {
// contains filtered or unexported fields
}
StatsDemuxer surfaces per-codec stats (codec, bitrate) for raw-TS sources by parsing PSI tables off the data path. Construct via NewStatsDemuxer; feed via Feed; tear down via Close. Safe to call Feed and Close concurrently from different goroutines.
func NewStatsDemuxer ¶ added in v0.0.69
func NewStatsDemuxer(onAV func(p *domain.AVPacket)) *StatsDemuxer
NewStatsDemuxer starts a goroutine that scans chunks fed via Feed and invokes onAV for every recognised TS packet. Returns nil for nil callback so callers don't need to guard the construction site.
func (*StatsDemuxer) Close ¶ added in v0.0.69
func (sd *StatsDemuxer) Close()
Close stops the scanner goroutine and waits for it to exit. Safe to call multiple times and from any goroutine.
func (*StatsDemuxer) Feed ¶ added in v0.0.69
func (sd *StatsDemuxer) Feed(chunk []byte)
Feed enqueues chunk for asynchronous PMT scanning. Non-blocking: if the channel is full the chunk is dropped (stats are best-effort). Empty chunks and post-Close calls are silently ignored. The slice is copied internally so the caller is free to reuse the underlying buffer.
type StreamLookup ¶ added in v0.0.20
type StreamLookup func(domain.StreamCode) (*domain.Stream, bool)
StreamLookup resolves an upstream stream by code. Plumbed from the API layer (or any layer holding the repo) so this package stays free of the store dependency.
type TSChunkReader ¶
type TSChunkReader interface {
Open(ctx context.Context) error
Read(ctx context.Context) ([]byte, error)
Close() error
}
TSChunkReader is an MPEG-TS byte source (UDP, HLS, file, SRT, …).
type TSDemuxOption ¶
type TSDemuxOption func(*TSDemuxPacketReader)
TSDemuxOption configures a TSDemuxPacketReader at construction time.
func WithRealtimePacing ¶
func WithRealtimePacing() TSDemuxOption
WithRealtimePacing enables DTS-based real-time throttling on AVPacket emission. Use it for chunk-based sources (HLS) that deliver bursts faster than playback rate; do not use it for live transports (UDP, SRT, RTMP) where the source already paces itself.
type TSDemuxPacketReader ¶
type TSDemuxPacketReader struct {
// contains filtered or unexported fields
}
TSDemuxPacketReader wraps a TSChunkReader and emits domain.AVPacket values.
func NewBufferTSDemuxReader ¶ added in v0.0.23
func NewBufferTSDemuxReader(bufSvc *buffer.Service, bufID domain.StreamCode) *TSDemuxPacketReader
NewBufferTSDemuxReader composes newBufferTSChunkReader with the TS demuxer to produce a *TSDemuxPacketReader that reads AVPackets straight from a Buffer Hub buffer carrying MPEG-TS bytes. Convenience for callers outside this package (coordinator's ABR-mixer pipeline) that need this combination without exposing the internal chunk-reader type.
func NewTSDemuxPacketReader ¶
func NewTSDemuxPacketReader(r TSChunkReader, opts ...TSDemuxOption) *TSDemuxPacketReader
NewTSDemuxPacketReader wraps r and emits domain.AVPacket values.
func (*TSDemuxPacketReader) Close ¶
func (d *TSDemuxPacketReader) Close() error
Close tears down both goroutines and the underlying reader. It is safe to call Close concurrently or multiple times.
func (*TSDemuxPacketReader) Open ¶
func (d *TSDemuxPacketReader) Open(ctx context.Context) error
Open starts the pump and demux goroutines.
func (*TSDemuxPacketReader) ReadPackets ¶
ReadPackets blocks until at least one AVPacket is available, then drains as many as are ready (up to 256 at once) to amortise call overhead. Returns (nil, io.EOF) when the source is exhausted.
type TSPassthroughPacketReader ¶ added in v0.0.61
type TSPassthroughPacketReader struct {
// contains filtered or unexported fields
}
TSPassthroughPacketReader adapts a TSChunkReader to the PacketReader interface by emitting one `AVPacket{Codec: AVCodecRawTSChunk}` per chunk.
func NewTSPassthroughPacketReader ¶ added in v0.0.61
func NewTSPassthroughPacketReader(r TSChunkReader) *TSPassthroughPacketReader
NewTSPassthroughPacketReader wraps r so each TS chunk surfaces as a raw-TS-marker AVPacket — see file header for why this beats demux/remux.
func (*TSPassthroughPacketReader) Close ¶ added in v0.0.61
func (p *TSPassthroughPacketReader) Close() error
Close delegates to the underlying chunk reader.
func (*TSPassthroughPacketReader) Open ¶ added in v0.0.61
func (p *TSPassthroughPacketReader) Open(ctx context.Context) error
Open delegates to the underlying chunk reader.
func (*TSPassthroughPacketReader) ReadPackets ¶ added in v0.0.61
ReadPackets reads ONE chunk per call and surfaces it as a single raw-TS-marker AVPacket. Returning a one-element batch matches the existing PacketReader contract; the buffer-hub fan-out is the same regardless of batch size.
type UDPReader ¶
type UDPReader struct {
// contains filtered or unexported fields
}
UDPReader listens on a UDP port and emits raw MPEG-TS chunks. It implements TSChunkReader and is intended to be wrapped with NewTSDemuxPacketReader.
func NewUDPReader ¶
NewUDPReader constructs a UDPReader for the given input without opening a connection.
func (*UDPReader) Close ¶
Close stops the pump goroutine, closes the UDP socket, and waits for clean shutdown. Safe to call before Open or multiple times.
Lifetime ordering matters under -race: pump() reads r.conn for SetReadDeadline and ReadFromUDP, so r.conn = nil must NOT happen until wg.Wait() has returned. The same applies to r.done / r.chunks — pump may still be in flight when Close starts. We close the socket (which unblocks pump's syscall), wait for pump to exit, and only then null out the shared fields.
func (*UDPReader) LocalAddr ¶
LocalAddr returns the local address the socket is bound to. Returns nil if the socket has not been opened yet.
func (*UDPReader) Open ¶
Open binds the UDP socket, optionally joins a multicast group, and starts the background pump goroutine.
Each Open() call creates a fresh socket so the reader can be reused after Close() by the ingestor reconnection loop.
func (*UDPReader) Read ¶
Read returns the next validated MPEG-TS datagram (with any RTP header stripped).
When the pump goroutine exits due to a real network error, Read returns that error (not io.EOF) so the upstream pumpChunks goroutine can store it in TSDemuxPacketReader.readErr and the worker will reconnect instead of triggering permanent failover.
Returns (nil, io.EOF) only when Close() is called cleanly without a pump error. Returns (nil, ctx.Err()) when the caller's context is cancelled.