jobs

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMetricsFreshness = 24 * time.Hour
	DefaultErrorRetry       = 30 * time.Minute
)
View Source
const (
	JobPlaylistSync   = "playlist_sync"
	JobAutoPrioritize = "auto_prioritize"
	JobDVRLineupSync  = "dvr_lineup_sync"

	TriggerManual   = "manual"
	TriggerSchedule = "schedule"

	StatusRunning  = "running"
	StatusSuccess  = "success"
	StatusError    = "error"
	StatusCanceled = "canceled"
)

Variables

View Source
var (
	ErrAlreadyRunning = errors.New("job already running")
	ErrRunnerClosed   = errors.New("job runner is closed")
)
View Source
var (
	ErrRunNotRunning = errors.New("job run is not running")
	ErrRunFinalized  = errors.New("job run is finalized")
)
View Source
var ErrProbePreempted = stream.ErrProbePreempted

Functions

func ParseAnalysisErrorBuckets

func ParseAnalysisErrorBuckets(summary string) map[string]int

ParseAnalysisErrorBuckets extracts categorized auto-prioritize error counts from a run summary string. It returns nil when no parseable buckets exist.

func ParseAutoPrioritizeSkipReasonBuckets

func ParseAutoPrioritizeSkipReasonBuckets(summary string) map[string]int

ParseAutoPrioritizeSkipReasonBuckets extracts per-reason skipped channel counts from an auto-prioritize run summary string.

func WithRunMetadata

func WithRunMetadata(ctx context.Context, runID int64, jobName, triggeredBy string) context.Context

WithRunMetadata annotates ctx with job-run correlation identifiers.

Types

type AutoPrioritizeChannelStore

type AutoPrioritizeChannelStore interface {
	ListEnabled(ctx context.Context) ([]channels.Channel, error)
	ListSourcesByChannelIDs(ctx context.Context, channelIDs []int64, enabledOnly bool) (map[int64][]channels.Source, error)
	ListSources(ctx context.Context, channelID int64, enabledOnly bool) ([]channels.Source, error)
	ReorderSources(ctx context.Context, channelID int64, sourceIDs []int64) error
	UpdateSourceProfile(ctx context.Context, sourceID int64, profile channels.SourceProfileUpdate) error
}

AutoPrioritizeChannelStore provides published channel/source operations.

type AutoPrioritizeJob

type AutoPrioritizeJob struct {
	// contains filtered or unexported fields
}

AutoPrioritizeJob analyzes source quality and rewrites per-channel source order.

func NewAutoPrioritizeJob

func NewAutoPrioritizeJob(
	settings AutoPrioritizeSettingsStore,
	channelsStore AutoPrioritizeChannelStore,
	metricsStore AutoPrioritizeMetricsStore,
	streamAnalyzer StreamAnalyzer,
	opts AutoPrioritizeOptions,
) (*AutoPrioritizeJob, error)

func (*AutoPrioritizeJob) Run

func (j *AutoPrioritizeJob) Run(ctx context.Context, run *RunContext) error

Run executes source analysis and channel source reordering.

type AutoPrioritizeMetricsStore

type AutoPrioritizeMetricsStore interface {
	GetStreamMetric(ctx context.Context, itemKey string) (StreamMetric, error)
	UpsertStreamMetric(ctx context.Context, metric StreamMetric) error
}

AutoPrioritizeMetricsStore provides stream metrics cache operations.

type AutoPrioritizeOptions

type AutoPrioritizeOptions struct {
	SuccessFreshness time.Duration
	ErrorRetry       time.Duration
	DefaultWorkers   int
	WorkerMode       string
	FixedWorkers     int
	TunerCount       int
	TunerUsage       TunerUsageProvider
	ProbeTuneDelay   time.Duration
	HTTP429Backoff   time.Duration
}

AutoPrioritizeOptions customizes cache freshness behavior.

type AutoPrioritizeSettingsStore

type AutoPrioritizeSettingsStore interface {
	GetSetting(ctx context.Context, key string) (string, error)
}

AutoPrioritizeSettingsStore reads analyzer-related automation settings.

type DVRLineupReloader

type DVRLineupReloader interface {
	ReloadLineup(ctx context.Context) error
}

DVRLineupReloader refreshes downstream DVR lineup state after playlist sync.

type DVRLineupReloaderWithStatus

type DVRLineupReloaderWithStatus interface {
	ReloadLineupForPlaylistSync(ctx context.Context) (reloaded bool, skipped bool, skipReason string, err error)
}

DVRLineupReloaderWithStatus optionally allows provider-aware non-fatal skip semantics for post-playlist-sync lineup reload paths.

type JobFunc

type JobFunc func(ctx context.Context, run *RunContext) error

JobFunc executes one asynchronous job run.

type PlaylistReconciler

type PlaylistReconciler interface {
	CountChannels(ctx context.Context) (int, error)
	Reconcile(ctx context.Context, onProgress func(cur, max int) error) (reconcile.Result, error)
}

PlaylistReconciler updates channel source mappings after catalog refresh.

type PlaylistRefresher

type PlaylistRefresher interface {
	Refresh(ctx context.Context, playlistURL string) (int, error)
}

PlaylistRefresher performs playlist fetch+parse+catalog upsert.

type PlaylistSettingsStore

type PlaylistSettingsStore interface {
	GetSetting(ctx context.Context, key string) (string, error)
}

PlaylistSettingsStore reads automation settings required by playlist sync jobs.

type PlaylistSyncJob

type PlaylistSyncJob struct {
	// contains filtered or unexported fields
}

PlaylistSyncJob runs playlist refresh then reconcile.

func NewPlaylistSyncJob

func NewPlaylistSyncJob(
	settings PlaylistSettingsStore,
	refresher PlaylistRefresher,
	reconciler PlaylistReconciler,
) (*PlaylistSyncJob, error)

func (*PlaylistSyncJob) Run

func (j *PlaylistSyncJob) Run(ctx context.Context, run *RunContext) error

Run executes refresh + reconcile and updates job progress by channel.

func (*PlaylistSyncJob) SetPostSyncLineupReloader

func (j *PlaylistSyncJob) SetPostSyncLineupReloader(reloader DVRLineupReloader)

SetPostSyncLineupReloader configures an optional DVR lineup reload hook executed after successful refresh+reconcile completion.

type Run

type Run struct {
	RunID        int64  `json:"run_id"`
	JobName      string `json:"job_name"`
	TriggeredBy  string `json:"triggered_by"`
	StartedAt    int64  `json:"started_at"`
	FinishedAt   int64  `json:"finished_at,omitempty"`
	Status       string `json:"status"`
	ProgressCur  int    `json:"progress_cur"`
	ProgressMax  int    `json:"progress_max"`
	Summary      string `json:"summary,omitempty"`
	ErrorMessage string `json:"error,omitempty"`
	// AnalysisErrorBuckets is derived from auto-prioritize run summary text when present.
	AnalysisErrorBuckets map[string]int `json:"analysis_error_buckets,omitempty"`
}

Run captures persisted job execution metadata.

type RunContext

type RunContext struct {
	// contains filtered or unexported fields
}

RunContext exposes progress and summary updates for a running job.

func (*RunContext) IncrementProgress

func (r *RunContext) IncrementProgress(ctx context.Context, delta int) error

IncrementProgress increments progress_cur by delta.

func (*RunContext) RunID

func (r *RunContext) RunID() int64

RunID returns the persisted run identifier.

func (*RunContext) SetProgress

func (r *RunContext) SetProgress(ctx context.Context, progressCur, progressMax int) error

SetProgress persists run progress counters.

func (*RunContext) SetSummary

func (r *RunContext) SetSummary(ctx context.Context, summary string) error

SetSummary persists an updated summary string.

func (*RunContext) Snapshot

func (r *RunContext) Snapshot() (progressCur, progressMax int, summary string)

Snapshot returns the current in-memory progress and summary.

type RunMetadata

type RunMetadata struct {
	RunID       int64
	JobName     string
	TriggeredBy string
}

RunMetadata identifies a persisted job run in downstream call chains.

func RunMetadataFromContext

func RunMetadataFromContext(ctx context.Context) (RunMetadata, bool)

RunMetadataFromContext returns run correlation metadata if present.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner coordinates asynchronous job execution with persisted run state.

func NewRunner

func NewRunner(store Store) (*Runner, error)

NewRunner builds a runner with global overlap locking enabled.

func (*Runner) Close

func (r *Runner) Close()

Close prevents new runs from being started, cancels active runs, and waits for all in-flight run goroutines to finish their terminal persistence (FinishRun) before returning.

func (*Runner) GetRun

func (r *Runner) GetRun(ctx context.Context, runID int64) (Run, error)

GetRun reads one run by ID.

func (*Runner) IsRunning

func (r *Runner) IsRunning(jobName string) bool

IsRunning returns whether a job currently has an active run.

func (*Runner) ListRuns

func (r *Runner) ListRuns(ctx context.Context, jobName string, limit, offset int) ([]Run, error)

ListRuns lists runs by optional job name.

func (*Runner) SetGlobalLock

func (r *Runner) SetGlobalLock(enabled bool)

SetGlobalLock controls whether different jobs may run concurrently. When enabled (default), only one job run may be active at a time.

func (*Runner) SetLogger

func (r *Runner) SetLogger(logger *slog.Logger)

func (*Runner) Start

func (r *Runner) Start(ctx context.Context, jobName, triggeredBy string, fn JobFunc) (int64, error)

Start creates a run row and executes fn asynchronously.

type Store

type Store interface {
	CreateRun(ctx context.Context, jobName, triggeredBy string, startedAt int64) (int64, error)
	UpdateRunProgress(ctx context.Context, runID int64, progressCur, progressMax int, summary string) error
	FinishRun(ctx context.Context, runID int64, status, errText, summary string, finishedAt int64) error
	GetRun(ctx context.Context, runID int64) (Run, error)
	ListRuns(ctx context.Context, jobName string, limit, offset int) ([]Run, error)
}

Store persists and queries job run lifecycle state.

type StreamAnalyzer

type StreamAnalyzer interface {
	Analyze(ctx context.Context, streamURL string) (analyzer.Metrics, error)
}

StreamAnalyzer probes one stream URL for quality metrics.

type StreamMetric

type StreamMetric struct {
	ItemKey    string  `json:"item_key"`
	AnalyzedAt int64   `json:"analyzed_at"`
	Width      int     `json:"width,omitempty"`
	Height     int     `json:"height,omitempty"`
	FPS        float64 `json:"fps,omitempty"`
	VideoCodec string  `json:"video_codec,omitempty"`
	AudioCodec string  `json:"audio_codec,omitempty"`
	BitrateBPS int64   `json:"bitrate_bps,omitempty"`
	VariantBPS int64   `json:"variant_bps,omitempty"`
	ScoreHint  float64 `json:"score_hint,omitempty"`
	Error      string  `json:"error,omitempty"`
}

StreamMetric stores cached probe metrics for one catalog source item.

func (StreamMetric) IsScorable

func (m StreamMetric) IsScorable() bool

func (StreamMetric) NeedsRefresh

func (m StreamMetric) NeedsRefresh(now time.Time, successFreshness, errorRetry time.Duration) bool

NeedsRefresh returns whether this cache entry should be re-analyzed.

type TunerUsageProvider

type TunerUsageProvider interface {
	InUseCount() int
	AcquireProbe(ctx context.Context, label string, cancel context.CancelCauseFunc) (*stream.Lease, error)
}

TunerUsageProvider reports active stream usage from the shared tuner pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL