database

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AutomaticHealthCheckRecord

type AutomaticHealthCheckRecord struct {
	FilePath         string
	LibraryPath      *string
	ReleaseDate      *time.Time
	ScheduledCheckAt *time.Time
	SourceNzbPath    *string
	Status           HealthStatus
	MaxRetries       int
	MaxRepairRetries int
}

AutomaticHealthCheckRecord represents a batch insert record

type BackfillRecord

type BackfillRecord struct {
	ID       int64
	FilePath string
}

BackfillRecord represents a record used for metadata backfilling

type BackfillUpdate

type BackfillUpdate struct {
	ID               int64
	ReleaseDate      time.Time
	ScheduledCheckAt time.Time
}

BackfillUpdate represents an update for release date backfilling

type BulkOperationResult

type BulkOperationResult struct {
	DeletedCount    int
	ProcessingCount int
	FailedIDs       []int64
}

BulkOperationResult represents the result of a bulk queue operation

type Config

type Config struct {
	// Type selects the backend: "sqlite" (default) or "postgres".
	Type         string
	DatabasePath string // SQLite only
	DSN          string // PostgreSQL only
}

Config holds database configuration.

type DB

type DB struct {

	// Repository is kept for backwards-compat; prefer using Connection() directly.
	Repository *QueueRepository
	// contains filtered or unexported fields
}

DB wraps the database connection and provides access to operations.

func NewDB

func NewDB(config Config) (*DB, error)

NewDB creates a new database connection and runs migrations.

func (*DB) Close

func (db *DB) Close() error

Close closes the database connection.

func (*DB) Connection

func (db *DB) Connection() *sql.DB

Connection returns the underlying database connection.

func (*DB) Dialect

func (db *DB) Dialect() Dialect

Dialect returns the dialect helper for this database.

func (*DB) UpdateConnectionPool

func (db *DB) UpdateConnectionPool(workerCount int)

UpdateConnectionPool adjusts the database connection pool settings based on worker count.

type DBQuerier

type DBQuerier interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

DBQuerier defines the interface for database query operations Both *sql.DB and *sql.Tx implement this interface

type Dialect

type Dialect string

Dialect identifies the database backend.

const (
	DialectSQLite   Dialect = "sqlite"
	DialectPostgres Dialect = "postgres"
)

type FileHealth

type FileHealth struct {
	ID               int64        `db:"id"`
	FilePath         string       `db:"file_path"`
	LibraryPath      *string      `db:"library_path"` // Path to file in library directory (symlink or .strm file)
	Status           HealthStatus `db:"status"`
	LastChecked      *time.Time   `db:"last_checked"`
	LastError        *string      `db:"last_error"`
	RetryCount       int          `db:"retry_count"`        // Health check retry count
	MaxRetries       int          `db:"max_retries"`        // Max health check retries
	RepairRetryCount int          `db:"repair_retry_count"` // Repair retry count
	MaxRepairRetries int          `db:"max_repair_retries"` // Max repair retries
	SourceNzbPath    *string      `db:"source_nzb_path"`
	ErrorDetails     *string      `db:"error_details"` // JSON error details
	CreatedAt        time.Time    `db:"created_at"`
	UpdatedAt        time.Time    `db:"updated_at"`
	// Health check scheduling fields
	ReleaseDate      *time.Time     `db:"release_date"`       // Cached from metadata for scheduling
	ScheduledCheckAt *time.Time     `db:"scheduled_check_at"` // Next check time
	Priority         HealthPriority `db:"priority"`           // Priority level for health checks
	// Failure masking fields
	StreamingFailureCount int  `db:"streaming_failure_count"`
	IsMasked              bool `db:"is_masked"`
}

FileHealth represents the health tracking of files in the filesystem

type HealthPriority

type HealthPriority int

HealthPriority represents the priority level of a health check

const (
	HealthPriorityNormal HealthPriority = 0
	HealthPriorityHigh   HealthPriority = 1
	HealthPriorityNext   HealthPriority = 2
)

type HealthRepository

type HealthRepository struct {
	// contains filtered or unexported fields
}

HealthRepository handles file health database operations

func NewHealthRepository

func NewHealthRepository(db *sql.DB, d Dialect) *HealthRepository

NewHealthRepository creates a new health repository

func (*HealthRepository) AddFileToHealthCheck

func (r *HealthRepository) AddFileToHealthCheck(ctx context.Context, filePath string, libraryPath *string, maxRetries int, maxRepairRetries int, sourceNzbPath *string, priority HealthPriority) error

AddFileToHealthCheck adds a file to the health database for checking

func (*HealthRepository) AddFileToHealthCheckWithMetadata

func (r *HealthRepository) AddFileToHealthCheckWithMetadata(ctx context.Context, filePath string, libraryPath *string, maxRetries int, maxRepairRetries int, sourceNzbPath *string, priority HealthPriority, releaseDate *time.Time) error

AddFileToHealthCheckWithMetadata adds a file to the health database for checking with metadata

func (*HealthRepository) AddHealthCheck

func (r *HealthRepository) AddHealthCheck(
	ctx context.Context,
	filePath string,
	releaseDate time.Time,
	scheduledCheckAt time.Time,
	sourceNzbPath *string,
) error

AddHealthCheck adds or updates a health check record

func (*HealthRepository) BackfillReleaseDates

func (r *HealthRepository) BackfillReleaseDates(ctx context.Context, updates []BackfillUpdate) error

BackfillReleaseDates updates multiple health records with their release dates and next check times

func (*HealthRepository) BatchAddAutomaticHealthChecks

func (r *HealthRepository) BatchAddAutomaticHealthChecks(ctx context.Context, records []AutomaticHealthCheckRecord) error

BatchAddAutomaticHealthChecks inserts multiple automatic health checks efficiently

func (*HealthRepository) CleanupHealthRecords

func (r *HealthRepository) CleanupHealthRecords(ctx context.Context, existingFiles []string) error

CleanupHealthRecords removes health records for files that no longer exist

func (*HealthRepository) CountHealthItems

func (r *HealthRepository) CountHealthItems(ctx context.Context, statusFilter *HealthStatus, sinceFilter *time.Time, search string) (int, error)

CountHealthItems returns the total count of health records with optional filtering

func (*HealthRepository) DeleteHealthRecord

func (r *HealthRepository) DeleteHealthRecord(ctx context.Context, filePath string) error

DeleteHealthRecord removes a specific health record from the database

func (*HealthRepository) DeleteHealthRecordByID

func (r *HealthRepository) DeleteHealthRecordByID(ctx context.Context, id int64) error

DeleteHealthRecordByID removes a specific health record from the database by ID

func (*HealthRepository) DeleteHealthRecordByLibraryPath

func (r *HealthRepository) DeleteHealthRecordByLibraryPath(ctx context.Context, libraryPath string) (string, error)

DeleteHealthRecordByLibraryPath deletes the health record matching the given absolute library path. Returns the file_path of the deleted record so the caller can use it for metadata cleanup.

func (*HealthRepository) DeleteHealthRecordsBulk

func (r *HealthRepository) DeleteHealthRecordsBulk(ctx context.Context, filePaths []string) error

DeleteHealthRecordsBulk removes multiple health records from the database

func (*HealthRepository) DeleteHealthRecordsByDate

func (r *HealthRepository) DeleteHealthRecordsByDate(ctx context.Context, olderThan time.Time, statusFilter *HealthStatus) (int, error)

DeleteHealthRecordsByDate deletes health records older than the specified date with optional status filter

func (*HealthRepository) DeleteHealthRecordsByLibraryPathPrefix

func (r *HealthRepository) DeleteHealthRecordsByLibraryPathPrefix(ctx context.Context, libraryPathPrefix string) ([]string, int64, error)

DeleteHealthRecordsByLibraryPathPrefix deletes health records where library_path matches the given prefix. Returns the file_paths of deleted records for metadata cleanup, plus the count.

func (*HealthRepository) DeleteHealthRecordsByPrefix

func (r *HealthRepository) DeleteHealthRecordsByPrefix(ctx context.Context, prefix string) (int64, error)

DeleteHealthRecordsByPrefix removes health records that start with the given prefix

func (*HealthRepository) GetAllHealthCheckPaths

func (r *HealthRepository) GetAllHealthCheckPaths(ctx context.Context) ([]string, error)

GetAllHealthCheckPaths returns all health check file paths (memory optimized)

func (*HealthRepository) GetAllHealthCheckRecords

func (r *HealthRepository) GetAllHealthCheckRecords(ctx context.Context) ([]AutomaticHealthCheckRecord, error)

GetAllHealthCheckRecords returns all health check records tracked in health system

func (*HealthRepository) GetFileHealth

func (r *HealthRepository) GetFileHealth(ctx context.Context, filePath string) (*FileHealth, error)

GetFileHealth retrieves health record for a specific file

func (*HealthRepository) GetFileHealthByID

func (r *HealthRepository) GetFileHealthByID(ctx context.Context, id int64) (*FileHealth, error)

GetFileHealthByID retrieves health record for a specific file by ID

func (*HealthRepository) GetFilesByPaths

func (r *HealthRepository) GetFilesByPaths(ctx context.Context, filePaths []string) ([]*FileHealth, error)

GetFilesByPaths returns health records for the specified file paths

func (*HealthRepository) GetFilesForLibrarySync

func (r *HealthRepository) GetFilesForLibrarySync(ctx context.Context) ([]*FileHealth, error)

GetFilesForLibrarySync returns all health records to verify their physical presence in the library

func (*HealthRepository) GetFilesForRepairNotification

func (r *HealthRepository) GetFilesForRepairNotification(ctx context.Context, limit int) ([]*FileHealth, error)

GetFilesForRepairNotification returns files that need repair notification (repair_triggered status)

func (*HealthRepository) GetFilesMissingReleaseDate

func (r *HealthRepository) GetFilesMissingReleaseDate(ctx context.Context, limit int) ([]BackfillRecord, error)

GetFilesMissingReleaseDate returns a list of files that don't have a release date cached

func (*HealthRepository) GetFilesWithoutLibraryPath

func (r *HealthRepository) GetFilesWithoutLibraryPath(ctx context.Context) ([]*FileHealth, error)

GetFilesWithoutLibraryPath returns all health records where library_path is NULL

func (*HealthRepository) GetHealthStats

func (r *HealthRepository) GetHealthStats(ctx context.Context) (map[HealthStatus]int, error)

GetHealthStats returns statistics about file health

func (*HealthRepository) GetSystemState

func (r *HealthRepository) GetSystemState(ctx context.Context, key string) (string, error)

GetSystemState retrieves a persistent state value

func (*HealthRepository) GetUnhealthyFiles

func (r *HealthRepository) GetUnhealthyFiles(ctx context.Context, limit int, strategy string, libraryDir string, maxRetries int) ([]*FileHealth, error)

GetUnhealthyFiles returns files that need health checks GetUnhealthyFiles returns files that need health checks

func (*HealthRepository) HasImportHistoryForPath

func (r *HealthRepository) HasImportHistoryForPath(ctx context.Context, virtualPath string) (bool, error)

HasImportHistoryForPath checks if any import history record exists for the given virtual path. Used to protect symlinks from deletion when an import has been recorded by AltMount, regardless of current metadata state.

func (*HealthRepository) IncrementRepairRetryCount

func (r *HealthRepository) IncrementRepairRetryCount(ctx context.Context, filePath string, errorMessage *string, errorDetails *string) error

IncrementRepairRetryCount increments the repair retry count

func (*HealthRepository) IncrementRetryCount

func (r *HealthRepository) IncrementRetryCount(ctx context.Context, filePath string, errorMessage *string, errorDetails *string, nextCheck time.Time) error

IncrementRetryCount increments the retry count and schedules next check

func (*HealthRepository) IncrementStreamingFailureCount

func (r *HealthRepository) IncrementStreamingFailureCount(ctx context.Context, filePath string, threshold int) (bool, bool, error)

IncrementStreamingFailureCount increments the streaming failure count and returns whether masking/repair threshold was reached

func (*HealthRepository) ListHealthItems

func (r *HealthRepository) ListHealthItems(ctx context.Context, statusFilter *HealthStatus, limit, offset int, sinceFilter *time.Time, search string, sortBy string, sortOrder string) ([]*FileHealth, error)

ListHealthItems returns all health records with optional filtering, sorting and pagination

func (*HealthRepository) MarkAsCorrupted

func (r *HealthRepository) MarkAsCorrupted(ctx context.Context, filePath string, finalError *string, errorDetails *string) error

MarkAsCorrupted permanently marks a file as corrupted after all retries are exhausted

func (*HealthRepository) MarkAsHealthy

func (r *HealthRepository) MarkAsHealthy(ctx context.Context, filePath string, nextCheckTime time.Time) error

MarkAsHealthy marks a file as healthy and clears all retry/error state

func (*HealthRepository) RegisterCorruptedFile

func (r *HealthRepository) RegisterCorruptedFile(ctx context.Context, filePath string, libraryPath *string, errorMessage string) error

RegisterCorruptedFile adds or updates a file as corrupted and schedules it for immediate check/repair

func (*HealthRepository) RelinkFileByFilename

func (r *HealthRepository) RelinkFileByFilename(ctx context.Context, filename, filePath, libraryPath string) (bool, error)

RelinkFileByFilename updates the file_path and library_path for a record that matches by filename. This is typically called by webhooks during renames or downloads to provide a definitive library path.

func (*HealthRepository) RenameHealthRecord

func (r *HealthRepository) RenameHealthRecord(ctx context.Context, oldPath, newPath string) error

RenameHealthRecord updates the file_path of a health record or records under a directory after a MOVE operation

func (*HealthRepository) ResetAllHealthChecks

func (r *HealthRepository) ResetAllHealthChecks(ctx context.Context) (int, error)

ResetAllHealthChecks resets all health records to pending status

func (*HealthRepository) ResetFileAllChecking

func (r *HealthRepository) ResetFileAllChecking(ctx context.Context) error

func (*HealthRepository) ResetHealthChecksBulk

func (r *HealthRepository) ResetHealthChecksBulk(ctx context.Context, filePaths []string) (int, error)

ResetHealthChecksBulk resets multiple health records to pending status

func (*HealthRepository) ResetStalePendingFiles

func (r *HealthRepository) ResetStalePendingFiles(ctx context.Context) error

ResetStalePendingFiles resets pending files that have exhausted retries back to retry_count=0 so they can be re-checked in the next health cycle. Called during worker startup.

func (*HealthRepository) ResolvePendingRepairsInDirectory

func (r *HealthRepository) ResolvePendingRepairsInDirectory(ctx context.Context, dirPath string) (int64, error)

ResolvePendingRepairsInDirectory removes health records with repair_triggered or corrupted status that exist in the specified directory. This is used when a new file is imported into a directory, implying it is a replacement for the broken file.

func (*HealthRepository) SetCorrupted

func (r *HealthRepository) SetCorrupted(ctx context.Context, filePath string, errorMessage *string, errorDetails *string) error

SetCorrupted sets a file's status to corrupted

func (*HealthRepository) SetFileChecking

func (r *HealthRepository) SetFileChecking(ctx context.Context, filePath string) error

SetFileChecking sets a file's status to 'checking'

func (*HealthRepository) SetFileCheckingByID

func (r *HealthRepository) SetFileCheckingByID(ctx context.Context, id int64) error

SetFileCheckingByID sets a file's status to 'checking' by ID

func (*HealthRepository) SetPriority

func (r *HealthRepository) SetPriority(ctx context.Context, id int64, priority HealthPriority) error

SetPriority sets the priority for a file health record

func (*HealthRepository) SetRepairTriggered

func (r *HealthRepository) SetRepairTriggered(ctx context.Context, filePath string, errorMessage *string, errorDetails *string) error

SetRepairTriggered sets a file's status to repair_triggered

func (*HealthRepository) SetRepairTriggeredByID

func (r *HealthRepository) SetRepairTriggeredByID(ctx context.Context, id int64, errorMessage *string, errorDetails *string) error

SetRepairTriggeredByID sets a file's status to repair_triggered by ID

func (*HealthRepository) UnmaskFile

func (r *HealthRepository) UnmaskFile(ctx context.Context, filePath string) error

UnmaskFile removes the mask from a file and resets the failure count

func (*HealthRepository) UpdateFileHealth

func (r *HealthRepository) UpdateFileHealth(ctx context.Context, filePath string, status HealthStatus, errorMessage *string, sourceNzbPath *string, errorDetails *string, noRetry bool) error

UpdateFileHealth updates or inserts a file health record

func (*HealthRepository) UpdateFileHealthScheduled

func (r *HealthRepository) UpdateFileHealthScheduled(ctx context.Context, filePath string, status HealthStatus, errorMessage *string, sourceNzbPath *string, errorDetails *string, noRetry bool, scheduledAt time.Time) error

UpdateFileHealthScheduled is like UpdateFileHealth but uses an explicit scheduledAt time instead of datetime('now') for the scheduled_check_at column.

func (*HealthRepository) UpdateHealthStatusBulk

func (r *HealthRepository) UpdateHealthStatusBulk(ctx context.Context, updates []HealthStatusUpdate) error

UpdateHealthStatusBulk updates multiple health records in a single transaction

func (*HealthRepository) UpdateLibraryPath

func (r *HealthRepository) UpdateLibraryPath(ctx context.Context, filePath string, libraryPath string) error

UpdateLibraryPath updates the library_path for a specific file

func (*HealthRepository) UpdateScheduledCheckTime

func (r *HealthRepository) UpdateScheduledCheckTime(ctx context.Context, filePath string, nextCheckTime time.Time) error

UpdateScheduledCheckTime updates the scheduled check time for a file

func (*HealthRepository) UpdateSystemState

func (r *HealthRepository) UpdateSystemState(ctx context.Context, key string, value string) error

UpdateSystemState updates or inserts a persistent state value

type HealthStatus

type HealthStatus string

HealthStatus represents the health status of a file

const (
	HealthStatusPending         HealthStatus = "pending"          // File has not been checked yet
	HealthStatusChecking        HealthStatus = "checking"         // File is currently being checked
	HealthStatusHealthy         HealthStatus = "healthy"          // File passed health check
	HealthStatusRepairTriggered HealthStatus = "repair_triggered" // File repair has been triggered in Arrs
	HealthStatusCorrupted       HealthStatus = "corrupted"        // File has missing segments or is corrupted
)

type HealthStatusUpdate

type HealthStatusUpdate struct {
	Type             UpdateType
	FilePath         string
	Status           HealthStatus
	ErrorMessage     *string
	ErrorDetails     *string
	ScheduledCheckAt time.Time
	Skip             bool // if true, skip this record in the bulk update (e.g. record already deleted)
}

HealthStatusUpdate represents a single update request for batch processing

type ImportDailyStat

type ImportDailyStat struct {
	Day             time.Time `db:"day"`
	CompletedCount  int       `db:"completed_count"`
	FailedCount     int       `db:"failed_count"`
	BytesDownloaded int64     `db:"bytes_downloaded"`
	UpdatedAt       time.Time `db:"updated_at"`
}

ImportDailyStat represents historical import statistics for a specific day

type ImportHistory

type ImportHistory struct {
	ID          int64     `db:"id"`
	DownloadID  *string   `db:"download_id"`
	NzbID       *int64    `db:"nzb_id"` // Nullable if queue item deleted
	NzbName     string    `db:"nzb_name"`
	FileName    string    `db:"file_name"`
	FileSize    int64     `db:"file_size"`
	VirtualPath string    `db:"virtual_path"`
	LibraryPath *string   `db:"library_path"` // Added to show final location from file_health
	Category    *string   `db:"category"`
	Metadata    *string   `db:"metadata"`
	CompletedAt time.Time `db:"completed_at"`
}

ImportHistory represents a persistent record of a single imported file

type ImportHourlyStat

type ImportHourlyStat struct {
	Hour            time.Time `db:"hour"`
	CompletedCount  int       `db:"completed_count"`
	FailedCount     int       `db:"failed_count"`
	BytesDownloaded int64     `db:"bytes_downloaded"`
	UpdatedAt       time.Time `db:"updated_at"`
}

ImportHourlyStat represents historical import statistics for a specific hour

type ImportQueueItem

type ImportQueueItem struct {
	ID           int64         `db:"id"`
	DownloadID   *string       `db:"download_id"` // GUID/String ID for external tracking (e.g. Sonarr/Radarr)
	NzbPath      string        `db:"nzb_path"`
	RelativePath *string       `db:"relative_path"`
	StoragePath  *string       `db:"storage_path"`
	Category     *string       `db:"category"` // SABnzbd-compatible category
	Priority     QueuePriority `db:"priority"`
	Status       QueueStatus   `db:"status"`
	CreatedAt    time.Time     `db:"created_at"`
	UpdatedAt    time.Time     `db:"updated_at"`
	StartedAt    *time.Time    `db:"started_at"`
	CompletedAt  *time.Time    `db:"completed_at"`
	RetryCount   int           `db:"retry_count"`
	MaxRetries   int           `db:"max_retries"`
	ErrorMessage *string       `db:"error_message"`
	BatchID      *string       `db:"batch_id"`
	Metadata     *string       `db:"metadata"`    // JSON metadata
	FileSize     *int64        `db:"file_size"`   // Total size in bytes calculated from segments
	TargetPath   *string       `db:"target_path"` // Optional forced symlink destination path
}

ImportQueueItem represents a queued NZB file waiting for import

type QueuePriority

type QueuePriority int

QueuePriority represents the priority level of a queued import

const (
	QueuePriorityHigh   QueuePriority = 1
	QueuePriorityNormal QueuePriority = 2
	QueuePriorityLow    QueuePriority = 3
)

type QueueRepository

type QueueRepository struct {
	// contains filtered or unexported fields
}

QueueRepository handles queue-specific database operations

func NewQueueRepository

func NewQueueRepository(db *sql.DB, d Dialect) *QueueRepository

NewQueueRepository creates a new queue repository

func (*QueueRepository) AddBatchToQueue

func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQueueItem) error

AddBatchToQueue adds multiple items to the queue in a single transaction

func (*QueueRepository) AddImportHistory

func (r *QueueRepository) AddImportHistory(ctx context.Context, history *ImportHistory) error

AddImportHistory records a successful file import in the persistent history table

func (*QueueRepository) AddStoragePath

func (r *QueueRepository) AddStoragePath(ctx context.Context, itemID int64, storagePath string) error

func (*QueueRepository) AddToQueue

func (r *QueueRepository) AddToQueue(ctx context.Context, item *ImportQueueItem) error

AddToQueue adds a new NZB file to the import queue

func (*QueueRepository) ClaimNextQueueItem

func (r *QueueRepository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueItem, error)

ClaimNextQueueItem atomically claims and returns the next available queue item

func (*QueueRepository) ClearDailyStats

func (r *QueueRepository) ClearDailyStats(ctx context.Context) error

ClearDailyStats deletes all records from the import_daily_stats, import_hourly_stats, and provider_hourly_stats tables

func (*QueueRepository) ClearHourlyStats

func (r *QueueRepository) ClearHourlyStats(ctx context.Context) error

ClearHourlyStats deletes all records from the import_hourly_stats table

func (*QueueRepository) ClearImportHistory

func (r *QueueRepository) ClearImportHistory(ctx context.Context) error

ClearImportHistory deletes all records from the import_history and import_daily_stats tables

func (*QueueRepository) ClearImportHistorySince

func (r *QueueRepository) ClearImportHistorySince(ctx context.Context, since time.Time) error

ClearImportHistorySince deletes records from the import_history and import_queue tables since the specified time, and adjusts the import_daily_stats and import_hourly_stats accordingly.

func (*QueueRepository) ClearProviderHourlyStats added in v0.2.0

func (r *QueueRepository) ClearProviderHourlyStats(ctx context.Context) error

ClearProviderHourlyStats deletes all records from the provider_hourly_stats table

func (*QueueRepository) DeleteFailedItemsOlderThan

func (r *QueueRepository) DeleteFailedItemsOlderThan(ctx context.Context, olderThan time.Time) ([]*ImportQueueItem, error)

DeleteFailedItemsOlderThan deletes failed queue items older than the given time. Returns the deleted items so the caller can clean up associated NZB files.

func (*QueueRepository) FilterExistingNzbdavIds

func (r *QueueRepository) FilterExistingNzbdavIds(ctx context.Context, ids []string) ([]string, error)

FilterExistingNzbdavIds checks a list of nzbdav IDs and returns those that already exist in the queue

func (*QueueRepository) GetImportDailyStats

func (r *QueueRepository) GetImportDailyStats(ctx context.Context, days int) ([]*ImportDailyStat, error)

GetImportDailyStats retrieves historical import statistics for the last N days

func (*QueueRepository) GetImportHistory

func (r *QueueRepository) GetImportHistory(ctx context.Context, days int) ([]*ImportDailyStat, error)

GetImportHistory retrieves historical import statistics for the last N days (Alias for GetImportDailyStats)

func (*QueueRepository) GetImportHourlyStats

func (r *QueueRepository) GetImportHourlyStats(ctx context.Context, hours int) ([]*ImportHourlyStat, error)

GetImportHourlyStats retrieves import statistics for the specified number of hours

func (*QueueRepository) GetQueueItem

func (r *QueueRepository) GetQueueItem(ctx context.Context, id int64) (*ImportQueueItem, error)

GetQueueItem retrieves a specific queue item by ID

func (*QueueRepository) GetQueueStats

func (r *QueueRepository) GetQueueStats(ctx context.Context) (*QueueStats, error)

GetQueueStats returns current queue statistics

func (*QueueRepository) IncrementDailyStat

func (r *QueueRepository) IncrementDailyStat(ctx context.Context, statType string) error

IncrementDailyStat increments the completed or failed count for the current day

func (*QueueRepository) IncrementHourlyStat

func (r *QueueRepository) IncrementHourlyStat(ctx context.Context, statType string) error

IncrementHourlyStat increments the completed or failed count for the current hour

func (*QueueRepository) IncrementRetryCountAndResetStatus

func (r *QueueRepository) IncrementRetryCountAndResetStatus(ctx context.Context, id int64, errorMessage *string) (bool, error)

IncrementRetryCountAndResetStatus increments the retry count and resets the status to pending

func (*QueueRepository) IsFileInQueue

func (r *QueueRepository) IsFileInQueue(ctx context.Context, filePath string) (bool, error)

IsFileInQueue checks if a file is already in the queue (pending or processing)

func (*QueueRepository) ListImportHistory

func (r *QueueRepository) ListImportHistory(ctx context.Context, limit int) ([]*ImportHistory, error)

ListImportHistory retrieves the last N successful imports from the persistent history

func (*QueueRepository) RemoveFromQueue

func (r *QueueRepository) RemoveFromQueue(ctx context.Context, id int64) error

RemoveFromQueue removes an item from the queue

func (*QueueRepository) RemoveFromQueueBulk

func (r *QueueRepository) RemoveFromQueueBulk(ctx context.Context, ids []int64) (*BulkOperationResult, error)

RemoveFromQueueBulk removes multiple items from the queue in bulk

func (*QueueRepository) ResetStaleItems

func (r *QueueRepository) ResetStaleItems(ctx context.Context) error

ResetStaleItems resets processing items back to pending on service startup

func (*QueueRepository) RestartQueueItemsBulk

func (r *QueueRepository) RestartQueueItemsBulk(ctx context.Context, ids []int64) error

RestartQueueItemsBulk resets multiple queue items back to pending status

func (*QueueRepository) UpdateQueueItemNzbPath

func (r *QueueRepository) UpdateQueueItemNzbPath(ctx context.Context, id int64, nzbPath string) error

UpdateQueueItemNzbPath updates the NZB path of a queue item

func (*QueueRepository) UpdateQueueItemPriority

func (r *QueueRepository) UpdateQueueItemPriority(ctx context.Context, id int64, priority QueuePriority) error

UpdateQueueItemPriority updates the priority of a queue item

func (*QueueRepository) UpdateQueueItemStatus

func (r *QueueRepository) UpdateQueueItemStatus(ctx context.Context, id int64, status QueueStatus, errorMessage *string) error

UpdateQueueItemStatus updates the status of a queue item

type QueueStats

type QueueStats struct {
	ID                  int64     `db:"id"`
	TotalQueued         int       `db:"total_queued"`
	TotalProcessing     int       `db:"total_processing"`
	TotalCompleted      int       `db:"total_completed"`
	TotalFailed         int       `db:"total_failed"`
	AvgProcessingTimeMs *int      `db:"avg_processing_time_ms"`
	LastUpdated         time.Time `db:"last_updated"`
}

QueueStats represents statistics about the import queue

type QueueStatus

type QueueStatus string

QueueStatus represents the status of a queued import

const (
	QueueStatusPending    QueueStatus = "pending"
	QueueStatusProcessing QueueStatus = "processing"
	QueueStatusCompleted  QueueStatus = "completed"
	QueueStatusFailed     QueueStatus = "failed"
	QueueStatusPaused     QueueStatus = "paused"
	QueueStatusFallback   QueueStatus = "fallback" // Sent to external SABnzbd as fallback
)

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository provides database operations for NZB and file management

func NewRepository

func NewRepository(db *sql.DB, d Dialect) *Repository

NewRepository creates a new repository instance

func (*Repository) AddBatchToQueue

func (r *Repository) AddBatchToQueue(ctx context.Context, items []*ImportQueueItem) error

AddBatchToQueue adds multiple items to the queue in a single transaction for better performance

func (*Repository) AddBytesDownloadedToDailyStat

func (r *Repository) AddBytesDownloadedToDailyStat(ctx context.Context, bytes int64) error

AddBytesDownloadedToDailyStat increments the bytes_downloaded counter for the current day

func (*Repository) AddBytesDownloadedToHourlyStat

func (r *Repository) AddBytesDownloadedToHourlyStat(ctx context.Context, bytes int64) error

AddBytesDownloadedToHourlyStat increments the bytes_downloaded counter for the current hour

func (*Repository) AddImportHistory

func (r *Repository) AddImportHistory(ctx context.Context, history *ImportHistory) error

AddImportHistory records a successful file import in the persistent history table

func (*Repository) AddProviderBytesToHourlyStat added in v0.2.0

func (r *Repository) AddProviderBytesToHourlyStat(ctx context.Context, providerID string, bytes int64) error

AddProviderBytesToHourlyStat adds bytes downloaded to the current hour's stat for a specific provider

func (*Repository) AddToQueue

func (r *Repository) AddToQueue(ctx context.Context, item *ImportQueueItem) error

AddToQueue adds an NZB file to the import queue with optimized concurrency

func (*Repository) BatchUpdateSystemStats

func (r *Repository) BatchUpdateSystemStats(ctx context.Context, stats map[string]int64) error

BatchUpdateSystemStats updates multiple system statistics in a single transaction

func (*Repository) ClaimNextQueueItem

func (r *Repository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueItem, error)

ClaimNextQueueItem atomically claims and returns the next available queue item This prevents multiple workers from processing the same item Uses a single atomic UPDATE...RETURNING query to eliminate race conditions

func (*Repository) ClearCompletedQueueItems

func (r *Repository) ClearCompletedQueueItems(ctx context.Context) (int, error)

ClearCompletedQueueItems removes completed and failed items from the queue

func (*Repository) ClearDailyStats

func (r *Repository) ClearDailyStats(ctx context.Context) error

ClearDailyStats deletes all records from the import_daily_stats, import_hourly_stats, and provider_hourly_stats tables

func (*Repository) ClearDailyStatsSince

func (r *Repository) ClearDailyStatsSince(ctx context.Context, since time.Time) error

ClearDailyStatsSince deletes records from the import_daily_stats table since the specified time

func (*Repository) ClearFailedQueueItems

func (r *Repository) ClearFailedQueueItems(ctx context.Context) (int, error)

ClearFailedQueueItems removes failed items from the queue

func (*Repository) ClearHourlyStats

func (r *Repository) ClearHourlyStats(ctx context.Context) error

ClearHourlyStats deletes all records from the import_hourly_stats table

func (*Repository) ClearHourlyStatsSince

func (r *Repository) ClearHourlyStatsSince(ctx context.Context, since time.Time) error

ClearHourlyStatsSince deletes records from the import_hourly_stats table since the specified time

func (*Repository) ClearImportHistory

func (r *Repository) ClearImportHistory(ctx context.Context) error

ClearImportHistory deletes all records from the import_history and import_daily_stats tables

func (*Repository) ClearImportHistorySince

func (r *Repository) ClearImportHistorySince(ctx context.Context, since time.Time) error

ClearImportHistorySince deletes records from the import_history and import_queue tables since the specified time, and adjusts the import_daily_stats accordingly.

func (*Repository) ClearPendingQueueItems

func (r *Repository) ClearPendingQueueItems(ctx context.Context) (int, error)

ClearPendingQueueItems removes pending items from the queue

func (*Repository) ClearProviderHourlyStats added in v0.2.0

func (r *Repository) ClearProviderHourlyStats(ctx context.Context) error

ClearProviderHourlyStats deletes all records from the provider_hourly_stats table

func (*Repository) CountActiveQueueItems

func (r *Repository) CountActiveQueueItems(ctx context.Context, search string, category string) (int, error)

CountActiveQueueItems counts the total number of pending and processing queue items

func (*Repository) CountQueueItems

func (r *Repository) CountQueueItems(ctx context.Context, status *QueueStatus, search string, category string) (int, error)

CountQueueItems counts the total number of queue items matching the given filters

func (*Repository) FilterExistingNzbdavIds

func (r *Repository) FilterExistingNzbdavIds(ctx context.Context, ids []string) ([]string, error)

FilterExistingNzbdavIds checks a list of nzbdav IDs and returns those that already exist in the queue This is used for deduplication during import

func (*Repository) GetExpiredStremioQueueItems

func (r *Repository) GetExpiredStremioQueueItems(ctx context.Context, ttlHours int, tempUploadDir string) ([]*ImportQueueItem, error)

GetExpiredStremioQueueItems returns completed Stremio queue items whose completed_at is older than ttlHours. Items are identified as Stremio-originated by their nzb_path containing tempUploadDir (typically os.TempDir()+"/altmount-uploads").

func (*Repository) GetImportDailyStats

func (r *Repository) GetImportDailyStats(ctx context.Context, days int) ([]*ImportDailyStat, error)

GetImportDailyStats retrieves import statistics for the specified number of days

func (*Repository) GetImportHistory

func (r *Repository) GetImportHistory(ctx context.Context, days int) ([]*ImportDailyStat, error)

GetImportHistory retrieves historical import statistics for the last N days (Alias for GetImportDailyStats)

func (*Repository) GetImportHistoryByDownloadID added in v0.2.0

func (r *Repository) GetImportHistoryByDownloadID(ctx context.Context, downloadID string) (*ImportHistory, error)

GetImportHistoryByDownloadID retrieves an import history item by its DownloadID

func (*Repository) GetImportHistoryByPath

func (r *Repository) GetImportHistoryByPath(ctx context.Context, virtualPath string) (*ImportHistory, error)

GetImportHistoryByPath retrieves an import history item by its virtual path

func (*Repository) GetImportHistoryItem added in v0.2.0

func (r *Repository) GetImportHistoryItem(ctx context.Context, id int64) (*ImportHistory, error)

GetImportHistoryItem retrieves a specific import history item by ID

func (*Repository) GetImportHourlyStats

func (r *Repository) GetImportHourlyStats(ctx context.Context, hours int) ([]*ImportHourlyStat, error)

GetImportHourlyStats retrieves import statistics for the specified number of hours

func (*Repository) GetProviderHourlyStats added in v0.2.0

func (r *Repository) GetProviderHourlyStats(ctx context.Context, hours int) (map[string]int64, error)

GetProviderHourlyStats retrieves provider download statistics for the last N hours

func (*Repository) GetQueueItem

func (r *Repository) GetQueueItem(ctx context.Context, id int64) (*ImportQueueItem, error)

GetQueueItem retrieves a specific queue item by ID

func (*Repository) GetQueueItemByDownloadID added in v0.2.0

func (r *Repository) GetQueueItemByDownloadID(ctx context.Context, downloadID string) (*ImportQueueItem, error)

GetQueueItemByDownloadID retrieves a queue item by its DownloadID

func (*Repository) GetQueueStats

func (r *Repository) GetQueueStats(ctx context.Context) (*QueueStats, error)

GetQueueStats retrieves current queue statistics

func (*Repository) GetSystemState

func (r *Repository) GetSystemState(ctx context.Context, key string) (string, error)

GetSystemState retrieves a system state string by key

func (*Repository) GetSystemStats

func (r *Repository) GetSystemStats(ctx context.Context) (map[string]int64, error)

GetSystemStats retrieves all system statistics as a map

func (*Repository) IncrementDailyStat

func (r *Repository) IncrementDailyStat(ctx context.Context, statType string) error

IncrementDailyStat increments the completed or failed count for the current day

func (*Repository) IncrementHourlyStat

func (r *Repository) IncrementHourlyStat(ctx context.Context, statType string) error

IncrementHourlyStat increments the completed or failed count for the current hour

func (*Repository) IsFileInQueue

func (r *Repository) IsFileInQueue(ctx context.Context, filePath string) (bool, error)

IsFileInQueue checks if a file is already in the queue (pending or processing)

func (*Repository) ListActiveQueueItems

func (r *Repository) ListActiveQueueItems(ctx context.Context, search string, category string, limit, offset int, sortBy, sortOrder string) ([]*ImportQueueItem, error)

ListActiveQueueItems retrieves pending and processing queue items

func (*Repository) ListImportHistory

func (r *Repository) ListImportHistory(ctx context.Context, limit, offset int, search string, category string) ([]*ImportHistory, error)

ListImportHistory retrieves import history items with optional filtering and pagination

func (*Repository) ListQueueItems

func (r *Repository) ListQueueItems(ctx context.Context, status *QueueStatus, search string, category string, limit, offset int, sortBy, sortOrder string) ([]*ImportQueueItem, error)

ListQueueItems retrieves queue items with optional filtering

func (*Repository) ListRecentImportHistory

func (r *Repository) ListRecentImportHistory(ctx context.Context, minutes int, category string) ([]*ImportHistory, error)

ListRecentImportHistory retrieves import history items completed within the last N minutes.

func (*Repository) RemoveFromHistory

func (r *Repository) RemoveFromHistory(ctx context.Context, id int64) (int64, error)

RemoveFromHistory removes a record from import_history by its own ID

func (*Repository) RemoveFromHistoryByDownloadID added in v0.2.0

func (r *Repository) RemoveFromHistoryByDownloadID(ctx context.Context, downloadID string) (int64, error)

RemoveFromHistoryByDownloadID removes a record from import_history by its DownloadID

func (*Repository) RemoveFromHistoryByNzbID

func (r *Repository) RemoveFromHistoryByNzbID(ctx context.Context, nzbID int64) (int64, error)

RemoveFromHistoryByNzbID removes a record from import_history by its original NZB ID

func (*Repository) RemoveFromQueue

func (r *Repository) RemoveFromQueue(ctx context.Context, id int64) error

RemoveFromQueue removes an item from the queue

func (*Repository) RemoveFromQueueBulk

func (r *Repository) RemoveFromQueueBulk(ctx context.Context, ids []int64) (*BulkOperationResult, error)

RemoveFromQueueBulk removes multiple items from the queue, excluding those currently being processed

func (*Repository) RemoveFromQueueByDownloadID added in v0.2.0

func (r *Repository) RemoveFromQueueByDownloadID(ctx context.Context, downloadID string) error

RemoveFromQueueByDownloadID removes an item from the queue by its DownloadID

func (*Repository) RestartQueueItemsBulk

func (r *Repository) RestartQueueItemsBulk(ctx context.Context, ids []int64) error

RestartQueueItemsBulk resets multiple queue items to pending status for reprocessing

func (*Repository) UpdateQueueItemPriority

func (r *Repository) UpdateQueueItemPriority(ctx context.Context, id int64, priority QueuePriority) error

UpdateQueueItemPriority updates the priority of a queue item

func (*Repository) UpdateQueueItemStatus

func (r *Repository) UpdateQueueItemStatus(ctx context.Context, id int64, status QueueStatus, errorMessage *string) error

UpdateQueueItemStatus updates the status of a queue item

func (*Repository) UpdateQueueStats

func (r *Repository) UpdateQueueStats(ctx context.Context) error

UpdateQueueStats updates queue statistics based on current queue state

func (*Repository) UpdateSystemStat

func (r *Repository) UpdateSystemStat(ctx context.Context, key string, value int64) error

UpdateSystemStat updates or inserts a single system statistic

func (*Repository) UpdateSystemState

func (r *Repository) UpdateSystemState(ctx context.Context, key string, value string) error

UpdateSystemState updates a system state string (JSON) by key

func (*Repository) WithImmediateTransaction

func (r *Repository) WithImmediateTransaction(ctx context.Context, fn func(*Repository) error) error

WithImmediateTransaction executes a function within an immediate database transaction This reduces lock contention for queue operations by acquiring write locks immediately Uses SQLite's IMMEDIATE transaction mode via BeginTx with Serializable isolation

func (*Repository) WithTransaction

func (r *Repository) WithTransaction(ctx context.Context, fn func(*Repository) error) error

WithTransaction executes a function within a database transaction

type SystemStat

type SystemStat struct {
	Key       string    `db:"key"`
	Value     int64     `db:"value"`
	UpdatedAt time.Time `db:"updated_at"`
}

SystemStat represents a persistent system statistic

type UpdateType

type UpdateType int

UpdateType represents the type of health update

const (
	UpdateTypeHealthy       UpdateType = 1
	UpdateTypeRetry         UpdateType = 2
	UpdateTypeRepairRetry   UpdateType = 3 // re-check of an already-triggered repair; increments repair_retry_count
	UpdateTypeCorrupted     UpdateType = 4
	UpdateTypeRepairTrigger UpdateType = 5 // first-time trigger; does not increment repair_retry_count
)

type User

type User struct {
	ID           int64      `db:"id"`
	UserID       string     `db:"user_id"`       // Unique identifier from auth provider
	Email        *string    `db:"email"`         // User email address (nullable)
	Name         *string    `db:"name"`          // User display name (nullable)
	AvatarURL    *string    `db:"avatar_url"`    // User avatar image URL (nullable)
	Provider     string     `db:"provider"`      // Auth provider (direct, github, google, dev, etc.)
	ProviderID   *string    `db:"provider_id"`   // Provider-specific user ID (nullable)
	PasswordHash *string    `db:"password_hash"` // Bcrypt password hash for direct auth (nullable)
	APIKey       *string    `db:"api_key"`       // API key for user authentication (nullable)
	IsAdmin      bool       `db:"is_admin"`      // Admin privileges flag
	CreatedAt    time.Time  `db:"created_at"`    // Account creation timestamp
	UpdatedAt    time.Time  `db:"updated_at"`    // Last profile update timestamp
	LastLogin    *time.Time `db:"last_login"`    // Last login timestamp (nullable)
}

User represents a user account in the system

type UserRepository

type UserRepository struct {
	// contains filtered or unexported fields
}

UserRepository handles user database operations

func NewUserRepository

func NewUserRepository(db *sql.DB, d Dialect) *UserRepository

NewUserRepository creates a new user repository

func (*UserRepository) CreateUser

func (r *UserRepository) CreateUser(ctx context.Context, user *User) error

CreateUser creates a new user account

func (*UserRepository) GetAllUsers

func (r *UserRepository) GetAllUsers(ctx context.Context) ([]*User, error)

GetAllUsers retrieves all users with API keys for authentication purposes

func (*UserRepository) GetUserByAPIKey

func (r *UserRepository) GetUserByAPIKey(ctx context.Context, apiKey string) (*User, error)

GetUserByAPIKey retrieves a user by their API key

func (*UserRepository) GetUserByEmail

func (r *UserRepository) GetUserByEmail(ctx context.Context, email string) (*User, error)

GetUserByEmail retrieves a user by their email address for direct authentication

func (*UserRepository) GetUserByID

func (r *UserRepository) GetUserByID(ctx context.Context, userID string) (*User, error)

GetUserByID retrieves a user by their unique user ID

func (*UserRepository) GetUserByProvider

func (r *UserRepository) GetUserByProvider(ctx context.Context, provider, providerID string) (*User, error)

GetUserByProvider retrieves a user by provider and provider ID

func (*UserRepository) GetUserByUsername

func (r *UserRepository) GetUserByUsername(ctx context.Context, username string) (*User, error)

GetUserByUsername retrieves a user by their username (user_id) for direct authentication

func (*UserRepository) GetUserCount

func (r *UserRepository) GetUserCount(ctx context.Context) (int, error)

GetUserCount returns the total number of users

func (*UserRepository) RegenerateAPIKey

func (r *UserRepository) RegenerateAPIKey(ctx context.Context, userID string) (string, error)

RegenerateAPIKey generates and updates a new API key for the user

func (*UserRepository) UpdateLastLogin

func (r *UserRepository) UpdateLastLogin(ctx context.Context, userID string) error

UpdateLastLogin updates the user's last login timestamp

func (*UserRepository) UpdatePassword

func (r *UserRepository) UpdatePassword(ctx context.Context, userID string, passwordHash string) error

UpdatePassword updates a user's password hash

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL