Documentation
¶
Overview ¶
Package coordinator wires buffer, stream manager, transcoder, and publisher for a single stream lifecycle. Used by the HTTP API and server bootstrap.
Index ¶
- func BootstrapPersistedStreams(ctx context.Context, log *slog.Logger, repo store.StreamRepository, ...)
- type Coordinator
- func (c *Coordinator) ABRCopyRuntimeStatus(code domain.StreamCode) (manager.RuntimeStatus, bool)
- func (c *Coordinator) ABRMixerRuntimeStatus(code domain.StreamCode) (manager.RuntimeStatus, bool)
- func (c *Coordinator) IsRunning(streamID domain.StreamCode) bool
- func (c *Coordinator) RunReconciler(ctx context.Context)
- func (c *Coordinator) RunningStreams() []domain.StreamCode
- func (c *Coordinator) SetUpstreamLookupForTesting(fn func(domain.StreamCode) (*domain.Stream, bool))
- func (c *Coordinator) Start(ctx context.Context, stream *domain.Stream) error
- func (c *Coordinator) Stop(ctx context.Context, streamID domain.StreamCode)
- func (c *Coordinator) StreamStatus(code domain.StreamCode) domain.StreamStatus
- func (c *Coordinator) Update(ctx context.Context, old, new *domain.Stream) error
- type ProfileChange
- type ProfilesDiff
- type StreamDiff
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BootstrapPersistedStreams ¶
func BootstrapPersistedStreams(ctx context.Context, log *slog.Logger, repo store.StreamRepository, coord *Coordinator)
BootstrapPersistedStreams starts the pipeline for every non-disabled stream that has at least one input configured. Stream status is never persisted, so all eligible streams are started fresh on every boot regardless of their last known runtime state.
Types ¶
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator starts and stops the full per-stream pipeline.
func New ¶
func New(i do.Injector) (*Coordinator, error)
New registers a Coordinator with the DI injector.
func (*Coordinator) ABRCopyRuntimeStatus ¶ added in v0.0.23
func (c *Coordinator) ABRCopyRuntimeStatus(code domain.StreamCode) (manager.RuntimeStatus, bool)
ABRCopyRuntimeStatus returns a synthetic manager.RuntimeStatus for an ABR-copy stream (which doesn't go through the manager and therefore has no real RuntimeStatus). The returned snapshot lets the API surface a single "input 0" with status derived from tap activity:
- lastPacketAt within abrCopyInputStaleAfter → StatusActive
- older than the threshold (or no packet yet) → StatusDegraded
Returns ok=false when the stream isn't running as ABR-copy — caller should fall back to the normal manager.RuntimeStatus path.
func (*Coordinator) ABRMixerRuntimeStatus ¶ added in v0.0.23
func (c *Coordinator) ABRMixerRuntimeStatus(code domain.StreamCode) (manager.RuntimeStatus, bool)
ABRMixerRuntimeStatus mirrors ABRCopyRuntimeStatus for the mixer path. Returns ok=false when the stream isn't running as ABR-mixer.
func (*Coordinator) IsRunning ¶
func (c *Coordinator) IsRunning(streamID domain.StreamCode) bool
IsRunning reports whether the stream pipeline is active in memory. Covers the normal pipeline (manager-registered) plus the in-process bypass paths (ABR copy, ABR mixer) that don't go through the manager.
func (*Coordinator) RunReconciler ¶ added in v0.0.90
func (c *Coordinator) RunReconciler(ctx context.Context)
RunReconciler enforces the invariant "every stream that is not disabled must be running". It re-lists the persisted streams every reconcileInterval and Start()s any enabled stream the coordinator is not currently running.
This is the safety net behind every code path that can leave a stream stopped against the operator's intent — e.g. a bootstrap Start that failed because the upstream HLS source was briefly unreachable, a restart that errored mid-flight, an API edge case where Start was never dispatched (POST /streams creating a brand-new stream and the handler only branched on "was previously disabled"). Without this loop those failures stick: the stream is saved as `disabled: false` but stays in StatusStopped until someone manually hits /restart.
Start is idempotent (it short-circuits when the pipeline is already running), and concurrent Update / Stop calls from the API path race safely against the reconciler — Start checks IsRunning under its own lock, and a Stop arriving after our IsRunning probe simply means the next tick will re-Start it (which is the desired behaviour: if the operator wants the stream stopped, they should toggle Disabled).
Call once at boot from the runtime Manager; the loop exits when ctx is cancelled (process shutdown).
func (*Coordinator) RunningStreams ¶ added in v0.0.31
func (c *Coordinator) RunningStreams() []domain.StreamCode
RunningStreams returns the codes of every stream whose pipeline is currently active. Used by runtime.diff to enumerate streams that need to be restarted when global config (e.g. transcoder.multi_output) changes.
Snapshot semantics: the returned slice is decoupled from the live maps, so a stream torn down between this call and the caller acting on it will safely no-op via Stop's existing "not running" branch.
func (*Coordinator) SetUpstreamLookupForTesting ¶ added in v0.0.21
func (c *Coordinator) SetUpstreamLookupForTesting(fn func(domain.StreamCode) (*domain.Stream, bool))
SetUpstreamLookupForTesting injects an upstream stream resolver — used by tests that exercise the ABR-copy branch without spinning up a store backend.
func (*Coordinator) Start ¶
Start creates the buffer, registers the stream with the manager (ingest + failover), starts publisher outputs, and optionally a transcoder worker.
When the stream's first input is `copy://X` and X has an ABR ladder the pipeline is wired through startABRCopy instead — no ingest worker, no transcoder, just N tap goroutines re-publishing upstream rendition packets.
func (*Coordinator) Stop ¶
func (c *Coordinator) Stop(ctx context.Context, streamID domain.StreamCode)
Stop tears down publisher, transcoder, manager (ingest), and the buffer. ctx is used for the DVR stop and the EventStreamStopped publish; cleanup of in-memory state always proceeds even if ctx is cancelled.
func (*Coordinator) StreamStatus ¶
func (c *Coordinator) StreamStatus(code domain.StreamCode) domain.StreamStatus
StreamStatus returns the runtime status of a stream pipeline. It is derived purely from in-memory state — never read from the store.
- StatusStopped — pipeline is not registered (never started, or was stopped)
- StatusActive — pipeline is running and no degradation source is active
- StatusDegraded — pipeline is running but at least one degradation source is active (all inputs exhausted, OR transcoder crashing in a loop)
type ProfileChange ¶
type ProfileChange struct {
Index int
Old *domain.VideoProfile // nil when added
New *domain.VideoProfile // nil when removed
}
ProfileChange describes one profile that was added, removed, or updated.
type ProfilesDiff ¶
type ProfilesDiff struct {
Added []ProfileChange
Removed []ProfileChange
Updated []ProfileChange
}
ProfilesDiff holds per-profile changes when the transcoder topology stays the same.
func (*ProfilesDiff) HasAddedOrRemoved ¶
func (d *ProfilesDiff) HasAddedOrRemoved() bool
HasAddedOrRemoved reports whether the profile count changed (master playlist rebuild needed).
func (*ProfilesDiff) HasProfileChanges ¶
func (d *ProfilesDiff) HasProfileChanges() bool
HasProfileChanges reports whether there are any per-profile changes.
type StreamDiff ¶
type StreamDiff struct {
// Inputs
InputsChanged bool
AddedInputs []domain.Input // priority exists in new but not old
RemovedInputs []domain.Input // priority exists in old but not new
UpdatedInputs []domain.Input // same priority, different config (carries NEW input)
// Transcoder
TranscoderChanged bool // any transcoder config change
TranscoderTopologyChanged bool // nil↔non-nil, mode change, video.copy change → buffer layout changes
ProfilesDiff *ProfilesDiff // per-profile granular diff; nil when topology changed
// Publisher
ProtocolsChanged bool
PushChanged bool
// DVR
DVRChanged bool
// Lifecycle
NowDisabled bool // was enabled → now disabled
}
StreamDiff describes what changed between two versions of a stream configuration. Only non-metadata fields are compared (name, description, tags, timestamps are ignored).
func ComputeDiff ¶
func ComputeDiff(old, new *domain.Stream) StreamDiff
ComputeDiff compares two stream configurations and returns a diff.