stream

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSessionMaxSubscribers         = errors.New("shared session max subscribers reached")
	ErrSessionNoSources              = errors.New("channel has no enabled sources")
	ErrSessionNotFound               = errors.New("shared session not found")
	ErrSessionRecoveryAlreadyPending = errors.New(
		"shared session manual recovery is already pending",
	)
	ErrSlowClientLagged = errors.New("subscriber fell behind stream tail")
)
View Source
var ErrNoTunersAvailable = errors.New("no tuners available")
View Source
var ErrProbePreempted = errors.New("probe preempted by client stream")
View Source
var ErrRingClosed = errors.New("chunk ring is closed")

Functions

func SanitizeURLForLog

func SanitizeURLForLog(raw string) string

SanitizeURLForLog keeps URL routing shape while removing credentials and token-bearing components from log output.

Types

type BackgroundProber

type BackgroundProber struct {
	// contains filtered or unexported fields
}

BackgroundProber periodically checks source startup readiness to pre-populate health state.

func NewBackgroundProber

func NewBackgroundProber(cfg ProberConfig, provider ProbeChannelsProvider) *BackgroundProber

func (*BackgroundProber) Close

func (p *BackgroundProber) Close()

Close stops the background session-close worker.

func (*BackgroundProber) Enabled

func (p *BackgroundProber) Enabled() bool

func (*BackgroundProber) ProbeOnce

func (p *BackgroundProber) ProbeOnce(ctx context.Context) error

func (*BackgroundProber) Run

func (p *BackgroundProber) Run(ctx context.Context)

Run executes periodic probe ticks until ctx is canceled. Caller-owned lifecycle: Run does not call Close; the caller must invoke Close() during shutdown to drain queued session closes.

type ChannelsProvider

type ChannelsProvider interface {
	GetByGuideNumber(ctx context.Context, guideNumber string) (channels.Channel, error)
	ListSources(ctx context.Context, channelID int64, enabledOnly bool) ([]channels.Source, error)
	MarkSourceFailure(ctx context.Context, sourceID int64, reason string, failedAt time.Time) error
	MarkSourceSuccess(ctx context.Context, sourceID int64, succeededAt time.Time) error
	UpdateSourceProfile(ctx context.Context, sourceID int64, profile channels.SourceProfileUpdate) error
}

ChannelsProvider supplies channel and source operations required by stream tuning.

type ChunkPublisher

type ChunkPublisher interface {
	PublishChunk(data []byte, publishedAt time.Time) error
}

ChunkPublisher receives chunks emitted by the stream pump.

type ChunkPublisherCopyAware

type ChunkPublisherCopyAware interface {
	ChunkPublisher
	CopiesPublishedChunkData() bool
}

ChunkPublisherCopyAware advertises whether PublishChunk always deep-copies input bytes before returning.

type ChunkRing

type ChunkRing struct {
	// contains filtered or unexported fields
}

ChunkRing stores recently published chunks in an in-memory ring.

func NewChunkRing

func NewChunkRing(maxChunks int) *ChunkRing

NewChunkRing creates a bounded in-memory chunk ring.

func NewChunkRingWithLimits

func NewChunkRingWithLimits(maxChunks, maxBytes int) *ChunkRing

NewChunkRingWithLimits creates a bounded in-memory chunk ring with optional chunk-count and byte-budget eviction limits.

func NewChunkRingWithLimitsAndStartupHint

func NewChunkRingWithLimitsAndStartupHint(maxChunks, maxBytes, startupChunkHint int) *ChunkRing

NewChunkRingWithLimitsAndStartupHint creates a bounded in-memory chunk ring with optional chunk-count/byte-budget limits plus a startup payload hint used to prewarm a bounded number of slot buffers.

func (*ChunkRing) Close

func (r *ChunkRing) Close(err error)

Close stops the ring and wakes readers waiting for new chunks.

func (*ChunkRing) CopiesPublishedChunkData

func (r *ChunkRing) CopiesPublishedChunkData() bool

CopiesPublishedChunkData reports that PublishChunk deep-copies chunk bytes.

func (*ChunkRing) PublishChunk

func (r *ChunkRing) PublishChunk(data []byte, publishedAt time.Time) error

PublishChunk appends one chunk and evicts the oldest chunk when full.

func (*ChunkRing) ReadFrom

func (r *ChunkRing) ReadFrom(seq uint64) ReadResult

ReadFrom returns available chunks for the requested sequence. If no chunks are available yet, WaitSeq can be passed to WaitForChange.

func (*ChunkRing) Snapshot

func (r *ChunkRing) Snapshot() []RingChunk

Snapshot returns a deep copy of currently buffered chunks.

func (*ChunkRing) StartSeqByLagBytes

func (r *ChunkRing) StartSeqByLagBytes(lagBytes int) uint64

StartSeqByLagBytes returns the sequence to start reading from for a new subscriber. When lagBytes is zero or negative, subscribers start at the live tail.

func (*ChunkRing) WaitForChange

func (r *ChunkRing) WaitForChange(ctx context.Context, waitSeq uint64) error

WaitForChange blocks until ring state changes from waitSeq, ring closes, or ctx ends.

type ChurnSummary

type ChurnSummary struct {
	ReselectAlertThreshold        int64  `json:"reselect_alert_threshold"`
	SessionCount                  int    `json:"session_count"`
	RecoveringSessionCount        int    `json:"recovering_session_count"`
	SessionsWithReselectCount     int    `json:"sessions_with_reselect_count"`
	SessionsOverReselectThreshold int    `json:"sessions_over_reselect_threshold"`
	TotalRecoveryCycles           int64  `json:"total_recovery_cycles"`
	TotalSourceSelectCount        int64  `json:"total_source_select_count"`
	TotalSameSourceReselectCount  int64  `json:"total_same_source_reselect_count"`
	MaxSameSourceReselectCount    int64  `json:"max_same_source_reselect_count"`
	MaxReselectChannelID          int64  `json:"max_reselect_channel_id,omitempty"`
	MaxReselectGuideNumber        string `json:"max_reselect_guide_number,omitempty"`
}

ChurnSummary captures high-level recovery/reselection churn counters across active shared sessions.

type ClientStreamStatus

type ClientStreamStatus struct {
	TunerID            int       `json:"tuner_id"`
	PlaylistSourceID   int64     `json:"playlist_source_id"`
	PlaylistSourceName string    `json:"playlist_source_name"`
	VirtualTunerSlot   int       `json:"virtual_tuner_slot"`
	Kind               string    `json:"kind"`
	ChannelID          int64     `json:"channel_id,omitempty"`
	GuideNumber        string    `json:"guide_number,omitempty"`
	GuideName          string    `json:"guide_name,omitempty"`
	SourceID           int64     `json:"source_id,omitempty"`
	SourceItemKey      string    `json:"source_item_key,omitempty"`
	SourceStreamURL    string    `json:"source_stream_url,omitempty"`
	ClientHost         string    `json:"client_host,omitempty"`
	Resolution         string    `json:"resolution,omitempty"`
	FrameRate          float64   `json:"frame_rate,omitempty"`
	VideoCodec         string    `json:"video_codec,omitempty"`
	AudioCodec         string    `json:"audio_codec,omitempty"`
	CurrentBitrateBPS  int64     `json:"current_bitrate_bps,omitempty"`
	ProfileBitrateBPS  int64     `json:"profile_bitrate_bps,omitempty"`
	Producer           string    `json:"producer,omitempty"`
	SubscriberID       uint64    `json:"subscriber_id"`
	ClientAddr         string    `json:"client_addr,omitempty"`
	ConnectedAt        time.Time `json:"connected_at"`
}

ClientStreamStatus maps one connected client subscriber to its backing tuner session.

type Config

type Config struct {
	Mode                                 string
	FFmpegPath                           string
	FFprobePath                          string
	HTTPClient                           *http.Client
	Logger                               *slog.Logger
	StartupTimeout                       time.Duration
	StartupRandomAccessRecoveryOnly      bool
	MinProbeBytes                        int
	MaxFailovers                         int
	FailoverTotalTimeout                 time.Duration
	UpstreamOverlimitCooldown            time.Duration
	FFmpegReconnectEnabled               bool
	FFmpegReconnectDelayMax              time.Duration
	FFmpegReconnectMaxRetries            int
	FFmpegReconnectHTTPErrors            string
	FFmpegRWTimeout                      time.Duration
	FFmpegStartupProbeSize               int
	FFmpegStartupAnalyzeDelay            time.Duration
	FFmpegInputBufferSize                int
	FFmpegDiscardCorrupt                 bool
	FFmpegCopyRegenerateTimestamps       *bool
	FFmpegSourceLogLevel                 string
	FFmpegSourceStderrPassthroughEnabled *bool
	FFmpegSourceStderrLogLevel           string
	FFmpegSourceStderrMaxLineBytes       int

	ProducerReadRate        float64
	ProducerReadRateCatchup float64
	ProducerInitialBurst    int

	BufferChunkBytes           int
	BufferPublishFlushInterval time.Duration
	BufferTSAlign188           bool

	StallDetect               time.Duration
	StallHardDeadline         time.Duration
	StallPolicy               string
	StallMaxFailoversPerStall int
	CycleFailureMinHealth     time.Duration

	RecoveryFillerEnabled     bool
	RecoveryFillerMode        string
	RecoveryFillerInterval    time.Duration
	RecoveryFillerText        string
	RecoveryFillerEnableAudio bool

	SubscriberJoinLagBytes     int
	SubscriberSlowClientPolicy string
	SubscriberMaxBlockedWrite  time.Duration

	SessionIdleTimeout    time.Duration
	SessionDrainTimeout   time.Duration
	SessionMaxSubscribers int
	SessionHistoryLimit   int
	// SessionSourceHistoryLimit and SessionSubscriberHistoryLimit are optional
	// per-session timeline overrides. When non-positive, SessionManager falls
	// back to SessionHistoryLimit/defaults and applies min/max clamp guardrails.
	SessionSourceHistoryLimit     int
	SessionSubscriberHistoryLimit int
	// SourceHealthDrainTimeout bounds post-cancel source-health queue draining
	// during session teardown. Non-positive values normalize to defaults in
	// SessionManager config.
	SourceHealthDrainTimeout time.Duration

	TuneBackoffMaxTunes int
	TuneBackoffInterval time.Duration
	TuneBackoffCooldown time.Duration
}

Config controls stream delivery behavior.

type DrainWaitTelemetry

type DrainWaitTelemetry struct {
	OK             uint64 `json:"ok"`
	Error          uint64 `json:"error"`
	WaitDurationUS uint64 `json:"wait_duration_us"`
	WaitDurationMS uint64 `json:"wait_duration_ms"`
}

DrainWaitTelemetry captures process-lifetime WaitForDrain result counters.

type FFmpegProducer

type FFmpegProducer struct {
	// contains filtered or unexported fields
}

FFmpegProducer starts ffmpeg and exposes its stdout as MPEG-TS bytes.

func NewFFmpegProducer

func NewFFmpegProducer(cfg FFmpegProducerConfig) (*FFmpegProducer, error)

NewFFmpegProducer validates config and creates a producer.

func (*FFmpegProducer) Describe

func (p *FFmpegProducer) Describe() string

Describe returns a log-friendly producer descriptor.

func (*FFmpegProducer) Start

func (p *FFmpegProducer) Start(ctx context.Context) (io.ReadCloser, error)

Start launches ffmpeg and returns a read closer for stdout bytes.

type FFmpegProducerConfig

type FFmpegProducerConfig struct {
	FFmpegPath          string
	Mode                string
	StreamURL           string
	ReadRate            float64
	InitialBurstSeconds int
	LogLevel            string
}

FFmpegProducerConfig controls ffmpeg-backed producer behavior.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler serves /auto/v{guideNumber} requests.

func NewHandler

func NewHandler(cfg Config, tuners tunerUsage, channelsProvider ChannelsProvider) *Handler

func (*Handler) ClearAllSourceHealth

func (h *Handler) ClearAllSourceHealth() error

ClearAllSourceHealth clears all source-health convergence state.

func (*Handler) ClearSourceHealth

func (h *Handler) ClearSourceHealth(channelID int64) error

ClearSourceHealth clears source-health convergence state for a channel.

func (*Handler) Close

func (h *Handler) Close()

Close cancels all active sessions so they drain before store close.

func (*Handler) CloseWithContext

func (h *Handler) CloseWithContext(ctx context.Context) error

CloseWithContext cancels all active sessions and waits for teardown within the provided context budget.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Handler) TriggerSessionRecovery

func (h *Handler) TriggerSessionRecovery(channelID int64, reason string) error

TriggerSessionRecovery requests an in-session recovery cycle for an active shared session on the provided channel. Recovery routing is resolved from in-memory shared-session state and does not require channel/source store lookups.

func (*Handler) TunerStatusSnapshot

func (h *Handler) TunerStatusSnapshot() TunerStatusSnapshot

TunerStatusSnapshot returns a structured view of active tuner leases and subscribers.

type Lease

type Lease struct {
	ID int
	// PlaylistSourceID/Name identify which virtual source pool owns this lease.
	PlaylistSourceID   int64
	PlaylistSourceName string
	// VirtualTunerSlot is the slot index inside the source-local pool.
	VirtualTunerSlot int
	// contains filtered or unexported fields
}

Lease represents an acquired tuner slot.

func (*Lease) Release

func (l *Lease) Release()

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool tracks active tuner sessions and enforces a max concurrent stream count.

func NewPool

func NewPool(count int) *Pool

func (*Pool) Acquire

func (p *Pool) Acquire(ctx context.Context, guideNumber, clientAddr string) (*Lease, error)

func (*Pool) AcquireClient

func (p *Pool) AcquireClient(ctx context.Context, guideNumber, clientAddr string) (*Lease, error)

func (*Pool) AcquireClientForSource added in v1.1.0

func (p *Pool) AcquireClientForSource(
	ctx context.Context,
	_ int64,
	guideNumber,
	clientAddr string,
) (*Lease, error)

AcquireClientForSource keeps API parity with VirtualTunerManager. The base single-pool implementation ignores source selection and acquires from the shared pool.

func (*Pool) AcquireProbe

func (p *Pool) AcquireProbe(ctx context.Context, label string, cancel context.CancelCauseFunc) (*Lease, error)

func (*Pool) AcquireProbeForSource added in v1.1.0

func (p *Pool) AcquireProbeForSource(
	ctx context.Context,
	_ int64,
	label string,
	cancel context.CancelCauseFunc,
) (*Lease, error)

AcquireProbeForSource keeps API parity with VirtualTunerManager. The base single-pool implementation ignores source selection and acquires from the shared pool.

func (*Pool) Capacity

func (p *Pool) Capacity() int

func (*Pool) CapacityForSource added in v1.1.0

func (p *Pool) CapacityForSource(_ int64) int

CapacityForSource keeps API parity with VirtualTunerManager. The base single-pool implementation reports global capacity for any source.

func (*Pool) InUseCount

func (p *Pool) InUseCount() int

func (*Pool) InUseCountForSource added in v1.1.0

func (p *Pool) InUseCountForSource(_ int64) int

InUseCountForSource keeps API parity with VirtualTunerManager. The base single-pool implementation reports global in-use count for any source.

func (*Pool) SetPreemptSettleDelay

func (p *Pool) SetPreemptSettleDelay(delay time.Duration)

SetPreemptSettleDelay configures a delay after preemption before reusing the reclaimed tuner slot for a new lease.

func (*Pool) Snapshot

func (p *Pool) Snapshot() []Session

func (*Pool) VirtualTunerSnapshot added in v1.1.0

func (p *Pool) VirtualTunerSnapshot() []VirtualTunerPoolSnapshot

VirtualTunerSnapshot reports a single implicit pool for legacy one-pool mode.

type ProbeChannelsProvider

type ProbeChannelsProvider interface {
	ListEnabled(ctx context.Context) ([]channels.Channel, error)
	ListSources(ctx context.Context, channelID int64, enabledOnly bool) ([]channels.Source, error)
	MarkSourceFailure(ctx context.Context, sourceID int64, reason string, failedAt time.Time) error
	MarkSourceSuccess(ctx context.Context, sourceID int64, succeededAt time.Time) error
}

ProbeChannelsProvider supplies channel/source operations required by the background prober.

type ProbeCloseTelemetry

type ProbeCloseTelemetry struct {
	InlineCount    uint64 `json:"inline_count"`
	QueueFullCount uint64 `json:"queue_full_count"`
}

ProbeCloseTelemetry captures process-lifetime background-prober close fallback counters.

type ProbeTunerUsage

type ProbeTunerUsage interface {
	AcquireProbe(ctx context.Context, label string, cancel context.CancelCauseFunc) (*Lease, error)
	AcquireProbeForSource(ctx context.Context, sourceID int64, label string, cancel context.CancelCauseFunc) (*Lease, error)
}

ProbeTunerUsage exposes probe-slot acquisition for background probing.

type ProberConfig

type ProberConfig struct {
	Mode                           string
	FFmpegPath                     string
	HTTPClient                     *http.Client
	Logger                         *slog.Logger
	ProducerReadRate               float64
	ProducerReadRateCatchup        float64
	ProducerInitialBurst           int
	FFmpegReconnectEnabled         bool
	FFmpegReconnectDelayMax        time.Duration
	FFmpegReconnectMaxRetries      int
	FFmpegReconnectHTTPErrors      string
	FFmpegRWTimeout                time.Duration
	FFmpegStartupProbeSize         int
	FFmpegStartupAnalyzeDelay      time.Duration
	FFmpegInputBufferSize          int
	FFmpegDiscardCorrupt           bool
	FFmpegCopyRegenerateTimestamps *bool
	MinProbeBytes                  int
	ProbeInterval                  time.Duration
	ProbeTimeout                   time.Duration
	ProbeCloseQueueDepth           int
	TunerUsage                     ProbeTunerUsage
	ProbeTuneDelay                 time.Duration
}

ProberConfig controls optional background source probing.

type Producer

type Producer interface {
	Start(ctx context.Context) (io.ReadCloser, error)
	Describe() string
}

Producer starts an upstream MPEG-TS source for a shared channel session.

type Pump

type Pump struct {
	// contains filtered or unexported fields
}

Pump copies producer bytes into chunk publications using size-or-time flush.

func NewPump

func NewPump(cfg PumpConfig, publisher ChunkPublisher) *Pump

NewPump builds a new chunk pump for producer output.

func (*Pump) Run

func (p *Pump) Run(ctx context.Context, src io.ReadCloser) error

Run consumes producer output and publishes chunks until EOF or cancellation.

func (*Pump) Stats

func (p *Pump) Stats() PumpStats

Stats returns the latest telemetry snapshot.

type PumpConfig

type PumpConfig struct {
	ChunkBytes           int
	ReadBufferBytes      int
	PublishFlushInterval time.Duration
	TSAlign188           bool
}

PumpConfig controls chunking and flush behavior for producer output.

type PumpStats

type PumpStats struct {
	LastByteReadAt  time.Time
	LastPublishAt   time.Time
	BytesRead       int64
	BytesPublished  int64
	ChunksPublished int64
}

PumpStats exposes byte-flow telemetry used by stall detection.

type ReadResult

type ReadResult struct {
	Chunks         []RingChunk
	NextSeq        uint64
	Behind         bool
	Closed         bool
	Err            error
	WaitSeq        uint64
	RequestedSeq   uint64
	OldestSeq      uint64
	RingNextSeq    uint64
	BufferedChunks int
	BufferedBytes  int
	// contains filtered or unexported fields
}

ReadResult is one read pass over ring data starting at a requested sequence.

func (*ReadResult) Release

func (r *ReadResult) Release()

Release signals that callers are done reading chunk payload slices returned by ReadFrom.

type RingChunk

type RingChunk struct {
	Seq         uint64
	PublishedAt time.Time
	Data        []byte
}

RingChunk is one published chunk in a channel session ring.

type Session

type Session struct {
	ID          int
	GuideNumber string
	ClientAddr  string
	Kind        string
	StartedAt   time.Time
	// PlaylistSourceID/Name identify which virtual source pool owns this lease.
	PlaylistSourceID   int64
	PlaylistSourceName string
	// VirtualTunerSlot is the slot index inside the source-local pool.
	VirtualTunerSlot int
	// contains filtered or unexported fields
}

Session describes an active tuner lease.

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager coordinates shared channel stream sessions.

func NewSessionManager

func NewSessionManager(cfg SessionManagerConfig, tuners tunerUsage, channelsProvider ChannelsProvider) *SessionManager

NewSessionManager builds a new shared session manager.

func (*SessionManager) ClearAllSourceHealth

func (m *SessionManager) ClearAllSourceHealth() error

func (*SessionManager) ClearSourceHealth

func (m *SessionManager) ClearSourceHealth(channelID int64) error

func (*SessionManager) Close

func (m *SessionManager) Close()

Close cancels all active sessions and blocks until their goroutines (including source-health persistence drain) complete. This must be called before store.Close() to guarantee no session-scoped DB work races with database teardown.

func (*SessionManager) CloseWithContext

func (m *SessionManager) CloseWithContext(ctx context.Context) error

CloseWithContext cancels all active sessions and waits until their goroutines complete or the provided context expires.

func (*SessionManager) HasActiveOrPendingSession

func (m *SessionManager) HasActiveOrPendingSession(channelID int64) bool

HasActiveOrPendingSession reports whether a channel currently has an admissible shared session, or a shared session in creation.

func (*SessionManager) HistorySnapshot

func (m *SessionManager) HistorySnapshot() ([]SharedSessionHistory, int, int64)

HistorySnapshot returns bounded shared-session history for active and recently closed sessions. Results are newest-first.

func (*SessionManager) Snapshot

func (m *SessionManager) Snapshot() []SharedSessionStats

Snapshot returns all active shared sessions with per-session diagnostics.

func (*SessionManager) Subscribe

func (m *SessionManager) Subscribe(ctx context.Context, channel channels.Channel) (*SessionSubscription, error)

Subscribe returns a client subscription for the given channel.

func (*SessionManager) SubscribeImmediate

func (m *SessionManager) SubscribeImmediate(ctx context.Context, channel channels.Channel) (*SessionSubscription, error)

SubscribeImmediate returns a subscription without waiting for startup readiness. This is useful for clients that expect a prompt HTTP response while startup proceeds.

func (*SessionManager) TriggerRecovery

func (m *SessionManager) TriggerRecovery(channelID int64, reason string) error

TriggerRecovery requests an in-session recovery cycle for an active channel session without waiting for a real upstream stall.

func (*SessionManager) WaitForDrain

func (m *SessionManager) WaitForDrain(ctx context.Context) error

WaitForDrain blocks until the manager has no active or pending channel sessions.

type SessionManagerConfig

type SessionManagerConfig struct {
	Mode                                 string
	FFmpegPath                           string
	FFprobePath                          string
	HTTPClient                           *http.Client
	Logger                               *slog.Logger
	StartupTimeout                       time.Duration
	StartupRandomAccessRecoveryOnly      bool
	MinProbeBytes                        int
	MaxFailovers                         int
	FailoverTotalTimeout                 time.Duration
	UpstreamOverlimitCooldown            time.Duration
	FFmpegReconnectEnabled               bool
	FFmpegReconnectDelayMax              time.Duration
	FFmpegReconnectMaxRetries            int
	FFmpegReconnectHTTPErrors            string
	FFmpegRWTimeout                      time.Duration
	FFmpegStartupProbeSize               int
	FFmpegStartupAnalyzeDelay            time.Duration
	FFmpegInputBufferSize                int
	FFmpegDiscardCorrupt                 bool
	FFmpegCopyRegenerateTimestamps       *bool
	FFmpegSourceLogLevel                 string
	FFmpegSourceStderrPassthroughEnabled *bool
	FFmpegSourceStderrLogLevel           string
	FFmpegSourceStderrMaxLineBytes       int
	ProducerReadRate                     float64
	ProducerReadRateCatchup              float64
	ProducerInitialBurst                 int

	BufferChunkBytes           int
	BufferPublishFlushInterval time.Duration
	BufferTSAlign188           bool

	StallDetect               time.Duration
	StallHardDeadline         time.Duration
	StallPolicy               string
	StallMaxFailoversPerStall int
	CycleFailureMinHealth     time.Duration

	RecoveryFillerEnabled     bool
	RecoveryFillerMode        string
	RecoveryFillerInterval    time.Duration
	RecoveryFillerText        string
	RecoveryFillerEnableAudio bool

	SubscriberJoinLagBytes        int
	SubscriberSlowClientPolicy    string
	SubscriberMaxBlockedWrite     time.Duration
	SessionIdleTimeout            time.Duration
	SessionMaxSubscribers         int
	SessionHistoryLimit           int
	SessionSourceHistoryLimit     int
	SessionSubscriberHistoryLimit int
	SessionDrainTimeout           time.Duration
	SourceHealthDrainTimeout      time.Duration
}

SessionManagerConfig controls shared per-channel stream sessions.

type SessionSubscription

type SessionSubscription struct {
	// contains filtered or unexported fields
}

SessionSubscription represents one active client subscription to a shared channel session.

func (*SessionSubscription) Close

func (s *SessionSubscription) Close()

Close releases one subscriber slot from the shared session.

func (*SessionSubscription) Stats

Stats returns a snapshot of the current shared session state.

func (*SessionSubscription) Stream

Stream writes shared stream chunks to the provided response writer until completion/cancel.

type SharedSessionHistory

type SharedSessionHistory struct {
	SessionID                               uint64                           `json:"session_id"`
	ChannelID                               int64                            `json:"channel_id,omitempty"`
	GuideNumber                             string                           `json:"guide_number,omitempty"`
	GuideName                               string                           `json:"guide_name,omitempty"`
	TunerID                                 int                              `json:"tuner_id"`
	OpenedAt                                time.Time                        `json:"opened_at"`
	ClosedAt                                time.Time                        `json:"closed_at,omitempty"`
	Active                                  bool                             `json:"active"`
	TerminalStatus                          string                           `json:"terminal_status,omitempty"`
	TerminalError                           string                           `json:"terminal_error,omitempty"`
	PeakSubscribers                         int                              `json:"peak_subscribers"`
	TotalSubscribers                        int64                            `json:"total_subscribers"`
	CompletedSubscribers                    int64                            `json:"completed_subscribers"`
	SlowSkipEventsTotal                     uint64                           `json:"slow_skip_events_total"`
	SlowSkipLagChunksTotal                  uint64                           `json:"slow_skip_lag_chunks_total"`
	SlowSkipLagBytesTotal                   uint64                           `json:"slow_skip_lag_bytes_total"`
	SlowSkipMaxLagChunks                    uint64                           `json:"slow_skip_max_lag_chunks"`
	SubscriberWriteDeadlineUnsupportedTotal uint64                           `json:"subscriber_write_deadline_unsupported_total"`
	SubscriberWriteDeadlineTimeoutsTotal    uint64                           `json:"subscriber_write_deadline_timeouts_total"`
	SubscriberWriteShortWritesTotal         uint64                           `json:"subscriber_write_short_writes_total"`
	SubscriberWriteBlockedDurationUS        uint64                           `json:"subscriber_write_blocked_duration_us"`
	SubscriberWriteBlockedDurationMS        uint64                           `json:"subscriber_write_blocked_duration_ms"`
	RecoveryCycleCount                      int64                            `json:"recovery_cycle_count"`
	LastRecoveryReason                      string                           `json:"last_recovery_reason,omitempty"`
	SourceSelectCount                       int64                            `json:"source_select_count"`
	SameSourceReselectCount                 int64                            `json:"same_source_reselect_count"`
	LastSourceSelectedAt                    time.Time                        `json:"last_source_selected_at,omitempty"`
	LastSourceSelectReason                  string                           `json:"last_source_select_reason,omitempty"`
	LastError                               string                           `json:"last_error,omitempty"`
	Sources                                 []SharedSessionSourceHistory     `json:"sources,omitempty"`
	Subscribers                             []SharedSessionSubscriberHistory `json:"subscribers,omitempty"`
	SourceHistoryLimit                      int                              `json:"source_history_limit,omitempty"`
	SourceHistoryTruncated                  int64                            `json:"source_history_truncated_count,omitempty"`
	SubscriberHistoryLimit                  int                              `json:"subscriber_history_limit,omitempty"`
	SubscriberHistoryTruncated              int64                            `json:"subscriber_history_truncated_count,omitempty"`
}

SharedSessionHistory reports bounded historical shared-session lifecycle windows for both active and recently closed sessions.

type SharedSessionSourceHistory

type SharedSessionSourceHistory struct {
	SourceID                       int64     `json:"source_id,omitempty"`
	ItemKey                        string    `json:"item_key,omitempty"`
	StreamURL                      string    `json:"stream_url,omitempty"`
	SelectedAt                     time.Time `json:"selected_at"`
	DeselectedAt                   time.Time `json:"deselected_at,omitempty"`
	SelectionReason                string    `json:"selection_reason,omitempty"`
	StartupProbeRawBytes           int       `json:"startup_probe_raw_bytes,omitempty"`
	StartupProbeTrimmedBytes       int       `json:"startup_probe_trimmed_bytes,omitempty"`
	StartupProbeCutoverOffset      int       `json:"startup_probe_cutover_offset,omitempty"`
	StartupProbeDroppedBytes       int       `json:"startup_probe_dropped_bytes,omitempty"`
	StartupRandomAccessReady       bool      `json:"startup_random_access_ready,omitempty"`
	StartupRandomAccessCodec       string    `json:"startup_random_access_codec,omitempty"`
	StartupInventoryMethod         string    `json:"startup_inventory_method,omitempty"`
	StartupVideoStreams            int       `json:"startup_video_streams,omitempty"`
	StartupAudioStreams            int       `json:"startup_audio_streams,omitempty"`
	StartupVideoCodecs             string    `json:"startup_video_codecs,omitempty"`
	StartupAudioCodecs             string    `json:"startup_audio_codecs,omitempty"`
	StartupComponentState          string    `json:"startup_component_state,omitempty"`
	StartupRetryRelaxedProbe       bool      `json:"startup_retry_relaxed_probe,omitempty"`
	StartupRetryRelaxedProbeReason string    `json:"startup_retry_relaxed_probe_reason,omitempty"`
	Resolution                     string    `json:"resolution,omitempty"`
	FrameRate                      float64   `json:"frame_rate,omitempty"`
	VideoCodec                     string    `json:"video_codec,omitempty"`
	AudioCodec                     string    `json:"audio_codec,omitempty"`
	ProfileBitrateBPS              int64     `json:"profile_bitrate_bps,omitempty"`
	Producer                       string    `json:"producer,omitempty"`
}

SharedSessionSourceHistory reports one source-selection window for a shared session.

type SharedSessionStats

type SharedSessionStats struct {
	TunerID                                 int
	PlaylistSourceID                        int64
	PlaylistSourceName                      string
	VirtualTunerSlot                        int
	ChannelID                               int64
	GuideNumber                             string
	GuideName                               string
	SourceID                                int64
	SourceItemKey                           string
	SourceStreamURL                         string
	SourceStartupProbeRawBytes              int
	SourceStartupProbeTrimmedBytes          int
	SourceStartupProbeCutoverOffset         int
	SourceStartupProbeDroppedBytes          int
	SourceStartupProbeBytes                 int
	SourceStartupRandomAccessReady          bool
	SourceStartupRandomAccessCodec          string
	SourceStartupInventoryMethod            string
	SourceStartupVideoStreams               int
	SourceStartupAudioStreams               int
	SourceStartupVideoCodecs                string
	SourceStartupAudioCodecs                string
	SourceStartupComponentState             string
	SourceStartupRetryRelaxedProbe          bool
	SourceStartupRetryRelaxedProbeReason    string
	Resolution                              string
	FrameRate                               float64
	VideoCodec                              string
	AudioCodec                              string
	CurrentBitrateBPS                       int64
	ProfileBitrateBPS                       int64
	Producer                                string
	StartedAt                               time.Time
	LastByteAt                              time.Time
	LastPushAt                              time.Time
	BytesRead                               int64
	BytesPushed                             int64
	ChunksPushed                            int64
	Subscribers                             int
	SubscriberInfo                          []SubscriberStats
	SlowSkipEventsTotal                     uint64
	SlowSkipLagChunksTotal                  uint64
	SlowSkipLagBytesTotal                   uint64
	SlowSkipMaxLagChunks                    uint64
	SubscriberWriteDeadlineUnsupportedTotal uint64
	SubscriberWriteDeadlineTimeoutsTotal    uint64
	SubscriberWriteShortWritesTotal         uint64
	SubscriberWriteBlockedDurationUS        uint64
	SubscriberWriteBlockedDurationMS        uint64
	StallCount                              int64
	RecoveryCycle                           int64
	RecoveryReason                          string
	RecoveryTransitionMode                  string
	RecoveryTransitionEffectiveMode         string
	RecoveryTransitionSignalsApplied        string
	RecoveryTransitionSignalSkips           string
	RecoveryTransitionFallbackCount         int64
	RecoveryTransitionFallbackReason        string
	RecoveryTransitionStitchApplied         bool
	RecoveryKeepaliveMode                   string
	RecoveryKeepaliveFallbackCount          int64
	RecoveryKeepaliveFallbackReason         string
	RecoveryKeepaliveStartedAt              time.Time
	RecoveryKeepaliveStoppedAt              time.Time
	RecoveryKeepaliveDuration               string
	RecoveryKeepaliveBytes                  int64
	RecoveryKeepaliveChunks                 int64
	RecoveryKeepaliveRateBytesPerSecond     float64
	RecoveryKeepaliveExpectedRate           float64
	RecoveryKeepaliveRealtimeMultiplier     float64
	RecoveryKeepaliveGuardrailCount         int64
	RecoveryKeepaliveGuardrailReason        string
	SourceSelectCount                       int64
	SameSourceReselectCount                 int64
	LastSourceSelectedAt                    time.Time
	LastSourceSelectReason                  string
	SinceLastSourceSelect                   string
	LastError                               string
	SourceHealthPersistCoalescedTotal       int64
	SourceHealthPersistDroppedTotal         int64
	SourceHealthPersistCoalescedBySource    map[int64]int64
	SourceHealthPersistDroppedBySource      map[int64]int64
}

SharedSessionStats reports per-channel shared-session state for diagnostics.

type SharedSessionSubscriberHistory

type SharedSessionSubscriberHistory struct {
	SubscriberID uint64    `json:"subscriber_id"`
	ClientAddr   string    `json:"client_addr,omitempty"`
	ClientHost   string    `json:"client_host,omitempty"`
	ConnectedAt  time.Time `json:"connected_at"`
	ClosedAt     time.Time `json:"closed_at,omitempty"`
	CloseReason  string    `json:"close_reason,omitempty"`
}

SharedSessionSubscriberHistory reports one subscriber lifecycle window.

type SubscriberStats

type SubscriberStats struct {
	SubscriberID uint64
	ClientAddr   string
	StartedAt    time.Time
}

SubscriberStats reports one connected subscriber on a shared session.

type TunerStatus

type TunerStatus struct {
	TunerID                                 int               `json:"tuner_id"`
	PlaylistSourceID                        int64             `json:"playlist_source_id"`
	PlaylistSourceName                      string            `json:"playlist_source_name"`
	VirtualTunerSlot                        int               `json:"virtual_tuner_slot"`
	Kind                                    string            `json:"kind"`
	State                                   string            `json:"state"`
	GuideNumber                             string            `json:"guide_number,omitempty"`
	LeaseClientAddr                         string            `json:"lease_client_addr,omitempty"`
	LeaseStartedAt                          time.Time         `json:"lease_started_at"`
	ChannelID                               int64             `json:"channel_id,omitempty"`
	GuideName                               string            `json:"guide_name,omitempty"`
	SourceID                                int64             `json:"source_id,omitempty"`
	SourceItemKey                           string            `json:"source_item_key,omitempty"`
	SourceStreamURL                         string            `json:"source_stream_url,omitempty"`
	SourceStartupProbeRawBytes              int               `json:"source_startup_probe_raw_bytes,omitempty"`
	SourceStartupProbeTrimmedBytes          int               `json:"source_startup_probe_trimmed_bytes,omitempty"`
	SourceStartupProbeCutoverOffset         int               `json:"source_startup_probe_cutover_offset,omitempty"`
	SourceStartupProbeDroppedBytes          int               `json:"source_startup_probe_dropped_bytes,omitempty"`
	SourceStartupProbeBytes                 int               `json:"source_startup_probe_bytes,omitempty"`
	SourceStartupRandomAccessReady          bool              `json:"source_startup_random_access_ready,omitempty"`
	SourceStartupRandomAccessCodec          string            `json:"source_startup_random_access_codec,omitempty"`
	SourceStartupInventoryMethod            string            `json:"source_startup_inventory_method,omitempty"`
	SourceStartupVideoStreams               int               `json:"source_startup_video_streams,omitempty"`
	SourceStartupAudioStreams               int               `json:"source_startup_audio_streams,omitempty"`
	SourceStartupVideoCodecs                string            `json:"source_startup_video_codecs,omitempty"`
	SourceStartupAudioCodecs                string            `json:"source_startup_audio_codecs,omitempty"`
	SourceStartupComponentState             string            `json:"source_startup_component_state,omitempty"`
	SourceStartupRetryRelaxedProbe          bool              `json:"source_startup_retry_relaxed_probe,omitempty"`
	SourceStartupRetryRelaxedProbeReason    string            `json:"source_startup_retry_relaxed_probe_reason,omitempty"`
	Resolution                              string            `json:"resolution,omitempty"`
	FrameRate                               float64           `json:"frame_rate,omitempty"`
	VideoCodec                              string            `json:"video_codec,omitempty"`
	AudioCodec                              string            `json:"audio_codec,omitempty"`
	CurrentBitrateBPS                       int64             `json:"current_bitrate_bps,omitempty"`
	ProfileBitrateBPS                       int64             `json:"profile_bitrate_bps,omitempty"`
	Producer                                string            `json:"producer,omitempty"`
	SessionStartedAt                        time.Time         `json:"session_started_at"`
	LastByteAt                              time.Time         `json:"last_byte_at"`
	LastPushAt                              time.Time         `json:"last_push_at"`
	BytesRead                               int64             `json:"bytes_read"`
	BytesPushed                             int64             `json:"bytes_pushed"`
	ChunksPushed                            int64             `json:"chunks_pushed"`
	SlowSkipEventsTotal                     uint64            `json:"slow_skip_events_total"`
	SlowSkipLagChunksTotal                  uint64            `json:"slow_skip_lag_chunks_total"`
	SlowSkipLagBytesTotal                   uint64            `json:"slow_skip_lag_bytes_total"`
	SlowSkipMaxLagChunks                    uint64            `json:"slow_skip_max_lag_chunks"`
	SubscriberWriteDeadlineUnsupportedTotal uint64            `json:"subscriber_write_deadline_unsupported_total"`
	SubscriberWriteDeadlineTimeoutsTotal    uint64            `json:"subscriber_write_deadline_timeouts_total"`
	SubscriberWriteShortWritesTotal         uint64            `json:"subscriber_write_short_writes_total"`
	SubscriberWriteBlockedDurationUS        uint64            `json:"subscriber_write_blocked_duration_us"`
	SubscriberWriteBlockedDurationMS        uint64            `json:"subscriber_write_blocked_duration_ms"`
	StallCount                              int64             `json:"stall_count"`
	RecoveryCycle                           int64             `json:"recovery_cycle"`
	RecoveryReason                          string            `json:"recovery_reason,omitempty"`
	RecoveryTransitionMode                  string            `json:"recovery_transition_mode,omitempty"`
	RecoveryTransitionEffectiveMode         string            `json:"recovery_transition_effective_mode,omitempty"`
	RecoveryTransitionSignalsApplied        string            `json:"recovery_transition_signals_applied,omitempty"`
	RecoveryTransitionSignalSkips           string            `json:"recovery_transition_signal_skips,omitempty"`
	RecoveryTransitionFallbackCount         int64             `json:"recovery_transition_fallback_count,omitempty"`
	RecoveryTransitionFallbackReason        string            `json:"recovery_transition_fallback_reason,omitempty"`
	RecoveryTransitionStitchApplied         bool              `json:"recovery_transition_stitch_applied,omitempty"`
	RecoveryKeepaliveMode                   string            `json:"recovery_keepalive_mode,omitempty"`
	RecoveryKeepaliveFallbackCount          int64             `json:"recovery_keepalive_fallback_count,omitempty"`
	RecoveryKeepaliveFallbackReason         string            `json:"recovery_keepalive_fallback_reason,omitempty"`
	RecoveryKeepaliveStartedAt              time.Time         `json:"recovery_keepalive_started_at,omitempty"`
	RecoveryKeepaliveStoppedAt              time.Time         `json:"recovery_keepalive_stopped_at,omitempty"`
	RecoveryKeepaliveDuration               string            `json:"recovery_keepalive_duration,omitempty"`
	RecoveryKeepaliveBytes                  int64             `json:"recovery_keepalive_bytes,omitempty"`
	RecoveryKeepaliveChunks                 int64             `json:"recovery_keepalive_chunks,omitempty"`
	RecoveryKeepaliveRateBytesPerSecond     float64           `json:"recovery_keepalive_rate_bytes_per_second,omitempty"`
	RecoveryKeepaliveExpectedRate           float64           `json:"recovery_keepalive_expected_rate_bytes_per_second,omitempty"`
	RecoveryKeepaliveRealtimeMultiplier     float64           `json:"recovery_keepalive_realtime_multiplier,omitempty"`
	RecoveryKeepaliveGuardrailCount         int64             `json:"recovery_keepalive_guardrail_count,omitempty"`
	RecoveryKeepaliveGuardrailReason        string            `json:"recovery_keepalive_guardrail_reason,omitempty"`
	SourceSelectCount                       int64             `json:"source_select_count"`
	SameSourceReselectCount                 int64             `json:"same_source_reselect_count"`
	LastSourceSelectedAt                    time.Time         `json:"last_source_selected_at"`
	LastSourceSelectReason                  string            `json:"last_source_select_reason,omitempty"`
	SinceLastSourceSelect                   string            `json:"since_last_source_select,omitempty"`
	LastError                               string            `json:"last_error,omitempty"`
	SourceHealthPersistCoalescedTotal       int64             `json:"source_health_persist_coalesced_total,omitempty"`
	SourceHealthPersistDroppedTotal         int64             `json:"source_health_persist_dropped_total,omitempty"`
	SourceHealthPersistCoalescedBySource    map[int64]int64   `json:"source_health_persist_coalesced_by_source,omitempty"`
	SourceHealthPersistDroppedBySource      map[int64]int64   `json:"source_health_persist_dropped_by_source,omitempty"`
	Subscribers                             []SubscriberStats `json:"subscribers"`
}

TunerStatus describes one active tuner lease and linked shared-session state.

type TunerStatusSnapshot

type TunerStatusSnapshot struct {
	GeneratedAt                  time.Time              `json:"generated_at"`
	TunerCount                   int                    `json:"tuner_count"`
	InUseCount                   int                    `json:"in_use_count"`
	IdleCount                    int                    `json:"idle_count"`
	VirtualTuners                []VirtualTunerStatus   `json:"virtual_tuners"`
	Churn                        ChurnSummary           `json:"churn"`
	DrainWait                    DrainWaitTelemetry     `json:"drain_wait"`
	ProbeClose                   ProbeCloseTelemetry    `json:"probe_close"`
	Tuners                       []TunerStatus          `json:"tuners"`
	ClientStreams                []ClientStreamStatus   `json:"client_streams"`
	SessionHistory               []SharedSessionHistory `json:"session_history,omitempty"`
	SessionHistoryLimit          int                    `json:"session_history_limit,omitempty"`
	SessionHistoryTruncatedCount int64                  `json:"session_history_truncated_count,omitempty"`
}

TunerStatusSnapshot reports current tuner/session runtime state for admin diagnostics.

type VirtualTunerManager added in v1.1.0

type VirtualTunerManager struct {
	// contains filtered or unexported fields
}

VirtualTunerManager routes tuner leases to per-source pools while presenting a single aggregate tuner surface to existing callers.

func NewVirtualTunerManager added in v1.1.0

func NewVirtualTunerManager(sources []VirtualTunerSource) *VirtualTunerManager

func (*VirtualTunerManager) Acquire added in v1.1.0

func (m *VirtualTunerManager) Acquire(ctx context.Context, guideNumber, clientAddr string) (*Lease, error)

func (*VirtualTunerManager) AcquireClient added in v1.1.0

func (m *VirtualTunerManager) AcquireClient(ctx context.Context, guideNumber, clientAddr string) (*Lease, error)

func (*VirtualTunerManager) AcquireClientForSource added in v1.1.0

func (m *VirtualTunerManager) AcquireClientForSource(
	ctx context.Context,
	sourceID int64,
	guideNumber,
	clientAddr string,
) (*Lease, error)

func (*VirtualTunerManager) AcquireProbe added in v1.1.0

func (m *VirtualTunerManager) AcquireProbe(
	ctx context.Context,
	label string,
	cancel context.CancelCauseFunc,
) (*Lease, error)

func (*VirtualTunerManager) AcquireProbeForSource added in v1.1.0

func (m *VirtualTunerManager) AcquireProbeForSource(
	ctx context.Context,
	sourceID int64,
	label string,
	cancel context.CancelCauseFunc,
) (*Lease, error)

func (*VirtualTunerManager) Capacity added in v1.1.0

func (m *VirtualTunerManager) Capacity() int

func (*VirtualTunerManager) CapacityForSource added in v1.1.0

func (m *VirtualTunerManager) CapacityForSource(sourceID int64) int

func (*VirtualTunerManager) Close added in v1.1.0

func (m *VirtualTunerManager) Close()

func (*VirtualTunerManager) InUseCount added in v1.1.0

func (m *VirtualTunerManager) InUseCount() int

func (*VirtualTunerManager) InUseCountForSource added in v1.1.0

func (m *VirtualTunerManager) InUseCountForSource(sourceID int64) int

func (*VirtualTunerManager) Reconfigure added in v1.1.0

func (m *VirtualTunerManager) Reconfigure(sources []VirtualTunerSource)

Reconfigure updates source-level virtual pools at runtime so subsequent tuner acquisitions reflect the latest playlist source configuration.

func (*VirtualTunerManager) SetPreemptSettleDelay added in v1.1.0

func (m *VirtualTunerManager) SetPreemptSettleDelay(delay time.Duration)

func (*VirtualTunerManager) Snapshot added in v1.1.0

func (m *VirtualTunerManager) Snapshot() []Session

func (*VirtualTunerManager) VirtualTunerSnapshot added in v1.1.0

func (m *VirtualTunerManager) VirtualTunerSnapshot() []VirtualTunerPoolSnapshot

type VirtualTunerPoolSnapshot added in v1.1.0

type VirtualTunerPoolSnapshot struct {
	PlaylistSourceID    int64
	PlaylistSourceName  string
	PlaylistSourceOrder int
	TunerCount          int
	InUseCount          int
	IdleCount           int
}

VirtualTunerPoolSnapshot reports one source-level virtual tuner pool.

type VirtualTunerSource added in v1.1.0

type VirtualTunerSource struct {
	SourceID   int64
	Name       string
	TunerCount int
	Enabled    bool
	OrderIndex int
}

VirtualTunerSource describes one configured source-level tuner pool.

type VirtualTunerStatus added in v1.1.0

type VirtualTunerStatus struct {
	PlaylistSourceID    int64  `json:"playlist_source_id"`
	PlaylistSourceName  string `json:"playlist_source_name"`
	PlaylistSourceOrder int    `json:"playlist_source_order"`
	TunerCount          int    `json:"tuner_count"`
	InUseCount          int    `json:"in_use_count"`
	IdleCount           int    `json:"idle_count"`
	ActiveSessionCount  int    `json:"active_session_count"`
}

VirtualTunerStatus reports aggregate tuner usage for one playlist source pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL