queue

package
v0.0.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusPending  = "pending"
	StatusRunning  = "running"
	StatusComplete = "complete"
	StatusError    = "error"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CompletedItem

type CompletedItem struct {
	ID          string    `json:"id"`
	Path        string    `json:"path"`
	Size        int64     `json:"size"`
	Priority    int       `json:"priority"`
	NzbPath     string    `json:"nzbPath"`
	CreatedAt   time.Time `json:"createdAt"`
	CompletedAt time.Time `json:"completedAt"`
	JobData     []byte    `json:"jobData"`
}

type FileJob

type FileJob struct {
	Path       string    `json:"path"`
	Size       int64     `json:"size"`
	Priority   int       `json:"priority"`
	CreatedAt  time.Time `json:"createdAt"`
	RetryCount int       `json:"retryCount"`
}

type GooseMigrationRunner added in v0.0.13

type GooseMigrationRunner struct {
	// contains filtered or unexported fields
}

GooseMigrationRunner handles database migrations using goose

func NewGooseMigrationRunner added in v0.0.13

func NewGooseMigrationRunner(db *sql.DB) *GooseMigrationRunner

NewGooseMigrationRunner creates a new goose migration runner

func (*GooseMigrationRunner) EnsureMigrationCompatibility added in v0.0.13

func (gmr *GooseMigrationRunner) EnsureMigrationCompatibility() error

EnsureMigrationCompatibility checks for legacy database and recreates if needed

func (*GooseMigrationRunner) GetStatus added in v0.0.13

func (gmr *GooseMigrationRunner) GetStatus() (*GooseMigrationStatus, error)

GetStatus returns the current migration status

func (*GooseMigrationRunner) IsLegacyDatabase added in v0.0.13

func (gmr *GooseMigrationRunner) IsLegacyDatabase() (bool, error)

IsLegacyDatabase checks if the database exists but doesn't have goose migrations table

func (*GooseMigrationRunner) MigrateDown added in v0.0.13

func (gmr *GooseMigrationRunner) MigrateDown() error

MigrateDown rolls back the last migration

func (*GooseMigrationRunner) MigrateTo added in v0.0.13

func (gmr *GooseMigrationRunner) MigrateTo(version int64) error

MigrateTo migrates to a specific version

func (*GooseMigrationRunner) MigrateUp added in v0.0.13

func (gmr *GooseMigrationRunner) MigrateUp() error

MigrateUp runs all pending migrations

func (*GooseMigrationRunner) RecreateDatabase added in v0.0.13

func (gmr *GooseMigrationRunner) RecreateDatabase() error

RecreateDatabase drops all existing tables and recreates them using goose migrations

func (*GooseMigrationRunner) Reset added in v0.0.13

func (gmr *GooseMigrationRunner) Reset() error

Reset drops all tables and re-runs all migrations

func (*GooseMigrationRunner) SetupGoose added in v0.0.13

func (gmr *GooseMigrationRunner) SetupGoose() error

SetupGoose initializes goose with embedded migrations

type GooseMigrationStatus added in v0.0.13

type GooseMigrationStatus struct {
	CurrentVersion int64 `json:"currentVersion"`
}

GooseMigrationStatus represents the current migration state using goose

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, cfg config.QueueConfig) (*Queue, error)

func (*Queue) AddFile

func (q *Queue) AddFile(ctx context.Context, path string, size int64) error

AddFile adds a file to the queue for processing

func (*Queue) AddFileWithPriority

func (q *Queue) AddFileWithPriority(ctx context.Context, path string, size int64, priority int) error

AddFileWithPriority adds a file to the queue with a specific priority

func (*Queue) AddFileWithPriorityWithoutDuplicateCheck

func (q *Queue) AddFileWithPriorityWithoutDuplicateCheck(ctx context.Context, path string, size int64, priority int) error

AddFileWithPriorityWithoutDuplicateCheck adds a file to the queue with a specific priority without checking for duplicates This is useful when original files are deleted after processing, making duplicate checks unnecessary

func (*Queue) AddFileWithoutDuplicateCheck

func (q *Queue) AddFileWithoutDuplicateCheck(ctx context.Context, path string, size int64) error

AddFileWithoutDuplicateCheck adds a file to the queue without checking for duplicates This is useful when original files are deleted after processing, making duplicate checks unnecessary

func (*Queue) ClearCompletedItems

func (q *Queue) ClearCompletedItems() error

ClearCompletedItems removes only completed items and their NZB files

func (*Queue) ClearQueue

func (q *Queue) ClearQueue() error

ClearQueue removes all completed, errored, and active items from the queue

func (*Queue) Close

func (q *Queue) Close() error

Close closes the database connection

func (*Queue) CompleteFile

func (q *Queue) CompleteFile(ctx context.Context, msgID goqite.ID, nzbPath string, job *FileJob) error

CompleteFile marks a file job as completed and adds it to completed_items table

func (*Queue) DebugQueueItem

func (q *Queue) DebugQueueItem(id string) (map[string]interface{}, error)

DebugQueueItem returns debug information about a specific queue item

func (*Queue) EnsureMigrationCompatibility added in v0.0.13

func (q *Queue) EnsureMigrationCompatibility() error

EnsureMigrationCompatibility ensures the database is compatible with goose migrations

func (*Queue) ExtendTimeout

func (q *Queue) ExtendTimeout(ctx context.Context, msgID goqite.ID, duration time.Duration) error

ExtendTimeout extends the processing timeout for a file job

func (*Queue) GetCompletedItemNzbPath

func (q *Queue) GetCompletedItemNzbPath(id string) (string, error)

GetCompletedItemNzbPath returns the NZB path for a completed item

func (*Queue) GetMigrationStatus added in v0.0.13

func (q *Queue) GetMigrationStatus() (*GooseMigrationStatus, error)

GetMigrationStatus returns the current migration status using goose

func (*Queue) GetQueueItems

func (q *Queue) GetQueueItems() ([]QueueItem, error)

GetQueueItems returns queue items for display including completed items

func (*Queue) GetQueueStats

func (q *Queue) GetQueueStats() (map[string]interface{}, error)

GetQueueStats returns statistics about the queue including completed and errored items

func (*Queue) IsLegacyDatabase added in v0.0.13

func (q *Queue) IsLegacyDatabase() (bool, error)

IsLegacyDatabase checks if the database is a legacy (non-goose) database

func (*Queue) IsPathInQueue added in v0.0.17

func (q *Queue) IsPathInQueue(path string) (bool, error)

IsPathInQueue checks if a file path already exists in pending queue, completed items, or errored items

func (*Queue) MarkAsError

func (q *Queue) MarkAsError(ctx context.Context, msgID goqite.ID, job *FileJob, errMsg string) error

MarkAsError marks a file job as errored and adds it to the errored_items table

func (*Queue) MigrateTo added in v0.0.13

func (q *Queue) MigrateTo(version int64) error

MigrateTo migrates to a specific version using goose

func (*Queue) ReaddJob

func (q *Queue) ReaddJob(ctx context.Context, job *FileJob) error

ReaddJob re-adds a job to the queue (used when processing fails)

func (*Queue) ReceiveFile

func (q *Queue) ReceiveFile(ctx context.Context) (*goqite.Message, *FileJob, error)

ReceiveFile gets the next file job from the queue and removes it immediately

func (*Queue) RecreateDatabase added in v0.0.13

func (q *Queue) RecreateDatabase() error

RecreateDatabase drops all tables and recreates them using goose migrations

func (*Queue) RemoveCompletedItem

func (q *Queue) RemoveCompletedItem(id string) error

RemoveCompletedItem removes a completed item and its associated NZB file

func (*Queue) RemoveErroredItem

func (q *Queue) RemoveErroredItem(id string) error

RemoveErroredItem removes an errored item from the database

func (*Queue) RemoveFromQueue

func (q *Queue) RemoveFromQueue(id string) error

RemoveFromQueue removes an item from the queue by ID (handles active, completed, and errored items)

func (*Queue) ResetDatabase added in v0.0.13

func (q *Queue) ResetDatabase() error

ResetDatabase drops all tables and re-runs all migrations using goose

func (*Queue) RetryErroredJob

func (q *Queue) RetryErroredJob(ctx context.Context, id string) error

RetryErroredJob retries an errored job by moving it back to the main queue

func (*Queue) RollbackMigration added in v0.0.13

func (q *Queue) RollbackMigration() error

RollbackMigration rolls back the last applied migration using goose

func (*Queue) RunMigrations added in v0.0.13

func (q *Queue) RunMigrations() error

RunMigrations runs all pending migrations using goose

func (*Queue) SetQueueItemPriority

func (q *Queue) SetQueueItemPriority(id string, priority int) error

SetQueueItemPriority updates the priority of a pending queue item by id

func (*Queue) SetQueueItemPriorityWithReorder added in v0.0.13

func (q *Queue) SetQueueItemPriorityWithReorder(ctx context.Context, id string, newPriority int) error

SetQueueItemPriorityWithReorder updates the priority of a pending queue item using goqite's native priority Higher priority numbers (0, 1, 2, ...) are processed first

type QueueInterface added in v0.0.13

type QueueInterface interface {
	AddFile(ctx context.Context, path string, size int64) error
	GetQueueItems() ([]QueueItem, error)
	RemoveFromQueue(id string) error
	ClearQueue() error
	GetQueueStats() (map[string]interface{}, error)
	SetQueueItemPriorityWithReorder(ctx context.Context, id string, newPriority int) error
	GetMigrationStatus() (*GooseMigrationStatus, error)
	RunMigrations() error
	RollbackMigration() error
	MigrateTo(version int64) error
	ResetDatabase() error
	IsLegacyDatabase() (bool, error)
	RecreateDatabase() error
	EnsureMigrationCompatibility() error
	IsPathInQueue(path string) (bool, error)
}

QueueInterface defines the interface for queue operations

type QueueItem

type QueueItem struct {
	ID           string     `json:"id"`
	Path         string     `json:"path"`
	FileName     string     `json:"fileName"`
	Size         int64      `json:"size"`
	Status       string     `json:"status"`
	RetryCount   int        `json:"retryCount"`
	Priority     int        `json:"priority"`
	ErrorMessage *string    `json:"errorMessage"`
	CreatedAt    time.Time  `json:"createdAt"`
	UpdatedAt    time.Time  `json:"updatedAt"`
	CompletedAt  *time.Time `json:"completedAt"`
	NzbPath      *string    `json:"nzbPath"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL