Documentation
¶
Index ¶
- func IsRecoverable(err error) bool
- func NewErr(msg string) error
- func Unrecoverable(err error) error
- type Base
- func (b *Base) Cancel()
- func (b *Base) Ctx() context.Context
- func (b *Base) CtxDone() <-chan struct{}
- func (b *Base) GetErr() error
- func (b *Base) GetID() string
- func (b *Base) GetProgress() float64
- func (b *Base) GetRetry() (int, int)
- func (b *Base) GetState() State
- func (b *Base) Persist()
- func (b *Base) SetCancelFunc(cancelFunc context.CancelFunc)
- func (b *Base) SetCtx(ctx context.Context)
- func (b *Base) SetErr(err error)
- func (b *Base) SetID(id string)
- func (b *Base) SetPersist(persist func())
- func (b *Base) SetProgress(progress float64)
- func (b *Base) SetRetry(retry int, maxRetry int)
- func (b *Base) SetState(state State)
- type Info
- type Manager
- func (m *Manager[T]) Add(task T)
- func (m *Manager[T]) Cancel(id string)
- func (m *Manager[T]) CancelAll()
- func (m *Manager[T]) CancelByCondition(condition func(task T) bool)
- func (m *Manager[T]) GetAll() []T
- func (m *Manager[T]) GetByCondition(condition func(task T) bool) []T
- func (m *Manager[T]) GetByID(id string) (T, bool)
- func (m *Manager[T]) GetByState(state ...State) []T
- func (m *Manager[T]) Pause()
- func (m *Manager[T]) Remove(id string)
- func (m *Manager[T]) RemoveAll()
- func (m *Manager[T]) RemoveByCondition(condition func(task T) bool)
- func (m *Manager[T]) RemoveByState(state ...State)
- func (m *Manager[T]) Retry(id string)
- func (m *Manager[T]) RetryAllFailed()
- func (m *Manager[T]) SetWorkersNumActive(active int64)
- func (m *Manager[T]) Start()
- func (m *Manager[T]) Wait()
- type OnBeforeRetry
- type OnFailed
- type OnSucceeded
- type Option
- func WithLogger(logger *slog.Logger) Option
- func WithMaxRetry(maxRetry int) Option
- func WithOptions(opts Options) Option
- func WithPersistDebounce(debounce time.Duration) Option
- func WithPersistFunction(r func() ([]byte, error), w func([]byte) error) Option
- func WithPersistPath(path string) Option
- func WithRunning(running bool) Option
- func WithTimeout(timeout time.Duration) Option
- func WithWorks(works int) Option
- type Options
- type Persistable
- type Recoverable
- type Retryable
- type State
- type TacheError
- type Task
- type TaskBase
- type TaskWithInfo
- type Worker
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsRecoverable ¶
IsRecoverable checks if error is an instance of `unrecoverableError`
func Unrecoverable ¶
Unrecoverable wraps an error in `unrecoverableError` struct
Types ¶
type Base ¶
type Base struct {
ID string `json:"id"`
State State `json:"state"`
Retry int `json:"retry"`
MaxRetry int `json:"max_retry"`
// contains filtered or unexported fields
}
Base is the base struct for all tasks to implement TaskBase interface
func (*Base) GetProgress ¶
func (*Base) SetCancelFunc ¶
func (b *Base) SetCancelFunc(cancelFunc context.CancelFunc)
func (*Base) SetPersist ¶
func (b *Base) SetPersist(persist func())
func (*Base) SetProgress ¶
type Manager ¶
type Manager[T Task] struct { // contains filtered or unexported fields }
Manager is the manager of all tasks
func NewManager ¶
NewManager create a new manager
func (*Manager[T]) CancelByCondition ¶
CancelByCondition cancel tasks under specific condition given by a function
func (*Manager[T]) GetByCondition ¶
GetByCondition get tasks under specific condition given by a function
func (*Manager[T]) GetByState ¶
GetByState get tasks by state
func (*Manager[T]) RemoveByCondition ¶
RemoveByCondition remove tasks under specific condition given by a function
func (*Manager[T]) RemoveByState ¶
RemoveByState remove tasks by state
func (*Manager[T]) RetryAllFailed ¶
func (m *Manager[T]) RetryAllFailed()
RetryAllFailed retry all failed tasks
func (*Manager[T]) SetWorkersNumActive ¶
type OnBeforeRetry ¶
type OnBeforeRetry interface {
OnBeforeRetry()
}
OnBeforeRetry is the interface for tasks that need to be executed before retrying
type OnFailed ¶
type OnFailed interface {
OnFailed()
}
OnFailed is the interface for tasks that need to be executed when they fail
type OnSucceeded ¶
type OnSucceeded interface {
OnSucceeded()
}
OnSucceeded is the interface for tasks that need to be executed when they succeed
type Option ¶
type Option func(*Options)
Option is the option for manager
func WithPersistDebounce ¶
WithPersistDebounce set persist debounce
func WithPersistFunction ¶
type Options ¶
type Options struct {
Works int
MaxRetry int
Timeout *time.Duration
PersistPath string
PersistDebounce *time.Duration
Running bool
Logger *slog.Logger
PersistReadFunction func() ([]byte, error)
PersistWriteFunction func([]byte) error
}
Options is the options for manager
type Persistable ¶
type Persistable interface {
Persistable() bool
}
Persistable judge whether the task is persistable
type Recoverable ¶
type Recoverable interface {
Recoverable() bool
}
Recoverable judge whether the task is recoverable
type Retryable ¶
type Retryable interface {
Retryable() bool
}
Retryable judge whether the task is retryable
type State ¶
type State int
State is the state of a task
const ( // StatePending is the state of a task when it is pending StatePending State = iota // StateRunning is the state of a task when it is running StateRunning // StateSucceeded is the state of a task when it succeeded StateSucceeded // StateCanceling is the state of a task when it is canceling StateCanceling // StateCanceled is the state of a task when it is canceled StateCanceled // StateErrored is the state of a task when it is errored (it will be retried) StateErrored // StateFailing is the state of a task when it is failing (executed OnFailed hook) StateFailing // StateFailed is the state of a task when it failed (no retry times left) StateFailed // StateWaitingRetry is the state of a task when it is waiting for retry StateWaitingRetry // StateBeforeRetry is the state of a task when it is executing OnBeforeRetry hook StateBeforeRetry )
type TacheError ¶
type TacheError struct {
Msg string
}
TacheError is a custom error type
func (*TacheError) Error ¶
func (e *TacheError) Error() string
type TaskBase ¶
type TaskBase interface {
// SetProgress sets the progress of the task
SetProgress(progress float64)
// GetProgress gets the progress of the task
GetProgress() float64
// SetState sets the state of the task
SetState(state State)
// GetState gets the state of the task
GetState() State
// GetID gets the ID of the task
GetID() string
// SetID sets the ID of the task
SetID(id string)
// SetErr sets the error of the task
SetErr(err error)
// GetErr gets the error of the task
GetErr() error
// SetCtx sets the context of the task
SetCtx(ctx context.Context)
// CtxDone gets the context done channel of the task
CtxDone() <-chan struct{}
// Cancel cancels the task
Cancel()
// Ctx gets the context of the task
Ctx() context.Context
// SetCancelFunc sets the cancel function of the task
SetCancelFunc(cancelFunc context.CancelFunc)
// GetRetry gets the retry of the task
GetRetry() (int, int)
// SetRetry sets the retry of the task
SetRetry(retry int, maxRetry int)
// Persist persists the task
Persist()
// SetPersist sets the persist function of the task
SetPersist(persist func())
}
TaskBase is the base interface for all tasks
type TaskWithInfo ¶
type WorkerPool ¶
type WorkerPool[T Task] struct { // contains filtered or unexported fields }
WorkerPool is the pool of workers
func NewWorkerPool ¶
func NewWorkerPool[T Task](size int) *WorkerPool[T]
NewWorkerPool creates a new worker pool
func (*WorkerPool[T]) Put ¶
func (wp *WorkerPool[T]) Put(worker *Worker[T])
Put puts a worker back to pool
func (*WorkerPool[T]) SetNumActive ¶
func (wp *WorkerPool[T]) SetNumActive(active int64)