manager

package
v0.0.78 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package manager implements the Stream Manager — the failover engine. It monitors input health (bitrate, FPS, packet loss, timeout) and seamlessly switches to the best available input when the active one degrades or fails. Failover is handled entirely in Go — FFmpeg is never restarted for this purpose.

Concurrency model

Each stream has its own [streamState] protected by state.mu. The global Service.mu is an RWMutex that guards only the streams map and is never held while performing I/O or while acquiring state.mu (consistent lock ordering: Service.mu → state.mu prevents deadlocks).

RecordPacket (hot path) does a global RLock to locate the state pointer, then a per-stream Lock to update LastPacketAt and Status. At typical packet rates (< 100/s per stream) the per-packet mutex overhead is negligible while being completely race-free.

Failover state machine

StatusIdle → StatusActive   (first packet arrives on the active input)
StatusActive → StatusDegraded (timeout detected by monitor, or error from ingestor)
StatusDegraded → StatusIdle   (background probe succeeds; input is a failback candidate)
StatusIdle → StatusActive   (ingestor switches to this input after tryFailover)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type InputHealth

type InputHealth struct {
	Input        domain.Input
	LastPacketAt time.Time
	PacketLoss   float64 // percent (reserved)
	Status       domain.StreamStatus

	// Errors is a bounded rolling history (newest at index 0, max maxInputErrorHistory).
	// Persists for the lifetime of the manager registration — cleared only when
	// the stream pipeline is stopped/restarted (Unregister drops the whole state).
	Errors []domain.ErrorEntry
	// contains filtered or unexported fields
}

InputHealth tracks the runtime health of one input source. All mutable fields are protected by the parent streamState.mu, EXCEPT `tracks` which has its own internal mutex so the per-packet hot path can update without contending on the broader state lock.

type InputHealthSnapshot

type InputHealthSnapshot struct {
	InputPriority int                     `json:"input_priority"`
	LastPacketAt  time.Time               `json:"last_packet_at"`
	BitrateKbps   int                     `json:"bitrate_kbps"`
	PacketLoss    float64                 `json:"packet_loss"`
	Status        domain.StreamStatus     `json:"status"`
	Tracks        []domain.MediaTrackInfo `json:"tracks,omitempty"`
	Errors        []domain.ErrorEntry     `json:"errors,omitempty"`
}

InputHealthSnapshot is a serialisable copy of one input's health. Errors is a bounded rolling history (newest first, max maxInputErrorHistory) of human-readable failure reasons (packet timeout, ingestor error, …). History persists for the lifetime of the manager registration and is only cleared when the stream pipeline is stopped. Frontend should treat each message as a diagnostic string, not a code.

BitrateKbps is the aggregate input bitrate (sum of per-codec track bitrates). Tracks lists each detected elementary stream (codec, resolution, kbps) and is nil/empty until the first AVPackets arrive.

type MediaSummary added in v0.0.47

type MediaSummary struct {
	InputBitrateKbps  int                     `json:"input_bitrate_kbps"`
	OutputBitrateKbps int                     `json:"output_bitrate_kbps"`
	Inputs            []domain.MediaTrackInfo `json:"inputs,omitempty"`
	Outputs           []domain.MediaTrackInfo `json:"outputs,omitempty"`
}

MediaSummary aggregates the input + output media-track info for one stream. Numbers are convenience aggregates over Inputs/Outputs and may be recomputed by clients if they want different rounding.

type RuntimeStatus

type RuntimeStatus struct {
	Status                domain.StreamStatus   `json:"status"`
	PipelineActive        bool                  `json:"pipeline_active"`
	ActiveInputPriority   int                   `json:"active_input_priority"`
	OverrideInputPriority *int                  `json:"override_input_priority,omitempty"`
	Exhausted             bool                  `json:"exhausted"`
	Inputs                []InputHealthSnapshot `json:"inputs"`
	// Switches is the rolling history of active-input changes (newest at
	// index 0, capped at maxSwitchHistory). Stream-level — switches happen
	// BETWEEN inputs, so this lives next to Inputs rather than inside one.
	Switches   []SwitchEvent             `json:"switches,omitempty"`
	Transcoder *transcoder.RuntimeStatus `json:"transcoder,omitempty"`
	Publisher  *publisher.RuntimeStatus  `json:"publisher,omitempty"`

	// Media is the UI-friendly summary of the current input → output track
	// shape (what the dashboard renders as "Input media info / Output media
	// info / 954kbit/s -> 2577kbit/s"). Populated by the API handler — the
	// manager doesn't know the output config.
	Media *MediaSummary `json:"media,omitempty"`
}

RuntimeStatus is the single "runtime" envelope returned by the API for a stream. Sub-systems contribute their own sections — manager owns input health; transcoder owns per-profile state — but the API exposes one shape so clients have a single root for all live data.

Status and PipelineActive are populated by the API handler from the coordinator (not by manager itself). Transcoder and Publisher are also handler-populated and named after the subsystem they wrap, so frontend reads `runtime.transcoder.profiles[]` and `runtime.publisher.pushes[]` — no collision with the persisted `transcoder` / `push` config fields on domain.Stream.

Exhausted is true when every input has degraded and no failover candidate remains — the stream is effectively offline at the source.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service monitors all streams and orchestrates source failover.

func New

func New(i do.Injector) (*Service, error)

New creates a Service and registers it with the DI injector.

func NewForTesting added in v0.0.23

func NewForTesting(bus events.Bus, ing ingestorDep, m *metrics.Metrics, packetTimeoutSec int) *Service

NewForTesting builds a Service from pre-constructed deps. Used by unit tests that stub the ingestor (no real pull workers needed) and skip the DI plumbing. packetTimeoutSec mirrors ManagerConfig.InputPacketTimeoutSec (defaults to 30s when ≤ 0).

func (*Service) IsRegistered

func (s *Service) IsRegistered(streamID domain.StreamCode) bool

IsRegistered reports whether streamID is under manager supervision.

func (*Service) RecordMediaPacket added in v0.0.47

func (s *Service) RecordMediaPacket(streamID domain.StreamCode, inputPriority int, p *domain.AVPacket)

RecordMediaPacket folds one decoded AVPacket into the per-input track stats (codec-bucketed bitrate, SPS-derived resolution). Hot path: one map lookup + one int add per packet, plus an SPS parse on the first keyframe of each codec.

Skips silently when the stream is unknown (typical on race between teardown and a still-flushing reader). Does NOT update LastPacketAt / Status — those live in RecordPacket which the ingestor already calls per packet.

func (*Service) RecordPacket

func (s *Service) RecordPacket(streamID domain.StreamCode, inputPriority int)

RecordPacket updates the last-seen timestamp for an input. Called by the ingestor per packet — must be fast and contention-free.

func (*Service) Register

func (s *Service) Register(ctx context.Context, stream *domain.Stream, bufferWriteID domain.StreamCode) error

Register starts health monitoring for a stream and begins ingesting on the best input. bufferWriteID is the Buffer Hub slot for ingest writes; empty defaults to stream.Code.

func (*Service) RegisteredStreams added in v0.0.31

func (s *Service) RegisteredStreams() []domain.StreamCode

RegisteredStreams returns the codes of every stream currently under manager supervision (snapshot — caller is free to mutate the slice).

func (*Service) ReportInputError

func (s *Service) ReportInputError(streamID domain.StreamCode, inputPriority int, err error)

ReportInputError marks an input degraded immediately and triggers failover. Called by the ingestor on non-retriable source errors.

func (*Service) RuntimeStatus

func (s *Service) RuntimeStatus(streamID domain.StreamCode) (RuntimeStatus, bool)

RuntimeStatus returns a snapshot of runtime input health, or ok=false if not registered.

func (*Service) SetConfig added in v0.0.51

func (s *Service) SetConfig(cfg config.ManagerConfig)

SetConfig hot-swaps the manager's runtime parameters. Currently only InputPacketTimeoutSec is exposed for live updates; future fields can be threaded through here as they get added. The new value takes effect on the next monitor tick (≤ monitorInterval = 2s).

Does NOT restart the monitor goroutine, ingestor workers, or any downstream pipeline — only the threshold value is replaced. Streams that were degraded under the old threshold stay degraded; the new threshold applies to subsequent silence-gap evaluations.

func (*Service) SetExhaustedCallback

func (s *Service) SetExhaustedCallback(fn func(domain.StreamCode))

SetExhaustedCallback registers a function called when all inputs for a stream are degraded and no failover candidate is available.

func (*Service) SetRestoredCallback

func (s *Service) SetRestoredCallback(fn func(domain.StreamCode))

SetRestoredCallback registers a function called when a failover succeeds after a period where all inputs were exhausted.

func (*Service) SwitchInput

func (s *Service) SwitchInput(streamID domain.StreamCode, priority int) error

SwitchInput forces the stream to use the given input priority regardless of its configured priority value. The override persists until the input degrades permanently, at which point the manager reverts to normal priority-based selection. Switching to a different priority replaces any previous override.

func (*Service) Unregister

func (s *Service) Unregister(streamID domain.StreamCode)

Unregister stops ingest and health monitoring for streamID.

func (*Service) UpdateBufferWriteID

func (s *Service) UpdateBufferWriteID(streamID domain.StreamCode, newBufID domain.StreamCode)

UpdateBufferWriteID changes the buffer slot where the ingestor writes packets. Used when adding/removing the transcoder (buffer topology change). The active ingestor is restarted to write to the new buffer.

func (*Service) UpdateInputs

func (s *Service) UpdateInputs(
	streamID domain.StreamCode,
	added, removed, updated []domain.Input,
)

UpdateInputs patches the live input routing table while the monitor is running.

  • removed: deleted from state.inputs; if the active input is removed, failover is triggered.
  • added: inserted as StatusIdle; if a higher-priority input is added, failover is triggered.
  • updated: Input field replaced; if the active input is updated, the ingestor is restarted.

type SwitchEvent added in v0.0.31

type SwitchEvent struct {
	At     time.Time    `json:"at"`
	From   int          `json:"from"`
	To     int          `json:"to"`
	Reason SwitchReason `json:"reason"`
	// Detail is human-readable extra context (error message, timeout
	// duration, …). Empty for reasons that have no extra context (manual,
	// failback, input_added, input_removed).
	Detail string `json:"detail,omitempty"`
}

SwitchEvent records one active-input switch for the rolling history. From = -1 means "no previous active" — used for the initial activation recorded by Register so the UI has a baseline event for fresh streams.

type SwitchReason added in v0.0.31

type SwitchReason string

SwitchReason names why the active input changed. The set is closed — every callsite of tryFailover must pick one so the rolling history is always interpretable in the UI.

const (
	// SwitchReasonInitial — the very first activation of a stream when
	// Register picks the best-priority input. From=-1 (no previous active);
	// recorded so the UI history shows a baseline event for streams that
	// haven't switched yet.
	SwitchReasonInitial SwitchReason = "initial"
	// SwitchReasonError — ingestor reported a non-recoverable error on
	// the active input (RTMP disconnect, HLS playlist 404, SRT broken …).
	SwitchReasonError SwitchReason = "error"
	// SwitchReasonTimeout — no packet from the active input for the full
	// packet-timeout window (default 30s).
	SwitchReasonTimeout SwitchReason = "timeout"
	// SwitchReasonManual — operator forced the switch via the
	// /streams/{code}/inputs/switch API.
	SwitchReasonManual SwitchReason = "manual"
	// SwitchReasonFailback — a higher-priority input recovered after
	// having been degraded; manager switched back to honour priority.
	SwitchReasonFailback SwitchReason = "failback"
	// SwitchReasonRecovery — every input had degraded (exhausted state)
	// and one probed clean; pipeline restarted on it. May land on the
	// same priority as before in the single-input case.
	SwitchReasonRecovery SwitchReason = "recovery"
	// SwitchReasonInputAdded — UpdateInputs added an input with a
	// priority lower (= higher precedence) than the current active one.
	SwitchReasonInputAdded SwitchReason = "input_added"
	// SwitchReasonInputRemoved — UpdateInputs removed the active input;
	// manager promoted the next-best candidate.
	SwitchReasonInputRemoved SwitchReason = "input_removed"
)

SwitchReason values. Add a new constant before introducing a new trigger path so the UI legend and frontend filter dropdowns can stay in sync (currently maintained manually).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL