queue

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package queue provides queue management for the NZB import service.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsDatabaseContentionError

func IsDatabaseContentionError(err error) bool

IsDatabaseContentionError checks if an error is a retryable database contention error

Types

type Claimer

type Claimer struct {
	// contains filtered or unexported fields
}

Claimer handles claiming queue items with retry logic

func NewClaimer

func NewClaimer(repo QueueRepository) *Claimer

NewClaimer creates a new Claimer

func (*Claimer) ClaimWithRetry

func (c *Claimer) ClaimWithRetry(ctx context.Context, workerID int) (*database.ImportQueueItem, error)

ClaimWithRetry attempts to claim a queue item with exponential backoff retry logic

type ItemProcessor

type ItemProcessor interface {
	// ProcessItem processes a single queue item and returns the resulting path or an error
	ProcessItem(ctx context.Context, item *database.ImportQueueItem) (string, error)
	// HandleSuccess handles successful processing
	HandleSuccess(ctx context.Context, item *database.ImportQueueItem, resultingPath string) error
	// HandleFailure handles failed processing
	HandleFailure(ctx context.Context, item *database.ImportQueueItem, err error)
}

ItemProcessor defines the interface for processing queue items

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages queue workers and processing

func NewManager

func NewManager(cfg ManagerConfig, repository *database.QueueRepository, processor ItemProcessor, listener QueueEventListener) *Manager

NewManager creates a new queue manager

func (*Manager) CancelProcessing

func (m *Manager) CancelProcessing(itemID int64) error

CancelProcessing cancels processing for a specific item

func (*Manager) IsPaused

func (m *Manager) IsPaused() bool

IsPaused returns whether the manager is paused

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning returns whether the manager is running

func (*Manager) Pause

func (m *Manager) Pause()

Pause pauses queue processing

func (*Manager) Resume

func (m *Manager) Resume()

Resume resumes queue processing

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts the queue workers

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context) error

Stop stops the queue workers

type ManagerConfig

type ManagerConfig struct {
	Workers      int
	ConfigGetter config.ConfigGetter
}

ManagerConfig holds configuration for the queue manager

type QueueEventListener

type QueueEventListener interface {
	OnItemClaimed(ctx context.Context, item *database.ImportQueueItem)
}

QueueEventListener receives notifications about queue item lifecycle events.

type QueueRepository

type QueueRepository interface {
	ClaimNextQueueItem(ctx context.Context) (*database.ImportQueueItem, error)
}

QueueRepository defines the interface for queue database operations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL