queue

package
v0.0.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusPending  = "pending"
	StatusRunning  = "running"
	StatusComplete = "complete"
	StatusError    = "error"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CompletedItem

type CompletedItem struct {
	ID          string    `json:"id"`
	Path        string    `json:"path"`
	Size        int64     `json:"size"`
	Priority    int       `json:"priority"`
	NzbPath     string    `json:"nzbPath"`
	CreatedAt   time.Time `json:"createdAt"`
	CompletedAt time.Time `json:"completedAt"`
	JobData     []byte    `json:"jobData"`
}

type FileJob

type FileJob struct {
	Path       string    `json:"path"`
	Size       int64     `json:"size"`
	Priority   int       `json:"priority"`
	CreatedAt  time.Time `json:"createdAt"`
	RetryCount int       `json:"retryCount"`
}

type PaginatedResult added in v0.0.26

type PaginatedResult struct {
	Items        []QueueItem `json:"items"`
	TotalItems   int         `json:"totalItems"`
	TotalPages   int         `json:"totalPages"`
	CurrentPage  int         `json:"currentPage"`
	ItemsPerPage int         `json:"itemsPerPage"`
	HasNext      bool        `json:"hasNext"`
	HasPrev      bool        `json:"hasPrev"`
}

PaginatedResult contains paginated queue items and metadata

type PaginationParams added in v0.0.26

type PaginationParams struct {
	Page   int    `json:"page"`   // 1-based page number
	Limit  int    `json:"limit"`  // Items per page
	SortBy string `json:"sortBy"` // Sort field: "created", "priority", "status", "filename"
	Order  string `json:"order"`  // Sort order: "asc", "desc"
}

PaginationParams defines parameters for paginated queries

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, database *database.Database) (*Queue, error)

func (*Queue) AddFile

func (q *Queue) AddFile(ctx context.Context, path string, size int64) error

AddFile adds a file to the queue for processing

func (*Queue) AddFileWithPriority

func (q *Queue) AddFileWithPriority(ctx context.Context, path string, size int64, priority int) error

AddFileWithPriority adds a file to the queue with a specific priority

func (*Queue) AddFileWithPriorityWithoutDuplicateCheck

func (q *Queue) AddFileWithPriorityWithoutDuplicateCheck(ctx context.Context, path string, size int64, priority int) error

AddFileWithPriorityWithoutDuplicateCheck adds a file to the queue with a specific priority without checking for duplicates This is useful when original files are deleted after processing, making duplicate checks unnecessary

func (*Queue) AddFileWithoutDuplicateCheck

func (q *Queue) AddFileWithoutDuplicateCheck(ctx context.Context, path string, size int64) error

AddFileWithoutDuplicateCheck adds a file to the queue without checking for duplicates This is useful when original files are deleted after processing, making duplicate checks unnecessary

func (*Queue) ClearCompletedItems

func (q *Queue) ClearCompletedItems() error

ClearCompletedItems removes only completed items and their NZB files

func (*Queue) ClearQueue

func (q *Queue) ClearQueue() error

ClearQueue removes all completed, errored, and active items from the queue

func (*Queue) Close

func (q *Queue) Close() error

Close closes the queue context (database is managed externally)

func (*Queue) CompleteFile

func (q *Queue) CompleteFile(ctx context.Context, msgID goqite.ID, nzbPath string, job *FileJob) error

CompleteFile marks a file job as completed and adds it to completed_items table

func (*Queue) DebugQueueItem

func (q *Queue) DebugQueueItem(id string) (map[string]interface{}, error)

DebugQueueItem returns debug information about a specific queue item

func (*Queue) ExtendTimeout

func (q *Queue) ExtendTimeout(ctx context.Context, msgID goqite.ID, duration time.Duration) error

ExtendTimeout extends the processing timeout for a file job

func (*Queue) GetCompletedItemNzbPath

func (q *Queue) GetCompletedItemNzbPath(id string) (string, error)

GetCompletedItemNzbPath returns the NZB path for a completed item

func (*Queue) GetQueueItems

func (q *Queue) GetQueueItems(params PaginationParams) (*PaginatedResult, error)

GetQueueItems returns paginated queue items with metadata

func (*Queue) GetQueueStats

func (q *Queue) GetQueueStats() (map[string]interface{}, error)

GetQueueStats returns statistics about the queue including completed and errored items

func (*Queue) IsPathInQueue added in v0.0.17

func (q *Queue) IsPathInQueue(path string) (bool, error)

IsPathInQueue checks if a file path already exists in pending queue, completed items, or errored items

func (*Queue) MarkAsError

func (q *Queue) MarkAsError(ctx context.Context, msgID goqite.ID, job *FileJob, errMsg string) error

MarkAsError marks a file job as errored and adds it to the errored_items table

func (*Queue) ReaddJob

func (q *Queue) ReaddJob(ctx context.Context, job *FileJob) error

ReaddJob re-adds a job to the queue (used when processing fails)

func (*Queue) ReceiveFile

func (q *Queue) ReceiveFile(ctx context.Context) (*goqite.Message, *FileJob, error)

ReceiveFile gets the next file job from the queue and removes it immediately

func (*Queue) RemoveCompletedItem

func (q *Queue) RemoveCompletedItem(id string) error

RemoveCompletedItem removes a completed item and its associated NZB file

func (*Queue) RemoveErroredItem

func (q *Queue) RemoveErroredItem(id string) error

RemoveErroredItem removes an errored item from the database

func (*Queue) RemoveFromQueue

func (q *Queue) RemoveFromQueue(id string) error

RemoveFromQueue removes an item from the queue by ID (handles active, completed, and errored items)

func (*Queue) RetryErroredJob

func (q *Queue) RetryErroredJob(ctx context.Context, id string) error

RetryErroredJob retries an errored job by moving it back to the main queue

func (*Queue) SetQueueItemPriority

func (q *Queue) SetQueueItemPriority(id string, priority int) error

SetQueueItemPriority updates the priority of a pending queue item by id

func (*Queue) SetQueueItemPriorityWithReorder added in v0.0.13

func (q *Queue) SetQueueItemPriorityWithReorder(ctx context.Context, id string, newPriority int) error

SetQueueItemPriorityWithReorder updates the priority of a pending queue item using goqite's native priority Higher priority numbers (0, 1, 2, ...) are processed first

type QueueInterface added in v0.0.13

type QueueInterface interface {
	AddFile(ctx context.Context, path string, size int64) error
	GetQueueItems(params PaginationParams) (*PaginatedResult, error)
	RemoveFromQueue(id string) error
	ClearQueue() error
	GetQueueStats() (map[string]interface{}, error)
	SetQueueItemPriorityWithReorder(ctx context.Context, id string, newPriority int) error
	IsPathInQueue(path string) (bool, error)
}

QueueInterface defines the interface for queue operations

type QueueItem

type QueueItem struct {
	ID           string     `json:"id"`
	Path         string     `json:"path"`
	FileName     string     `json:"fileName"`
	Size         int64      `json:"size"`
	Status       string     `json:"status"`
	RetryCount   int        `json:"retryCount"`
	Priority     int        `json:"priority"`
	ErrorMessage *string    `json:"errorMessage"`
	CreatedAt    time.Time  `json:"createdAt"`
	UpdatedAt    time.Time  `json:"updatedAt"`
	CompletedAt  *time.Time `json:"completedAt"`
	NzbPath      *string    `json:"nzbPath"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL