stream

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSessionMaxSubscribers         = errors.New("shared session max subscribers reached")
	ErrSessionNoSources              = errors.New("channel has no enabled sources")
	ErrSessionNotFound               = errors.New("shared session not found")
	ErrSessionRecoveryAlreadyPending = errors.New(
		"shared session manual recovery is already pending",
	)
	ErrSlowClientLagged = errors.New("subscriber fell behind stream tail")
)
View Source
var ErrNoTunersAvailable = errors.New("no tuners available")
View Source
var ErrProbePreempted = errors.New("probe preempted by client stream")
View Source
var ErrRingClosed = errors.New("chunk ring is closed")

Functions

func SanitizeURLForLog

func SanitizeURLForLog(raw string) string

SanitizeURLForLog keeps URL routing shape while removing credentials and token-bearing components from log output.

Types

type BackgroundProber

type BackgroundProber struct {
	// contains filtered or unexported fields
}

BackgroundProber periodically checks source startup readiness to pre-populate health state.

func NewBackgroundProber

func NewBackgroundProber(cfg ProberConfig, provider ProbeChannelsProvider) *BackgroundProber

func (*BackgroundProber) Close

func (p *BackgroundProber) Close()

Close stops the background session-close worker.

func (*BackgroundProber) Enabled

func (p *BackgroundProber) Enabled() bool

func (*BackgroundProber) ProbeOnce

func (p *BackgroundProber) ProbeOnce(ctx context.Context) error

func (*BackgroundProber) Run

func (p *BackgroundProber) Run(ctx context.Context)

Run executes periodic probe ticks until ctx is canceled. Caller-owned lifecycle: Run does not call Close; the caller must invoke Close() during shutdown to drain queued session closes.

type ChannelsProvider

type ChannelsProvider interface {
	GetByGuideNumber(ctx context.Context, guideNumber string) (channels.Channel, error)
	ListSources(ctx context.Context, channelID int64, enabledOnly bool) ([]channels.Source, error)
	MarkSourceFailure(ctx context.Context, sourceID int64, reason string, failedAt time.Time) error
	MarkSourceSuccess(ctx context.Context, sourceID int64, succeededAt time.Time) error
	UpdateSourceProfile(ctx context.Context, sourceID int64, profile channels.SourceProfileUpdate) error
}

ChannelsProvider supplies channel and source operations required by stream tuning.

type ChunkPublisher

type ChunkPublisher interface {
	PublishChunk(data []byte, publishedAt time.Time) error
}

ChunkPublisher receives chunks emitted by the stream pump.

type ChunkPublisherCopyAware

type ChunkPublisherCopyAware interface {
	ChunkPublisher
	CopiesPublishedChunkData() bool
}

ChunkPublisherCopyAware advertises whether PublishChunk always deep-copies input bytes before returning.

type ChunkRing

type ChunkRing struct {
	// contains filtered or unexported fields
}

ChunkRing stores recently published chunks in an in-memory ring.

func NewChunkRing

func NewChunkRing(maxChunks int) *ChunkRing

NewChunkRing creates a bounded in-memory chunk ring.

func NewChunkRingWithLimits

func NewChunkRingWithLimits(maxChunks, maxBytes int) *ChunkRing

NewChunkRingWithLimits creates a bounded in-memory chunk ring with optional chunk-count and byte-budget eviction limits.

func NewChunkRingWithLimitsAndStartupHint

func NewChunkRingWithLimitsAndStartupHint(maxChunks, maxBytes, startupChunkHint int) *ChunkRing

NewChunkRingWithLimitsAndStartupHint creates a bounded in-memory chunk ring with optional chunk-count/byte-budget limits plus a startup payload hint used to prewarm a bounded number of slot buffers.

func (*ChunkRing) Close

func (r *ChunkRing) Close(err error)

Close stops the ring and wakes readers waiting for new chunks.

func (*ChunkRing) CopiesPublishedChunkData

func (r *ChunkRing) CopiesPublishedChunkData() bool

CopiesPublishedChunkData reports that PublishChunk deep-copies chunk bytes.

func (*ChunkRing) PublishChunk

func (r *ChunkRing) PublishChunk(data []byte, publishedAt time.Time) error

PublishChunk appends one chunk and evicts the oldest chunk when full.

func (*ChunkRing) ReadFrom

func (r *ChunkRing) ReadFrom(seq uint64) ReadResult

ReadFrom returns available chunks for the requested sequence. If no chunks are available yet, WaitSeq can be passed to WaitForChange.

func (*ChunkRing) Snapshot

func (r *ChunkRing) Snapshot() []RingChunk

Snapshot returns a deep copy of currently buffered chunks.

func (*ChunkRing) StartSeqByLagBytes

func (r *ChunkRing) StartSeqByLagBytes(lagBytes int) uint64

StartSeqByLagBytes returns the sequence to start reading from for a new subscriber. When lagBytes is zero or negative, subscribers start at the live tail.

func (*ChunkRing) WaitForChange

func (r *ChunkRing) WaitForChange(ctx context.Context, waitSeq uint64) error

WaitForChange blocks until ring state changes from waitSeq, ring closes, or ctx ends.

type ChurnSummary

type ChurnSummary struct {
	ReselectAlertThreshold        int64  `json:"reselect_alert_threshold"`
	SessionCount                  int    `json:"session_count"`
	RecoveringSessionCount        int    `json:"recovering_session_count"`
	SessionsWithReselectCount     int    `json:"sessions_with_reselect_count"`
	SessionsOverReselectThreshold int    `json:"sessions_over_reselect_threshold"`
	TotalRecoveryCycles           int64  `json:"total_recovery_cycles"`
	TotalSourceSelectCount        int64  `json:"total_source_select_count"`
	TotalSameSourceReselectCount  int64  `json:"total_same_source_reselect_count"`
	MaxSameSourceReselectCount    int64  `json:"max_same_source_reselect_count"`
	MaxReselectChannelID          int64  `json:"max_reselect_channel_id,omitempty"`
	MaxReselectGuideNumber        string `json:"max_reselect_guide_number,omitempty"`
}

ChurnSummary captures high-level recovery/reselection churn counters across active shared sessions.

type ClientStreamStatus

type ClientStreamStatus struct {
	TunerID           int       `json:"tuner_id"`
	Kind              string    `json:"kind"`
	ChannelID         int64     `json:"channel_id,omitempty"`
	GuideNumber       string    `json:"guide_number,omitempty"`
	GuideName         string    `json:"guide_name,omitempty"`
	SourceID          int64     `json:"source_id,omitempty"`
	SourceItemKey     string    `json:"source_item_key,omitempty"`
	SourceStreamURL   string    `json:"source_stream_url,omitempty"`
	Resolution        string    `json:"resolution,omitempty"`
	FrameRate         float64   `json:"frame_rate,omitempty"`
	VideoCodec        string    `json:"video_codec,omitempty"`
	AudioCodec        string    `json:"audio_codec,omitempty"`
	CurrentBitrateBPS int64     `json:"current_bitrate_bps,omitempty"`
	ProfileBitrateBPS int64     `json:"profile_bitrate_bps,omitempty"`
	Producer          string    `json:"producer,omitempty"`
	SubscriberID      uint64    `json:"subscriber_id"`
	ClientAddr        string    `json:"client_addr,omitempty"`
	ConnectedAt       time.Time `json:"connected_at"`
}

ClientStreamStatus maps one connected client subscriber to its backing tuner session.

type Config

type Config struct {
	Mode                            string
	FFmpegPath                      string
	HTTPClient                      *http.Client
	Logger                          *slog.Logger
	StartupTimeout                  time.Duration
	StartupRandomAccessRecoveryOnly bool
	MinProbeBytes                   int
	MaxFailovers                    int
	FailoverTotalTimeout            time.Duration
	UpstreamOverlimitCooldown       time.Duration
	FFmpegReconnectEnabled          bool
	FFmpegReconnectDelayMax         time.Duration
	FFmpegReconnectMaxRetries       int
	FFmpegReconnectHTTPErrors       string
	FFmpegStartupProbeSize          int
	FFmpegStartupAnalyzeDelay       time.Duration
	FFmpegCopyRegenerateTimestamps  *bool

	ProducerReadRate     float64
	ProducerInitialBurst int

	BufferChunkBytes           int
	BufferPublishFlushInterval time.Duration
	BufferTSAlign188           bool

	StallDetect               time.Duration
	StallHardDeadline         time.Duration
	StallPolicy               string
	StallMaxFailoversPerStall int
	CycleFailureMinHealth     time.Duration

	RecoveryFillerEnabled     bool
	RecoveryFillerMode        string
	RecoveryFillerInterval    time.Duration
	RecoveryFillerText        string
	RecoveryFillerEnableAudio bool

	SubscriberJoinLagBytes     int
	SubscriberSlowClientPolicy string
	SubscriberMaxBlockedWrite  time.Duration

	SessionIdleTimeout    time.Duration
	SessionDrainTimeout   time.Duration
	SessionMaxSubscribers int
	SessionHistoryLimit   int
	// SessionSourceHistoryLimit and SessionSubscriberHistoryLimit are optional
	// per-session timeline overrides. When non-positive, SessionManager falls
	// back to SessionHistoryLimit/defaults and applies min/max clamp guardrails.
	SessionSourceHistoryLimit     int
	SessionSubscriberHistoryLimit int
	// SourceHealthDrainTimeout bounds post-cancel source-health queue draining
	// during session teardown. Non-positive values normalize to defaults in
	// SessionManager config.
	SourceHealthDrainTimeout time.Duration

	TuneBackoffMaxTunes int
	TuneBackoffInterval time.Duration
	TuneBackoffCooldown time.Duration
}

Config controls stream delivery behavior.

type DrainWaitTelemetry

type DrainWaitTelemetry struct {
	OK             uint64 `json:"ok"`
	Error          uint64 `json:"error"`
	WaitDurationUS uint64 `json:"wait_duration_us"`
	WaitDurationMS uint64 `json:"wait_duration_ms"`
}

DrainWaitTelemetry captures process-lifetime WaitForDrain result counters.

type FFmpegProducer

type FFmpegProducer struct {
	// contains filtered or unexported fields
}

FFmpegProducer starts ffmpeg and exposes its stdout as MPEG-TS bytes.

func NewFFmpegProducer

func NewFFmpegProducer(cfg FFmpegProducerConfig) (*FFmpegProducer, error)

NewFFmpegProducer validates config and creates a producer.

func (*FFmpegProducer) Describe

func (p *FFmpegProducer) Describe() string

Describe returns a log-friendly producer descriptor.

func (*FFmpegProducer) Start

func (p *FFmpegProducer) Start(ctx context.Context) (io.ReadCloser, error)

Start launches ffmpeg and returns a read closer for stdout bytes.

type FFmpegProducerConfig

type FFmpegProducerConfig struct {
	FFmpegPath          string
	Mode                string
	StreamURL           string
	ReadRate            float64
	InitialBurstSeconds int
	LogLevel            string
}

FFmpegProducerConfig controls ffmpeg-backed producer behavior.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler serves /auto/v{guideNumber} requests.

func NewHandler

func NewHandler(cfg Config, tuners *Pool, channelsProvider ChannelsProvider) *Handler

func (*Handler) ClearAllSourceHealth

func (h *Handler) ClearAllSourceHealth() error

ClearAllSourceHealth clears all source-health convergence state.

func (*Handler) ClearSourceHealth

func (h *Handler) ClearSourceHealth(channelID int64) error

ClearSourceHealth clears source-health convergence state for a channel.

func (*Handler) Close

func (h *Handler) Close()

Close cancels all active sessions so they drain before store close.

func (*Handler) CloseWithContext

func (h *Handler) CloseWithContext(ctx context.Context) error

CloseWithContext cancels all active sessions and waits for teardown within the provided context budget.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Handler) TriggerSessionRecovery

func (h *Handler) TriggerSessionRecovery(channelID int64, reason string) error

TriggerSessionRecovery requests an in-session recovery cycle for an active shared session on the provided channel. Recovery routing is resolved from in-memory shared-session state and does not require channel/source store lookups.

func (*Handler) TunerStatusSnapshot

func (h *Handler) TunerStatusSnapshot() TunerStatusSnapshot

TunerStatusSnapshot returns a structured view of active tuner leases and subscribers.

type Lease

type Lease struct {
	ID int
	// contains filtered or unexported fields
}

Lease represents an acquired tuner slot.

func (*Lease) Release

func (l *Lease) Release()

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool tracks active tuner sessions and enforces a max concurrent stream count.

func NewPool

func NewPool(count int) *Pool

func (*Pool) Acquire

func (p *Pool) Acquire(ctx context.Context, guideNumber, clientAddr string) (*Lease, error)

func (*Pool) AcquireClient

func (p *Pool) AcquireClient(ctx context.Context, guideNumber, clientAddr string) (*Lease, error)

func (*Pool) AcquireProbe

func (p *Pool) AcquireProbe(ctx context.Context, label string, cancel context.CancelCauseFunc) (*Lease, error)

func (*Pool) Capacity

func (p *Pool) Capacity() int

func (*Pool) InUseCount

func (p *Pool) InUseCount() int

func (*Pool) SetPreemptSettleDelay

func (p *Pool) SetPreemptSettleDelay(delay time.Duration)

SetPreemptSettleDelay configures a delay after preemption before reusing the reclaimed tuner slot for a new lease.

func (*Pool) Snapshot

func (p *Pool) Snapshot() []Session

type ProbeChannelsProvider

type ProbeChannelsProvider interface {
	ListEnabled(ctx context.Context) ([]channels.Channel, error)
	ListSources(ctx context.Context, channelID int64, enabledOnly bool) ([]channels.Source, error)
	MarkSourceFailure(ctx context.Context, sourceID int64, reason string, failedAt time.Time) error
	MarkSourceSuccess(ctx context.Context, sourceID int64, succeededAt time.Time) error
}

ProbeChannelsProvider supplies channel/source operations required by the background prober.

type ProbeCloseTelemetry

type ProbeCloseTelemetry struct {
	InlineCount    uint64 `json:"inline_count"`
	QueueFullCount uint64 `json:"queue_full_count"`
}

ProbeCloseTelemetry captures process-lifetime background-prober close fallback counters.

type ProbeTunerUsage

type ProbeTunerUsage interface {
	AcquireProbe(ctx context.Context, label string, cancel context.CancelCauseFunc) (*Lease, error)
}

ProbeTunerUsage exposes probe-slot acquisition for background probing.

type ProberConfig

type ProberConfig struct {
	Mode                           string
	FFmpegPath                     string
	HTTPClient                     *http.Client
	Logger                         *slog.Logger
	ProducerReadRate               float64
	ProducerInitialBurst           int
	FFmpegReconnectEnabled         bool
	FFmpegReconnectDelayMax        time.Duration
	FFmpegReconnectMaxRetries      int
	FFmpegReconnectHTTPErrors      string
	FFmpegStartupProbeSize         int
	FFmpegStartupAnalyzeDelay      time.Duration
	FFmpegCopyRegenerateTimestamps *bool
	MinProbeBytes                  int
	ProbeInterval                  time.Duration
	ProbeTimeout                   time.Duration
	ProbeCloseQueueDepth           int
	TunerUsage                     ProbeTunerUsage
	ProbeTuneDelay                 time.Duration
}

ProberConfig controls optional background source probing.

type Producer

type Producer interface {
	Start(ctx context.Context) (io.ReadCloser, error)
	Describe() string
}

Producer starts an upstream MPEG-TS source for a shared channel session.

type Pump

type Pump struct {
	// contains filtered or unexported fields
}

Pump copies producer bytes into chunk publications using size-or-time flush.

func NewPump

func NewPump(cfg PumpConfig, publisher ChunkPublisher) *Pump

NewPump builds a new chunk pump for producer output.

func (*Pump) Run

func (p *Pump) Run(ctx context.Context, src io.ReadCloser) error

Run consumes producer output and publishes chunks until EOF or cancellation.

func (*Pump) Stats

func (p *Pump) Stats() PumpStats

Stats returns the latest telemetry snapshot.

type PumpConfig

type PumpConfig struct {
	ChunkBytes           int
	ReadBufferBytes      int
	PublishFlushInterval time.Duration
	TSAlign188           bool
}

PumpConfig controls chunking and flush behavior for producer output.

type PumpStats

type PumpStats struct {
	LastByteReadAt  time.Time
	LastPublishAt   time.Time
	BytesRead       int64
	BytesPublished  int64
	ChunksPublished int64
}

PumpStats exposes byte-flow telemetry used by stall detection.

type ReadResult

type ReadResult struct {
	Chunks         []RingChunk
	NextSeq        uint64
	Behind         bool
	Closed         bool
	Err            error
	WaitSeq        uint64
	RequestedSeq   uint64
	OldestSeq      uint64
	RingNextSeq    uint64
	BufferedChunks int
	BufferedBytes  int
	// contains filtered or unexported fields
}

ReadResult is one read pass over ring data starting at a requested sequence.

func (*ReadResult) Release

func (r *ReadResult) Release()

Release signals that callers are done reading chunk payload slices returned by ReadFrom.

type RingChunk

type RingChunk struct {
	Seq         uint64
	PublishedAt time.Time
	Data        []byte
}

RingChunk is one published chunk in a channel session ring.

type Session

type Session struct {
	ID          int
	GuideNumber string
	ClientAddr  string
	Kind        string
	StartedAt   time.Time
	// contains filtered or unexported fields
}

Session describes an active tuner lease.

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager coordinates shared channel stream sessions.

func NewSessionManager

func NewSessionManager(cfg SessionManagerConfig, tuners *Pool, channelsProvider ChannelsProvider) *SessionManager

NewSessionManager builds a new shared session manager.

func (*SessionManager) ClearAllSourceHealth

func (m *SessionManager) ClearAllSourceHealth() error

func (*SessionManager) ClearSourceHealth

func (m *SessionManager) ClearSourceHealth(channelID int64) error

func (*SessionManager) Close

func (m *SessionManager) Close()

Close cancels all active sessions and blocks until their goroutines (including source-health persistence drain) complete. This must be called before store.Close() to guarantee no session-scoped DB work races with database teardown.

func (*SessionManager) CloseWithContext

func (m *SessionManager) CloseWithContext(ctx context.Context) error

CloseWithContext cancels all active sessions and waits until their goroutines complete or the provided context expires.

func (*SessionManager) HasActiveOrPendingSession

func (m *SessionManager) HasActiveOrPendingSession(channelID int64) bool

HasActiveOrPendingSession reports whether a channel currently has an admissible shared session, or a shared session in creation.

func (*SessionManager) HistorySnapshot

func (m *SessionManager) HistorySnapshot() ([]SharedSessionHistory, int, int64)

HistorySnapshot returns bounded shared-session history for active and recently closed sessions. Results are newest-first.

func (*SessionManager) Snapshot

func (m *SessionManager) Snapshot() []SharedSessionStats

Snapshot returns all active shared sessions with per-session diagnostics.

func (*SessionManager) Subscribe

func (m *SessionManager) Subscribe(ctx context.Context, channel channels.Channel) (*SessionSubscription, error)

Subscribe returns a client subscription for the given channel.

func (*SessionManager) SubscribeImmediate

func (m *SessionManager) SubscribeImmediate(ctx context.Context, channel channels.Channel) (*SessionSubscription, error)

SubscribeImmediate returns a subscription without waiting for startup readiness. This is useful for clients that expect a prompt HTTP response while startup proceeds.

func (*SessionManager) TriggerRecovery

func (m *SessionManager) TriggerRecovery(channelID int64, reason string) error

TriggerRecovery requests an in-session recovery cycle for an active channel session without waiting for a real upstream stall.

func (*SessionManager) WaitForDrain

func (m *SessionManager) WaitForDrain(ctx context.Context) error

WaitForDrain blocks until the manager has no active or pending channel sessions.

type SessionManagerConfig

type SessionManagerConfig struct {
	Mode                            string
	FFmpegPath                      string
	HTTPClient                      *http.Client
	Logger                          *slog.Logger
	StartupTimeout                  time.Duration
	StartupRandomAccessRecoveryOnly bool
	MinProbeBytes                   int
	MaxFailovers                    int
	FailoverTotalTimeout            time.Duration
	UpstreamOverlimitCooldown       time.Duration
	FFmpegReconnectEnabled          bool
	FFmpegReconnectDelayMax         time.Duration
	FFmpegReconnectMaxRetries       int
	FFmpegReconnectHTTPErrors       string
	FFmpegStartupProbeSize          int
	FFmpegStartupAnalyzeDelay       time.Duration
	FFmpegCopyRegenerateTimestamps  *bool
	ProducerReadRate                float64
	ProducerInitialBurst            int

	BufferChunkBytes           int
	BufferPublishFlushInterval time.Duration
	BufferTSAlign188           bool

	StallDetect               time.Duration
	StallHardDeadline         time.Duration
	StallPolicy               string
	StallMaxFailoversPerStall int
	CycleFailureMinHealth     time.Duration

	RecoveryFillerEnabled     bool
	RecoveryFillerMode        string
	RecoveryFillerInterval    time.Duration
	RecoveryFillerText        string
	RecoveryFillerEnableAudio bool

	SubscriberJoinLagBytes        int
	SubscriberSlowClientPolicy    string
	SubscriberMaxBlockedWrite     time.Duration
	SessionIdleTimeout            time.Duration
	SessionMaxSubscribers         int
	SessionHistoryLimit           int
	SessionSourceHistoryLimit     int
	SessionSubscriberHistoryLimit int
	SessionDrainTimeout           time.Duration
	SourceHealthDrainTimeout      time.Duration
}

SessionManagerConfig controls shared per-channel stream sessions.

type SessionSubscription

type SessionSubscription struct {
	// contains filtered or unexported fields
}

SessionSubscription represents one active client subscription to a shared channel session.

func (*SessionSubscription) Close

func (s *SessionSubscription) Close()

Close releases one subscriber slot from the shared session.

func (*SessionSubscription) Stats

Stats returns a snapshot of the current shared session state.

func (*SessionSubscription) Stream

Stream writes shared stream chunks to the provided response writer until completion/cancel.

type SharedSessionHistory

type SharedSessionHistory struct {
	SessionID                            uint64                           `json:"session_id"`
	ChannelID                            int64                            `json:"channel_id,omitempty"`
	GuideNumber                          string                           `json:"guide_number,omitempty"`
	GuideName                            string                           `json:"guide_name,omitempty"`
	TunerID                              int                              `json:"tuner_id"`
	OpenedAt                             time.Time                        `json:"opened_at"`
	ClosedAt                             time.Time                        `json:"closed_at,omitempty"`
	Active                               bool                             `json:"active"`
	TerminalStatus                       string                           `json:"terminal_status,omitempty"`
	TerminalError                        string                           `json:"terminal_error,omitempty"`
	PeakSubscribers                      int                              `json:"peak_subscribers"`
	TotalSubscribers                     int64                            `json:"total_subscribers"`
	CompletedSubscribers                 int64                            `json:"completed_subscribers"`
	SlowSkipEventsTotal                  uint64                           `json:"slow_skip_events_total"`
	SlowSkipLagChunksTotal               uint64                           `json:"slow_skip_lag_chunks_total"`
	SlowSkipLagBytesTotal                uint64                           `json:"slow_skip_lag_bytes_total"`
	SlowSkipMaxLagChunks                 uint64                           `json:"slow_skip_max_lag_chunks"`
	SubscriberWriteDeadlineTimeoutsTotal uint64                           `json:"subscriber_write_deadline_timeouts_total"`
	SubscriberWriteShortWritesTotal      uint64                           `json:"subscriber_write_short_writes_total"`
	SubscriberWriteBlockedDurationUS     uint64                           `json:"subscriber_write_blocked_duration_us"`
	SubscriberWriteBlockedDurationMS     uint64                           `json:"subscriber_write_blocked_duration_ms"`
	RecoveryCycleCount                   int64                            `json:"recovery_cycle_count"`
	LastRecoveryReason                   string                           `json:"last_recovery_reason,omitempty"`
	SourceSelectCount                    int64                            `json:"source_select_count"`
	SameSourceReselectCount              int64                            `json:"same_source_reselect_count"`
	LastSourceSelectedAt                 time.Time                        `json:"last_source_selected_at,omitempty"`
	LastSourceSelectReason               string                           `json:"last_source_select_reason,omitempty"`
	LastError                            string                           `json:"last_error,omitempty"`
	Sources                              []SharedSessionSourceHistory     `json:"sources,omitempty"`
	Subscribers                          []SharedSessionSubscriberHistory `json:"subscribers,omitempty"`
	SourceHistoryLimit                   int                              `json:"source_history_limit,omitempty"`
	SourceHistoryTruncated               int64                            `json:"source_history_truncated_count,omitempty"`
	SubscriberHistoryLimit               int                              `json:"subscriber_history_limit,omitempty"`
	SubscriberHistoryTruncated           int64                            `json:"subscriber_history_truncated_count,omitempty"`
}

SharedSessionHistory reports bounded historical shared-session lifecycle windows for both active and recently closed sessions.

type SharedSessionSourceHistory

type SharedSessionSourceHistory struct {
	SourceID                       int64     `json:"source_id,omitempty"`
	ItemKey                        string    `json:"item_key,omitempty"`
	StreamURL                      string    `json:"stream_url,omitempty"`
	SelectedAt                     time.Time `json:"selected_at"`
	DeselectedAt                   time.Time `json:"deselected_at,omitempty"`
	SelectionReason                string    `json:"selection_reason,omitempty"`
	StartupProbeRawBytes           int       `json:"startup_probe_raw_bytes,omitempty"`
	StartupProbeTrimmedBytes       int       `json:"startup_probe_trimmed_bytes,omitempty"`
	StartupProbeCutoverOffset      int       `json:"startup_probe_cutover_offset,omitempty"`
	StartupProbeDroppedBytes       int       `json:"startup_probe_dropped_bytes,omitempty"`
	StartupRandomAccessReady       bool      `json:"startup_random_access_ready,omitempty"`
	StartupRandomAccessCodec       string    `json:"startup_random_access_codec,omitempty"`
	StartupInventoryMethod         string    `json:"startup_inventory_method,omitempty"`
	StartupVideoStreams            int       `json:"startup_video_streams,omitempty"`
	StartupAudioStreams            int       `json:"startup_audio_streams,omitempty"`
	StartupVideoCodecs             string    `json:"startup_video_codecs,omitempty"`
	StartupAudioCodecs             string    `json:"startup_audio_codecs,omitempty"`
	StartupComponentState          string    `json:"startup_component_state,omitempty"`
	StartupRetryRelaxedProbe       bool      `json:"startup_retry_relaxed_probe,omitempty"`
	StartupRetryRelaxedProbeReason string    `json:"startup_retry_relaxed_probe_reason,omitempty"`
	Resolution                     string    `json:"resolution,omitempty"`
	FrameRate                      float64   `json:"frame_rate,omitempty"`
	VideoCodec                     string    `json:"video_codec,omitempty"`
	AudioCodec                     string    `json:"audio_codec,omitempty"`
	ProfileBitrateBPS              int64     `json:"profile_bitrate_bps,omitempty"`
	Producer                       string    `json:"producer,omitempty"`
}

SharedSessionSourceHistory reports one source-selection window for a shared session.

type SharedSessionStats

type SharedSessionStats struct {
	TunerID                              int
	ChannelID                            int64
	GuideNumber                          string
	GuideName                            string
	SourceID                             int64
	SourceItemKey                        string
	SourceStreamURL                      string
	SourceStartupProbeRawBytes           int
	SourceStartupProbeTrimmedBytes       int
	SourceStartupProbeCutoverOffset      int
	SourceStartupProbeDroppedBytes       int
	SourceStartupProbeBytes              int
	SourceStartupRandomAccessReady       bool
	SourceStartupRandomAccessCodec       string
	SourceStartupInventoryMethod         string
	SourceStartupVideoStreams            int
	SourceStartupAudioStreams            int
	SourceStartupVideoCodecs             string
	SourceStartupAudioCodecs             string
	SourceStartupComponentState          string
	SourceStartupRetryRelaxedProbe       bool
	SourceStartupRetryRelaxedProbeReason string
	Resolution                           string
	FrameRate                            float64
	VideoCodec                           string
	AudioCodec                           string
	CurrentBitrateBPS                    int64
	ProfileBitrateBPS                    int64
	Producer                             string
	StartedAt                            time.Time
	LastByteAt                           time.Time
	LastPushAt                           time.Time
	BytesRead                            int64
	BytesPushed                          int64
	ChunksPushed                         int64
	Subscribers                          int
	SubscriberInfo                       []SubscriberStats
	SlowSkipEventsTotal                  uint64
	SlowSkipLagChunksTotal               uint64
	SlowSkipLagBytesTotal                uint64
	SlowSkipMaxLagChunks                 uint64
	SubscriberWriteDeadlineTimeoutsTotal uint64
	SubscriberWriteShortWritesTotal      uint64
	SubscriberWriteBlockedDurationUS     uint64
	SubscriberWriteBlockedDurationMS     uint64
	StallCount                           int64
	RecoveryCycle                        int64
	RecoveryReason                       string
	RecoveryTransitionMode               string
	RecoveryTransitionEffectiveMode      string
	RecoveryTransitionSignalsApplied     string
	RecoveryTransitionSignalSkips        string
	RecoveryTransitionFallbackCount      int64
	RecoveryTransitionFallbackReason     string
	RecoveryTransitionStitchApplied      bool
	RecoveryKeepaliveMode                string
	RecoveryKeepaliveFallbackCount       int64
	RecoveryKeepaliveFallbackReason      string
	RecoveryKeepaliveStartedAt           time.Time
	RecoveryKeepaliveStoppedAt           time.Time
	RecoveryKeepaliveDuration            string
	RecoveryKeepaliveBytes               int64
	RecoveryKeepaliveChunks              int64
	RecoveryKeepaliveRateBytesPerSecond  float64
	RecoveryKeepaliveExpectedRate        float64
	RecoveryKeepaliveRealtimeMultiplier  float64
	RecoveryKeepaliveGuardrailCount      int64
	RecoveryKeepaliveGuardrailReason     string
	SourceSelectCount                    int64
	SameSourceReselectCount              int64
	LastSourceSelectedAt                 time.Time
	LastSourceSelectReason               string
	SinceLastSourceSelect                string
	LastError                            string
	SourceHealthPersistCoalescedTotal    int64
	SourceHealthPersistDroppedTotal      int64
	SourceHealthPersistCoalescedBySource map[int64]int64
	SourceHealthPersistDroppedBySource   map[int64]int64
}

SharedSessionStats reports per-channel shared-session state for diagnostics.

type SharedSessionSubscriberHistory

type SharedSessionSubscriberHistory struct {
	SubscriberID uint64    `json:"subscriber_id"`
	ClientAddr   string    `json:"client_addr,omitempty"`
	ConnectedAt  time.Time `json:"connected_at"`
	ClosedAt     time.Time `json:"closed_at,omitempty"`
	CloseReason  string    `json:"close_reason,omitempty"`
}

SharedSessionSubscriberHistory reports one subscriber lifecycle window.

type SubscriberStats

type SubscriberStats struct {
	SubscriberID uint64
	ClientAddr   string
	StartedAt    time.Time
}

SubscriberStats reports one connected subscriber on a shared session.

type TunerStatus

type TunerStatus struct {
	TunerID                              int               `json:"tuner_id"`
	Kind                                 string            `json:"kind"`
	State                                string            `json:"state"`
	GuideNumber                          string            `json:"guide_number,omitempty"`
	LeaseClientAddr                      string            `json:"lease_client_addr,omitempty"`
	LeaseStartedAt                       time.Time         `json:"lease_started_at"`
	ChannelID                            int64             `json:"channel_id,omitempty"`
	GuideName                            string            `json:"guide_name,omitempty"`
	SourceID                             int64             `json:"source_id,omitempty"`
	SourceItemKey                        string            `json:"source_item_key,omitempty"`
	SourceStreamURL                      string            `json:"source_stream_url,omitempty"`
	SourceStartupProbeRawBytes           int               `json:"source_startup_probe_raw_bytes,omitempty"`
	SourceStartupProbeTrimmedBytes       int               `json:"source_startup_probe_trimmed_bytes,omitempty"`
	SourceStartupProbeCutoverOffset      int               `json:"source_startup_probe_cutover_offset,omitempty"`
	SourceStartupProbeDroppedBytes       int               `json:"source_startup_probe_dropped_bytes,omitempty"`
	SourceStartupProbeBytes              int               `json:"source_startup_probe_bytes,omitempty"`
	SourceStartupRandomAccessReady       bool              `json:"source_startup_random_access_ready,omitempty"`
	SourceStartupRandomAccessCodec       string            `json:"source_startup_random_access_codec,omitempty"`
	SourceStartupInventoryMethod         string            `json:"source_startup_inventory_method,omitempty"`
	SourceStartupVideoStreams            int               `json:"source_startup_video_streams,omitempty"`
	SourceStartupAudioStreams            int               `json:"source_startup_audio_streams,omitempty"`
	SourceStartupVideoCodecs             string            `json:"source_startup_video_codecs,omitempty"`
	SourceStartupAudioCodecs             string            `json:"source_startup_audio_codecs,omitempty"`
	SourceStartupComponentState          string            `json:"source_startup_component_state,omitempty"`
	SourceStartupRetryRelaxedProbe       bool              `json:"source_startup_retry_relaxed_probe,omitempty"`
	SourceStartupRetryRelaxedProbeReason string            `json:"source_startup_retry_relaxed_probe_reason,omitempty"`
	Resolution                           string            `json:"resolution,omitempty"`
	FrameRate                            float64           `json:"frame_rate,omitempty"`
	VideoCodec                           string            `json:"video_codec,omitempty"`
	AudioCodec                           string            `json:"audio_codec,omitempty"`
	CurrentBitrateBPS                    int64             `json:"current_bitrate_bps,omitempty"`
	ProfileBitrateBPS                    int64             `json:"profile_bitrate_bps,omitempty"`
	Producer                             string            `json:"producer,omitempty"`
	SessionStartedAt                     time.Time         `json:"session_started_at"`
	LastByteAt                           time.Time         `json:"last_byte_at"`
	LastPushAt                           time.Time         `json:"last_push_at"`
	BytesRead                            int64             `json:"bytes_read"`
	BytesPushed                          int64             `json:"bytes_pushed"`
	ChunksPushed                         int64             `json:"chunks_pushed"`
	SlowSkipEventsTotal                  uint64            `json:"slow_skip_events_total"`
	SlowSkipLagChunksTotal               uint64            `json:"slow_skip_lag_chunks_total"`
	SlowSkipLagBytesTotal                uint64            `json:"slow_skip_lag_bytes_total"`
	SlowSkipMaxLagChunks                 uint64            `json:"slow_skip_max_lag_chunks"`
	SubscriberWriteDeadlineTimeoutsTotal uint64            `json:"subscriber_write_deadline_timeouts_total"`
	SubscriberWriteShortWritesTotal      uint64            `json:"subscriber_write_short_writes_total"`
	SubscriberWriteBlockedDurationUS     uint64            `json:"subscriber_write_blocked_duration_us"`
	SubscriberWriteBlockedDurationMS     uint64            `json:"subscriber_write_blocked_duration_ms"`
	StallCount                           int64             `json:"stall_count"`
	RecoveryCycle                        int64             `json:"recovery_cycle"`
	RecoveryReason                       string            `json:"recovery_reason,omitempty"`
	RecoveryTransitionMode               string            `json:"recovery_transition_mode,omitempty"`
	RecoveryTransitionEffectiveMode      string            `json:"recovery_transition_effective_mode,omitempty"`
	RecoveryTransitionSignalsApplied     string            `json:"recovery_transition_signals_applied,omitempty"`
	RecoveryTransitionSignalSkips        string            `json:"recovery_transition_signal_skips,omitempty"`
	RecoveryTransitionFallbackCount      int64             `json:"recovery_transition_fallback_count,omitempty"`
	RecoveryTransitionFallbackReason     string            `json:"recovery_transition_fallback_reason,omitempty"`
	RecoveryTransitionStitchApplied      bool              `json:"recovery_transition_stitch_applied,omitempty"`
	RecoveryKeepaliveMode                string            `json:"recovery_keepalive_mode,omitempty"`
	RecoveryKeepaliveFallbackCount       int64             `json:"recovery_keepalive_fallback_count,omitempty"`
	RecoveryKeepaliveFallbackReason      string            `json:"recovery_keepalive_fallback_reason,omitempty"`
	RecoveryKeepaliveStartedAt           time.Time         `json:"recovery_keepalive_started_at,omitempty"`
	RecoveryKeepaliveStoppedAt           time.Time         `json:"recovery_keepalive_stopped_at,omitempty"`
	RecoveryKeepaliveDuration            string            `json:"recovery_keepalive_duration,omitempty"`
	RecoveryKeepaliveBytes               int64             `json:"recovery_keepalive_bytes,omitempty"`
	RecoveryKeepaliveChunks              int64             `json:"recovery_keepalive_chunks,omitempty"`
	RecoveryKeepaliveRateBytesPerSecond  float64           `json:"recovery_keepalive_rate_bytes_per_second,omitempty"`
	RecoveryKeepaliveExpectedRate        float64           `json:"recovery_keepalive_expected_rate_bytes_per_second,omitempty"`
	RecoveryKeepaliveRealtimeMultiplier  float64           `json:"recovery_keepalive_realtime_multiplier,omitempty"`
	RecoveryKeepaliveGuardrailCount      int64             `json:"recovery_keepalive_guardrail_count,omitempty"`
	RecoveryKeepaliveGuardrailReason     string            `json:"recovery_keepalive_guardrail_reason,omitempty"`
	SourceSelectCount                    int64             `json:"source_select_count"`
	SameSourceReselectCount              int64             `json:"same_source_reselect_count"`
	LastSourceSelectedAt                 time.Time         `json:"last_source_selected_at"`
	LastSourceSelectReason               string            `json:"last_source_select_reason,omitempty"`
	SinceLastSourceSelect                string            `json:"since_last_source_select,omitempty"`
	LastError                            string            `json:"last_error,omitempty"`
	SourceHealthPersistCoalescedTotal    int64             `json:"source_health_persist_coalesced_total,omitempty"`
	SourceHealthPersistDroppedTotal      int64             `json:"source_health_persist_dropped_total,omitempty"`
	SourceHealthPersistCoalescedBySource map[int64]int64   `json:"source_health_persist_coalesced_by_source,omitempty"`
	SourceHealthPersistDroppedBySource   map[int64]int64   `json:"source_health_persist_dropped_by_source,omitempty"`
	Subscribers                          []SubscriberStats `json:"subscribers"`
}

TunerStatus describes one active tuner lease and linked shared-session state.

type TunerStatusSnapshot

type TunerStatusSnapshot struct {
	GeneratedAt                  time.Time              `json:"generated_at"`
	TunerCount                   int                    `json:"tuner_count"`
	InUseCount                   int                    `json:"in_use_count"`
	IdleCount                    int                    `json:"idle_count"`
	Churn                        ChurnSummary           `json:"churn"`
	DrainWait                    DrainWaitTelemetry     `json:"drain_wait"`
	ProbeClose                   ProbeCloseTelemetry    `json:"probe_close"`
	Tuners                       []TunerStatus          `json:"tuners"`
	ClientStreams                []ClientStreamStatus   `json:"client_streams"`
	SessionHistory               []SharedSessionHistory `json:"session_history,omitempty"`
	SessionHistoryLimit          int                    `json:"session_history_limit,omitempty"`
	SessionHistoryTruncatedCount int64                  `json:"session_history_truncated_count,omitempty"`
}

TunerStatusSnapshot reports current tuner/session runtime state for admin diagnostics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL