Documentation
¶
Overview ¶
Package publisher delivers transcoded streams to all outputs.
It handles two distinct output types:
- Serve endpoints (HLS, DASH, RTSP, RTMP listen, SRT listen): the server listens, clients connect and pull. HLS uses MPEG-TS segments + m3u8; DASH uses fMP4 (init + .m4s) + dynamic MPD (Eyevinn mp4ff); RTSP/RTMP/SRT use gortsplib/gomedia/gosrt — no FFmpeg in this package. RTSP, RTMP play, and SRT listen each use one shared TCP/UDP port (configured under listeners.{rtsp,rtmp,srt}.port — same port serves both ingest and play); streams are selected by path (/live/<code>), RTMP app "live", or SRT streamid (live/<code>).
- Push destinations: rtmp:// (plain TCP) and rtmps:// (TLS, default port 443) via gomedia go-rtmp client; other schemes return a clear error.
Index ¶
- type ABRRepMeta
- type PushSnapshot
- type PushStatus
- type RuntimeStatus
- type Service
- func (s *Service) HandleRTMPPlay(ctx context.Context, key string, info push.PlayInfo, ...) error
- func (s *Service) RestartHLSDASH(ctx context.Context, stream *domain.Stream) error
- func (s *Service) RunRTSPPlayServer(ctx context.Context) error
- func (s *Service) RunSRTPlayServer(ctx context.Context) error
- func (s *Service) RuntimeStatus(streamID domain.StreamCode) (RuntimeStatus, bool)
- func (s *Service) SetListenersForTesting(l config.ListenersConfig)
- func (s *Service) Start(ctx context.Context, stream *domain.Stream) error
- func (s *Service) Stop(streamID domain.StreamCode)
- func (s *Service) UpdateABRMasterMeta(streamCode domain.StreamCode, updates []ABRRepMeta)
- func (s *Service) UpdateProtocols(ctx context.Context, old, new *domain.Stream) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ABRRepMeta ¶
ABRRepMeta carries updated metadata for one ABR rendition used in in-place master playlist rewrites (e.g. after a profile bitrate/resolution change without an ABR ladder structure change).
type PushSnapshot ¶ added in v0.0.14
type PushSnapshot struct {
URL string `json:"url"`
Status PushStatus `json:"status"`
Attempt int `json:"attempt"`
ConnectedAt *time.Time `json:"connected_at,omitempty"`
Errors []domain.ErrorEntry `json:"errors,omitempty"`
}
PushSnapshot is the JSON-safe snapshot of one push destination's runtime state.
type PushStatus ¶ added in v0.0.14
type PushStatus string
PushStatus is the lifecycle state of one outbound push destination.
const ( // PushStatusStarting — packager goroutine has spawned, no handshake attempt yet. PushStatusStarting PushStatus = "starting" // PushStatusActive — last connect succeeded; session is currently sending media. PushStatusActive PushStatus = "active" // PushStatusReconnecting — last session ended (server close / write error / handshake fail); // waiting RetryTimeoutSec before next attempt. PushStatusReconnecting PushStatus = "reconnecting" // PushStatusFailed — exhausted dest.Limit attempts; goroutine has given up. PushStatusFailed PushStatus = "failed" )
Status values reported via RuntimeStatus.
type RuntimeStatus ¶ added in v0.0.14
type RuntimeStatus struct {
Pushes []PushSnapshot `json:"pushes"`
}
RuntimeStatus is the JSON-safe snapshot of all push destinations for one stream.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages all output workers for active streams.
func NewServiceForTesting ¶
NewServiceForTesting creates a Service without DI, for use in integration tests.
func (*Service) HandleRTMPPlay ¶
func (s *Service) HandleRTMPPlay( ctx context.Context, key string, info push.PlayInfo, writeFrame func(cid gocodec.CodecID, data []byte, pts, dts uint32) error, ) error
HandleRTMPPlay is the play handler registered with the ingestor's RTMP server. It subscribes to the Buffer Hub for the given stream key, demuxes the TS stream, and calls writeFrame for each H.264/AAC frame until ctx is cancelled. Returns an error when the stream is not active (caller closes the connection).
Signature matches push.PlayFunc — the info argument carries remote-peer metadata captured by the RTMP server at OnPlay handshake time and is forwarded into the play-sessions tracker for the API.
func (*Service) RestartHLSDASH ¶
RestartHLSDASH stops and restarts only the HLS and DASH goroutines with the new stream config. Used when the ABR ladder count changes (profile added/removed) so the master playlist and per-shard segmenters reflect the new rendition set. RTSP, RTMP, and SRT goroutines are unaffected.
func (*Service) RunRTSPPlayServer ¶
RunRTSPPlayServer starts the RTSP play listener. Returns nil immediately when listeners.rtsp is disabled.
func (*Service) RunSRTPlayServer ¶
RunSRTPlayServer starts the SRT play listener. Returns nil immediately when listeners.srt is disabled.
func (*Service) RuntimeStatus ¶ added in v0.0.14
func (s *Service) RuntimeStatus(streamID domain.StreamCode) (RuntimeStatus, bool)
RuntimeStatus returns a snapshot of all push destinations for the given stream, sorted by URL for stable output. Returns ok=false when no push is registered for the stream (no destinations configured, or all torn down).
func (*Service) SetListenersForTesting ¶
func (s *Service) SetListenersForTesting(l config.ListenersConfig)
SetListenersForTesting overrides the shared listeners config without going through DI. Use only from tests that exercise RunRTSPPlayServer / RunSRTPlayServer.
func (*Service) Start ¶
Start launches all serve-endpoints and push-destination workers for a stream.
func (*Service) Stop ¶
func (s *Service) Stop(streamID domain.StreamCode)
Stop cancels all output workers for a stream, waits for them to finish, and removes the on-disk segment directories (HLS/DASH).
func (*Service) UpdateABRMasterMeta ¶
func (s *Service) UpdateABRMasterMeta(streamCode domain.StreamCode, updates []ABRRepMeta)
UpdateABRMasterMeta applies updated bandwidth/resolution metadata to the running HLS ABR master playlist writer. This avoids a full publisher restart when only a profile's bitrate or resolution changes (but the ladder count stays the same).
func (*Service) UpdateProtocols ¶
UpdateProtocols surgically stops/starts only the protocol goroutines that changed between old and new stream configs. Goroutines for unchanged protocols keep running. Call this when diff.ProtocolsChanged || diff.PushChanged.