coordinator

package
v0.0.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package coordinator wires buffer, stream manager, transcoder, and publisher for a single stream lifecycle. Used by the HTTP API and server bootstrap.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BootstrapPersistedStreams

func BootstrapPersistedStreams(ctx context.Context, log *slog.Logger, repo store.StreamRepository, coord *Coordinator)

BootstrapPersistedStreams starts the pipeline for every non-disabled stream that has at least one input configured. Stream status is never persisted, so all eligible streams are started fresh on every boot regardless of their last known runtime state.

Types

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator starts and stops the full per-stream pipeline.

func New

func New(i do.Injector) (*Coordinator, error)

New registers a Coordinator with the DI injector.

func (*Coordinator) ABRCopyRuntimeStatus added in v0.0.23

func (c *Coordinator) ABRCopyRuntimeStatus(code domain.StreamCode) (manager.RuntimeStatus, bool)

ABRCopyRuntimeStatus returns a synthetic manager.RuntimeStatus for an ABR-copy stream (which doesn't go through the manager and therefore has no real RuntimeStatus). The returned snapshot lets the API surface a single "input 0" with status derived from tap activity:

  • lastPacketAt within abrCopyInputStaleAfter → StatusActive
  • older than the threshold (or no packet yet) → StatusDegraded

Returns ok=false when the stream isn't running as ABR-copy — caller should fall back to the normal manager.RuntimeStatus path.

func (*Coordinator) ABRMixerRuntimeStatus added in v0.0.23

func (c *Coordinator) ABRMixerRuntimeStatus(code domain.StreamCode) (manager.RuntimeStatus, bool)

ABRMixerRuntimeStatus mirrors ABRCopyRuntimeStatus for the mixer path. Returns ok=false when the stream isn't running as ABR-mixer.

func (*Coordinator) IsRunning

func (c *Coordinator) IsRunning(streamID domain.StreamCode) bool

IsRunning reports whether the stream pipeline is active in memory. Covers the normal pipeline (manager-registered) plus the in-process bypass paths (ABR copy, ABR mixer) that don't go through the manager.

func (*Coordinator) SetUpstreamLookupForTesting added in v0.0.21

func (c *Coordinator) SetUpstreamLookupForTesting(fn func(domain.StreamCode) (*domain.Stream, bool))

SetUpstreamLookupForTesting injects an upstream stream resolver — used by tests that exercise the ABR-copy branch without spinning up a store backend.

func (*Coordinator) Start

func (c *Coordinator) Start(ctx context.Context, stream *domain.Stream) error

Start creates the buffer, registers the stream with the manager (ingest + failover), starts publisher outputs, and optionally a transcoder worker.

When the stream's first input is `copy://X` and X has an ABR ladder the pipeline is wired through startABRCopy instead — no ingest worker, no transcoder, just N tap goroutines re-publishing upstream rendition packets.

func (*Coordinator) Stop

func (c *Coordinator) Stop(ctx context.Context, streamID domain.StreamCode)

Stop tears down publisher, transcoder, manager (ingest), and the buffer. ctx is used for the DVR stop and the EventStreamStopped publish; cleanup of in-memory state always proceeds even if ctx is cancelled.

func (*Coordinator) StreamStatus

func (c *Coordinator) StreamStatus(code domain.StreamCode) domain.StreamStatus

StreamStatus returns the runtime status of a stream pipeline. It is derived purely from in-memory state — never read from the store.

  • StatusStopped — pipeline is not registered (never started, or was stopped)
  • StatusActive — pipeline is running and at least one input is live
  • StatusDegraded — pipeline is running but all inputs are currently exhausted

func (*Coordinator) Update

func (c *Coordinator) Update(ctx context.Context, old, new *domain.Stream) error

Update hot-reloads only the components that changed between old and new stream configs. The caller must persist the new config to the store BEFORE calling Update. Returns early with c.Stop() when the stream transitions to Disabled.

type ProfileChange

type ProfileChange struct {
	Index int
	Old   *domain.VideoProfile // nil when added
	New   *domain.VideoProfile // nil when removed
}

ProfileChange describes one profile that was added, removed, or updated.

type ProfilesDiff

type ProfilesDiff struct {
	Added   []ProfileChange
	Removed []ProfileChange
	Updated []ProfileChange
}

ProfilesDiff holds per-profile changes when the transcoder topology stays the same.

func (*ProfilesDiff) HasAddedOrRemoved

func (d *ProfilesDiff) HasAddedOrRemoved() bool

HasAddedOrRemoved reports whether the profile count changed (master playlist rebuild needed).

func (*ProfilesDiff) HasProfileChanges

func (d *ProfilesDiff) HasProfileChanges() bool

HasProfileChanges reports whether there are any per-profile changes.

type StreamDiff

type StreamDiff struct {
	// Inputs
	InputsChanged bool
	AddedInputs   []domain.Input // priority exists in new but not old
	RemovedInputs []domain.Input // priority exists in old but not new
	UpdatedInputs []domain.Input // same priority, different config (carries NEW input)

	// Transcoder
	TranscoderChanged         bool          // any transcoder config change
	TranscoderTopologyChanged bool          // nil↔non-nil, mode change, video.copy change → buffer layout changes
	ProfilesDiff              *ProfilesDiff // per-profile granular diff; nil when topology changed

	// Publisher
	ProtocolsChanged bool
	PushChanged      bool

	// DVR
	DVRChanged bool

	// Lifecycle
	NowDisabled bool // was enabled → now disabled
}

StreamDiff describes what changed between two versions of a stream configuration. Only non-metadata fields are compared (name, description, tags, timestamps are ignored).

func ComputeDiff

func ComputeDiff(old, new *domain.Stream) StreamDiff

ComputeDiff compares two stream configurations and returns a diff.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL