pull

package
v0.0.91 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: MIT Imports: 46 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CopyReader added in v0.0.20

type CopyReader struct {
	// contains filtered or unexported fields
}

CopyReader implements ingestor.PacketReader for `copy://<upstream>` URLs.

One of two modes is selected at construction based on the upstream shape:

  • sub != nil: single-stream mode — subscribe to upstream's main buffer (AVPackets); ReadPackets reads sub.Recv() directly.
  • abrInner != nil: ABR mode — best rendition buffer is wrapped through a TS demuxer; ReadPackets delegates to abrInner.ReadPackets.

func NewCopyReader added in v0.0.20

func NewCopyReader(input domain.Input, bufSvc *buffer.Service, lookup StreamLookup) (*CopyReader, error)

NewCopyReader constructs a CopyReader from a copy:// input. Errors:

  • URL malformed (`copy://` grammar violation) — propagates protocol error
  • upstream missing in lookup — runtime error; coordinator surfaces it as input degraded so the manager can switch to a fallback

Mode selection:

  • upstream is single-stream → subscribe upstream's main buffer (AVPackets)
  • upstream is ABR → subscribe upstream's BEST rendition buffer (TS bytes) and demux to AVPackets via TSDemuxPacketReader. Coordinator only routes this case here when downstream has its own transcoder; the no-transcoder ABR-copy mirror path is handled by Coordinator.startABRCopy.

func (*CopyReader) Close added in v0.0.20

func (r *CopyReader) Close() error

Close unsubscribes. Idempotent — safe to call from multiple goroutines.

func (*CopyReader) Open added in v0.0.20

func (r *CopyReader) Open(ctx context.Context) error

Open subscribes to the upstream buffer. Returns an error when the buffer doesn't exist yet (upstream not started, or torn down).

func (*CopyReader) ReadPackets added in v0.0.20

func (r *CopyReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets blocks until one upstream packet arrives or ctx is done. Returns io.EOF when upstream tears down (subscriber channel closes).

Single-stream mode: packets without an AV payload (TS-only) are skipped silently — they shouldn't normally appear in a main buffer. ABR mode delegates to the inner TS demuxer.

type FileReader

type FileReader struct {
	// contains filtered or unexported fields
}

FileReader is a pull source for local media files. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.

func NewFileReader

func NewFileReader(path string, loop bool) *FileReader

NewFileReader constructs a FileReader without opening the file. path must be a resolved absolute filesystem path. The caller (typically internal/ingestor.NewPacketReader) is responsible for resolving any file:// URL through the VOD registry first.

Logs the loop flag at construction so operators can verify the deployed binary's vod.Registry resolver hands the expected value (loop=true is the default for file:// URLs without an explicit ?loop=false override).

func (*FileReader) Close

func (r *FileReader) Close() error

Close releases the open file handle. Calling Close before Open, or calling it twice, is safe.

func (*FileReader) Open

func (r *FileReader) Open(_ context.Context) error

Open opens the file and selects the appropriate handler based on extension. Returns an error if the path does not exist or is a directory.

func (*FileReader) Read

func (r *FileReader) Read(ctx context.Context) ([]byte, error)

Read returns the next raw MPEG-TS chunk. Returns io.EOF when the file is exhausted (and loop is not set).

type HLSReader

type HLSReader struct {
	// contains filtered or unexported fields
}

HLSReader polls an HLS playlist and emits MPEG-TS segment bytes in order. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.

Expected URL format: http[s]://host/path/to/playlist.m3u8 Master playlists are transparently resolved to the highest-bandwidth variant.

Open/Close may be called multiple times (runPullWorker reconnects on error). Each Open() creates a fresh output channel so that the previous poll goroutine's deferred close does not affect the new one.

func NewHLSReader

func NewHLSReader(input domain.Input, cfg config.IngestorConfig) *HLSReader

NewHLSReader constructs an HLSReader without opening the connection.

func (*HLSReader) Close

func (r *HLSReader) Close() error

Close stops the poll goroutine. Safe to call before Open or more than once.

func (*HLSReader) Open

func (r *HLSReader) Open(ctx context.Context) error

Open starts the background polling goroutine. Returns immediately; segments arrive asynchronously via Read. Safe to call again after Close — each call creates a fresh output channel.

func (*HLSReader) Read

func (r *HLSReader) Read(ctx context.Context) ([]byte, error)

Read blocks until the next MPEG-TS segment chunk is available. Returns (nil, io.EOF) when a VOD stream is fully consumed or the source ends.

type HTTPTSReader added in v0.0.68

type HTTPTSReader struct {
	// contains filtered or unexported fields
}

HTTPTSReader pulls a continuous MPEG-TS stream over chunked HTTP.

func NewHTTPTSReader added in v0.0.68

func NewHTTPTSReader(input domain.Input) *HTTPTSReader

NewHTTPTSReader constructs a reader for the given input. Connection is deferred to Open so the constructor never blocks.

func (*HTTPTSReader) Close added in v0.0.68

func (r *HTTPTSReader) Close() error

Close releases the response body. Safe to call before Open or multiple times.

func (*HTTPTSReader) Open added in v0.0.68

func (r *HTTPTSReader) Open(ctx context.Context) error

Open issues the GET request and stores the response. The response body must be closed via Close().

func (*HTTPTSReader) Read added in v0.0.68

func (r *HTTPTSReader) Read(ctx context.Context) ([]byte, error)

Read blocks for the next chunk of bytes from the response body. Returns io.EOF when the upstream closes cleanly.

type MPTSProgramFilter added in v0.0.66

type MPTSProgramFilter struct {
	// contains filtered or unexported fields
}

MPTSProgramFilter wraps a TSChunkReader and emits chunks containing only TS packets belonging to one program from the source MPTS.

func NewMPTSProgramFilter added in v0.0.66

func NewMPTSProgramFilter(r TSChunkReader, program int) *MPTSProgramFilter

NewMPTSProgramFilter wraps r so each Read returns a chunk containing only the chosen program. program must be ≥ 1; passing 0 or less is a config error and reader.go is responsible for not constructing the filter then.

func (*MPTSProgramFilter) Close added in v0.0.66

func (f *MPTSProgramFilter) Close() error

Close delegates to the wrapped reader.

func (*MPTSProgramFilter) Open added in v0.0.66

func (f *MPTSProgramFilter) Open(ctx context.Context) error

Open delegates to the wrapped reader and resets PSI state so a reconnection picks up a possibly-changed PMT layout.

func (*MPTSProgramFilter) Read added in v0.0.66

func (f *MPTSProgramFilter) Read(ctx context.Context) ([]byte, error)

Read returns the next non-empty filtered chunk. If filtering produces an empty result for the upstream chunk (e.g. early datagrams that contain only other programs' packets), it loops to the next upstream chunk so the caller never sees a zero-length non-error read.

type MixerReader added in v0.0.22

type MixerReader struct {
	// contains filtered or unexported fields
}

MixerReader implements ingestor.PacketReader for `mixer://` URLs.

Each source (video, audio) selects one of two modes at construction:

  • direct mode: subscribe upstream's main buffer (single-stream upstream; packets arrive as AVPackets). Uses the *Sub fields below.
  • ABR mode: subscribe upstream's best rendition (TS bytes) and demux via TSDemuxPacketReader. Uses the *Inner / *Chunk fields below.

The two modes can mix freely: e.g. ABR video + single-stream audio is the common case for "replace audio of an ABR camera with internet radio".

func NewMixerReader added in v0.0.22

func NewMixerReader(input domain.Input, bufSvc *buffer.Service, lookup StreamLookup) (*MixerReader, error)

NewMixerReader constructs a MixerReader from a mixer:// input. Errors:

  • URL malformed → propagates protocol error
  • either upstream missing in lookup → runtime error; coordinator surfaces as input degraded so the manager can switch to a fallback (none allowed in v1, so effectively the stream stops)

Mode per source: ABR upstream → tap best rendition + TS demux; otherwise → subscribe main buffer directly. Modes are independent for video vs audio.

func (*MixerReader) Close added in v0.0.22

func (r *MixerReader) Close() error

Close stops all sources + pumps. Idempotent.

func (*MixerReader) Open added in v0.0.22

func (r *MixerReader) Open(ctx context.Context) error

Open subscribes to both upstream buffers and (for ABR sources) starts pump goroutines that bridge TS-demux output into select-friendly channels. On any failure, partial state is rolled back.

func (*MixerReader) ReadPackets added in v0.0.22

func (r *MixerReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets blocks until one upstream packet arrives or ctx is done.

Selection rules:

  • video source ends (sub close or pump EOF) → io.EOF (always; downstream stream stops)
  • audio source ends + audio_failure=down (default) → io.EOF
  • audio source ends + audio_failure=continue → drop the audio source and keep forwarding video-only on subsequent calls
  • packets whose codec doesn't match the source channel are dropped (e.g. video upstream emitting an audio AAC packet — we want video only from the video source)

type PIDFilter added in v0.0.67

type PIDFilter struct {
	// contains filtered or unexported fields
}

PIDFilter forwards only TS packets whose PID is in the configured set.

func NewPIDFilter added in v0.0.67

func NewPIDFilter(r TSChunkReader, pids []int) *PIDFilter

NewPIDFilter wraps r so each Read returns a chunk containing only packets whose PID is in pids. PIDs outside the valid TS range (0..8191) are silently ignored. An empty pids list is treated as "drop everything", which the caller's reader.go avoids by not constructing the filter then.

func (*PIDFilter) Close added in v0.0.67

func (f *PIDFilter) Close() error

Close delegates to the wrapped reader.

func (*PIDFilter) Open added in v0.0.67

func (f *PIDFilter) Open(ctx context.Context) error

Open delegates and resets the alignment carry.

func (*PIDFilter) Read added in v0.0.67

func (f *PIDFilter) Read(ctx context.Context) ([]byte, error)

Read returns the next non-empty filtered chunk. Loops over inner reads when filtering produces zero bytes (e.g. a UDP datagram of all-dropped PIDs) so the caller never sees a zero-length non-error read.

type RTMPMsgConverter added in v0.0.74

type RTMPMsgConverter struct {
	// contains filtered or unexported fields
}

RTMPMsgConverter holds the per-stream codec configuration state and converts incoming RTMP messages to domain AVPackets.

func NewRTMPMsgConverter added in v0.0.74

func NewRTMPMsgConverter() *RTMPMsgConverter

NewRTMPMsgConverter constructs an empty converter ready to receive sequence headers. AVPackets emitted before the first matching codec sequence header arrives are dropped (no codec config to attach).

func (*RTMPMsgConverter) Convert added in v0.0.74

func (c *RTMPMsgConverter) Convert(msg base.RtmpMsg) []domain.AVPacket

Convert processes one RtmpMsg and returns the resulting AVPacket(s). Sequence-header messages update internal state and emit no packet (returns nil). Unsupported codecs (legacy RTMP audio: PCM/G.711/Speex, non-AVC/HEVC video) are silently dropped.

type RTMPReader

type RTMPReader struct {
	// contains filtered or unexported fields
}

RTMPReader pulls a remote RTMP play stream and emits domain.AVPacket.

Owns one lal PullSession plus a goroutine that waits on its WaitChan to detect server disconnect. The OnReadRtmpAvMsg callback runs on lal's internal read goroutine; we convert each RtmpMsg into AVPacket(s) and push to the buffered pkts channel that ReadPackets drains.

func NewRTMPReader

func NewRTMPReader(input domain.Input) *RTMPReader

NewRTMPReader constructs a reader for input without opening any connections. URL parse / dial / handshake happens in Open.

func (*RTMPReader) Close

func (r *RTMPReader) Close() error

Close tears down the RTMP session and waits for the watcher goroutine to exit. Idempotent.

func (*RTMPReader) Open

func (r *RTMPReader) Open(ctx context.Context) error

Open dials the upstream, completes the RTMP handshake, sends the play command, and blocks until lal confirms play-start (or the configured timeout fires). After this returns nil, ReadPackets is ready to drain frames.

func (*RTMPReader) ReadPackets

func (r *RTMPReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets blocks until at least one AVPacket is available, then drains up to 256 more so the caller's per-call overhead amortises across a burst. Returns (nil, io.EOF) on stream end and (nil, ctx.Err()) on caller cancellation.

type RTSPReader

type RTSPReader struct {
	// contains filtered or unexported fields
}

RTSPReader connects to a remote RTSP server in play (pull) mode and emits domain.AVPacket. It uses gortsplib for proper RTCP-based A/V synchronisation and RTP jitter buffering.

func NewRTSPReader

func NewRTSPReader(input domain.Input) *RTSPReader

NewRTSPReader constructs an RTSPReader without opening a connection.

func (*RTSPReader) Close

func (r *RTSPReader) Close() error

Close closes the RTSP connection and waits for the read loop to stop.

func (*RTSPReader) Open

func (r *RTSPReader) Open(ctx context.Context) error

Open dials the RTSP source, negotiates tracks, and starts the background read loop. It blocks until DESCRIBE + SETUP + PLAY complete.

func (*RTSPReader) ReadPackets

func (r *RTSPReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets blocks until at least one AVPacket is available or the source ends.

type SRTReader

type SRTReader struct {
	// contains filtered or unexported fields
}

SRTReader dials an SRT server in caller mode and emits raw MPEG-TS chunks. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.

func NewSRTReader

func NewSRTReader(input domain.Input) *SRTReader

NewSRTReader constructs an SRTReader without opening a connection.

func (*SRTReader) Close

func (r *SRTReader) Close() error

Close stops the context watcher and closes the SRT socket. Safe to call before Open or more than once.

Wait for the watcher goroutine to exit BEFORE nilling out r.conn — the watcher may be inside `if r.conn != nil { r.conn.Close() }` when its callerCtx fires; mutating r.conn without that wait would race the read.

func (*SRTReader) Open

func (r *SRTReader) Open(ctx context.Context) error

Open dials the remote SRT endpoint in caller mode. All SRT options present in the URL query string or in input.Params are applied to the connection config via Config.UnmarshalURL.

func (*SRTReader) Read

func (r *SRTReader) Read(ctx context.Context) ([]byte, error)

Read returns the next raw MPEG-TS chunk from the SRT stream. Returns (nil, io.EOF) when the remote endpoint closes the connection.

type StatsDemuxer added in v0.0.69

type StatsDemuxer struct {
	// contains filtered or unexported fields
}

StatsDemuxer surfaces per-codec stats (codec, bitrate) for raw-TS sources by parsing PSI tables off the data path. Construct via NewStatsDemuxer; feed via Feed; tear down via Close. Safe to call Feed and Close concurrently from different goroutines.

func NewStatsDemuxer added in v0.0.69

func NewStatsDemuxer(onAV func(p *domain.AVPacket)) *StatsDemuxer

NewStatsDemuxer starts a goroutine that scans chunks fed via Feed and invokes onAV for every recognised TS packet. Returns nil for nil callback so callers don't need to guard the construction site.

func (*StatsDemuxer) Close added in v0.0.69

func (sd *StatsDemuxer) Close()

Close stops the scanner goroutine and waits for it to exit. Safe to call multiple times and from any goroutine.

func (*StatsDemuxer) Feed added in v0.0.69

func (sd *StatsDemuxer) Feed(chunk []byte)

Feed enqueues chunk for asynchronous PMT scanning. Non-blocking: if the channel is full the chunk is dropped (stats are best-effort). Empty chunks and post-Close calls are silently ignored. The slice is copied internally so the caller is free to reuse the underlying buffer.

type StreamLookup added in v0.0.20

type StreamLookup func(domain.StreamCode) (*domain.Stream, bool)

StreamLookup resolves an upstream stream by code. Plumbed from the API layer (or any layer holding the repo) so this package stays free of the store dependency.

type TSChunkReader

type TSChunkReader interface {
	Open(ctx context.Context) error
	Read(ctx context.Context) ([]byte, error)
	Close() error
}

TSChunkReader is an MPEG-TS byte source (UDP, HLS, file, SRT, …).

type TSDemuxOption

type TSDemuxOption func(*TSDemuxPacketReader)

TSDemuxOption configures a TSDemuxPacketReader at construction time.

func WithRealtimePacing

func WithRealtimePacing() TSDemuxOption

WithRealtimePacing enables DTS-based real-time throttling on AVPacket emission. Use it for chunk-based sources (HLS) that deliver bursts faster than playback rate; do not use it for live transports (UDP, SRT, RTMP) where the source already paces itself.

type TSDemuxPacketReader

type TSDemuxPacketReader struct {
	// contains filtered or unexported fields
}

TSDemuxPacketReader wraps a TSChunkReader and emits domain.AVPacket values.

func NewBufferTSDemuxReader added in v0.0.23

func NewBufferTSDemuxReader(bufSvc *buffer.Service, bufID domain.StreamCode) *TSDemuxPacketReader

NewBufferTSDemuxReader composes newBufferTSChunkReader with the TS demuxer to produce a *TSDemuxPacketReader that reads AVPackets straight from a Buffer Hub buffer carrying MPEG-TS bytes. Convenience for callers outside this package (coordinator's ABR-mixer pipeline) that need this combination without exposing the internal chunk-reader type.

func NewTSDemuxPacketReader

func NewTSDemuxPacketReader(r TSChunkReader, opts ...TSDemuxOption) *TSDemuxPacketReader

NewTSDemuxPacketReader wraps r and emits domain.AVPacket values.

func (*TSDemuxPacketReader) Close

func (d *TSDemuxPacketReader) Close() error

Close tears down both goroutines and the underlying reader. It is safe to call Close concurrently or multiple times.

func (*TSDemuxPacketReader) Open

Open starts the pump and demux goroutines.

func (*TSDemuxPacketReader) ReadPackets

func (d *TSDemuxPacketReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets blocks until at least one AVPacket is available, then drains as many as are ready (up to 256 at once) to amortise call overhead. Returns (nil, io.EOF) when the source is exhausted.

type TSPassthroughPacketReader added in v0.0.61

type TSPassthroughPacketReader struct {
	// contains filtered or unexported fields
}

TSPassthroughPacketReader adapts a TSChunkReader to the PacketReader interface by emitting one `AVPacket{Codec: AVCodecRawTSChunk}` per chunk.

func NewTSPassthroughPacketReader added in v0.0.61

func NewTSPassthroughPacketReader(r TSChunkReader) *TSPassthroughPacketReader

NewTSPassthroughPacketReader wraps r so each TS chunk surfaces as a raw-TS-marker AVPacket — see file header for why this beats demux/remux.

func (*TSPassthroughPacketReader) Close added in v0.0.61

func (p *TSPassthroughPacketReader) Close() error

Close delegates to the underlying chunk reader.

func (*TSPassthroughPacketReader) Open added in v0.0.61

Open delegates to the underlying chunk reader.

func (*TSPassthroughPacketReader) ReadPackets added in v0.0.61

func (p *TSPassthroughPacketReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets reads ONE chunk per call and surfaces it as a single raw-TS-marker AVPacket. Returning a one-element batch matches the existing PacketReader contract; the buffer-hub fan-out is the same regardless of batch size.

type UDPReader

type UDPReader struct {
	// contains filtered or unexported fields
}

UDPReader listens on a UDP port and emits raw MPEG-TS chunks. It implements TSChunkReader and is intended to be wrapped with NewTSDemuxPacketReader.

func NewUDPReader

func NewUDPReader(input domain.Input) *UDPReader

NewUDPReader constructs a UDPReader for the given input without opening a connection.

func (*UDPReader) Close

func (r *UDPReader) Close() error

Close stops the pump goroutine, closes the UDP socket, and waits for clean shutdown. Safe to call before Open or multiple times.

Lifetime ordering matters under -race: pump() reads r.conn for SetReadDeadline and ReadFromUDP, so r.conn = nil must NOT happen until wg.Wait() has returned. The same applies to r.done / r.chunks — pump may still be in flight when Close starts. We close the socket (which unblocks pump's syscall), wait for pump to exit, and only then null out the shared fields.

func (*UDPReader) LocalAddr

func (r *UDPReader) LocalAddr() net.Addr

LocalAddr returns the local address the socket is bound to. Returns nil if the socket has not been opened yet.

func (*UDPReader) Open

func (r *UDPReader) Open(_ context.Context) error

Open binds the UDP socket, optionally joins a multicast group, and starts the background pump goroutine.

Each Open() call creates a fresh socket so the reader can be reused after Close() by the ingestor reconnection loop.

func (*UDPReader) Read

func (r *UDPReader) Read(ctx context.Context) ([]byte, error)

Read returns the next validated MPEG-TS datagram (with any RTP header stripped).

When the pump goroutine exits due to a real network error, Read returns that error (not io.EOF) so the upstream pumpChunks goroutine can store it in TSDemuxPacketReader.readErr and the worker will reconnect instead of triggering permanent failover.

Returns (nil, io.EOF) only when Close() is called cleanly without a pump error. Returns (nil, ctx.Err()) when the caller's context is cancelled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL