ingestor

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package ingestor handles raw stream ingestion.

The ingestor accepts a plain URL from the user and automatically derives the protocol from the URL scheme — no protocol configuration is needed.

Pull mode (server connects to remote source). All pull paths yield domain.AVPacket via PacketReader / NewPacketReader:

  • UDP MPEG-TS → TS demux → AVPacket
  • HLS playlist → HTTP + M3U8 → TS demux → AVPacket
  • Local file → resolved through the VOD registry → TS demux → AVPacket
  • SRT pull → gosrt → TS demux → AVPacket
  • RTMP pull → native RTMP → AVPacket
  • RTSP pull → gortsplib → AVPacket

Push mode (external encoder connects to our server):

  • RTMP → RTMPServer (gomedia RTMP server handle, native FLV→MPEG-TS)
  • SRT → SRTServer (gosrt native, MPEG-TS passthrough)

Push mode is auto-detected: if the URL host is 0.0.0.0 / :: and the scheme is rtmp or srt, the ingestor starts/uses the shared push server instead of a pull worker.

Local file inputs MUST take the form file://<vod_mount>/<relative/path>; bare paths and absolute file:/// URLs are rejected so the VOD mount layer is the single point of policy.

Package ingestor handles raw stream ingestion.

The ingestor accepts a plain URL and automatically derives everything from it:

  • Protocol (RTMP, RTSP, SRT, UDP, HLS, file, …) → from URL scheme
  • Mode (pull vs push-listen) → from URL host (wildcard = listen)

Pull workers are created per-stream and reconnect automatically. Push servers (RTMP, SRT) are global, started once, and route by stream key.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PacketReader

type PacketReader interface {
	Open(ctx context.Context) error
	ReadPackets(ctx context.Context) ([]domain.AVPacket, error)
	Close() error
}

PacketReader is a pull source that yields elementary-stream access units (domain.AVPacket).

func NewPacketReader

func NewPacketReader(input domain.Input, cfg config.IngestorConfig, vods VODResolver) (PacketReader, error)

NewPacketReader constructs the appropriate PacketReader for the given input URL. RTSP and RTMP pull emit native AVPackets; MPEG-TS transports are demuxed to AVPackets.

For file:// URLs the VODResolver translates the URL to an absolute host path; a nil resolver or an unknown mount both produce an error. Any other scheme ignores the resolver.

Returns an error when:

  • The URL scheme is unrecognised
  • The URL describes a push-listen address (handled by the push servers)
  • The URL is a file:// reference that cannot be resolved against a VOD mount

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps ingest credentials (stream code used as key) to the destination buffer for a specific stream. Push servers (RTMP, SRT) use this to route incoming connections to the correct Buffer Hub slot.

Only streams configured with a publish:// input URL are registered here. The registry key is always the stream code — encoders do not need to know an app name; the stream code alone is sufficient to locate the slot.

func NewRegistry

func NewRegistry() *Registry

NewRegistry constructs an empty Registry.

func (*Registry) Acquire

func (r *Registry) Acquire(key string) (bufferWriteID domain.StreamCode, streamID domain.StreamCode, buf *buffer.Service, err error)

Acquire atomically marks the slot for key as occupied and returns its targets. Returns push.ErrStreamAlreadyActive if another pusher is currently connected. Returns an error if key is not registered.

func (*Registry) Lookup

func (r *Registry) Lookup(key string) (bufferWriteID domain.StreamCode, streamID domain.StreamCode, buf *buffer.Service, err error)

Lookup returns the buffer write target, logical stream id, and buffer service for key without changing the active-pusher state. Used for pre-flight checks (e.g. SRT HandleConnect).

func (*Registry) Register

func (r *Registry) Register(key string, streamID domain.StreamCode, buf *buffer.Service, bufferWriteID domain.StreamCode)

Register maps key to a stream's buffer. bufferWriteID is the slot used for buf.Write; if empty, streamID is used. Registering the same key twice overwrites the previous entry and clears any stale active-pusher mark (e.g. after a server restart).

func (*Registry) Release

func (r *Registry) Release(key string)

Release clears the active-pusher mark for key so the next encoder can Acquire. It is a no-op if key is not registered or was never acquired.

func (*Registry) Unregister

func (r *Registry) Unregister(key string)

Unregister removes a key from the registry.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages the full ingest layer.

  • Pull inputs: one goroutine per active stream, auto-reconnects.
  • Push inputs: stream key is registered in the push server routing table; the next encoder connection for that key is accepted and routed here.

func New

func New(i do.Injector) (*Service, error)

New creates a Service and registers it with the DI injector.

func (*Service) Probe

func (s *Service) Probe(ctx context.Context, input domain.Input) error

Probe performs a short pull-read health probe for one input. It is used by the manager to verify degraded inputs before failback.

func (*Service) Run

func (s *Service) Run(ctx context.Context) error

Run starts the shared push/play listeners (RTMP) and blocks until ctx is cancelled. Other shared listeners (SRT, RTSP) are owned by the publisher and run from runtime.Manager; the same network port serves both ingest and play because the listeners config is the single source of truth.

func (*Service) SetInputErrorObserver

func (s *Service) SetInputErrorObserver(fn func(streamID domain.StreamCode, inputPriority int, err error))

SetInputErrorObserver configures a callback invoked on input read/open failures.

func (*Service) SetPacketObserver

func (s *Service) SetPacketObserver(fn func(streamID domain.StreamCode, inputPriority int))

SetPacketObserver configures a callback invoked for each packet read.

func (*Service) SetRTMPPlayHandler

func (s *Service) SetRTMPPlayHandler(fn push.PlayFunc)

SetRTMPPlayHandler registers an external play handler on the RTMP server. Must be called before Run. If the RTMP server is not enabled this is a no-op.

func (*Service) Start

func (s *Service) Start(ctx context.Context, streamID domain.StreamCode, input domain.Input, bufferWriteID domain.StreamCode) error

Start activates ingest for the given input on streamID.

The mode is derived from the URL:

  • publish:// → push-listen: register the stream code in the push server routing table so encoders can connect with just the stream code as key.
  • rtmp://0.0.0.0:... / srt://0.0.0.0:... → legacy push-listen (same effect).
  • Any other URL → pull: start a goroutine that connects to the URL.

Calling Start again for the same stream replaces the previous worker/registration. bufferWriteID is the Buffer Hub slot for MPEG-TS writes; if empty, streamID is used.

func (*Service) Stop

func (s *Service) Stop(streamID domain.StreamCode)

Stop cancels the pull worker or unregisters the push key for streamID.

type VODResolver

type VODResolver interface {
	Resolve(rawURL string) (path string, loop bool, err error)
}

VODResolver resolves a file:// URL referencing a registered VOD mount to an absolute filesystem path. It is the contract between the ingestor and the internal/vod registry — kept as an interface so tests can stub it.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL