worker

package
v0.1.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateCroppedPoster

func GenerateCroppedPoster(
	ctx context.Context,
	movie *models.Movie,
	httpClient httpclientiface.HTTPClient,
	userAgent string,
	referer string,
	refererResolver RefererResolver,
) (croppedURL string, err error)

GenerateCroppedPoster downloads and crops a poster, then persists it to disk Returns the API URL for the persisted cropped poster Updates movie.ShouldCropPoster to false since the image is already cropped

Parameters:

  • ctx: Context for cancellation support
  • movie: Movie model containing poster/cover URLs
  • httpClient: Pre-configured HTTP client with proxy and timeout settings
  • userAgent: User-Agent header value from config
  • referer: Referer header value from config (for CDN compatibility)

Returns:

  • croppedURL: API URL path like "/api/v1/posters/{movieID}.jpg"
  • error: Any error encountered during download, cropping, or persistence

Note: Caller is responsible for updating the database with the returned croppedURL

func GenerateTempPoster

func GenerateTempPoster(
	ctx context.Context,
	jobID string,
	movie *models.Movie,
	httpClient httpclientiface.HTTPClient,
	userAgent string,
	referer string,
	refererResolver RefererResolver,
) (tempRelativeURL string, err error)

GenerateTempPoster downloads and crops a poster temporarily for the review page Returns the relative API URL path for the temp poster Updates movie.ShouldCropPoster to false since the temp image is already cropped

Parameters:

  • ctx: Context for cancellation support
  • jobID: Batch job ID for organizing temp files by job
  • movie: Movie model containing poster/cover URLs
  • httpClient: Pre-configured HTTP client with proxy and timeout settings
  • userAgent: User-Agent header value from config
  • referer: Referer header value from config (for CDN compatibility)

Returns:

  • tempRelativeURL: API URL path like "/api/v1/temp/posters/{jobID}/{movieID}.jpg"
  • error: Any error encountered during download or cropping

Types

type BaseTask

type BaseTask struct {
	// contains filtered or unexported fields
}

BaseTask provides common task functionality

func (*BaseTask) Description

func (t *BaseTask) Description() string

func (*BaseTask) ID

func (t *BaseTask) ID() string

func (*BaseTask) Type

func (t *BaseTask) Type() TaskType

type BatchJob

type BatchJob struct {
	ID          string                 `json:"id"`
	Status      JobStatus              `json:"status"`
	TotalFiles  int                    `json:"total_files"`
	Completed   int                    `json:"completed"`
	Failed      int                    `json:"failed"`
	Excluded    map[string]bool        `json:"excluded"` // Files excluded from organization (keyed by file path)
	Files       []string               `json:"files"`
	Results     map[string]*FileResult `json:"results"` // keyed by file path
	Progress    float64                `json:"progress"`
	StartedAt   time.Time              `json:"started_at"`
	CompletedAt *time.Time             `json:"completed_at,omitempty"`
	CancelFunc  context.CancelFunc     `json:"-"`
	Done        chan struct{}          `json:"-"` // closed when job fully finishes
	// contains filtered or unexported fields
}

BatchJob represents a batch processing job

func (*BatchJob) AtomicUpdateFileResult

func (job *BatchJob) AtomicUpdateFileResult(filePath string, updateFn func(*FileResult) (*FileResult, error)) error

AtomicUpdateFileResult performs an atomic read-modify-write on a FileResult The updateFn receives a deep copy of the current FileResult and must return the updated version This prevents lost-update races by ensuring all modifications happen under the job's lock

func (*BatchJob) Cancel

func (job *BatchJob) Cancel()

Cancel cancels the job

func (*BatchJob) ExcludeFile

func (job *BatchJob) ExcludeFile(filePath string)

ExcludeFile marks a file as excluded from organization (thread-safe)

func (*BatchJob) GetProgress

func (job *BatchJob) GetProgress() float64

GetProgress returns the current progress percentage (thread-safe). This is a lightweight accessor that avoids copying the entire job state.

func (*BatchJob) GetStatus

func (job *BatchJob) GetStatus() *BatchJob

GetStatus returns a thread-safe copy of the job status

func (*BatchJob) IsExcluded

func (job *BatchJob) IsExcluded(filePath string) bool

IsExcluded checks if a file is excluded from organization (thread-safe)

func (*BatchJob) MarkCancelled

func (job *BatchJob) MarkCancelled()

MarkCancelled marks the job as cancelled

func (*BatchJob) MarkCompleted

func (job *BatchJob) MarkCompleted()

MarkCompleted marks the job as completed

func (*BatchJob) MarkFailed

func (job *BatchJob) MarkFailed()

MarkFailed marks the job as failed

func (*BatchJob) MarkStarted

func (job *BatchJob) MarkStarted()

MarkStarted marks the job as started

func (*BatchJob) SetCancelFunc

func (job *BatchJob) SetCancelFunc(cancelFunc context.CancelFunc)

SetCancelFunc sets the cancel function for the job (thread-safe)

func (*BatchJob) UpdateFileResult

func (job *BatchJob) UpdateFileResult(filePath string, result *FileResult)

UpdateFileResult updates the result for a specific file in the job

type BatchScrapeTask

type BatchScrapeTask struct {
	BaseTask
	// contains filtered or unexported fields
}

BatchScrapeTask represents a task for scraping metadata for a single file in a batch operation

func NewBatchScrapeTask

func NewBatchScrapeTask(
	taskID string,
	filePath string,
	fileIndex int,
	job *BatchJob,
	registry *models.ScraperRegistry,
	agg *aggregator.Aggregator,
	movieRepo *database.MovieRepository,
	mat *matcher.Matcher,
	progressTracker *ProgressTracker,
	force bool,
	updateMode bool,
	selectedScrapers []string,
	httpClient httpclientiface.HTTPClient,
	userAgent string,
	referer string,
	processedMovieIDs map[string]bool,
	cfg *config.Config,
	scalarStrategy string,
	arrayStrategy string,
) *BatchScrapeTask

NewBatchScrapeTask creates a new batch scrape task

func (*BatchScrapeTask) Execute

func (t *BatchScrapeTask) Execute(ctx context.Context) error

Execute implements the Task interface

type DownloadTask

type DownloadTask struct {
	BaseTask
	// contains filtered or unexported fields
}

DownloadTask downloads media for a movie

func NewDownloadTask

func NewDownloadTask(
	movie *models.Movie,
	targetDir string,
	dl *downloader.Downloader,
	progressTracker *ProgressTracker,
	dryRun bool,
	multipart *downloader.MultipartInfo,
) *DownloadTask

NewDownloadTask creates a new download task

func (*DownloadTask) Execute

func (t *DownloadTask) Execute(ctx context.Context) error

type FileResult

type FileResult struct {
	FilePath       string            `json:"file_path"`
	MovieID        string            `json:"movie_id"`
	Status         JobStatus         `json:"status"`
	Error          string            `json:"error,omitempty"`
	PosterError    *string           `json:"poster_error,omitempty"`    // Optional error from poster generation
	FieldSources   map[string]string `json:"field_sources,omitempty"`   // Field -> scraper/NFO source
	ActressSources map[string]string `json:"actress_sources,omitempty"` // Actress-key -> scraper/NFO source
	Data           interface{}       `json:"data,omitempty"`
	StartedAt      time.Time         `json:"started_at"`
	EndedAt        *time.Time        `json:"ended_at,omitempty"`
	IsMultiPart    bool              `json:"is_multi_part,omitempty"`
	PartNumber     int               `json:"part_number,omitempty"`
	PartSuffix     string            `json:"part_suffix,omitempty"`
}

FileResult represents the result of processing a single file

func RunBatchScrapeOnce

func RunBatchScrapeOnce(
	ctx context.Context,
	job *BatchJob,
	filePath string,
	fileIndex int,
	queryOverride string,
	registry *models.ScraperRegistry,
	agg *aggregator.Aggregator,
	movieRepo *database.MovieRepository,
	fileMatcher *matcher.Matcher,
	httpClient httpclientiface.HTTPClient,
	userAgent string,
	referer string,
	force bool,
	updateMode bool,
	selectedScrapers []string,
	processedMovieIDs map[string]bool,
	cfg *config.Config,
	scalarStrategy string,
	arrayStrategy string,
) (*models.Movie, *FileResult, error)

RunBatchScrapeOnce performs a single scrape operation for a file within a batch job context This function extracts the core scraping logic that can be reused for both initial batch scraping and rescraping operations.

Parameters:

  • ctx: Context for cancellation support
  • job: Batch job for logging and state tracking
  • filePath: Path to the video file being scraped
  • fileIndex: Index of file in batch (for logging, can be 0 for rescrape)
  • queryOverride: If non-empty, use this as the search query instead of extracting from filename
  • registry: Scraper registry for querying scrapers
  • agg: Aggregator for merging scraper results
  • movieRepo: Movie repository for database operations
  • matcher: Matcher for extracting IDs from filenames
  • httpClient: Pre-configured HTTP client with proxy settings
  • userAgent: User-Agent header value from config
  • referer: Referer header value from config
  • force: If true, skip cache and delete existing data
  • updateMode: If true, merge scraped data with existing NFO file
  • selectedScrapers: If non-empty, use these scrapers instead of defaults
  • processedMovieIDs: Map to track which movie IDs have already had posters generated (pass nil to disable tracking)
  • cfg: Config for NFO path construction (required if updateMode is true)

Returns:

  • movie: Successfully scraped and saved movie metadata
  • fileResult: FileResult object for updating job status
  • error: Any error encountered during scraping

Note: This function does NOT call job.UpdateFileResult() - the caller should do that to allow for custom timing or additional processing before updating the job state

type JobQueue

type JobQueue struct {
	// contains filtered or unexported fields
}

JobQueue manages batch jobs

func NewJobQueue

func NewJobQueue() *JobQueue

NewJobQueue creates a new job queue

func (*JobQueue) CreateJob

func (jq *JobQueue) CreateJob(files []string) *BatchJob

CreateJob creates a new batch job

func (*JobQueue) DeleteJob

func (jq *JobQueue) DeleteJob(id string)

DeleteJob removes a job from the queue and cleans up associated temp files Cancels the job first and waits for it to fully finish before removing files

func (*JobQueue) GetJob

func (jq *JobQueue) GetJob(id string) (*BatchJob, bool)

GetJob retrieves a thread-safe copy of a job by ID Returns a deep copy to prevent external mutations of internal state

func (*JobQueue) GetJobPointer

func (jq *JobQueue) GetJobPointer(id string) (*BatchJob, bool)

GetJobPointer retrieves the actual job pointer for internal mutations WARNING: This exposes the internal job - use only when mutations are required Callers must respect the job's internal mutex (job.mu) when modifying state

func (*JobQueue) ListJobs

func (jq *JobQueue) ListJobs() []*BatchJob

ListJobs returns thread-safe copies of all jobs Returns deep copies to prevent external mutations of internal state

type JobStatus

type JobStatus string

JobStatus represents the status of a job

const (
	JobStatusPending   JobStatus = "pending"
	JobStatusRunning   JobStatus = "running"
	JobStatusCompleted JobStatus = "completed"
	JobStatusFailed    JobStatus = "failed"
	JobStatusCancelled JobStatus = "cancelled"
)

type NFOTask

type NFOTask struct {
	BaseTask
	// contains filtered or unexported fields
}

NFOTask generates an NFO file

func NewNFOTask

func NewNFOTask(
	movie *models.Movie,
	targetDir string,
	gen *nfo.Generator,
	progressTracker *ProgressTracker,
	dryRun bool,
	partSuffix string,
	videoFilePath string,
) *NFOTask

NewNFOTask creates a new NFO generation task

func (*NFOTask) Execute

func (t *NFOTask) Execute(ctx context.Context) error

type OrganizeTask

type OrganizeTask struct {
	BaseTask
	// contains filtered or unexported fields
}

OrganizeTask organizes a video file

func NewOrganizeTask

func NewOrganizeTask(
	match matcher.MatchResult,
	movie *models.Movie,
	destPath string,
	moveFiles bool,
	forceUpdate bool,
	org *organizer.Organizer,
	progressTracker *ProgressTracker,
	dryRun bool,
	linkModes ...organizer.LinkMode,
) *OrganizeTask

NewOrganizeTask creates a new organize task

func (*OrganizeTask) Execute

func (t *OrganizeTask) Execute(ctx context.Context) error

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages a pool of workers that execute tasks concurrently

func NewPool

func NewPool(maxWorkers int, timeout time.Duration, progress *ProgressTracker) *Pool

New creates a new worker pool

func NewPoolWithContext

func NewPoolWithContext(parentCtx context.Context, maxWorkers int, timeout time.Duration, progress *ProgressTracker) *Pool

NewPoolWithContext creates a new worker pool with a parent context The pool will be cancelled when the parent context is cancelled

func (*Pool) ActiveWorkers

func (p *Pool) ActiveWorkers() int

ActiveWorkers returns the number of currently active workers Note: This is an approximation based on running tasks in progress tracker

func (*Pool) Errors

func (p *Pool) Errors() []error

Errors returns all errors encountered during task execution

func (*Pool) Stats

func (p *Pool) Stats() *PoolStats

Stats returns statistics about the pool

func (*Pool) Stop

func (p *Pool) Stop()

Stop cancels all running tasks and waits for them to finish

func (*Pool) Submit

func (p *Pool) Submit(task Task) error

Submit submits a task to the pool for execution Blocks if the pool is at capacity

func (*Pool) Wait

func (p *Pool) Wait() error

Wait waits for all tasks to complete

type PoolStats

type PoolStats struct {
	MaxWorkers      int
	Timeout         time.Duration
	TotalTasks      int
	Pending         int
	Running         int
	Success         int
	Failed          int
	Canceled        int
	Errors          int
	OverallProgress float64
}

PoolStats holds statistics about the worker pool

type ProcessFileOption

type ProcessFileOption func(*ProcessFileOptions)

ProcessFileOption configures optional behavior for a process file task.

func WithLinkMode

func WithLinkMode(mode organizer.LinkMode) ProcessFileOption

WithLinkMode sets copy link behavior for organize operations.

func WithUpdateMerge

func WithUpdateMerge(enabled bool, scalarStrategy, arrayStrategy string, cfg *config.Config) ProcessFileOption

WithUpdateMerge enables update-mode merge logic and merge strategy options.

type ProcessFileOptions

type ProcessFileOptions struct {
	LinkMode       organizer.LinkMode
	UpdateMode     bool
	ScalarStrategy string
	ArrayStrategy  string
	Config         *config.Config
}

ProcessFileOptions holds optional settings for process file tasks.

type ProcessFileTask

type ProcessFileTask struct {
	BaseTask
	// contains filtered or unexported fields
}

ProcessFileTask is a composite task that processes a single file It executes scrape, download, organize, and NFO tasks sequentially

func NewProcessFileTask

func NewProcessFileTask(
	match matcher.MatchResult,
	registry *models.ScraperRegistry,
	agg *aggregator.Aggregator,
	movieRepo *database.MovieRepository,
	dl *downloader.Downloader,
	org *organizer.Organizer,
	nfoGen *nfo.Generator,
	destPath string,
	moveFiles bool,
	forceUpdate bool,
	forceRefresh bool,
	progressTracker *ProgressTracker,
	dryRun bool,
	scrapeEnabled bool,
	downloadEnabled bool,
	organizeEnabled bool,
	nfoEnabled bool,
	customScraperPriority []string,
	options ...ProcessFileOption,
) *ProcessFileTask

NewProcessFileTask creates a new composite task for processing a file

func (*ProcessFileTask) Execute

func (t *ProcessFileTask) Execute(ctx context.Context) error

type ProgressStats

type ProgressStats struct {
	Total           int
	Pending         int
	Running         int
	Success         int
	Failed          int
	Canceled        int
	TotalBytes      int64
	DoneBytes       int64
	OverallProgress float64
}

ProgressStats holds statistics about all tasks

type ProgressTracker

type ProgressTracker struct {
	// contains filtered or unexported fields
}

ProgressTracker tracks progress for all tasks

func NewProgressTracker

func NewProgressTracker(notify chan<- ProgressUpdate) *ProgressTracker

NewProgressTracker creates a new progress tracker

func (*ProgressTracker) Cancel

func (pt *ProgressTracker) Cancel(taskID string)

Cancel marks a task as canceled

func (*ProgressTracker) Clear

func (pt *ProgressTracker) Clear()

Clear removes all task progress

func (*ProgressTracker) Complete

func (pt *ProgressTracker) Complete(taskID string, message string)

Complete marks a task as completed successfully

func (*ProgressTracker) Fail

func (pt *ProgressTracker) Fail(taskID string, err error)

Fail marks a task as failed

func (*ProgressTracker) Get

func (pt *ProgressTracker) Get(taskID string) (*TaskProgress, bool)

Get retrieves the progress for a task

func (*ProgressTracker) GetAll

func (pt *ProgressTracker) GetAll() []*TaskProgress

GetAll retrieves all task progress

func (*ProgressTracker) GetByStatus

func (pt *ProgressTracker) GetByStatus(status TaskStatus) []*TaskProgress

GetByStatus retrieves tasks with a specific status

func (*ProgressTracker) GetByType

func (pt *ProgressTracker) GetByType(taskType TaskType) []*TaskProgress

GetByType retrieves tasks of a specific type

func (*ProgressTracker) Remove

func (pt *ProgressTracker) Remove(taskID string)

Remove removes a specific task

func (*ProgressTracker) SetTotal

func (pt *ProgressTracker) SetTotal(taskID string, bytesTotal int64)

SetTotal sets the total bytes for a task

func (*ProgressTracker) Start

func (pt *ProgressTracker) Start(taskID string, taskType TaskType, message string)

Start marks a task as started

func (*ProgressTracker) Stats

func (pt *ProgressTracker) Stats() ProgressStats

Stats returns statistics about all tasks

func (*ProgressTracker) Update

func (pt *ProgressTracker) Update(taskID string, progress float64, message string, bytesDone int64)

Update updates the progress of a task

type ProgressUpdate

type ProgressUpdate struct {
	TaskID    string
	Type      TaskType
	Status    TaskStatus
	Progress  float64
	Message   string
	BytesDone int64
	Error     error
	Timestamp time.Time
}

ProgressUpdate represents a progress update event

type RefererResolver

type RefererResolver func(downloadURL, configuredReferer string) string

RefererResolver resolves an effective Referer for a download URL. This is injected by callers so poster generation does not hardcode host rules.

type ScrapeTask

type ScrapeTask struct {
	BaseTask
	// contains filtered or unexported fields
}

ScrapeTask scrapes metadata for a JAV ID

func NewScrapeTask

func NewScrapeTask(
	javID string,
	registry *models.ScraperRegistry,
	agg *aggregator.Aggregator,
	movieRepo *database.MovieRepository,
	progressTracker *ProgressTracker,
	dryRun bool,
	forceRefresh bool,
	customScraperPriority []string,
) *ScrapeTask

NewScrapeTask creates a new scrape task

func (*ScrapeTask) Execute

func (t *ScrapeTask) Execute(ctx context.Context) error

type Task

type Task interface {
	// ID returns a unique identifier for this task
	ID() string

	// Type returns the type of task
	Type() TaskType

	// Execute runs the task and returns an error if it fails
	Execute(ctx context.Context) error

	// Description returns a human-readable description
	Description() string
}

Task represents a unit of work to be executed

type TaskProgress

type TaskProgress struct {
	ID         string
	Type       TaskType
	Status     TaskStatus
	Progress   float64 // 0.0 to 1.0
	Message    string
	BytesTotal int64
	BytesDone  int64
	StartTime  time.Time
	UpdatedAt  time.Time
	Error      error
}

TaskProgress tracks the progress of a single task

type TaskResult

type TaskResult struct {
	TaskID      string
	Type        TaskType
	Status      TaskStatus
	Error       error
	StartTime   time.Time
	EndTime     time.Time
	Duration    time.Duration
	BytesTotal  int64
	BytesDone   int64
	Description string
}

TaskResult holds the result of a completed task

type TaskStatus

type TaskStatus string

TaskStatus represents the status of a task

const (
	TaskStatusPending  TaskStatus = "pending"
	TaskStatusRunning  TaskStatus = "running"
	TaskStatusSuccess  TaskStatus = "success"
	TaskStatusFailed   TaskStatus = "failed"
	TaskStatusCanceled TaskStatus = "canceled"
)

type TaskType

type TaskType string

TaskType represents the type of task

const (
	TaskTypeScan        TaskType = "scan"
	TaskTypeScrape      TaskType = "scrape"
	TaskTypeBatchScrape TaskType = "batch_scrape"
	TaskTypeDownload    TaskType = "download"
	TaskTypeOrganize    TaskType = "organize"
	TaskTypeNFO         TaskType = "nfo"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL