database

package
v0.0.1-alpha3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BulkDeleteResult

type BulkDeleteResult struct {
	DeletedCount    int64
	ProcessingCount int64
	RequestedCount  int64
}

BulkDeleteResult contains the result of a bulk delete operation

type Config

type Config struct {
	DatabasePath string
}

Config holds database configuration

type DB

type DB struct {
	Repository *QueueRepository
	// contains filtered or unexported fields
}

DB wraps the database connection and provides access to operations

func NewDB

func NewDB(config Config) (*DB, error)

NewDB creates a new database connection and runs migrations

func (*DB) Close

func (db *DB) Close() error

Close closes the database connection

func (*DB) Connection

func (db *DB) Connection() *sql.DB

Connection returns the underlying database connection

type FileHealth

type FileHealth struct {
	ID               int64        `db:"id"`
	FilePath         string       `db:"file_path"`
	Status           HealthStatus `db:"status"`
	LastChecked      time.Time    `db:"last_checked"`
	LastError        *string      `db:"last_error"`
	RetryCount       int          `db:"retry_count"`        // Health check retry count
	MaxRetries       int          `db:"max_retries"`        // Max health check retries
	RepairRetryCount int          `db:"repair_retry_count"` // Repair retry count
	MaxRepairRetries int          `db:"max_repair_retries"` // Max repair retries
	NextRetryAt      *time.Time   `db:"next_retry_at"`
	SourceNzbPath    *string      `db:"source_nzb_path"`
	ErrorDetails     *string      `db:"error_details"` // JSON error details
	CreatedAt        time.Time    `db:"created_at"`
	UpdatedAt        time.Time    `db:"updated_at"`
}

FileHealth represents the health tracking of files in the filesystem

type HealthRepository

type HealthRepository struct {
	// contains filtered or unexported fields
}

HealthRepository handles file health database operations

func NewHealthRepository

func NewHealthRepository(db interface {
	Exec(query string, args ...interface{}) (sql.Result, error)
	Query(query string, args ...interface{}) (*sql.Rows, error)
	QueryRow(query string, args ...interface{}) *sql.Row
}) *HealthRepository

NewHealthRepository creates a new health repository

func (*HealthRepository) AddFileToHealthCheck

func (r *HealthRepository) AddFileToHealthCheck(filePath string, maxRetries int, sourceNzbPath *string) error

AddFileToHealthCheck adds a file to the health database for checking

func (*HealthRepository) CleanupHealthRecords

func (r *HealthRepository) CleanupHealthRecords(existingFiles []string) error

CleanupHealthRecords removes health records for files that no longer exist

func (*HealthRepository) CountHealthItems

func (r *HealthRepository) CountHealthItems(statusFilter *HealthStatus, sinceFilter *time.Time, search string) (int, error)

CountHealthItems returns the total count of health records with optional filtering

func (*HealthRepository) DeleteHealthRecord

func (r *HealthRepository) DeleteHealthRecord(filePath string) error

DeleteHealthRecord removes a specific health record from the database

func (*HealthRepository) DeleteHealthRecordByID

func (r *HealthRepository) DeleteHealthRecordByID(id int64) error

DeleteHealthRecordByID removes a specific health record from the database by ID

func (*HealthRepository) DeleteHealthRecordsBulk

func (r *HealthRepository) DeleteHealthRecordsBulk(filePaths []string) error

DeleteHealthRecordsBulk removes multiple health records from the database

func (*HealthRepository) GetFileHealth

func (r *HealthRepository) GetFileHealth(filePath string) (*FileHealth, error)

GetFileHealth retrieves health record for a specific file

func (*HealthRepository) GetFileHealthByID

func (r *HealthRepository) GetFileHealthByID(id int64) (*FileHealth, error)

GetFileHealthByID retrieves health record for a specific file by ID

func (*HealthRepository) GetFilesForRepairNotification

func (r *HealthRepository) GetFilesForRepairNotification(limit int) ([]*FileHealth, error)

GetFilesForRepairNotification returns files that need repair notification (repair_triggered status)

func (*HealthRepository) GetHealthStats

func (r *HealthRepository) GetHealthStats() (map[HealthStatus]int, error)

GetHealthStats returns statistics about file health

func (*HealthRepository) GetUnhealthyFiles

func (r *HealthRepository) GetUnhealthyFiles(limit int) ([]*FileHealth, error)

GetUnhealthyFiles returns files that need health checks (excluding repair_triggered files)

func (*HealthRepository) IncrementRepairRetryCount

func (r *HealthRepository) IncrementRepairRetryCount(filePath string, errorMessage *string) error

IncrementRepairRetryCount increments the repair retry count and calculates next retry time

func (*HealthRepository) IncrementRetryCount

func (r *HealthRepository) IncrementRetryCount(filePath string, errorMessage *string) error

IncrementRetryCount increments the retry count and calculates next retry time

func (*HealthRepository) ListHealthItems

func (r *HealthRepository) ListHealthItems(statusFilter *HealthStatus, limit, offset int, sinceFilter *time.Time, search string) ([]*FileHealth, error)

ListHealthItems returns all health records with optional filtering and pagination

func (*HealthRepository) MarkAsCorrupted

func (r *HealthRepository) MarkAsCorrupted(filePath string, finalError *string) error

MarkAsCorrupted permanently marks a file as corrupted after all retries are exhausted

func (*HealthRepository) ResetFileAllChecking

func (r *HealthRepository) ResetFileAllChecking() error

func (*HealthRepository) SetCorrupted

func (r *HealthRepository) SetCorrupted(filePath string, errorMessage *string) error

SetCorrupted sets a file's status to corrupted

func (*HealthRepository) SetFileChecking

func (r *HealthRepository) SetFileChecking(filePath string) error

SetFileChecking sets a file's status to 'checking'

func (*HealthRepository) SetFileCheckingByID

func (r *HealthRepository) SetFileCheckingByID(id int64) error

SetFileCheckingByID sets a file's status to 'checking' by ID

func (*HealthRepository) SetRepairTriggered

func (r *HealthRepository) SetRepairTriggered(filePath string, errorMessage *string) error

SetRepairTriggered sets a file's status to repair_triggered

func (*HealthRepository) SetRepairTriggeredByID

func (r *HealthRepository) SetRepairTriggeredByID(id int64, errorMessage *string) error

SetRepairTriggeredByID sets a file's status to repair_triggered by ID

func (*HealthRepository) UpdateFileHealth

func (r *HealthRepository) UpdateFileHealth(filePath string, status HealthStatus, errorMessage *string, sourceNzbPath *string, errorDetails *string) error

UpdateFileHealth updates or inserts a file health record

type HealthStatus

type HealthStatus string

HealthStatus represents the health status of a file

const (
	HealthStatusPending         HealthStatus = "pending"          // File has not been checked yet
	HealthStatusChecking        HealthStatus = "checking"         // File is currently being checked
	HealthStatusHealthy         HealthStatus = "healthy"          // File is fully available and healthy
	HealthStatusPartial         HealthStatus = "partial"          // File has some missing segments but is recoverable
	HealthStatusRepairTriggered HealthStatus = "repair_triggered" // File repair has been triggered in Arrs
	HealthStatusCorrupted       HealthStatus = "corrupted"        // File is corrupted or permanently unavailable
)

type ImportQueueItem

type ImportQueueItem struct {
	ID           int64         `db:"id"`
	NzbPath      string        `db:"nzb_path"`
	RelativePath *string       `db:"relative_path"`
	StoragePath  *string       `db:"storage_path"`
	Category     *string       `db:"category"` // SABnzbd-compatible category
	Priority     QueuePriority `db:"priority"`
	Status       QueueStatus   `db:"status"`
	CreatedAt    time.Time     `db:"created_at"`
	UpdatedAt    time.Time     `db:"updated_at"`
	StartedAt    *time.Time    `db:"started_at"`
	CompletedAt  *time.Time    `db:"completed_at"`
	RetryCount   int           `db:"retry_count"`
	MaxRetries   int           `db:"max_retries"`
	ErrorMessage *string       `db:"error_message"`
	BatchID      *string       `db:"batch_id"`
	Metadata     *string       `db:"metadata"`  // JSON metadata
	FileSize     *int64        `db:"file_size"` // Total size in bytes calculated from segments
}

ImportQueueItem represents a queued NZB file waiting for import

type MediaFile

type MediaFile struct {
	ID           int64     `db:"id"`
	InstanceName string    `db:"instance_name"` // Name from configuration
	InstanceType string    `db:"instance_type"` // "radarr" or "sonarr"
	ExternalID   int64     `db:"external_id"`   // Movie ID or Episode ID from API
	FileID       int64     `db:"file_id"`       // Movie File ID or Episode File ID from API (nullable)
	FilePath     string    `db:"file_path"`     // Full file path
	FileSize     *int64    `db:"file_size"`     // File size in bytes (nullable)
	CreatedAt    time.Time `db:"created_at"`    // When record was created
	UpdatedAt    time.Time `db:"updated_at"`    // When record was last updated
}

MediaFile represents a media file tracked by scrapers

type MediaFileInput

type MediaFileInput struct {
	InstanceName string
	InstanceType string
	ExternalID   int64  // Movie ID or Episode ID
	FileID       *int64 // Movie File ID or Episode File ID (nullable)
	FilePath     string
	FileSize     *int64
}

MediaFileInput represents input data for media file operations

type MediaRepository

type MediaRepository struct {
	// contains filtered or unexported fields
}

MediaRepository handles operations for media files table

func NewMediaRepository

func NewMediaRepository(db *sql.DB, logger *slog.Logger) *MediaRepository

NewMediaRepository creates a new media repository

func (*MediaRepository) CleanupInstanceData

func (r *MediaRepository) CleanupInstanceData(instanceName, instanceType string) error

CleanupInstanceData removes all media files for a specific instance This can be called when an instance is removed from configuration

func (*MediaRepository) GetMediaFilesByInstance

func (r *MediaRepository) GetMediaFilesByInstance(instanceName, instanceType string) ([]MediaFile, error)

GetMediaFilesByInstance returns all media files for a specific instance

func (*MediaRepository) GetMediaFilesByPath

func (r *MediaRepository) GetMediaFilesByPath(filePath string) ([]MediaFile, error)

GetMediaFilesByPath returns media files matching a file path This can be used for health correlation

func (*MediaRepository) GetMediaFilesCount

func (r *MediaRepository) GetMediaFilesCount() (int64, error)

GetMediaFilesCount returns the total count of media files

func (*MediaRepository) GetMediaFilesCountByInstance

func (r *MediaRepository) GetMediaFilesCountByInstance(instanceName, instanceType string) (int64, error)

GetMediaFilesCountByInstance returns the count of media files for a specific instance

func (*MediaRepository) SyncMediaFiles

func (r *MediaRepository) SyncMediaFiles(instanceName, instanceType string, files []MediaFileInput) (*SyncResult, error)

SyncMediaFiles performs a complete sync operation for an instance This replaces all files for the instance with the provided list

type QueuePriority

type QueuePriority int

QueuePriority represents the priority level of a queued import

const (
	QueuePriorityHigh   QueuePriority = 1
	QueuePriorityNormal QueuePriority = 2
	QueuePriorityLow    QueuePriority = 3
)

type QueueRepository

type QueueRepository struct {
	// contains filtered or unexported fields
}

QueueRepository handles queue-specific database operations

func NewQueueRepository

func NewQueueRepository(db interface {
	Exec(query string, args ...interface{}) (sql.Result, error)
	Query(query string, args ...interface{}) (*sql.Rows, error)
	QueryRow(query string, args ...interface{}) *sql.Row
}) *QueueRepository

NewQueueRepository creates a new queue repository

func (*QueueRepository) AddBatchToQueue

func (r *QueueRepository) AddBatchToQueue(items []*ImportQueueItem) error

AddBatchToQueue adds multiple items to the queue in a single transaction

func (*QueueRepository) AddStoragePath

func (r *QueueRepository) AddStoragePath(itemID int64, storagePath string) error

func (*QueueRepository) AddToQueue

func (r *QueueRepository) AddToQueue(item *ImportQueueItem) error

AddToQueue adds a new NZB file to the import queue

func (*QueueRepository) ClaimNextQueueItem

func (r *QueueRepository) ClaimNextQueueItem() (*ImportQueueItem, error)

ClaimNextQueueItem atomically claims and returns the next available queue item

func (*QueueRepository) GetQueueItem

func (r *QueueRepository) GetQueueItem(id int64) (*ImportQueueItem, error)

GetQueueItem retrieves a specific queue item by ID

func (*QueueRepository) GetQueueStats

func (r *QueueRepository) GetQueueStats() (*QueueStats, error)

GetQueueStats returns current queue statistics

func (*QueueRepository) IsFileInQueue

func (r *QueueRepository) IsFileInQueue(filePath string) (bool, error)

IsFileInQueue checks if a file is already in the queue (pending, retrying, or processing)

func (*QueueRepository) ResetStaleItems

func (r *QueueRepository) ResetStaleItems() error

ResetStaleItems resets processing and retrying items back to pending on service startup

func (*QueueRepository) UpdateQueueItemStatus

func (r *QueueRepository) UpdateQueueItemStatus(id int64, status QueueStatus, errorMessage *string) error

UpdateQueueItemStatus updates the status of a queue item

type QueueStats

type QueueStats struct {
	ID                  int64     `db:"id"`
	TotalQueued         int       `db:"total_queued"`
	TotalProcessing     int       `db:"total_processing"`
	TotalCompleted      int       `db:"total_completed"`
	TotalFailed         int       `db:"total_failed"`
	AvgProcessingTimeMs *int      `db:"avg_processing_time_ms"`
	LastUpdated         time.Time `db:"last_updated"`
}

QueueStats represents statistics about the import queue

type QueueStatus

type QueueStatus string

QueueStatus represents the status of a queued import

const (
	QueueStatusPending    QueueStatus = "pending"
	QueueStatusProcessing QueueStatus = "processing"
	QueueStatusCompleted  QueueStatus = "completed"
	QueueStatusFailed     QueueStatus = "failed"
	QueueStatusRetrying   QueueStatus = "retrying"
)

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository provides database operations for NZB and file management

func NewRepository

func NewRepository(db *sql.DB) *Repository

NewRepository creates a new repository instance

func (*Repository) AddBatchToQueue

func (r *Repository) AddBatchToQueue(items []*ImportQueueItem) error

AddBatchToQueue adds multiple items to the queue in a single transaction for better performance

func (*Repository) AddToQueue

func (r *Repository) AddToQueue(item *ImportQueueItem) error

AddToQueue adds an NZB file to the import queue with optimized concurrency

func (*Repository) ClaimNextQueueItem

func (r *Repository) ClaimNextQueueItem() (*ImportQueueItem, error)

ClaimNextQueueItem atomically claims and returns the next available queue item This prevents multiple workers from processing the same item

func (*Repository) ClearCompletedQueueItems

func (r *Repository) ClearCompletedQueueItems(olderThan time.Time) (int, error)

ClearCompletedQueueItems removes completed and failed items from the queue

func (*Repository) ClearFailedQueueItems

func (r *Repository) ClearFailedQueueItems(olderThan time.Time) (int, error)

ClearFailedQueueItems removes failed items from the queue

func (*Repository) CountQueueItems

func (r *Repository) CountQueueItems(status *QueueStatus, search string) (int, error)

CountQueueItems counts the total number of queue items matching the given filters

func (*Repository) GetNextQueueItems

func (r *Repository) GetNextQueueItems(limit int) ([]*ImportQueueItem, error)

GetNextQueueItems retrieves the next batch of items to process from the queue Uses optimized query with row-level locking for better concurrency

func (*Repository) GetQueueItem

func (r *Repository) GetQueueItem(id int64) (*ImportQueueItem, error)

GetQueueItem retrieves a specific queue item by ID

func (*Repository) GetQueueItemByPath

func (r *Repository) GetQueueItemByPath(nzbPath string) (*ImportQueueItem, error)

GetQueueItemByPath retrieves a queue item by NZB path

func (*Repository) GetQueueStats

func (r *Repository) GetQueueStats() (*QueueStats, error)

GetQueueStats retrieves current queue statistics

func (*Repository) IsFileInQueue

func (r *Repository) IsFileInQueue(filePath string) (bool, error)

IsFileInQueue checks if a file is already in the queue (pending, retrying, or processing)

func (*Repository) ListQueueItems

func (r *Repository) ListQueueItems(status *QueueStatus, search string, limit, offset int) ([]*ImportQueueItem, error)

ListQueueItems retrieves queue items with optional filtering

func (*Repository) RemoveFromQueue

func (r *Repository) RemoveFromQueue(id int64) error

RemoveFromQueue removes an item from the queue

func (*Repository) RemoveFromQueueBulk

func (r *Repository) RemoveFromQueueBulk(ids []int64) (*BulkDeleteResult, error)

RemoveFromQueueBulk removes multiple items from the queue, excluding those currently being processed

func (*Repository) RestartQueueItemsBulk

func (r *Repository) RestartQueueItemsBulk(ids []int64) error

RestartQueueItemsBulk resets multiple queue items to pending status for reprocessing

func (*Repository) UpdateQueueItemStatus

func (r *Repository) UpdateQueueItemStatus(id int64, status QueueStatus, errorMessage *string) error

UpdateQueueItemStatus updates the status of a queue item

func (*Repository) UpdateQueueStats

func (r *Repository) UpdateQueueStats() error

UpdateQueueStats updates queue statistics based on current queue state

func (*Repository) WithImmediateTransaction

func (r *Repository) WithImmediateTransaction(fn func(*Repository) error) error

WithImmediateTransaction executes a function within an immediate database transaction This reduces lock contention for queue operations by acquiring locks immediately

func (*Repository) WithTransaction

func (r *Repository) WithTransaction(fn func(*Repository) error) error

WithTransaction executes a function within a database transaction

type SyncResult

type SyncResult struct {
	Added   int
	Updated int
	Removed int
}

SyncResult represents the result of a sync operation

type User

type User struct {
	ID           int64      `db:"id"`
	UserID       string     `db:"user_id"`       // Unique identifier from auth provider
	Email        *string    `db:"email"`         // User email address (nullable)
	Name         *string    `db:"name"`          // User display name (nullable)
	AvatarURL    *string    `db:"avatar_url"`    // User avatar image URL (nullable)
	Provider     string     `db:"provider"`      // Auth provider (direct, github, google, dev, etc.)
	ProviderID   *string    `db:"provider_id"`   // Provider-specific user ID (nullable)
	PasswordHash *string    `db:"password_hash"` // Bcrypt password hash for direct auth (nullable)
	APIKey       *string    `db:"api_key"`       // API key for user authentication (nullable)
	IsAdmin      bool       `db:"is_admin"`      // Admin privileges flag
	CreatedAt    time.Time  `db:"created_at"`    // Account creation timestamp
	UpdatedAt    time.Time  `db:"updated_at"`    // Last profile update timestamp
	LastLogin    *time.Time `db:"last_login"`    // Last login timestamp (nullable)
}

User represents a user account in the system

type UserRepository

type UserRepository struct {
	// contains filtered or unexported fields
}

UserRepository handles user database operations

func NewUserRepository

func NewUserRepository(db interface {
	Exec(query string, args ...interface{}) (sql.Result, error)
	Query(query string, args ...interface{}) (*sql.Rows, error)
	QueryRow(query string, args ...interface{}) *sql.Row
}) *UserRepository

NewUserRepository creates a new user repository

func (*UserRepository) CreateUser

func (r *UserRepository) CreateUser(user *User) error

CreateUser creates a new user account

func (*UserRepository) DeleteUser

func (r *UserRepository) DeleteUser(userID string) error

DeleteUser deletes a user by their user ID

func (*UserRepository) GetUserByAPIKey

func (r *UserRepository) GetUserByAPIKey(apiKey string) (*User, error)

GetUserByAPIKey retrieves a user by their API key

func (*UserRepository) GetUserByEmail

func (r *UserRepository) GetUserByEmail(email string) (*User, error)

GetUserByEmail retrieves a user by their email address for direct authentication

func (*UserRepository) GetUserByID

func (r *UserRepository) GetUserByID(userID string) (*User, error)

GetUserByID retrieves a user by their unique user ID

func (*UserRepository) GetUserByProvider

func (r *UserRepository) GetUserByProvider(provider, providerID string) (*User, error)

GetUserByProvider retrieves a user by provider and provider ID

func (*UserRepository) GetUserByUsername

func (r *UserRepository) GetUserByUsername(username string) (*User, error)

GetUserByUsername retrieves a user by their username (user_id) for direct authentication

func (*UserRepository) GetUserCount

func (r *UserRepository) GetUserCount() (int, error)

GetUserCount returns the total number of users

func (*UserRepository) ListUsers

func (r *UserRepository) ListUsers(limit, offset int) ([]*User, error)

ListUsers returns a list of all users with pagination

func (*UserRepository) RegenerateAPIKey

func (r *UserRepository) RegenerateAPIKey(userID string) (string, error)

RegenerateAPIKey generates and updates a new API key for the user

func (*UserRepository) SetAdminStatus

func (r *UserRepository) SetAdminStatus(userID string, isAdmin bool) error

SetAdminStatus updates a user's admin status

func (*UserRepository) UpdateLastLogin

func (r *UserRepository) UpdateLastLogin(userID string) error

UpdateLastLogin updates the user's last login timestamp

func (*UserRepository) UpdatePassword

func (r *UserRepository) UpdatePassword(userID string, passwordHash string) error

UpdatePassword updates a user's password hash

func (*UserRepository) UpdateUser

func (r *UserRepository) UpdateUser(user *User) error

UpdateUser updates an existing user's information

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL