queue

package
v4.0.0-...-a908ec4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: GPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MediaMetaTaskType             = "media_meta"
	EntityRecycleRoutineTaskType  = "entity_recycle_routine"
	ExplicitEntityRecycleTaskType = "explicit_entity_recycle"
	UploadSentinelCheckTaskType   = "upload_sentinel_check"
	CreateArchiveTaskType         = "create_archive"
	ExtractArchiveTaskType        = "extract_archive"
	RelocateTaskType              = "relocate"
	RemoteDownloadTaskType        = "remote_download"
	ImportTaskType                = "import"

	SlaveCreateArchiveTaskType = "slave_create_archive"
	SlaveUploadTaskType        = "slave_upload"
	SlaveExtractArchiveType    = "slave_extract_archive"
)

Variables

View Source
var (
	// ErrQueueShutdown the queue is released and closed.
	ErrQueueShutdown = errors.New("queue has been closed and released")
	// ErrMaxCapacity Maximum size limit reached
	ErrMaxCapacity = errors.New("golang-queue: maximum size limit reached")
	// ErrNoTaskInQueue there is nothing in the queue
	ErrNoTaskInQueue = errors.New("golang-queue: no Task in queue")
)
View Source
var (
	CriticalErr = errors.New("non-retryable error")
)

Functions

func RegisterResumableTaskFactory

func RegisterResumableTaskFactory(taskType string, factory ResumableTaskFactory)

RegisterResumableTaskFactory registers a resumable Task factory

Types

type DBTask

type DBTask struct {
	DirectOwner *ent.User
	Task        *ent.Task
	// contains filtered or unexported fields
}

DBTask implements Task interface related to DB schema

func (*DBTask) Cleanup

func (t *DBTask) Cleanup(ctx context.Context) error

func (*DBTask) CorrelationID

func (t *DBTask) CorrelationID() uuid.UUID

func (*DBTask) Error

func (t *DBTask) Error() error

func (*DBTask) ErrorHistory

func (t *DBTask) ErrorHistory() []error

func (*DBTask) Executed

func (t *DBTask) Executed() time.Duration

func (*DBTask) ID

func (t *DBTask) ID() int

func (*DBTask) Lock

func (t *DBTask) Lock()

func (*DBTask) Model

func (t *DBTask) Model() *ent.Task

func (*DBTask) OnError

func (t *DBTask) OnError(err error, d time.Duration)

func (*DBTask) OnIterationComplete

func (t *DBTask) OnIterationComplete(d time.Duration)

func (*DBTask) OnPersisted

func (t *DBTask) OnPersisted(task *ent.Task)

func (*DBTask) OnRetry

func (t *DBTask) OnRetry(err error)

func (*DBTask) OnStatusTransition

func (t *DBTask) OnStatusTransition(newStatus task.Status)

func (*DBTask) OnSuspend

func (t *DBTask) OnSuspend(time int64)

func (*DBTask) Owner

func (t *DBTask) Owner() *ent.User

func (*DBTask) Persisted

func (t *DBTask) Persisted() bool

func (*DBTask) Progress

func (t *DBTask) Progress(ctx context.Context) Progresses

func (*DBTask) ResumeAfter

func (t *DBTask) ResumeAfter(next time.Duration)

func (*DBTask) ResumeTime

func (t *DBTask) ResumeTime() int64

func (*DBTask) Retried

func (t *DBTask) Retried() int

func (*DBTask) ShouldPersist

func (t *DBTask) ShouldPersist() bool

func (*DBTask) State

func (t *DBTask) State() string

func (*DBTask) Status

func (t *DBTask) Status() task.Status

func (*DBTask) Summarize

func (t *DBTask) Summarize(hasher hashid.Encoder) *Summary

func (*DBTask) Type

func (t *DBTask) Type() string

func (*DBTask) Unlock

func (t *DBTask) Unlock()

type Dep

type Dep interface {
	ForkWithLogger(ctx context.Context, l logging.Logger) context.Context
}

type InMemoryTask

type InMemoryTask struct {
	*DBTask
}

InMemoryTask implements part Task interface using in-memory data.

func (*InMemoryTask) OnStatusTransition

func (t *InMemoryTask) OnStatusTransition(newStatus task.Status)

func (*InMemoryTask) ShouldPersist

func (i *InMemoryTask) ShouldPersist() bool

type Metric

type Metric interface {
	IncBusyWorker()
	DecBusyWorker()
	BusyWorkers() uint64
	SuccessTasks() uint64
	FailureTasks() uint64
	SubmittedTasks() uint64
	IncSuccessTask()
	IncFailureTask()
	IncSubmittedTask()
}

Metric interface

func NewMetric

func NewMetric() Metric

NewMetric for default metric structure

type Option

type Option interface {
	// contains filtered or unexported methods
}

An Option configures a mutex.

func WithBackoffFactor

func WithBackoffFactor(f float64) Option

WithBackoffFactor set backoff factor

func WithBackoffMaxDuration

func WithBackoffMaxDuration(d time.Duration) Option

WithBackoffMaxDuration set backoff max duration

func WithMaxRetry

func WithMaxRetry(n int) Option

WithMaxRetry set max retry

func WithMaxTaskExecution

func WithMaxTaskExecution(d time.Duration) Option

WithMaxTaskExecution set maximum execution time for a Task.

func WithName

func WithName(name string) Option

WithName set queue name

func WithResumeTaskType

func WithResumeTaskType(types ...string) Option

WithResumeTaskType set resume Task type

func WithRetryDelay

func WithRetryDelay(d time.Duration) Option

WithRetryDelay set retry delay

func WithTaskPullInterval

func WithTaskPullInterval(d time.Duration) Option

WithTaskPullInterval set task pull interval

func WithWorkerCount

func WithWorkerCount(num int) Option

WithWorkerCount set worker count

type OptionFunc

type OptionFunc func(*options)

OptionFunc is a function that configures a queue.

type Progress

type Progress struct {
	Total      int64  `json:"total"`
	Current    int64  `json:"current"`
	Identifier string `json:"identifier"`
}

type Progresses

type Progresses map[string]*Progress

type Queue

type Queue interface {
	// Start resume tasks and starts all workers.
	Start()
	// Shutdown stops all workers.
	Shutdown()
	// SubmitTask submits a Task to the queue.
	QueueTask(ctx context.Context, t Task) error
	// BusyWorkers returns the numbers of workers in the running process.
	BusyWorkers() int
	// BusyWorkers returns the numbers of success tasks.
	SuccessTasks() int
	// FailureTasks returns the numbers of failure tasks.
	FailureTasks() int
	// SubmittedTasks returns the numbers of submitted tasks.
	SubmittedTasks() int
	// SuspendingTasks returns the numbers of suspending tasks.
	SuspendingTasks() int
}

func New

func New(l logging.Logger, taskClient inventory.TaskClient, registry TaskRegistry, dep Dep, opts ...Option) Queue

type ResumableTaskFactory

type ResumableTaskFactory func(model *ent.Task) Task

type Scheduler

type Scheduler interface {
	// Queue add a new Task into the queue
	Queue(task Task) error
	// Request get a new Task from the queue
	Request() (Task, error)
	// Shutdown stop all worker
	Shutdown() error
}

func NewFifoScheduler

func NewFifoScheduler(queueSize int, logger logging.Logger) Scheduler

NewFifoScheduler for create new Scheduler instance

type Summary

type Summary struct {
	NodeID int            `json:"-"`
	Phase  string         `json:"phase,omitempty"`
	Props  map[string]any `json:"props,omitempty"`
}

type Task

type Task interface {
	Do(ctx context.Context) (task.Status, error)

	// ID returns the Task ID
	ID() int
	// Type returns the Task type
	Type() string
	// Status returns the Task status
	Status() task.Status
	// Owner returns the Task owner
	Owner() *ent.User
	// State returns the internal Task state
	State() string
	// ShouldPersist returns true if the Task should be persisted into DB
	ShouldPersist() bool
	// Persisted returns true if the Task is persisted in DB
	Persisted() bool
	// Executed returns the duration of the Task execution
	Executed() time.Duration
	// Retried returns the number of times the Task has been retried
	Retried() int
	// Error returns the error of the Task
	Error() error
	// ErrorHistory returns the error history of the Task
	ErrorHistory() []error
	// Model returns the ent model of the Task
	Model() *ent.Task
	// CorrelationID returns the correlation ID of the Task
	CorrelationID() uuid.UUID
	// ResumeTime returns the time when the Task is resumed
	ResumeTime() int64
	// ResumeAfter sets the time when the Task should be resumed
	ResumeAfter(next time.Duration)
	Progress(ctx context.Context) Progresses
	// Summarize returns the Task summary for UI display
	Summarize(hasher hashid.Encoder) *Summary
	// OnSuspend is called when queue decides to suspend the Task
	OnSuspend(time int64)
	// OnPersisted is called when the Task is persisted or updated in DB
	OnPersisted(task *ent.Task)
	// OnError is called when the Task encounters an error
	OnError(err error, d time.Duration)
	// OnRetry is called when the iteration returns error and before retry
	OnRetry(err error)
	// OnIterationComplete is called when the one iteration is completed
	OnIterationComplete(executed time.Duration)
	// OnStatusTransition is called when the Task status is changed
	OnStatusTransition(newStatus task.Status)

	// Cleanup is called when the Task is done or error.
	Cleanup(ctx context.Context) error

	Lock()
	Unlock()
}

func NewTaskFromModel

func NewTaskFromModel(model *ent.Task) (Task, error)

NewTaskFromModel creates a Task from ent.Task model

type TaskRegistry

type TaskRegistry interface {
	// NextID returns the next available Task ID.
	NextID() int
	// Get returns the Task by ID.
	Get(id int) (Task, bool)
	// Set sets the Task by ID.
	Set(id int, t Task)
	// Delete deletes the Task by ID.
	Delete(id int)
}

TaskRegistry is used in slave node to track in-memory stateful tasks.

func NewTaskRegistry

func NewTaskRegistry() TaskRegistry

NewTaskRegistry creates a new TaskRegistry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL