Documentation
¶
Overview ¶
Package publisher delivers transcoded streams to all outputs.
It handles two distinct output types:
- Serve endpoints (HLS, DASH, RTSP, RTMP listen, SRT listen): the server listens, clients connect and pull. HLS uses MPEG-TS segments + m3u8; DASH uses fMP4 (init + .m4s) + dynamic MPD (Eyevinn mp4ff); RTSP/RTMP/SRT use gortsplib/gomedia/gosrt — no FFmpeg in this package. RTSP, RTMP play, and SRT listen each use one shared TCP/UDP port (configured under listeners.{rtsp,rtmp,srt}.port — same port serves both ingest and play); streams are selected by path (/live/<code>), RTMP app "live", or SRT streamid (live/<code>).
- Push destinations: rtmp:// (plain TCP) and rtmps:// (TLS, default port 443) via gomedia go-rtmp client; other schemes return a clear error.
Index ¶
- type ABRRepMeta
- type PushSnapshot
- type PushStatus
- type RuntimeStatus
- type Service
- func (s *Service) HandleMPEGTS() http.HandlerFunc
- func (s *Service) HandleRTMPPlay(ctx context.Context, key string, info push.PlayInfo, ...) error
- func (s *Service) RestartHLSDASH(ctx context.Context, stream *domain.Stream) error
- func (s *Service) RunRTSPPlayServer(ctx context.Context) error
- func (s *Service) RunSRTPlayServer(ctx context.Context) error
- func (s *Service) RuntimeStatus(streamID domain.StreamCode) (RuntimeStatus, bool)
- func (s *Service) SetListeners(l config.ListenersConfig)
- func (s *Service) Start(ctx context.Context, stream *domain.Stream) error
- func (s *Service) Stop(streamID domain.StreamCode)
- func (s *Service) UpdateABRMasterMeta(streamCode domain.StreamCode, updates []ABRRepMeta)
- func (s *Service) UpdateProtocols(ctx context.Context, old, new *domain.Stream) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ABRRepMeta ¶
ABRRepMeta carries updated metadata for one ABR rendition used in in-place master playlist rewrites (e.g. after a profile bitrate/resolution change without an ABR ladder structure change).
type PushSnapshot ¶ added in v0.0.14
type PushSnapshot struct {
URL string `json:"url"`
Status PushStatus `json:"status"`
Attempt int `json:"attempt"`
ConnectedAt *time.Time `json:"connected_at,omitempty"`
Errors []domain.ErrorEntry `json:"errors,omitempty"`
}
PushSnapshot is the JSON-safe snapshot of one push destination's runtime state.
type PushStatus ¶ added in v0.0.14
type PushStatus string
PushStatus is the lifecycle state of one outbound push destination.
const ( // PushStatusStarting — packager goroutine has spawned, no handshake attempt yet. PushStatusStarting PushStatus = "starting" // PushStatusActive — last connect succeeded; session is currently sending media. PushStatusActive PushStatus = "active" // PushStatusReconnecting — last session ended (server close / write error / handshake fail); // waiting RetryTimeoutSec before next attempt. PushStatusReconnecting PushStatus = "reconnecting" // PushStatusFailed — exhausted dest.Limit attempts; goroutine has given up. PushStatusFailed PushStatus = "failed" )
Status values reported via RuntimeStatus.
type RuntimeStatus ¶ added in v0.0.14
type RuntimeStatus struct {
Pushes []PushSnapshot `json:"pushes"`
}
RuntimeStatus is the JSON-safe snapshot of all push destinations for one stream.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages all output workers for active streams.
func NewServiceForTesting ¶
NewServiceForTesting creates a Service without DI, for use in integration tests.
func (*Service) HandleMPEGTS ¶ added in v0.0.68
func (s *Service) HandleMPEGTS() http.HandlerFunc
HandleMPEGTS returns an http.HandlerFunc that streams raw MPEG-TS bytes from the playback buffer hub for the {code} URL parameter. Used by the API server to mount /<code>/mpegts in the same router group as the HLS / DASH file routes (so the sessions middleware tracks viewers uniformly across protocols).
Behaviour:
- 404 if the stream is not running, or its MPEGTS protocol flag is off.
- 500 if the buffer subscribe fails (transient — buffer was deleted between the runtime check and Subscribe).
- On success, writes raw TS bytes with chunked transfer encoding until the client disconnects, the stream stops, or the request context is cancelled.
The handler intentionally does not flush every packet — Go's net/http chunked writer flushes on its own buffer threshold (~4 KiB) which is a good latency / syscall trade-off for typical TS bitrates. Operators that need per-packet flushing can lower the buffer-hub chunk size at the source (e.g. UDP datagram size) instead of churning syscalls here.
func (*Service) HandleRTMPPlay ¶
func (s *Service) HandleRTMPPlay( ctx context.Context, key string, info push.PlayInfo, session *rtmp.ServerSession, ) error
HandleRTMPPlay is the play handler registered with the ingestor's RTMP server. It subscribes to the Buffer Hub for the given stream key, demuxes the TS stream, and writes H.264 / AAC frames into the lal RTMP session until ctx is cancelled. Returns an error when the stream is not active (caller closes the connection).
Signature matches push.PlayFunc — the info argument carries remote-peer metadata captured by the RTMP server at OnPlay handshake time and is forwarded into the play-sessions tracker for the API.
func (*Service) RestartHLSDASH ¶
RestartHLSDASH stops and restarts only the HLS and DASH goroutines with the new stream config. Used when the ABR ladder count changes (profile added/removed) so the master playlist and per-shard segmenters reflect the new rendition set. RTSP, RTMP, and SRT goroutines are unaffected.
func (*Service) RunRTSPPlayServer ¶
RunRTSPPlayServer starts the RTSP play listener. Returns nil immediately when listeners.rtsp is disabled.
func (*Service) RunSRTPlayServer ¶
RunSRTPlayServer starts the SRT play listener. Returns nil immediately when listeners.srt is disabled.
func (*Service) RuntimeStatus ¶ added in v0.0.14
func (s *Service) RuntimeStatus(streamID domain.StreamCode) (RuntimeStatus, bool)
RuntimeStatus returns a snapshot of all push destinations for the given stream, sorted by URL for stable output. Returns ok=false when no push is registered for the stream (no destinations configured, or all torn down).
func (*Service) SetListeners ¶ added in v0.0.53
func (s *Service) SetListeners(l config.ListenersConfig)
SetListeners hot-swaps the shared listeners config. The next invocation of RunRTSPPlayServer or RunSRTPlayServer will read the new value at the top of its run loop. Already-running RTSP / SRT servers keep their old bind address until runtime restarts them — see runtime.diff for the "SetListeners then restart" sequencing.
func (*Service) Start ¶
Start launches all serve-endpoints and push-destination workers for a stream.
func (*Service) Stop ¶
func (s *Service) Stop(streamID domain.StreamCode)
Stop cancels all output workers for a stream, waits for them to finish, and removes the on-disk segment directories (HLS/DASH).
func (*Service) UpdateABRMasterMeta ¶
func (s *Service) UpdateABRMasterMeta(streamCode domain.StreamCode, updates []ABRRepMeta)
UpdateABRMasterMeta applies updated bandwidth/resolution metadata to the running HLS ABR master playlist writer. This avoids a full publisher restart when only a profile's bitrate or resolution changes (but the ladder count stays the same).
func (*Service) UpdateProtocols ¶
UpdateProtocols surgically stops/starts only the protocol goroutines that changed between old and new stream configs. Goroutines for unchanged protocols keep running. Call this when diff.ProtocolsChanged || diff.PushChanged.