Documentation
¶
Overview ¶
Package ingestor handles raw stream ingestion.
The ingestor accepts a plain URL from the user and automatically derives the protocol from the URL scheme — no protocol configuration is needed.
Pull mode (server connects to remote source). All pull paths yield domain.AVPacket via PacketReader / NewPacketReader:
- UDP MPEG-TS → TS demux → AVPacket
- HLS playlist → HTTP + M3U8 → TS demux → AVPacket
- Local file → resolved through the VOD registry → TS demux → AVPacket
- SRT pull → gosrt → TS demux → AVPacket
- RTMP pull → native RTMP → AVPacket
- RTSP pull → gortsplib → AVPacket
Push mode (external encoder connects to our server):
- RTMP → RTMPServer (gomedia RTMP server handle, native FLV→MPEG-TS)
- SRT → SRTServer (gosrt native, MPEG-TS passthrough)
Push mode is auto-detected: if the URL host is 0.0.0.0 / :: and the scheme is rtmp or srt, the ingestor starts/uses the shared push server instead of a pull worker.
Local file inputs MUST take the form file://<vod_mount>/<relative/path>; bare paths and absolute file:/// URLs are rejected so the VOD mount layer is the single point of policy.
Package ingestor handles raw stream ingestion.
The ingestor accepts a plain URL and automatically derives everything from it:
- Protocol (RTMP, RTSP, SRT, UDP, HLS, file, …) → from URL scheme
- Mode (pull vs push-listen) → from URL host (wildcard = listen)
Pull workers are created per-stream and reconnect automatically. Push servers (RTMP, SRT) are global, started once, and route by stream key.
Index ¶
- type PacketReader
- type Registry
- func (r *Registry) Acquire(key string) (bufferWriteID domain.StreamCode, streamID domain.StreamCode, ...)
- func (r *Registry) Lookup(key string) (bufferWriteID domain.StreamCode, streamID domain.StreamCode, ...)
- func (r *Registry) Register(key string, streamID domain.StreamCode, buf *buffer.Service, ...)
- func (r *Registry) Release(key string)
- func (r *Registry) Unregister(key string)
- type Service
- func (s *Service) Probe(ctx context.Context, input domain.Input) error
- func (s *Service) Run(ctx context.Context) error
- func (s *Service) SetInputErrorObserver(fn func(streamID domain.StreamCode, inputPriority int, err error))
- func (s *Service) SetPacketObserver(fn func(streamID domain.StreamCode, inputPriority int))
- func (s *Service) SetRTMPPlayHandler(fn push.PlayFunc)
- func (s *Service) SetStreamLookup(fn pull.StreamLookup)
- func (s *Service) Start(ctx context.Context, streamID domain.StreamCode, input domain.Input, ...) error
- func (s *Service) Stop(streamID domain.StreamCode)
- type VODResolver
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PacketReader ¶
type PacketReader interface {
Open(ctx context.Context) error
ReadPackets(ctx context.Context) ([]domain.AVPacket, error)
Close() error
}
PacketReader is a pull source that yields elementary-stream access units (domain.AVPacket).
func NewPacketReader ¶
func NewPacketReader( input domain.Input, cfg config.IngestorConfig, vods VODResolver, buf *buffer.Service, lookup pull.StreamLookup, ) (PacketReader, error)
NewPacketReader constructs the appropriate PacketReader for the given input URL. RTSP and RTMP pull emit native AVPackets; MPEG-TS transports are demuxed to AVPackets.
For file:// URLs the VODResolver translates the URL to an absolute host path; a nil resolver or an unknown mount both produce an error. Any other scheme ignores the resolver.
For copy:// URLs the buffer service + stream lookup are required:
- buf: provides the upstream subscriber
- lookup: resolves upstream stream config to find the buffer ID and verify the upstream is single-stream (ABR upstreams use a different coordinator path, never this factory)
Returns an error when:
- The URL scheme is unrecognised
- The URL describes a push-listen address (handled by the push servers)
- The URL is a file:// reference that cannot be resolved against a VOD mount
- The URL is a copy:// reference and buf or lookup is nil, the upstream is missing, or the upstream has an ABR ladder
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maps ingest credentials (stream code used as key) to the destination buffer for a specific stream. Push servers (RTMP, SRT) use this to route incoming connections to the correct Buffer Hub slot.
Only streams configured with a publish:// input URL are registered here. The registry key is always the stream code — encoders do not need to know an app name; the stream code alone is sufficient to locate the slot.
func (*Registry) Acquire ¶
func (r *Registry) Acquire(key string) (bufferWriteID domain.StreamCode, streamID domain.StreamCode, buf *buffer.Service, err error)
Acquire atomically marks the slot for key as occupied and returns its targets. Returns push.ErrStreamAlreadyActive if another pusher is currently connected. Returns an error if key is not registered.
func (*Registry) Lookup ¶
func (r *Registry) Lookup(key string) (bufferWriteID domain.StreamCode, streamID domain.StreamCode, buf *buffer.Service, err error)
Lookup returns the buffer write target, logical stream id, and buffer service for key without changing the active-pusher state. Used for pre-flight checks (e.g. SRT HandleConnect).
func (*Registry) Register ¶
func (r *Registry) Register(key string, streamID domain.StreamCode, buf *buffer.Service, bufferWriteID domain.StreamCode)
Register maps key to a stream's buffer. bufferWriteID is the slot used for buf.Write; if empty, streamID is used. Registering the same key twice overwrites the previous entry and clears any stale active-pusher mark (e.g. after a server restart).
func (*Registry) Release ¶
Release clears the active-pusher mark for key so the next encoder can Acquire. It is a no-op if key is not registered or was never acquired.
func (*Registry) Unregister ¶
Unregister removes a key from the registry.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages the full ingest layer.
- Pull inputs: one goroutine per active stream, auto-reconnects.
- Push inputs: stream key is registered in the push server routing table; the next encoder connection for that key is accepted and routed here.
func (*Service) Probe ¶
Probe performs a short pull-read health probe for one input. It is used by the manager to verify degraded inputs before failback.
func (*Service) Run ¶
Run starts the shared push/play listeners (RTMP) and blocks until ctx is cancelled. Other shared listeners (SRT, RTSP) are owned by the publisher and run from runtime.Manager; the same network port serves both ingest and play because the listeners config is the single source of truth.
func (*Service) SetInputErrorObserver ¶
func (s *Service) SetInputErrorObserver(fn func(streamID domain.StreamCode, inputPriority int, err error))
SetInputErrorObserver configures a callback invoked on input read/open failures.
func (*Service) SetPacketObserver ¶
func (s *Service) SetPacketObserver(fn func(streamID domain.StreamCode, inputPriority int))
SetPacketObserver configures a callback invoked for each packet read.
func (*Service) SetRTMPPlayHandler ¶
SetRTMPPlayHandler registers an external play handler on the RTMP server. Must be called before Run. If the RTMP server is not enabled this is a no-op.
func (*Service) SetStreamLookup ¶ added in v0.0.20
func (s *Service) SetStreamLookup(fn pull.StreamLookup)
SetStreamLookup wires the upstream-stream resolver used by `copy://` inputs. Called once from main.go after the store is available; passing `nil` (or never calling) means copy:// inputs will fail at NewPacketReader with an explicit error message.
func (*Service) Start ¶
func (s *Service) Start(ctx context.Context, streamID domain.StreamCode, input domain.Input, bufferWriteID domain.StreamCode) error
Start activates ingest for the given input on streamID.
The mode is derived from the URL:
- publish:// → push-listen: register the stream code in the push server routing table so encoders can connect with just the stream code as key.
- rtmp://0.0.0.0:... / srt://0.0.0.0:... → legacy push-listen (same effect).
- Any other URL → pull: start a goroutine that connects to the URL.
Calling Start again for the same stream replaces the previous worker/registration. bufferWriteID is the Buffer Hub slot for MPEG-TS writes; if empty, streamID is used.
func (*Service) Stop ¶
func (s *Service) Stop(streamID domain.StreamCode)
Stop cancels the pull worker or unregisters the push key for streamID.
type VODResolver ¶
VODResolver resolves a file:// URL referencing a registered VOD mount to an absolute filesystem path. It is the contract between the ingestor and the internal/vod registry — kept as an interface so tests can stub it.