Documentation
¶
Index ¶
- Constants
- Variables
- func ParseAnalysisErrorBuckets(summary string) map[string]int
- func ParseAutoPrioritizeSkipReasonBuckets(summary string) map[string]int
- func WithRunMetadata(ctx context.Context, runID int64, jobName, triggeredBy string) context.Context
- type AutoPrioritizeChannelStore
- type AutoPrioritizeJob
- type AutoPrioritizeMetricsStore
- type AutoPrioritizeOptions
- type AutoPrioritizeSettingsStore
- type DVRLineupReloader
- type DVRLineupReloaderWithStatus
- type JobFunc
- type PlaylistReconciler
- type PlaylistRefresher
- type PlaylistSettingsStore
- type PlaylistSyncJob
- type Run
- type RunContext
- func (r *RunContext) IncrementProgress(ctx context.Context, delta int) error
- func (r *RunContext) RunID() int64
- func (r *RunContext) SetProgress(ctx context.Context, progressCur, progressMax int) error
- func (r *RunContext) SetSummary(ctx context.Context, summary string) error
- func (r *RunContext) Snapshot() (progressCur, progressMax int, summary string)
- type RunMetadata
- type Runner
- func (r *Runner) Close()
- func (r *Runner) GetRun(ctx context.Context, runID int64) (Run, error)
- func (r *Runner) IsRunning(jobName string) bool
- func (r *Runner) ListRuns(ctx context.Context, jobName string, limit, offset int) ([]Run, error)
- func (r *Runner) SetGlobalLock(enabled bool)
- func (r *Runner) SetLogger(logger *slog.Logger)
- func (r *Runner) Start(ctx context.Context, jobName, triggeredBy string, fn JobFunc) (int64, error)
- type Store
- type StreamAnalyzer
- type StreamMetric
- type TunerUsageProvider
Constants ¶
const ( DefaultMetricsFreshness = 24 * time.Hour DefaultErrorRetry = 30 * time.Minute )
const ( JobPlaylistSync = "playlist_sync" JobAutoPrioritize = "auto_prioritize" JobDVRLineupSync = "dvr_lineup_sync" TriggerManual = "manual" TriggerSchedule = "schedule" StatusRunning = "running" StatusSuccess = "success" StatusError = "error" StatusCanceled = "canceled" )
Variables ¶
var ( ErrAlreadyRunning = errors.New("job already running") ErrRunnerClosed = errors.New("job runner is closed") )
var ( ErrRunNotRunning = errors.New("job run is not running") ErrRunFinalized = errors.New("job run is finalized") )
var ErrProbePreempted = stream.ErrProbePreempted
Functions ¶
func ParseAnalysisErrorBuckets ¶
ParseAnalysisErrorBuckets extracts categorized auto-prioritize error counts from a run summary string. It returns nil when no parseable buckets exist.
func ParseAutoPrioritizeSkipReasonBuckets ¶
ParseAutoPrioritizeSkipReasonBuckets extracts per-reason skipped channel counts from an auto-prioritize run summary string.
Types ¶
type AutoPrioritizeChannelStore ¶
type AutoPrioritizeChannelStore interface {
ListEnabled(ctx context.Context) ([]channels.Channel, error)
ListSourcesByChannelIDs(ctx context.Context, channelIDs []int64, enabledOnly bool) (map[int64][]channels.Source, error)
ListSources(ctx context.Context, channelID int64, enabledOnly bool) ([]channels.Source, error)
ReorderSources(ctx context.Context, channelID int64, sourceIDs []int64) error
UpdateSourceProfile(ctx context.Context, sourceID int64, profile channels.SourceProfileUpdate) error
}
AutoPrioritizeChannelStore provides published channel/source operations.
type AutoPrioritizeJob ¶
type AutoPrioritizeJob struct {
// contains filtered or unexported fields
}
AutoPrioritizeJob analyzes source quality and rewrites per-channel source order.
func NewAutoPrioritizeJob ¶
func NewAutoPrioritizeJob( settings AutoPrioritizeSettingsStore, channelsStore AutoPrioritizeChannelStore, metricsStore AutoPrioritizeMetricsStore, streamAnalyzer StreamAnalyzer, opts AutoPrioritizeOptions, ) (*AutoPrioritizeJob, error)
func (*AutoPrioritizeJob) Run ¶
func (j *AutoPrioritizeJob) Run(ctx context.Context, run *RunContext) error
Run executes source analysis and channel source reordering.
type AutoPrioritizeMetricsStore ¶
type AutoPrioritizeMetricsStore interface {
GetStreamMetric(ctx context.Context, itemKey string) (StreamMetric, error)
UpsertStreamMetric(ctx context.Context, metric StreamMetric) error
}
AutoPrioritizeMetricsStore provides stream metrics cache operations.
type AutoPrioritizeOptions ¶
type AutoPrioritizeOptions struct {
SuccessFreshness time.Duration
ErrorRetry time.Duration
DefaultWorkers int
WorkerMode string
FixedWorkers int
TunerCount int
TunerUsage TunerUsageProvider
ProbeTuneDelay time.Duration
HTTP429Backoff time.Duration
}
AutoPrioritizeOptions customizes cache freshness behavior.
type AutoPrioritizeSettingsStore ¶
type AutoPrioritizeSettingsStore interface {
GetSetting(ctx context.Context, key string) (string, error)
}
AutoPrioritizeSettingsStore reads analyzer-related automation settings.
type DVRLineupReloader ¶
DVRLineupReloader refreshes downstream DVR lineup state after playlist sync.
type DVRLineupReloaderWithStatus ¶
type DVRLineupReloaderWithStatus interface {
ReloadLineupForPlaylistSync(ctx context.Context) (reloaded bool, skipped bool, skipReason string, err error)
}
DVRLineupReloaderWithStatus optionally allows provider-aware non-fatal skip semantics for post-playlist-sync lineup reload paths.
type JobFunc ¶
type JobFunc func(ctx context.Context, run *RunContext) error
JobFunc executes one asynchronous job run.
type PlaylistReconciler ¶
type PlaylistReconciler interface {
CountChannels(ctx context.Context) (int, error)
Reconcile(ctx context.Context, onProgress func(cur, max int) error) (reconcile.Result, error)
}
PlaylistReconciler updates channel source mappings after catalog refresh.
type PlaylistRefresher ¶
PlaylistRefresher performs playlist fetch+parse+catalog upsert.
type PlaylistSettingsStore ¶
type PlaylistSettingsStore interface {
GetSetting(ctx context.Context, key string) (string, error)
}
PlaylistSettingsStore reads automation settings required by playlist sync jobs.
type PlaylistSyncJob ¶
type PlaylistSyncJob struct {
// contains filtered or unexported fields
}
PlaylistSyncJob runs playlist refresh then reconcile.
func NewPlaylistSyncJob ¶
func NewPlaylistSyncJob( settings PlaylistSettingsStore, refresher PlaylistRefresher, reconciler PlaylistReconciler, ) (*PlaylistSyncJob, error)
func (*PlaylistSyncJob) Run ¶
func (j *PlaylistSyncJob) Run(ctx context.Context, run *RunContext) error
Run executes refresh + reconcile and updates job progress by channel.
func (*PlaylistSyncJob) SetPostSyncLineupReloader ¶
func (j *PlaylistSyncJob) SetPostSyncLineupReloader(reloader DVRLineupReloader)
SetPostSyncLineupReloader configures an optional DVR lineup reload hook executed after successful refresh+reconcile completion.
type Run ¶
type Run struct {
RunID int64 `json:"run_id"`
JobName string `json:"job_name"`
TriggeredBy string `json:"triggered_by"`
StartedAt int64 `json:"started_at"`
FinishedAt int64 `json:"finished_at,omitempty"`
Status string `json:"status"`
ProgressCur int `json:"progress_cur"`
ProgressMax int `json:"progress_max"`
Summary string `json:"summary,omitempty"`
ErrorMessage string `json:"error,omitempty"`
// AnalysisErrorBuckets is derived from auto-prioritize run summary text when present.
AnalysisErrorBuckets map[string]int `json:"analysis_error_buckets,omitempty"`
}
Run captures persisted job execution metadata.
type RunContext ¶
type RunContext struct {
// contains filtered or unexported fields
}
RunContext exposes progress and summary updates for a running job.
func (*RunContext) IncrementProgress ¶
func (r *RunContext) IncrementProgress(ctx context.Context, delta int) error
IncrementProgress increments progress_cur by delta.
func (*RunContext) RunID ¶
func (r *RunContext) RunID() int64
RunID returns the persisted run identifier.
func (*RunContext) SetProgress ¶
func (r *RunContext) SetProgress(ctx context.Context, progressCur, progressMax int) error
SetProgress persists run progress counters.
func (*RunContext) SetSummary ¶
func (r *RunContext) SetSummary(ctx context.Context, summary string) error
SetSummary persists an updated summary string.
func (*RunContext) Snapshot ¶
func (r *RunContext) Snapshot() (progressCur, progressMax int, summary string)
Snapshot returns the current in-memory progress and summary.
type RunMetadata ¶
RunMetadata identifies a persisted job run in downstream call chains.
func RunMetadataFromContext ¶
func RunMetadataFromContext(ctx context.Context) (RunMetadata, bool)
RunMetadataFromContext returns run correlation metadata if present.
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner coordinates asynchronous job execution with persisted run state.
func (*Runner) Close ¶
func (r *Runner) Close()
Close prevents new runs from being started, cancels active runs, and waits for all in-flight run goroutines to finish their terminal persistence (FinishRun) before returning.
func (*Runner) SetGlobalLock ¶
SetGlobalLock controls whether different jobs may run concurrently. When enabled (default), only one job run may be active at a time.
type Store ¶
type Store interface {
CreateRun(ctx context.Context, jobName, triggeredBy string, startedAt int64) (int64, error)
UpdateRunProgress(ctx context.Context, runID int64, progressCur, progressMax int, summary string) error
FinishRun(ctx context.Context, runID int64, status, errText, summary string, finishedAt int64) error
GetRun(ctx context.Context, runID int64) (Run, error)
ListRuns(ctx context.Context, jobName string, limit, offset int) ([]Run, error)
}
Store persists and queries job run lifecycle state.
type StreamAnalyzer ¶
type StreamAnalyzer interface {
Analyze(ctx context.Context, streamURL string) (analyzer.Metrics, error)
}
StreamAnalyzer probes one stream URL for quality metrics.
type StreamMetric ¶
type StreamMetric struct {
ItemKey string `json:"item_key"`
AnalyzedAt int64 `json:"analyzed_at"`
Width int `json:"width,omitempty"`
Height int `json:"height,omitempty"`
FPS float64 `json:"fps,omitempty"`
VideoCodec string `json:"video_codec,omitempty"`
AudioCodec string `json:"audio_codec,omitempty"`
BitrateBPS int64 `json:"bitrate_bps,omitempty"`
VariantBPS int64 `json:"variant_bps,omitempty"`
ScoreHint float64 `json:"score_hint,omitempty"`
Error string `json:"error,omitempty"`
}
StreamMetric stores cached probe metrics for one catalog source item.
func (StreamMetric) IsScorable ¶
func (m StreamMetric) IsScorable() bool
func (StreamMetric) NeedsRefresh ¶
NeedsRefresh returns whether this cache entry should be re-analyzed.