Documentation
¶
Overview ¶
Package manager implements the Stream Manager — the failover engine. It monitors input health (bitrate, FPS, packet loss, timeout) and seamlessly switches to the best available input when the active one degrades or fails. Failover is handled entirely in Go — FFmpeg is never restarted for this purpose.
Concurrency model ¶
Each stream has its own [streamState] protected by state.mu. The global Service.mu is an RWMutex that guards only the streams map and is never held while performing I/O or while acquiring state.mu (consistent lock ordering: Service.mu → state.mu prevents deadlocks).
RecordPacket (hot path) does a global RLock to locate the state pointer, then a per-stream Lock to update LastPacketAt and Status. At typical packet rates (< 100/s per stream) the per-packet mutex overhead is negligible while being completely race-free.
Failover state machine ¶
StatusIdle → StatusActive (first packet arrives on the active input) StatusActive → StatusDegraded (timeout detected by monitor, or error from ingestor) StatusDegraded → StatusIdle (background probe succeeds; input is a failback candidate) StatusIdle → StatusActive (ingestor switches to this input after tryFailover)
Index ¶
- type InputHealth
- type InputHealthSnapshot
- type RuntimeStatus
- type Service
- func (s *Service) IsRegistered(streamID domain.StreamCode) bool
- func (s *Service) RecordPacket(streamID domain.StreamCode, inputPriority int)
- func (s *Service) Register(ctx context.Context, stream *domain.Stream, bufferWriteID domain.StreamCode) error
- func (s *Service) ReportInputError(streamID domain.StreamCode, inputPriority int, err error)
- func (s *Service) RuntimeStatus(streamID domain.StreamCode) (RuntimeStatus, bool)
- func (s *Service) SetExhaustedCallback(fn func(domain.StreamCode))
- func (s *Service) SetRestoredCallback(fn func(domain.StreamCode))
- func (s *Service) SwitchInput(streamID domain.StreamCode, priority int) error
- func (s *Service) Unregister(streamID domain.StreamCode)
- func (s *Service) UpdateBufferWriteID(streamID domain.StreamCode, newBufID domain.StreamCode)
- func (s *Service) UpdateInputs(streamID domain.StreamCode, added, removed, updated []domain.Input)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type InputHealth ¶
type InputHealth struct {
Input domain.Input
LastPacketAt time.Time
Bitrate float64 // kbps (reserved for future bitrate estimator)
PacketLoss float64 // percent (reserved)
Status domain.StreamStatus
LastError string // last reason the input went degraded; cleared on recovery
LastErrorAt time.Time // when LastError was set; zero value when no error
}
InputHealth tracks the runtime health of one input source. All mutable fields are protected by the parent streamState.mu.
type InputHealthSnapshot ¶
type InputHealthSnapshot struct {
InputPriority int `json:"input_priority"`
LastPacketAt time.Time `json:"last_packet_at"`
BitrateKbps float64 `json:"bitrate_kbps"`
PacketLoss float64 `json:"packet_loss"`
Status domain.StreamStatus `json:"status"`
LastError string `json:"last_error,omitempty"`
LastErrorAt *time.Time `json:"last_error_at,omitempty"`
}
InputHealthSnapshot is a serialisable copy of one input's health. LastError carries a human-readable reason the input most recently went degraded (packet timeout, ingestor error, …); it is cleared when the input recovers. Frontend should treat it as a diagnostic string, not a code.
type RuntimeStatus ¶
type RuntimeStatus struct {
ActiveInputPriority int `json:"active_input_priority"`
OverrideInputPriority *int `json:"override_input_priority,omitempty"`
Exhausted bool `json:"exhausted"`
Inputs []InputHealthSnapshot `json:"inputs"`
}
RuntimeStatus is a JSON-safe snapshot of manager state for one stream. Exhausted is true when every input has degraded and no failover candidate remains — the stream is effectively offline at the source.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service monitors all streams and orchestrates source failover.
func (*Service) IsRegistered ¶
func (s *Service) IsRegistered(streamID domain.StreamCode) bool
IsRegistered reports whether streamID is under manager supervision.
func (*Service) RecordPacket ¶
func (s *Service) RecordPacket(streamID domain.StreamCode, inputPriority int)
RecordPacket updates the last-seen timestamp for an input. Called by the ingestor per packet — must be fast and contention-free.
func (*Service) Register ¶
func (s *Service) Register(ctx context.Context, stream *domain.Stream, bufferWriteID domain.StreamCode) error
Register starts health monitoring for a stream and begins ingesting on the best input. bufferWriteID is the Buffer Hub slot for ingest writes; empty defaults to stream.Code.
func (*Service) ReportInputError ¶
func (s *Service) ReportInputError(streamID domain.StreamCode, inputPriority int, err error)
ReportInputError marks an input degraded immediately and triggers failover. Called by the ingestor on non-retriable source errors.
func (*Service) RuntimeStatus ¶
func (s *Service) RuntimeStatus(streamID domain.StreamCode) (RuntimeStatus, bool)
RuntimeStatus returns a snapshot of runtime input health, or ok=false if not registered.
func (*Service) SetExhaustedCallback ¶
func (s *Service) SetExhaustedCallback(fn func(domain.StreamCode))
SetExhaustedCallback registers a function called when all inputs for a stream are degraded and no failover candidate is available.
func (*Service) SetRestoredCallback ¶
func (s *Service) SetRestoredCallback(fn func(domain.StreamCode))
SetRestoredCallback registers a function called when a failover succeeds after a period where all inputs were exhausted.
func (*Service) SwitchInput ¶
func (s *Service) SwitchInput(streamID domain.StreamCode, priority int) error
SwitchInput forces the stream to use the given input priority regardless of its configured priority value. The override persists until the input degrades permanently, at which point the manager reverts to normal priority-based selection. Switching to a different priority replaces any previous override.
func (*Service) Unregister ¶
func (s *Service) Unregister(streamID domain.StreamCode)
Unregister stops ingest and health monitoring for streamID.
func (*Service) UpdateBufferWriteID ¶
func (s *Service) UpdateBufferWriteID(streamID domain.StreamCode, newBufID domain.StreamCode)
UpdateBufferWriteID changes the buffer slot where the ingestor writes packets. Used when adding/removing the transcoder (buffer topology change). The active ingestor is restarted to write to the new buffer.
func (*Service) UpdateInputs ¶
func (s *Service) UpdateInputs( streamID domain.StreamCode, added, removed, updated []domain.Input, )
UpdateInputs patches the live input routing table while the monitor is running.
- removed: deleted from state.inputs; if the active input is removed, failover is triggered.
- added: inserted as StatusIdle; if a higher-priority input is added, failover is triggered.
- updated: Input field replaced; if the active input is updated, the ingestor is restarted.