pull

package
v0.0.52 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: MIT Imports: 44 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CopyReader added in v0.0.20

type CopyReader struct {
	// contains filtered or unexported fields
}

CopyReader implements ingestor.PacketReader for `copy://<upstream>` URLs.

One of two modes is selected at construction based on the upstream shape:

  • sub != nil: single-stream mode — subscribe to upstream's main buffer (AVPackets); ReadPackets reads sub.Recv() directly.
  • abrInner != nil: ABR mode — best rendition buffer is wrapped through a TS demuxer; ReadPackets delegates to abrInner.ReadPackets.

func NewCopyReader added in v0.0.20

func NewCopyReader(input domain.Input, bufSvc *buffer.Service, lookup StreamLookup) (*CopyReader, error)

NewCopyReader constructs a CopyReader from a copy:// input. Errors:

  • URL malformed (`copy://` grammar violation) — propagates protocol error
  • upstream missing in lookup — runtime error; coordinator surfaces it as input degraded so the manager can switch to a fallback

Mode selection:

  • upstream is single-stream → subscribe upstream's main buffer (AVPackets)
  • upstream is ABR → subscribe upstream's BEST rendition buffer (TS bytes) and demux to AVPackets via TSDemuxPacketReader. Coordinator only routes this case here when downstream has its own transcoder; the no-transcoder ABR-copy mirror path is handled by Coordinator.startABRCopy.

func (*CopyReader) Close added in v0.0.20

func (r *CopyReader) Close() error

Close unsubscribes. Idempotent — safe to call from multiple goroutines.

func (*CopyReader) Open added in v0.0.20

func (r *CopyReader) Open(ctx context.Context) error

Open subscribes to the upstream buffer. Returns an error when the buffer doesn't exist yet (upstream not started, or torn down).

func (*CopyReader) ReadPackets added in v0.0.20

func (r *CopyReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets blocks until one upstream packet arrives or ctx is done. Returns io.EOF when upstream tears down (subscriber channel closes).

Single-stream mode: packets without an AV payload (TS-only) are skipped silently — they shouldn't normally appear in a main buffer. ABR mode delegates to the inner TS demuxer.

type FileReader

type FileReader struct {
	// contains filtered or unexported fields
}

FileReader is a pull source for local media files. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.

func NewFileReader

func NewFileReader(path string, loop bool) *FileReader

NewFileReader constructs a FileReader without opening the file. path must be a resolved absolute filesystem path. The caller (typically internal/ingestor.NewPacketReader) is responsible for resolving any file:// URL through the VOD registry first.

Logs the loop flag at construction so operators can verify the deployed binary's vod.Registry resolver hands the expected value (loop=true is the default for file:// URLs without an explicit ?loop=false override).

func (*FileReader) Close

func (r *FileReader) Close() error

Close releases the open file handle. Calling Close before Open, or calling it twice, is safe.

func (*FileReader) Open

func (r *FileReader) Open(_ context.Context) error

Open opens the file and selects the appropriate handler based on extension. Returns an error if the path does not exist or is a directory.

func (*FileReader) Read

func (r *FileReader) Read(ctx context.Context) ([]byte, error)

Read returns the next raw MPEG-TS chunk. Returns io.EOF when the file is exhausted (and loop is not set).

type HLSReader

type HLSReader struct {
	// contains filtered or unexported fields
}

HLSReader polls an HLS playlist and emits MPEG-TS segment bytes in order. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.

Expected URL format: http[s]://host/path/to/playlist.m3u8 Master playlists are transparently resolved to the highest-bandwidth variant.

Open/Close may be called multiple times (runPullWorker reconnects on error). Each Open() creates a fresh output channel so that the previous poll goroutine's deferred close does not affect the new one.

func NewHLSReader

func NewHLSReader(input domain.Input, cfg config.IngestorConfig) *HLSReader

NewHLSReader constructs an HLSReader without opening the connection.

func (*HLSReader) Close

func (r *HLSReader) Close() error

Close stops the poll goroutine. Safe to call before Open or more than once.

func (*HLSReader) Open

func (r *HLSReader) Open(ctx context.Context) error

Open starts the background polling goroutine. Returns immediately; segments arrive asynchronously via Read. Safe to call again after Close — each call creates a fresh output channel.

func (*HLSReader) Read

func (r *HLSReader) Read(ctx context.Context) ([]byte, error)

Read blocks until the next MPEG-TS segment chunk is available. Returns (nil, io.EOF) when a VOD stream is fully consumed or the source ends.

type MixerReader added in v0.0.22

type MixerReader struct {
	// contains filtered or unexported fields
}

MixerReader implements ingestor.PacketReader for `mixer://` URLs.

Each source (video, audio) selects one of two modes at construction:

  • direct mode: subscribe upstream's main buffer (single-stream upstream; packets arrive as AVPackets). Uses the *Sub fields below.
  • ABR mode: subscribe upstream's best rendition (TS bytes) and demux via TSDemuxPacketReader. Uses the *Inner / *Chunk fields below.

The two modes can mix freely: e.g. ABR video + single-stream audio is the common case for "replace audio of an ABR camera with internet radio".

func NewMixerReader added in v0.0.22

func NewMixerReader(input domain.Input, bufSvc *buffer.Service, lookup StreamLookup) (*MixerReader, error)

NewMixerReader constructs a MixerReader from a mixer:// input. Errors:

  • URL malformed → propagates protocol error
  • either upstream missing in lookup → runtime error; coordinator surfaces as input degraded so the manager can switch to a fallback (none allowed in v1, so effectively the stream stops)

Mode per source: ABR upstream → tap best rendition + TS demux; otherwise → subscribe main buffer directly. Modes are independent for video vs audio.

func (*MixerReader) Close added in v0.0.22

func (r *MixerReader) Close() error

Close stops all sources + pumps. Idempotent.

func (*MixerReader) Open added in v0.0.22

func (r *MixerReader) Open(ctx context.Context) error

Open subscribes to both upstream buffers and (for ABR sources) starts pump goroutines that bridge TS-demux output into select-friendly channels. On any failure, partial state is rolled back.

func (*MixerReader) ReadPackets added in v0.0.22

func (r *MixerReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets blocks until one upstream packet arrives or ctx is done.

Selection rules:

  • video source ends (sub close or pump EOF) → io.EOF (always; downstream stream stops)
  • audio source ends + audio_failure=down (default) → io.EOF
  • audio source ends + audio_failure=continue → drop the audio source and keep forwarding video-only on subsequent calls
  • packets whose codec doesn't match the source channel are dropped (e.g. video upstream emitting an audio AAC packet — we want video only from the video source)

type RTMPReader

type RTMPReader struct {
	// contains filtered or unexported fields
}

RTMPReader connects to a remote RTMP server in play (pull) mode and emits domain.AVPacket.

Life cycle: NewRTMPReader → Open (blocks on TCP+RTMP handshake) → ReadPackets loop → Close. The underlying read goroutine exits when the server closes the connection or Close is called; both paths close pkts so ReadPackets returns io.EOF.

func NewRTMPReader

func NewRTMPReader(input domain.Input) *RTMPReader

NewRTMPReader constructs an RTMPReader without opening a connection.

func (*RTMPReader) Close

func (r *RTMPReader) Close() error

Close closes the RTMP connection; the readLoop then closes the packet channels.

func (*RTMPReader) Open

func (r *RTMPReader) Open(ctx context.Context) error

Open dials the RTMP source, negotiates codec data, and starts the background read loop. Blocks until the TCP connection + RTMP handshake + codec probing complete.

func (*RTMPReader) ReadPackets

func (r *RTMPReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets blocks until at least one AVPacket is available or the source ends.

type RTSPReader

type RTSPReader struct {
	// contains filtered or unexported fields
}

RTSPReader connects to a remote RTSP server in play (pull) mode and emits domain.AVPacket. It uses gortsplib for proper RTCP-based A/V synchronisation and RTP jitter buffering.

func NewRTSPReader

func NewRTSPReader(input domain.Input) *RTSPReader

NewRTSPReader constructs an RTSPReader without opening a connection.

func (*RTSPReader) Close

func (r *RTSPReader) Close() error

Close closes the RTSP connection and waits for the read loop to stop.

func (*RTSPReader) Open

func (r *RTSPReader) Open(ctx context.Context) error

Open dials the RTSP source, negotiates tracks, and starts the background read loop. It blocks until DESCRIBE + SETUP + PLAY complete.

func (*RTSPReader) ReadPackets

func (r *RTSPReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets blocks until at least one AVPacket is available or the source ends.

type SRTReader

type SRTReader struct {
	// contains filtered or unexported fields
}

SRTReader dials an SRT server in caller mode and emits raw MPEG-TS chunks. It implements TSChunkReader and is intended to wrap with NewTSDemuxPacketReader.

func NewSRTReader

func NewSRTReader(input domain.Input) *SRTReader

NewSRTReader constructs an SRTReader without opening a connection.

func (*SRTReader) Close

func (r *SRTReader) Close() error

Close stops the context watcher and closes the SRT socket. Safe to call before Open or more than once.

Wait for the watcher goroutine to exit BEFORE nilling out r.conn — the watcher may be inside `if r.conn != nil { r.conn.Close() }` when its callerCtx fires; mutating r.conn without that wait would race the read.

func (*SRTReader) Open

func (r *SRTReader) Open(ctx context.Context) error

Open dials the remote SRT endpoint in caller mode. All SRT options present in the URL query string or in input.Params are applied to the connection config via Config.UnmarshalURL.

func (*SRTReader) Read

func (r *SRTReader) Read(ctx context.Context) ([]byte, error)

Read returns the next raw MPEG-TS chunk from the SRT stream. Returns (nil, io.EOF) when the remote endpoint closes the connection.

type StreamLookup added in v0.0.20

type StreamLookup func(domain.StreamCode) (*domain.Stream, bool)

StreamLookup resolves an upstream stream by code. Plumbed from the API layer (or any layer holding the repo) so this package stays free of the store dependency.

type TSChunkReader

type TSChunkReader interface {
	Open(ctx context.Context) error
	Read(ctx context.Context) ([]byte, error)
	Close() error
}

TSChunkReader is an MPEG-TS byte source (UDP, HLS, file, SRT, …).

type TSDemuxOption

type TSDemuxOption func(*TSDemuxPacketReader)

TSDemuxOption configures a TSDemuxPacketReader at construction time.

func WithRealtimePacing

func WithRealtimePacing() TSDemuxOption

WithRealtimePacing enables DTS-based real-time throttling on AVPacket emission. Use it for chunk-based sources (HLS) that deliver bursts faster than playback rate; do not use it for live transports (UDP, SRT, RTMP) where the source already paces itself.

type TSDemuxPacketReader

type TSDemuxPacketReader struct {
	// contains filtered or unexported fields
}

TSDemuxPacketReader wraps a TSChunkReader and emits domain.AVPacket values.

func NewBufferTSDemuxReader added in v0.0.23

func NewBufferTSDemuxReader(bufSvc *buffer.Service, bufID domain.StreamCode) *TSDemuxPacketReader

NewBufferTSDemuxReader composes newBufferTSChunkReader with the TS demuxer to produce a *TSDemuxPacketReader that reads AVPackets straight from a Buffer Hub buffer carrying MPEG-TS bytes. Convenience for callers outside this package (coordinator's ABR-mixer pipeline) that need this combination without exposing the internal chunk-reader type.

func NewTSDemuxPacketReader

func NewTSDemuxPacketReader(r TSChunkReader, opts ...TSDemuxOption) *TSDemuxPacketReader

NewTSDemuxPacketReader wraps r and emits domain.AVPacket values.

func (*TSDemuxPacketReader) Close

func (d *TSDemuxPacketReader) Close() error

Close tears down both goroutines and the underlying reader. It is safe to call Close concurrently or multiple times.

func (*TSDemuxPacketReader) Open

Open starts the pump and demux goroutines.

func (*TSDemuxPacketReader) ReadPackets

func (d *TSDemuxPacketReader) ReadPackets(ctx context.Context) ([]domain.AVPacket, error)

ReadPackets blocks until at least one AVPacket is available, then drains as many as are ready (up to 256 at once) to amortise call overhead. Returns (nil, io.EOF) when the source is exhausted.

type UDPReader

type UDPReader struct {
	// contains filtered or unexported fields
}

UDPReader listens on a UDP port and emits raw MPEG-TS chunks. It implements TSChunkReader and is intended to be wrapped with NewTSDemuxPacketReader.

func NewUDPReader

func NewUDPReader(input domain.Input) *UDPReader

NewUDPReader constructs a UDPReader for the given input without opening a connection.

func (*UDPReader) Close

func (r *UDPReader) Close() error

Close stops the pump goroutine, closes the UDP socket, and waits for clean shutdown. Safe to call before Open or multiple times.

Lifetime ordering matters under -race: pump() reads r.conn for SetReadDeadline and ReadFromUDP, so r.conn = nil must NOT happen until wg.Wait() has returned. The same applies to r.done / r.chunks — pump may still be in flight when Close starts. We close the socket (which unblocks pump's syscall), wait for pump to exit, and only then null out the shared fields.

func (*UDPReader) LocalAddr

func (r *UDPReader) LocalAddr() net.Addr

LocalAddr returns the local address the socket is bound to. Returns nil if the socket has not been opened yet.

func (*UDPReader) Open

func (r *UDPReader) Open(_ context.Context) error

Open binds the UDP socket, optionally joins a multicast group, and starts the background pump goroutine.

Each Open() call creates a fresh socket so the reader can be reused after Close() by the ingestor reconnection loop.

func (*UDPReader) Read

func (r *UDPReader) Read(ctx context.Context) ([]byte, error)

Read returns the next validated MPEG-TS datagram (with any RTP header stripped).

When the pump goroutine exits due to a real network error, Read returns that error (not io.EOF) so the upstream pumpChunks goroutine can store it in TSDemuxPacketReader.readErr and the worker will reconnect instead of triggering permanent failover.

Returns (nil, io.EOF) only when Close() is called cleanly without a pump error. Returns (nil, ctx.Err()) when the caller's context is cancelled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL