importer

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2026 License: MIT Imports: 35 Imported by: 0

Documentation

Overview

Package importer provides import queue processing for NZB files.

Index

Constants

View Source
const (
	ScanStatusIdle      = scanner.ScanStatusIdle
	ScanStatusScanning  = scanner.ScanStatusScanning
	ScanStatusCanceling = scanner.ScanStatusCanceling
	ImportStatusIdle    = scanner.ImportStatusIdle
	ImportStatusRunning = scanner.ImportStatusRunning
)

Re-export scanner status constants for backward compatibility

Variables

View Source
var (
	// NewNonRetryableError creates a new non-retryable error with a message and optional cause.
	NewNonRetryableError = sharedErrors.NewNonRetryableError

	// WrapNonRetryable wraps an existing error as non-retryable.
	WrapNonRetryable = sharedErrors.WrapNonRetryable

	// IsNonRetryable checks if an error is non-retryable.
	IsNonRetryable = sharedErrors.IsNonRetryable

	// ErrFallbackNotConfigured indicates that SABnzbd fallback is not enabled or configured.
	ErrFallbackNotConfigured = sharedErrors.ErrFallbackNotConfigured

	// ErrArticlesNotFound indicates that one or more NZB segments could not be found on any provider.
	ErrArticlesNotFound = sharedErrors.ErrArticlesNotFound
)

Functions

This section is empty.

Types

type ARRNotifier

type ARRNotifier interface {
	// NotifyARR notifies ARR applications about imported content
	NotifyARR(ctx context.Context, item *database.ImportQueueItem, resultingPath string) error
}

ARRNotifier handles notifications to ARR applications (Sonarr/Radarr)

type DirectoryScanner

type DirectoryScanner interface {
	// StartManualScan begins scanning a directory for NZB files
	StartManualScan(scanPath string) error
	// GetScanStatus returns the current scan status
	GetScanStatus() ScanInfo
	// CancelScan cancels an in-progress scan
	CancelScan() error
}

DirectoryScanner provides manual directory scanning functionality

type FileSizeCalculator

type FileSizeCalculator interface {
	// CalculateFileSizeOnly calculates the size of a file without full processing
	CalculateFileSizeOnly(filePath string) (int64, error)
}

FileSizeCalculator calculates file sizes for different file types

type HealthScheduler

type HealthScheduler interface {
	// ScheduleHealthCheck schedules a health check for an imported file
	ScheduleHealthCheck(ctx context.Context, filePath string, sourceNzb string, priority database.HealthPriority) error
}

HealthScheduler handles health check scheduling for imported files

type HistoryRecorder

type HistoryRecorder interface {
	// AddImportHistory records a successful file import
	AddImportHistory(ctx context.Context, history *database.ImportHistory) error
}

HistoryRecorder records successful import events in persistent storage

type IDMetadataLinker

type IDMetadataLinker interface {
	// HandleIDMetadataLinks creates ID-based metadata links
	HandleIDMetadataLinks(item *database.ImportQueueItem, resultingPath string)
}

IDMetadataLinker handles NzbDav ID metadata linking

type ImportInfo

type ImportInfo = scanner.ImportInfo

Type aliases from scanner package for backward compatibility

type ImportJobStatus

type ImportJobStatus = scanner.ImportJobStatus

Type aliases from scanner package for backward compatibility

type ImportService

type ImportService interface {
	QueueManager
	DirectoryScanner
	NzbDavImporter
	QueueOperations

	// Close releases all resources
	Close() error
	// SetRcloneClient sets the rclone client for VFS notifications
	SetRcloneClient(client any)
	// SetArrsService sets the ARRs service for notifications
	SetArrsService(service any)
	// RegisterConfigChangeHandler registers a handler for configuration changes
	RegisterConfigChangeHandler(configManager any)
	// RegenerateMetadata attempts to rebuild metadata for a file by finding its original NZB
	RegenerateMetadata(ctx context.Context, mountRelativePath string) error
}

ImportService is the main interface combining all importer capabilities

type NonRetryableError

type NonRetryableError = sharedErrors.NonRetryableError

Re-export error types and functions from shared errors package for backward compatibility with existing code.

type NzbDavImporter

type NzbDavImporter interface {
	// StartNzbdavImport begins importing from an NzbDav database
	StartNzbdavImport(dbPath string, blobsPath string, rootFolder string, cleanupFile bool) error
	// GetImportStatus returns the current import status
	GetImportStatus() ImportInfo
	// CancelImport cancels an in-progress import
	CancelImport() error
}

NzbDavImporter handles bulk import from NzbDav databases

type PostProcessor

type PostProcessor interface {
	SymlinkCreator
	StrmGenerator
	VFSNotifier
	HealthScheduler
	ARRNotifier
	SABnzbdFallback
	IDMetadataLinker

	// HandleSuccess performs all post-processing for successful imports
	HandleSuccess(ctx context.Context, item *database.ImportQueueItem, resultingPath string) error
	// HandleFailure performs all cleanup for failed imports
	HandleFailure(ctx context.Context, item *database.ImportQueueItem, processingErr error) error
}

PostProcessor coordinates all post-import processing steps

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor handles the processing and storage of parsed NZB files using metadata storage

func NewProcessor

func NewProcessor(metadataService *metadata.MetadataService, poolManager pool.Manager, broadcaster *progress.ProgressBroadcaster, configGetter config.ConfigGetter, recorder HistoryRecorder) *Processor

NewProcessor creates a new NZB processor using metadata storage

func (*Processor) ProcessNzbFile

func (proc *Processor) ProcessNzbFile(ctx context.Context, filePath, relativePath string, queueID int, allowedExtensionsOverride *[]string, virtualDirOverride *string, extractedFiles []parser.ExtractedFileInfo, category *string, metadata *string) (string, []string, error)

ProcessNzbFile processes an NZB or STRM file maintaining the folder structure relative to relative path. Returns (resultPath, writtenMetadataPaths, error). writtenMetadataPaths contains all virtual paths of metadata files written to disk; it is populated even on partial failure so callers can clean up. Paths prefixed with "DIR:" indicate a metadata directory that should be removed entirely.

func (*Processor) SetRecorder

func (proc *Processor) SetRecorder(recorder HistoryRecorder)

type QueueManager

type QueueManager interface {
	// Start begins processing queue items with the configured number of workers
	Start(ctx context.Context) error
	// Stop gracefully stops all workers and waits for completion
	Stop(ctx context.Context) error
	// Pause temporarily stops processing new items (workers remain active)
	Pause()
	// Resume continues processing after a pause
	Resume()
	// IsPaused returns whether the queue is currently paused
	IsPaused() bool
	// IsRunning returns whether the queue manager is active
	IsRunning() bool
	// CancelProcessing cancels processing of a specific item
	CancelProcessing(itemID int64) error
	// ProcessItemInBackground starts processing a specific item in the background
	ProcessItemInBackground(ctx context.Context, itemID int64)
}

QueueManager manages the import queue worker lifecycle

type QueueOperations

type QueueOperations interface {
	// AddToQueue adds an item to the import queue
	AddToQueue(ctx context.Context, filePath string, relativePath *string, category *string, priority *database.QueuePriority, metadata *string, downloadID *string) (*database.ImportQueueItem, error)
	// GetQueueStats returns queue statistics
	GetQueueStats(ctx context.Context) (*database.QueueStats, error)
}

QueueOperations provides queue manipulation operations

type SABnzbdFallback

type SABnzbdFallback interface {
	// AttemptFallback tries to send a failed import to external SABnzbd
	AttemptFallback(ctx context.Context, item *database.ImportQueueItem) error
}

SABnzbdFallback handles fallback to external SABnzbd for failed imports

type ScanInfo

type ScanInfo = scanner.ScanInfo

Type aliases from scanner package for backward compatibility

type ScanStatus

type ScanStatus = scanner.ScanStatus

Type aliases from scanner package for backward compatibility

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service provides NZB import functionality with manual directory scanning and queue-based processing

func NewService

func NewService(config ServiceConfig, metadataService *metadata.MetadataService, database *database.DB, poolManager pool.Manager, rcloneClient rclonecli.RcloneRcClient, configGetter config.ConfigGetter, healthRepo *database.HealthRepository, broadcaster *progress.ProgressBroadcaster, userRepo *database.UserRepository) (*Service, error)

NewService creates a new NZB import service with manual scanning and queue processing capabilities

func (*Service) AddImportHistory

func (s *Service) AddImportHistory(ctx context.Context, history *database.ImportHistory) error

AddImportHistory records a successful file import in persistent history

func (*Service) AddToQueue

func (s *Service) AddToQueue(ctx context.Context, filePath string, relativePath *string, category *string, priority *database.QueuePriority, metadata *string, downloadID *string) (*database.ImportQueueItem, error)

AddToQueue adds a new NZB file to the import queue with optional category and priority

func (*Service) CalculateFileSizeOnly

func (s *Service) CalculateFileSizeOnly(filePath string) (int64, error)

CalculateFileSizeOnly calculates the total file size from NZB/STRM segments This is a lightweight parser that only extracts size information without full processing

func (*Service) CancelImport

func (s *Service) CancelImport() error

CancelImport cancels the current import operation

func (*Service) CancelProcessing

func (s *Service) CancelProcessing(itemID int64) error

CancelProcessing cancels a processing queue item by cancelling its context

func (*Service) CancelScan

func (s *Service) CancelScan() error

CancelScan cancels the current scan operation

func (*Service) Close

func (s *Service) Close() error

Close closes the NZB import service and releases all resources

func (*Service) GetFailedNzbFolder

func (s *Service) GetFailedNzbFolder() string

GetFailedNzbFolder returns the path to the directory for failed NZB files

func (*Service) GetImportStatus

func (s *Service) GetImportStatus() ImportInfo

GetImportStatus returns the current import status

func (*Service) GetNzbFolder

func (s *Service) GetNzbFolder() string

GetNzbFolder returns the path to the persistent NZB storage directory

func (*Service) GetPostProcessor

func (s *Service) GetPostProcessor() *postprocessor.Coordinator

GetPostProcessor returns the post-processor coordinator

func (*Service) GetQueueStats

func (s *Service) GetQueueStats(ctx context.Context) (*database.QueueStats, error)

GetQueueStats returns current queue statistics from database

func (*Service) GetScanStatus

func (s *Service) GetScanStatus() ScanInfo

GetScanStatus returns the current scan status

func (*Service) HandleFailure

func (s *Service) HandleFailure(ctx context.Context, item *database.ImportQueueItem, err error)

HandleFailure implements queue.ItemProcessor - handles failed processing

func (*Service) HandleSuccess

func (s *Service) HandleSuccess(ctx context.Context, item *database.ImportQueueItem, resultingPath string) error

HandleSuccess implements queue.ItemProcessor - handles successful processing

func (*Service) IsFileInQueue

func (s *Service) IsFileInQueue(ctx context.Context, filePath string) (bool, error)

IsFileInQueue checks if a file is already in the queue (pending or processing)

func (*Service) IsPaused

func (s *Service) IsPaused() bool

IsPaused returns whether the service is paused

func (*Service) IsRunning

func (s *Service) IsRunning() bool

IsRunning returns whether the service is running

func (*Service) MoveToFailedFolder

func (s *Service) MoveToFailedFolder(ctx context.Context, item *database.ImportQueueItem) error

MoveToFailedFolder moves a failed NZB file to the failed directory

func (*Service) OnItemClaimed

func (s *Service) OnItemClaimed(ctx context.Context, item *database.ImportQueueItem)

OnItemClaimed implements queue.QueueEventListener. It broadcasts a queue-changed notification whenever a worker claims a pending item (pending → processing transition).

func (*Service) Pause

func (s *Service) Pause()

Pause pauses the queue processing

func (*Service) ProcessItem

func (s *Service) ProcessItem(ctx context.Context, item *database.ImportQueueItem) (string, error)

ProcessItem implements queue.ItemProcessor - processes a single queue item

func (*Service) ProcessItemInBackground

func (s *Service) ProcessItemInBackground(ctx context.Context, itemID int64)

ProcessItemInBackground processes a specific queue item in the background. NOTE: This intentionally runs outside the worker pool — it is used for manual retries of specific items and should not compete with the normal import queue workers.

func (*Service) RegenerateMetadata

func (s *Service) RegenerateMetadata(ctx context.Context, mountRelativePath string) error

RegenerateMetadata attempts to find the original NZB for a file and re-process it to fix corrupted metadata.

func (*Service) RegisterConfigChangeHandler

func (s *Service) RegisterConfigChangeHandler(configManager any)

func (*Service) ResetNzbdavImportStatus

func (s *Service) ResetNzbdavImportStatus()

ResetNzbdavImportStatus resets the import status to Idle

func (*Service) Resume

func (s *Service) Resume()

Resume resumes the queue processing

func (*Service) SetArrsService

func (s *Service) SetArrsService(service any)

SetArrsService sets or updates the ARRs service

func (*Service) SetRcloneClient

func (s *Service) SetRcloneClient(client any)

SetRcloneClient sets or updates the RClone client for VFS notifications

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start starts the NZB import service (queue workers only, manual scanning available via API)

func (*Service) StartManualScan

func (s *Service) StartManualScan(scanPath string) error

StartManualScan starts a manual scan of the specified directory

func (*Service) StartNzbdavImport

func (s *Service) StartNzbdavImport(dbPath string, blobsPath string, rootFolder string, cleanupFile bool) error

StartNzbdavImport starts an asynchronous import from an NZBDav database

func (*Service) Stop

func (s *Service) Stop(ctx context.Context) error

Stop stops the NZB import service and all queue workers

type ServiceConfig

type ServiceConfig struct {
	Workers int // Number of parallel queue workers (default: 2)
}

ServiceConfig holds configuration for the NZB import service

type StrmGenerator

type StrmGenerator interface {
	// CreateStrmFiles creates STRM files for an imported item
	CreateStrmFiles(item *database.ImportQueueItem, resultingPath string) error
}

StrmGenerator handles STRM file generation

type SymlinkCreator

type SymlinkCreator interface {
	// CreateSymlinks creates symlinks for an imported item
	CreateSymlinks(item *database.ImportQueueItem, resultingPath string) error
}

SymlinkCreator handles symlink creation for imported files

type VFSNotifier

type VFSNotifier interface {
	// NotifyVFS notifies rclone VFS about file changes
	NotifyVFS(ctx context.Context, resultingPath string, async bool)
	// RefreshMountPathIfNeeded refreshes the mount path cache if required
	RefreshMountPathIfNeeded(ctx context.Context, resultingPath string, itemID int64)
}

VFSNotifier handles rclone VFS cache notifications

Directories

Path Synopsis
iso
rar
Package postprocessor handles all post-import processing steps including symlink creation, STRM file generation, VFS notifications, health check scheduling, and ARR notifications.
Package postprocessor handles all post-import processing steps including symlink creation, STRM file generation, VFS notifications, health check scheduling, and ARR notifications.
Package queue provides queue management for the NZB import service.
Package queue provides queue management for the NZB import service.
Package scanner provides directory scanning and NZBDav import functionality.
Package scanner provides directory scanning and NZBDav import functionality.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL