publisher

package
v0.0.84 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 49 Imported by: 0

Documentation

Overview

Package publisher delivers transcoded streams to all outputs.

It handles two distinct output types:

  • Serve endpoints (HLS, DASH, RTSP, RTMP listen, SRT listen): the server listens, clients connect and pull. HLS uses MPEG-TS segments + m3u8; DASH uses fMP4 (init + .m4s) + dynamic MPD (Eyevinn mp4ff); RTSP/RTMP/SRT use gortsplib/gomedia/gosrt — no FFmpeg in this package. RTSP, RTMP play, and SRT listen each use one shared TCP/UDP port (configured under listeners.{rtsp,rtmp,srt}.port — same port serves both ingest and play); streams are selected by path (/live/<code>), RTMP app "live", or SRT streamid (live/<code>).
  • Push destinations: rtmp:// (plain TCP) and rtmps:// (TLS, default port 443) via gomedia go-rtmp client; other schemes return a clear error.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ABRRepMeta

type ABRRepMeta struct {
	Slug   string
	BwBps  int
	Width  int
	Height int
}

ABRRepMeta carries updated metadata for one ABR rendition used in in-place master playlist rewrites (e.g. after a profile bitrate/resolution change without an ABR ladder structure change).

type PushSnapshot added in v0.0.14

type PushSnapshot struct {
	URL         string              `json:"url"`
	Status      PushStatus          `json:"status"`
	Attempt     int                 `json:"attempt"`
	ConnectedAt *time.Time          `json:"connected_at,omitempty"`
	Errors      []domain.ErrorEntry `json:"errors,omitempty"`
}

PushSnapshot is the JSON-safe snapshot of one push destination's runtime state.

type PushStatus added in v0.0.14

type PushStatus string

PushStatus is the lifecycle state of one outbound push destination.

const (
	// PushStatusStarting — packager goroutine has spawned, no handshake attempt yet.
	PushStatusStarting PushStatus = "starting"
	// PushStatusActive — last connect succeeded; session is currently sending media.
	PushStatusActive PushStatus = "active"
	// PushStatusReconnecting — last session ended (server close / write error / handshake fail);
	// waiting RetryTimeoutSec before next attempt.
	PushStatusReconnecting PushStatus = "reconnecting"
	// PushStatusFailed — exhausted dest.Limit attempts; goroutine has given up.
	PushStatusFailed PushStatus = "failed"
)

Status values reported via RuntimeStatus.

type RuntimeStatus added in v0.0.14

type RuntimeStatus struct {
	Pushes []PushSnapshot `json:"pushes"`
}

RuntimeStatus is the JSON-safe snapshot of all push destinations for one stream.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages all output workers for active streams.

func New

func New(i do.Injector) (*Service, error)

New creates a Service and registers it with the DI injector.

func NewServiceForTesting

func NewServiceForTesting(cfg config.PublisherConfig, buf *buffer.Service, bus events.Bus) *Service

NewServiceForTesting creates a Service without DI, for use in integration tests.

func (*Service) HandleMPEGTS added in v0.0.68

func (s *Service) HandleMPEGTS() http.HandlerFunc

HandleMPEGTS returns an http.HandlerFunc that streams raw MPEG-TS bytes from the playback buffer hub for the {code} URL parameter. Used by the API server to mount /<code>/mpegts in the same router group as the HLS / DASH file routes (so the sessions middleware tracks viewers uniformly across protocols).

Behaviour:

  • 404 if the stream is not running, or its MPEGTS protocol flag is off.
  • 500 if the buffer subscribe fails (transient — buffer was deleted between the runtime check and Subscribe).
  • On success, writes raw TS bytes with chunked transfer encoding until the client disconnects, the stream stops, or the request context is cancelled.

The handler intentionally does not flush every packet — Go's net/http chunked writer flushes on its own buffer threshold (~4 KiB) which is a good latency / syscall trade-off for typical TS bitrates. Operators that need per-packet flushing can lower the buffer-hub chunk size at the source (e.g. UDP datagram size) instead of churning syscalls here.

func (*Service) HandleRTMPPlay

func (s *Service) HandleRTMPPlay(
	ctx context.Context,
	key string,
	info push.PlayInfo,
	session *rtmp.ServerSession,
) error

HandleRTMPPlay is the play handler registered with the ingestor's RTMP server. It subscribes to the Buffer Hub for the given stream key, demuxes the TS stream, and writes H.264 / AAC frames into the lal RTMP session until ctx is cancelled. Returns an error when the stream is not active (caller closes the connection).

Signature matches push.PlayFunc — the info argument carries remote-peer metadata captured by the RTMP server at OnPlay handshake time and is forwarded into the play-sessions tracker for the API.

func (*Service) RestartHLSDASH

func (s *Service) RestartHLSDASH(ctx context.Context, stream *domain.Stream) error

RestartHLSDASH stops and restarts only the HLS and DASH goroutines with the new stream config. Used when the ABR ladder count changes (profile added/removed) so the master playlist and per-shard segmenters reflect the new rendition set. RTSP, RTMP, and SRT goroutines are unaffected.

func (*Service) RunRTSPPlayServer

func (s *Service) RunRTSPPlayServer(ctx context.Context) error

RunRTSPPlayServer starts the RTSP play listener. Returns nil immediately when listeners.rtsp is disabled.

func (*Service) RunSRTPlayServer

func (s *Service) RunSRTPlayServer(ctx context.Context) error

RunSRTPlayServer starts the SRT play listener. Returns nil immediately when listeners.srt is disabled.

func (*Service) RuntimeStatus added in v0.0.14

func (s *Service) RuntimeStatus(streamID domain.StreamCode) (RuntimeStatus, bool)

RuntimeStatus returns a snapshot of all push destinations for the given stream, sorted by URL for stable output. Returns ok=false when no push is registered for the stream (no destinations configured, or all torn down).

func (*Service) SetListeners added in v0.0.53

func (s *Service) SetListeners(l config.ListenersConfig)

SetListeners hot-swaps the shared listeners config. The next invocation of RunRTSPPlayServer or RunSRTPlayServer will read the new value at the top of its run loop. Already-running RTSP / SRT servers keep their old bind address until runtime restarts them — see runtime.diff for the "SetListeners then restart" sequencing.

func (*Service) Start

func (s *Service) Start(ctx context.Context, stream *domain.Stream) error

Start launches all serve-endpoints and push-destination workers for a stream.

func (*Service) Stop

func (s *Service) Stop(streamID domain.StreamCode)

Stop cancels all output workers for a stream, waits for them to finish, and removes the on-disk segment directories (HLS/DASH).

func (*Service) UpdateABRMasterMeta

func (s *Service) UpdateABRMasterMeta(streamCode domain.StreamCode, updates []ABRRepMeta)

UpdateABRMasterMeta applies updated bandwidth/resolution metadata to the running HLS ABR master playlist writer. This avoids a full publisher restart when only a profile's bitrate or resolution changes (but the ladder count stays the same).

func (*Service) UpdateProtocols

func (s *Service) UpdateProtocols(ctx context.Context, old, new *domain.Stream) error

UpdateProtocols surgically stops/starts only the protocol goroutines that changed between old and new stream configs. Goroutines for unchanged protocols keep running. Call this when diff.ProtocolsChanged || diff.PushChanged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL