Documentation
¶
Index ¶
- Constants
- Variables
- func ParseAnalysisErrorBuckets(summary string) map[string]int
- func ParseAutoPrioritizeSkipReasonBuckets(summary string) map[string]int
- func PlaylistSyncSourceIDFromContext(ctx context.Context) (int64, bool)
- func WithPlaylistSyncSourceID(ctx context.Context, sourceID int64) context.Context
- func WithRunMetadata(ctx context.Context, runID int64, jobName, triggeredBy string) context.Context
- type AutoPrioritizeChannelStore
- type AutoPrioritizeJob
- type AutoPrioritizeMetricsStore
- type AutoPrioritizeOptions
- type AutoPrioritizeSettingsStore
- type DVRLineupReloader
- type JobFunc
- type PlaylistReconciler
- type PlaylistRefresher
- type PlaylistSettingsStore
- type PlaylistSyncJob
- type Run
- type RunContext
- func (r *RunContext) IncrementProgress(ctx context.Context, delta int) error
- func (r *RunContext) RunID() int64
- func (r *RunContext) SetProgress(ctx context.Context, progressCur, progressMax int) error
- func (r *RunContext) SetSummary(ctx context.Context, summary string) error
- func (r *RunContext) Snapshot() (progressCur, progressMax int, summary string)
- type RunMetadata
- type Runner
- func (r *Runner) Close()
- func (r *Runner) GetRun(ctx context.Context, runID int64) (Run, error)
- func (r *Runner) IsRunning(jobName string) bool
- func (r *Runner) ListRuns(ctx context.Context, jobName string, limit, offset int) ([]Run, error)
- func (r *Runner) SetGlobalLock(enabled bool)
- func (r *Runner) SetLogger(logger *slog.Logger)
- func (r *Runner) Start(ctx context.Context, jobName, triggeredBy string, fn JobFunc) (int64, error)
- type ScheduledCatchUpOptions
- type ScheduledJobCallback
- type ScheduledJobStarter
- type SourcePoolTunerUsageProvider
- type Store
- type StreamAnalyzer
- type StreamMetric
- type TunerUsageProvider
Constants ¶
const ( DefaultMetricsFreshness = 24 * time.Hour DefaultErrorRetry = 30 * time.Minute )
const ( JobPlaylistSync = "playlist_sync" JobAutoPrioritize = "auto_prioritize" JobDVRLineupSync = "dvr_lineup_sync" TriggerManual = "manual" TriggerSchedule = "schedule" StatusRunning = "running" StatusSuccess = "success" StatusError = "error" StatusCanceled = "canceled" )
Variables ¶
var ( ErrAlreadyRunning = errors.New("job already running") ErrRunnerClosed = errors.New("job runner is closed") )
var ( ErrRunNotRunning = errors.New("job run is not running") ErrRunFinalized = errors.New("job run is finalized") )
var ErrProbePreempted = stream.ErrProbePreempted
Functions ¶
func ParseAnalysisErrorBuckets ¶
ParseAnalysisErrorBuckets extracts categorized auto-prioritize error counts from a run summary string. It returns nil when no parseable buckets exist.
func ParseAutoPrioritizeSkipReasonBuckets ¶
ParseAutoPrioritizeSkipReasonBuckets extracts per-reason skipped channel counts from an auto-prioritize run summary string.
func PlaylistSyncSourceIDFromContext ¶ added in v1.1.0
PlaylistSyncSourceIDFromContext returns an optional playlist source scope.
func WithPlaylistSyncSourceID ¶ added in v1.1.0
WithPlaylistSyncSourceID annotates ctx with an optional source scope for playlist_sync runs triggered via the admin API.
Types ¶
type AutoPrioritizeChannelStore ¶
type AutoPrioritizeChannelStore interface {
ListEnabled(ctx context.Context) ([]channels.Channel, error)
ListSourcesByChannelIDs(ctx context.Context, channelIDs []int64, enabledOnly bool) (map[int64][]channels.Source, error)
ListSources(ctx context.Context, channelID int64, enabledOnly bool) ([]channels.Source, error)
ReorderSources(ctx context.Context, channelID int64, sourceIDs []int64) error
UpdateSourceProfile(ctx context.Context, sourceID int64, profile channels.SourceProfileUpdate) error
}
AutoPrioritizeChannelStore provides published channel/source operations.
type AutoPrioritizeJob ¶
type AutoPrioritizeJob struct {
// contains filtered or unexported fields
}
AutoPrioritizeJob analyzes source quality and rewrites per-channel source order.
func NewAutoPrioritizeJob ¶
func NewAutoPrioritizeJob( settings AutoPrioritizeSettingsStore, channelsStore AutoPrioritizeChannelStore, metricsStore AutoPrioritizeMetricsStore, streamAnalyzer StreamAnalyzer, opts AutoPrioritizeOptions, ) (*AutoPrioritizeJob, error)
func (*AutoPrioritizeJob) Run ¶
func (j *AutoPrioritizeJob) Run(ctx context.Context, run *RunContext) error
Run executes source analysis and channel source reordering.
type AutoPrioritizeMetricsStore ¶
type AutoPrioritizeMetricsStore interface {
GetStreamMetric(ctx context.Context, itemKey string) (StreamMetric, error)
UpsertStreamMetric(ctx context.Context, metric StreamMetric) error
}
AutoPrioritizeMetricsStore provides stream metrics cache operations.
type AutoPrioritizeOptions ¶
type AutoPrioritizeOptions struct {
SuccessFreshness time.Duration
ErrorRetry time.Duration
DefaultWorkers int
WorkerMode string
FixedWorkers int
TunerCount int
TunerUsage TunerUsageProvider
ProbeTuneDelay time.Duration
HTTP429Backoff time.Duration
}
AutoPrioritizeOptions customizes cache freshness behavior.
type AutoPrioritizeSettingsStore ¶
type AutoPrioritizeSettingsStore interface {
GetSetting(ctx context.Context, key string) (string, error)
}
AutoPrioritizeSettingsStore reads analyzer-related automation settings.
type DVRLineupReloader ¶
type DVRLineupReloader interface {
ReloadLineupForPlaylistSyncOutcome(ctx context.Context) (dvr.ReloadOutcome, error)
}
DVRLineupReloader refreshes downstream DVR lineup state after playlist sync and returns provider-aware non-fatal skip metadata.
type JobFunc ¶
type JobFunc func(ctx context.Context, run *RunContext) error
JobFunc executes one asynchronous job run.
type PlaylistReconciler ¶
type PlaylistReconciler interface {
CountChannels(ctx context.Context) (int, error)
Reconcile(ctx context.Context, onProgress func(cur, max int) error) (reconcile.Result, error)
}
PlaylistReconciler updates channel source mappings after catalog refresh.
type PlaylistRefresher ¶
type PlaylistRefresher interface {
RefreshForSource(ctx context.Context, source playlist.PlaylistSource) (int, error)
}
PlaylistRefresher performs playlist fetch+parse+catalog upsert.
type PlaylistSettingsStore ¶
type PlaylistSettingsStore interface {
GetSetting(ctx context.Context, key string) (string, error)
ListPlaylistSources(ctx context.Context) ([]playlist.PlaylistSource, error)
}
PlaylistSettingsStore reads automation settings required by playlist sync jobs.
type PlaylistSyncJob ¶
type PlaylistSyncJob struct {
// contains filtered or unexported fields
}
PlaylistSyncJob runs playlist refresh then reconcile.
func NewPlaylistSyncJob ¶
func NewPlaylistSyncJob( settings PlaylistSettingsStore, refresher PlaylistRefresher, reconciler PlaylistReconciler, ) (*PlaylistSyncJob, error)
func (*PlaylistSyncJob) Run ¶
func (j *PlaylistSyncJob) Run(ctx context.Context, run *RunContext) error
Run executes refresh + reconcile and updates job progress by channel.
func (*PlaylistSyncJob) SetPostSyncLineupReloader ¶
func (j *PlaylistSyncJob) SetPostSyncLineupReloader(reloader DVRLineupReloader)
SetPostSyncLineupReloader configures an optional DVR lineup reload hook executed after successful refresh+reconcile completion.
func (*PlaylistSyncJob) SetSourceRefreshConcurrency ¶ added in v1.1.0
func (j *PlaylistSyncJob) SetSourceRefreshConcurrency(concurrency int)
SetSourceRefreshConcurrency controls bounded source refresh parallelism for all-source runs. Values <= 1 keep sequential behavior. The value is clamped to [1, 16].
type Run ¶
type Run struct {
RunID int64 `json:"run_id"`
JobName string `json:"job_name"`
TriggeredBy string `json:"triggered_by"`
StartedAt int64 `json:"started_at"`
FinishedAt int64 `json:"finished_at,omitempty"`
Status string `json:"status"`
ProgressCur int `json:"progress_cur"`
ProgressMax int `json:"progress_max"`
Summary string `json:"summary,omitempty"`
ErrorMessage string `json:"error,omitempty"`
// AnalysisErrorBuckets is derived from auto-prioritize run summary text when present.
AnalysisErrorBuckets map[string]int `json:"analysis_error_buckets,omitempty"`
}
Run captures persisted job execution metadata.
type RunContext ¶
type RunContext struct {
// contains filtered or unexported fields
}
RunContext exposes progress and summary updates for a running job.
func (*RunContext) IncrementProgress ¶
func (r *RunContext) IncrementProgress(ctx context.Context, delta int) error
IncrementProgress increments progress_cur by delta.
func (*RunContext) RunID ¶
func (r *RunContext) RunID() int64
RunID returns the persisted run identifier.
func (*RunContext) SetProgress ¶
func (r *RunContext) SetProgress(ctx context.Context, progressCur, progressMax int) error
SetProgress persists run progress counters.
func (*RunContext) SetSummary ¶
func (r *RunContext) SetSummary(ctx context.Context, summary string) error
SetSummary persists an updated summary string.
func (*RunContext) Snapshot ¶
func (r *RunContext) Snapshot() (progressCur, progressMax int, summary string)
Snapshot returns the current in-memory progress and summary.
type RunMetadata ¶
RunMetadata identifies a persisted job run in downstream call chains.
func RunMetadataFromContext ¶
func RunMetadataFromContext(ctx context.Context) (RunMetadata, bool)
RunMetadataFromContext returns run correlation metadata if present.
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner coordinates asynchronous job execution with persisted run state.
func (*Runner) Close ¶
func (r *Runner) Close()
Close prevents new runs from being started, cancels active runs, and waits for all in-flight run goroutines to finish their terminal persistence (FinishRun) before returning.
func (*Runner) SetGlobalLock ¶
SetGlobalLock controls whether different jobs may run concurrently. When enabled (default), only one job run may be active at a time.
type ScheduledCatchUpOptions ¶ added in v1.1.0
type ScheduledCatchUpOptions struct {
InitialBackoff time.Duration
MaxBackoff time.Duration
FreshnessLimit int
}
ScheduledCatchUpOptions controls overlap catch-up behavior for scheduled callbacks.
type ScheduledJobCallback ¶ added in v1.1.0
type ScheduledJobCallback struct {
// contains filtered or unexported fields
}
ScheduledJobCallback applies overlap catch-up/backoff policy for scheduled job starts while preserving runner-level overlap enforcement.
func NewScheduledJobCallback ¶ added in v1.1.0
func NewScheduledJobCallback( starter ScheduledJobStarter, store Store, logger *slog.Logger, jobName string, jobFn JobFunc, options ScheduledCatchUpOptions, ) (*ScheduledJobCallback, error)
type ScheduledJobStarter ¶ added in v1.1.0
type ScheduledJobStarter interface {
Start(ctx context.Context, jobName, triggeredBy string, fn JobFunc) (int64, error)
}
ScheduledJobStarter starts asynchronous jobs from scheduler callbacks.
type SourcePoolTunerUsageProvider ¶ added in v1.1.0
type SourcePoolTunerUsageProvider interface {
CapacityForSource(sourceID int64) int
InUseCountForSource(sourceID int64) int
}
SourcePoolTunerUsageProvider exposes optional per-source pool accounting.
type Store ¶
type Store interface {
CreateRun(ctx context.Context, jobName, triggeredBy string, startedAt int64) (int64, error)
UpdateRunProgress(ctx context.Context, runID int64, progressCur, progressMax int, summary string) error
FinishRun(ctx context.Context, runID int64, status, errText, summary string, finishedAt int64) error
GetRun(ctx context.Context, runID int64) (Run, error)
ListRuns(ctx context.Context, jobName string, limit, offset int) ([]Run, error)
}
Store persists and queries job run lifecycle state.
type StreamAnalyzer ¶
type StreamAnalyzer interface {
Analyze(ctx context.Context, streamURL string) (analyzer.Metrics, error)
}
StreamAnalyzer probes one stream URL for quality metrics.
type StreamMetric ¶
type StreamMetric struct {
ItemKey string `json:"item_key"`
AnalyzedAt int64 `json:"analyzed_at"`
Width int `json:"width,omitempty"`
Height int `json:"height,omitempty"`
FPS float64 `json:"fps,omitempty"`
VideoCodec string `json:"video_codec,omitempty"`
AudioCodec string `json:"audio_codec,omitempty"`
BitrateBPS int64 `json:"bitrate_bps,omitempty"`
VariantBPS int64 `json:"variant_bps,omitempty"`
ScoreHint float64 `json:"score_hint,omitempty"`
Error string `json:"error,omitempty"`
}
StreamMetric stores cached probe metrics for one catalog source item.
func (StreamMetric) IsScorable ¶
func (m StreamMetric) IsScorable() bool
func (StreamMetric) NeedsRefresh ¶
NeedsRefresh returns whether this cache entry should be re-analyzed.
type TunerUsageProvider ¶
type TunerUsageProvider interface {
InUseCount() int
AcquireProbe(ctx context.Context, label string, cancel context.CancelCauseFunc) (*stream.Lease, error)
AcquireProbeForSource(ctx context.Context, sourceID int64, label string, cancel context.CancelCauseFunc) (*stream.Lease, error)
}
TunerUsageProvider reports active stream usage from the shared tuner pool.