Documentation
¶
Index ¶
- Constants
- Variables
- func RegisterResumableTaskFactory(taskType string, factory ResumableTaskFactory)
- type DBTask
- func (t *DBTask) Cleanup(ctx context.Context) error
- func (t *DBTask) CorrelationID() uuid.UUID
- func (t *DBTask) Error() error
- func (t *DBTask) ErrorHistory() []error
- func (t *DBTask) Executed() time.Duration
- func (t *DBTask) ID() int
- func (t *DBTask) Lock()
- func (t *DBTask) Model() *ent.Task
- func (t *DBTask) OnError(err error, d time.Duration)
- func (t *DBTask) OnIterationComplete(d time.Duration)
- func (t *DBTask) OnPersisted(task *ent.Task)
- func (t *DBTask) OnRetry(err error)
- func (t *DBTask) OnStatusTransition(newStatus task.Status)
- func (t *DBTask) OnSuspend(time int64)
- func (t *DBTask) Owner() *ent.User
- func (t *DBTask) Persisted() bool
- func (t *DBTask) Progress(ctx context.Context) Progresses
- func (t *DBTask) ResumeAfter(next time.Duration)
- func (t *DBTask) ResumeTime() int64
- func (t *DBTask) Retried() int
- func (t *DBTask) ShouldPersist() bool
- func (t *DBTask) State() string
- func (t *DBTask) Status() task.Status
- func (t *DBTask) Summarize(hasher hashid.Encoder) *Summary
- func (t *DBTask) Type() string
- func (t *DBTask) Unlock()
- type Dep
- type InMemoryTask
- type Metric
- type Option
- func WithBackoffFactor(f float64) Option
- func WithBackoffMaxDuration(d time.Duration) Option
- func WithMaxRetry(n int) Option
- func WithMaxTaskExecution(d time.Duration) Option
- func WithName(name string) Option
- func WithResumeTaskType(types ...string) Option
- func WithRetryDelay(d time.Duration) Option
- func WithTaskPullInterval(d time.Duration) Option
- func WithWorkerCount(num int) Option
- type OptionFunc
- type Progress
- type Progresses
- type Queue
- type ResumableTaskFactory
- type Scheduler
- type Summary
- type Task
- type TaskRegistry
Constants ¶
View Source
const ( MediaMetaTaskType = "media_meta" EntityRecycleRoutineTaskType = "entity_recycle_routine" ExplicitEntityRecycleTaskType = "explicit_entity_recycle" UploadSentinelCheckTaskType = "upload_sentinel_check" CreateArchiveTaskType = "create_archive" ExtractArchiveTaskType = "extract_archive" RelocateTaskType = "relocate" RemoteDownloadTaskType = "remote_download" ImportTaskType = "import" SlaveCreateArchiveTaskType = "slave_create_archive" SlaveUploadTaskType = "slave_upload" SlaveExtractArchiveType = "slave_extract_archive" )
Variables ¶
View Source
var ( // ErrQueueShutdown the queue is released and closed. ErrQueueShutdown = errors.New("queue has been closed and released") // ErrMaxCapacity Maximum size limit reached ErrMaxCapacity = errors.New("golang-queue: maximum size limit reached") // ErrNoTaskInQueue there is nothing in the queue ErrNoTaskInQueue = errors.New("golang-queue: no Task in queue") )
View Source
var (
CriticalErr = errors.New("non-retryable error")
)
Functions ¶
func RegisterResumableTaskFactory ¶
func RegisterResumableTaskFactory(taskType string, factory ResumableTaskFactory)
RegisterResumableTaskFactory registers a resumable Task factory
Types ¶
type DBTask ¶
type DBTask struct {
DirectOwner *ent.User
Task *ent.Task
// contains filtered or unexported fields
}
DBTask implements Task interface related to DB schema
func (*DBTask) CorrelationID ¶
func (*DBTask) ErrorHistory ¶
func (*DBTask) OnIterationComplete ¶
func (*DBTask) OnPersisted ¶
func (*DBTask) OnStatusTransition ¶
func (*DBTask) ResumeAfter ¶
func (*DBTask) ResumeTime ¶
func (*DBTask) ShouldPersist ¶
type InMemoryTask ¶
type InMemoryTask struct {
*DBTask
}
InMemoryTask implements part Task interface using in-memory data.
func (*InMemoryTask) OnStatusTransition ¶
func (t *InMemoryTask) OnStatusTransition(newStatus task.Status)
func (*InMemoryTask) ShouldPersist ¶
func (i *InMemoryTask) ShouldPersist() bool
type Metric ¶
type Metric interface {
IncBusyWorker()
DecBusyWorker()
BusyWorkers() uint64
SuccessTasks() uint64
FailureTasks() uint64
SubmittedTasks() uint64
IncSuccessTask()
IncFailureTask()
IncSubmittedTask()
}
Metric interface
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
An Option configures a mutex.
func WithBackoffFactor ¶
WithBackoffFactor set backoff factor
func WithBackoffMaxDuration ¶
WithBackoffMaxDuration set backoff max duration
func WithMaxTaskExecution ¶
WithMaxTaskExecution set maximum execution time for a Task.
func WithResumeTaskType ¶
WithResumeTaskType set resume Task type
func WithTaskPullInterval ¶
WithTaskPullInterval set task pull interval
type Progresses ¶
type Queue ¶
type Queue interface {
// Start resume tasks and starts all workers.
Start()
// Shutdown stops all workers.
Shutdown()
// SubmitTask submits a Task to the queue.
QueueTask(ctx context.Context, t Task) error
// BusyWorkers returns the numbers of workers in the running process.
BusyWorkers() int
// BusyWorkers returns the numbers of success tasks.
SuccessTasks() int
// FailureTasks returns the numbers of failure tasks.
FailureTasks() int
// SubmittedTasks returns the numbers of submitted tasks.
SubmittedTasks() int
// SuspendingTasks returns the numbers of suspending tasks.
SuspendingTasks() int
}
func New ¶
func New(l logging.Logger, taskClient inventory.TaskClient, registry TaskRegistry, dep Dep, opts ...Option) Queue
type ResumableTaskFactory ¶
type Scheduler ¶
type Task ¶
type Task interface {
Do(ctx context.Context) (task.Status, error)
// ID returns the Task ID
ID() int
// Type returns the Task type
Type() string
// Status returns the Task status
Status() task.Status
// Owner returns the Task owner
Owner() *ent.User
// State returns the internal Task state
State() string
// ShouldPersist returns true if the Task should be persisted into DB
ShouldPersist() bool
// Persisted returns true if the Task is persisted in DB
Persisted() bool
// Executed returns the duration of the Task execution
Executed() time.Duration
// Retried returns the number of times the Task has been retried
Retried() int
// Error returns the error of the Task
Error() error
// ErrorHistory returns the error history of the Task
ErrorHistory() []error
// Model returns the ent model of the Task
Model() *ent.Task
// CorrelationID returns the correlation ID of the Task
CorrelationID() uuid.UUID
// ResumeTime returns the time when the Task is resumed
ResumeTime() int64
// ResumeAfter sets the time when the Task should be resumed
ResumeAfter(next time.Duration)
Progress(ctx context.Context) Progresses
// Summarize returns the Task summary for UI display
Summarize(hasher hashid.Encoder) *Summary
// OnSuspend is called when queue decides to suspend the Task
OnSuspend(time int64)
// OnPersisted is called when the Task is persisted or updated in DB
OnPersisted(task *ent.Task)
// OnError is called when the Task encounters an error
OnError(err error, d time.Duration)
// OnRetry is called when the iteration returns error and before retry
OnRetry(err error)
// OnIterationComplete is called when the one iteration is completed
OnIterationComplete(executed time.Duration)
// OnStatusTransition is called when the Task status is changed
OnStatusTransition(newStatus task.Status)
// Cleanup is called when the Task is done or error.
Cleanup(ctx context.Context) error
Lock()
Unlock()
}
type TaskRegistry ¶
type TaskRegistry interface {
// NextID returns the next available Task ID.
NextID() int
// Get returns the Task by ID.
Get(id int) (Task, bool)
// Set sets the Task by ID.
Set(id int, t Task)
// Delete deletes the Task by ID.
Delete(id int)
}
TaskRegistry is used in slave node to track in-memory stateful tasks.
func NewTaskRegistry ¶
func NewTaskRegistry() TaskRegistry
NewTaskRegistry creates a new TaskRegistry.
Click to show internal directories.
Click to hide internal directories.