publisher

package
v0.0.67 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 47 Imported by: 0

Documentation

Overview

Package publisher delivers transcoded streams to all outputs.

It handles two distinct output types:

  • Serve endpoints (HLS, DASH, RTSP, RTMP listen, SRT listen): the server listens, clients connect and pull. HLS uses MPEG-TS segments + m3u8; DASH uses fMP4 (init + .m4s) + dynamic MPD (Eyevinn mp4ff); RTSP/RTMP/SRT use gortsplib/gomedia/gosrt — no FFmpeg in this package. RTSP, RTMP play, and SRT listen each use one shared TCP/UDP port (configured under listeners.{rtsp,rtmp,srt}.port — same port serves both ingest and play); streams are selected by path (/live/<code>), RTMP app "live", or SRT streamid (live/<code>).
  • Push destinations: rtmp:// (plain TCP) and rtmps:// (TLS, default port 443) via gomedia go-rtmp client; other schemes return a clear error.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ABRRepMeta

type ABRRepMeta struct {
	Slug   string
	BwBps  int
	Width  int
	Height int
}

ABRRepMeta carries updated metadata for one ABR rendition used in in-place master playlist rewrites (e.g. after a profile bitrate/resolution change without an ABR ladder structure change).

type PushSnapshot added in v0.0.14

type PushSnapshot struct {
	URL         string              `json:"url"`
	Status      PushStatus          `json:"status"`
	Attempt     int                 `json:"attempt"`
	ConnectedAt *time.Time          `json:"connected_at,omitempty"`
	Errors      []domain.ErrorEntry `json:"errors,omitempty"`
}

PushSnapshot is the JSON-safe snapshot of one push destination's runtime state.

type PushStatus added in v0.0.14

type PushStatus string

PushStatus is the lifecycle state of one outbound push destination.

const (
	// PushStatusStarting — packager goroutine has spawned, no handshake attempt yet.
	PushStatusStarting PushStatus = "starting"
	// PushStatusActive — last connect succeeded; session is currently sending media.
	PushStatusActive PushStatus = "active"
	// PushStatusReconnecting — last session ended (server close / write error / handshake fail);
	// waiting RetryTimeoutSec before next attempt.
	PushStatusReconnecting PushStatus = "reconnecting"
	// PushStatusFailed — exhausted dest.Limit attempts; goroutine has given up.
	PushStatusFailed PushStatus = "failed"
)

Status values reported via RuntimeStatus.

type RuntimeStatus added in v0.0.14

type RuntimeStatus struct {
	Pushes []PushSnapshot `json:"pushes"`
}

RuntimeStatus is the JSON-safe snapshot of all push destinations for one stream.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages all output workers for active streams.

func New

func New(i do.Injector) (*Service, error)

New creates a Service and registers it with the DI injector.

func NewServiceForTesting

func NewServiceForTesting(cfg config.PublisherConfig, buf *buffer.Service, bus events.Bus) *Service

NewServiceForTesting creates a Service without DI, for use in integration tests.

func (*Service) HandleRTMPPlay

func (s *Service) HandleRTMPPlay(
	ctx context.Context,
	key string,
	info push.PlayInfo,
	writeFrame func(cid gocodec.CodecID, data []byte, pts, dts uint32) error,
) error

HandleRTMPPlay is the play handler registered with the ingestor's RTMP server. It subscribes to the Buffer Hub for the given stream key, demuxes the TS stream, and calls writeFrame for each H.264/AAC frame until ctx is cancelled. Returns an error when the stream is not active (caller closes the connection).

Signature matches push.PlayFunc — the info argument carries remote-peer metadata captured by the RTMP server at OnPlay handshake time and is forwarded into the play-sessions tracker for the API.

func (*Service) RestartHLSDASH

func (s *Service) RestartHLSDASH(ctx context.Context, stream *domain.Stream) error

RestartHLSDASH stops and restarts only the HLS and DASH goroutines with the new stream config. Used when the ABR ladder count changes (profile added/removed) so the master playlist and per-shard segmenters reflect the new rendition set. RTSP, RTMP, and SRT goroutines are unaffected.

func (*Service) RunRTSPPlayServer

func (s *Service) RunRTSPPlayServer(ctx context.Context) error

RunRTSPPlayServer starts the RTSP play listener. Returns nil immediately when listeners.rtsp is disabled.

func (*Service) RunSRTPlayServer

func (s *Service) RunSRTPlayServer(ctx context.Context) error

RunSRTPlayServer starts the SRT play listener. Returns nil immediately when listeners.srt is disabled.

func (*Service) RuntimeStatus added in v0.0.14

func (s *Service) RuntimeStatus(streamID domain.StreamCode) (RuntimeStatus, bool)

RuntimeStatus returns a snapshot of all push destinations for the given stream, sorted by URL for stable output. Returns ok=false when no push is registered for the stream (no destinations configured, or all torn down).

func (*Service) SetListeners added in v0.0.53

func (s *Service) SetListeners(l config.ListenersConfig)

SetListeners hot-swaps the shared listeners config. The next invocation of RunRTSPPlayServer or RunSRTPlayServer will read the new value at the top of its run loop. Already-running RTSP / SRT servers keep their old bind address until runtime restarts them — see runtime.diff for the "SetListeners then restart" sequencing.

func (*Service) Start

func (s *Service) Start(ctx context.Context, stream *domain.Stream) error

Start launches all serve-endpoints and push-destination workers for a stream.

func (*Service) Stop

func (s *Service) Stop(streamID domain.StreamCode)

Stop cancels all output workers for a stream, waits for them to finish, and removes the on-disk segment directories (HLS/DASH).

func (*Service) UpdateABRMasterMeta

func (s *Service) UpdateABRMasterMeta(streamCode domain.StreamCode, updates []ABRRepMeta)

UpdateABRMasterMeta applies updated bandwidth/resolution metadata to the running HLS ABR master playlist writer. This avoids a full publisher restart when only a profile's bitrate or resolution changes (but the ladder count stays the same).

func (*Service) UpdateProtocols

func (s *Service) UpdateProtocols(ctx context.Context, old, new *domain.Stream) error

UpdateProtocols surgically stops/starts only the protocol goroutines that changed between old and new stream configs. Goroutines for unchanged protocols keep running. Call this when diff.ProtocolsChanged || diff.PushChanged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL