Documentation
¶
Overview ¶
Package task provides task and worker pool management for the Weblens application.
Index ¶
- Constants
- Variables
- func NewTestContext() context_mod.Z
- func NewTestContextWithCancel() (context_mod.Z, context.CancelFunc)
- type Dispatcher
- type ExitStatus
- type HandlerFunc
- type Metadata
- type Options
- type Pool
- func (tp *Pool) AddCleanup(fn PoolCleanupFunc)
- func (tp *Pool) AddError(t *Task)
- func (tp *Pool) Cancel()
- func (tp *Pool) CreatedInTask() *Task
- func (tp *Pool) Errors() []*Task
- func (tp *Pool) GetCompletedTaskCount() int
- func (tp *Pool) GetRootPool() *Pool
- func (tp *Pool) GetTasks() []*Task
- func (tp *Pool) GetTotalTaskCount() int
- func (tp *Pool) GetWorkerPool() *WorkerPool
- func (tp *Pool) HandleTaskExit(isReplacementThread bool) (canContinue bool)
- func (tp *Pool) ID() string
- func (tp *Pool) IncCompletedTasks(count int)
- func (tp *Pool) IncTaskCount(count int)
- func (tp *Pool) IsGlobal() bool
- func (tp *Pool) IsRoot() bool
- func (tp *Pool) LockExit()
- func (tp *Pool) MarkGlobal()
- func (tp *Pool) QueueTask(tsk *Task) (err error)
- func (tp *Pool) RemoveTask(taskID string)
- func (tp *Pool) SignalAllQueued()
- func (tp *Pool) Status() PoolStatus
- func (tp *Pool) UnlockExit()
- func (tp *Pool) Wait(supplementWorker bool, task ...*Task)
- type PoolCleanupFunc
- type PoolStatus
- type QueueState
- type Result
- type Task
- func (t *Task) AtomicSetResult(fn func(Result) Result)
- func (t *Task) Cancel()
- func (t *Task) ClearAndRecompute()
- func (t *Task) ClearOnResult()
- func (t *Task) ClearTimeout()
- func (t *Task) ExeTime() time.Duration
- func (t *Task) Fail(err error)
- func (t *Task) FirstResultChan() <-chan struct{}
- func (t *Task) GetChildTaskPool() *Pool
- func (t *Task) GetFinishTime() time.Time
- func (t *Task) GetMeta() Metadata
- func (t *Task) GetResult() Result
- func (t *Task) GetResults() Result
- func (t *Task) GetStartTime() time.Time
- func (t *Task) GetTaskPool() *Pool
- func (t *Task) GetTimeout() time.Time
- func (t *Task) GetWorkerID() int
- func (t *Task) ID() string
- func (t *Task) JobName() string
- func (t *Task) Log() *zerolog.Logger
- func (t *Task) Manipulate(fn func(meta Metadata) error) error
- func (t *Task) OnResult(callback func(Result))
- func (t *Task) QueueState() QueueState
- func (t *Task) QueueTimeDuration() time.Duration
- func (t *Task) ReadError() error
- func (t *Task) ReqNoErr(err error)
- func (t *Task) SetChildTaskPool(pool *Pool)
- func (t *Task) SetCleanup(cleanup HandlerFunc)
- func (t *Task) SetErrorCleanup(cleanup HandlerFunc)
- func (t *Task) SetPostAction(action func(Result))
- func (t *Task) SetQueueState(s QueueState)
- func (t *Task) SetResult(results Result)
- func (t *Task) SetTimeout(timeout time.Time)
- func (t *Task) Status() (bool, ExitStatus)
- func (t *Task) Success(msg ...any)
- func (t *Task) Wait()
- type WorkerPool
- func (wp *WorkerPool) AddHit(time time.Time, target *Task)
- func (wp *WorkerPool) DispatchJob(ctx context.Context, jobName string, meta Metadata, pool *Pool) (*Task, error)
- func (wp *WorkerPool) GetTask(taskID string) *Task
- func (wp *WorkerPool) GetTaskPool(tpID string) *Pool
- func (wp *WorkerPool) GetTaskPoolByJobName(jobName string) *Pool
- func (wp *WorkerPool) GetTasks() []*Task
- func (wp *WorkerPool) GetTasksByJobName(jobName string) []*Task
- func (wp *WorkerPool) NewTaskPool(replace bool, createdBy *Task) (*Pool, error)
- func (wp *WorkerPool) RegisterJob(jobName string, fn HandlerFunc, opts ...Options)
- func (wp *WorkerPool) Run(ctx context.Context)
- func (wp *WorkerPool) Status() (int, int, int, int, int)
Constants ¶
const GlobalTaskPoolID = "GLOBAL"
GlobalTaskPoolID is the identifier for the global task pool that is always available.
Variables ¶
var ErrChildTaskFailed = wlerrors.New("child task failed")
ErrChildTaskFailed indicates that one or more child tasks in a pool failed.
var ErrTaskAlreadyComplete = wlerrors.New("task already complete")
ErrTaskAlreadyComplete indicates an attempt to execute an already completed task.
var ErrTaskCanceled = wlerrors.New("task canceled")
ErrTaskCanceled indicates a task was canceled before completion.
var ErrTaskError = wlerrors.New("task error")
ErrTaskError indicates a task encountered an error during execution.
var ErrTaskExit = wlerrors.New("task exit")
ErrTaskExit indicates a task has exited.
var ErrTaskTimeout = wlerrors.New("task timeout")
ErrTaskTimeout indicates a task exceeded its timeout duration.
Functions ¶
func NewTestContext ¶
func NewTestContext() context_mod.Z
NewTestContext creates a context implementing context_mod.Z for testing.
func NewTestContextWithCancel ¶
func NewTestContextWithCancel() (context_mod.Z, context.CancelFunc)
NewTestContextWithCancel creates a cancelable context implementing context_mod.Z for testing.
Types ¶
type Dispatcher ¶
type Dispatcher interface {
// DispatchJob creates and dispatches a new job with the specified name and metadata.
DispatchJob(ctx context.Context, jobName string, meta Metadata) (*Task, error)
}
Dispatcher provides an interface for dispatching jobs with associated metadata.
type ExitStatus ¶
type ExitStatus string
ExitStatus represents the final state of a completed task.
const ( // TaskNoStatus indicates the task has not yet completed. TaskNoStatus ExitStatus = "" // TaskSuccess indicates the task completed successfully. TaskSuccess ExitStatus = "success" // TaskCanceled indicates the task was canceled before completion. TaskCanceled ExitStatus = "canceled" // TaskError indicates the task failed with an error. TaskError ExitStatus = "error" )
type HandlerFunc ¶
type HandlerFunc func(task *Task)
HandlerFunc defines the function signature for task execution functions.
type Metadata ¶
type Metadata interface {
JobName() string
MetaString() string
FormatToResult() Result
Verify() error
}
Metadata provides information about a task including its name, configuration, and validation.
func NewTestMetadata ¶
NewTestMetadata creates a Metadata instance for testing.
type Options ¶
type Options struct {
// Persistent indicates whether the task should persist after completion.
// If true, the task's state and results will be saved in the pool to avoid recomputation.
// If false, the task will be removed from memory after it has completed, and if the same task is
// queued again (assuming the Unique flag is false), it will be recomputed from scratch.
Persistent bool
// Unique indicates whether the task should be unique in the queue.
// If true, duplicate tasks (based on metadata string) will not be added to the queue, and instead the existing
// task with matching metadata will be returned. Otherwise, multiple tasks with the same metadata can coexist in the queue.
Unique bool
}
Options specifies configuration options for task behavior.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool manages a collection of tasks and tracks their execution progress.
func (*Pool) AddCleanup ¶
func (tp *Pool) AddCleanup(fn PoolCleanupFunc)
AddCleanup registers a cleanup function to run when the pool completes.
func (*Pool) CreatedInTask ¶
CreatedInTask returns the task that created this pool, or nil if created outside a task.
func (*Pool) GetCompletedTaskCount ¶
GetCompletedTaskCount returns the number of tasks that have completed in this pool.
func (*Pool) GetRootPool ¶
GetRootPool returns the root task pool in the hierarchy.
func (*Pool) GetTotalTaskCount ¶
GetTotalTaskCount returns the total number of tasks that have been added to this pool.
func (*Pool) GetWorkerPool ¶
func (tp *Pool) GetWorkerPool() *WorkerPool
GetWorkerPool returns the worker pool that manages this task pool.
func (*Pool) HandleTaskExit ¶
HandleTaskExit processes task completion and determines if the worker should continue.
func (*Pool) IncCompletedTasks ¶
IncCompletedTasks increments the completed task count for this pool by the specified amount.
func (*Pool) IncTaskCount ¶
IncTaskCount increments the total task count for this pool by the specified amount.
func (*Pool) LockExit ¶
func (tp *Pool) LockExit()
LockExit acquires the exit lock for this task pool.
func (*Pool) MarkGlobal ¶
func (tp *Pool) MarkGlobal()
MarkGlobal specifies the work queue as being a "global" one
func (*Pool) RemoveTask ¶
RemoveTask removes the task with the specified ID from this pool.
func (*Pool) SignalAllQueued ¶
func (tp *Pool) SignalAllQueued()
SignalAllQueued marks that all tasks have been queued to this pool.
func (*Pool) Status ¶
func (tp *Pool) Status() PoolStatus
Status returns the current status of the task pool including completion progress.
func (*Pool) UnlockExit ¶
func (tp *Pool) UnlockExit()
UnlockExit releases the exit lock for this task pool.
func (*Pool) Wait ¶
Wait Parks the thread on the work queue until all the tasks have been queued and finish. **If you never call tp.SignalAllQueued(), the waiters will never wake up** Make sure that you SignalAllQueued before parking here if it is the only thread loading tasks. If you are parking a thread that is currently executing a task, you can pass that task in as well, and that task will also listen for exit events.
type PoolCleanupFunc ¶
type PoolCleanupFunc func(tp *Pool)
PoolCleanupFunc defines a function type for cleaning up resources associated with a task pool.
type PoolStatus ¶
type PoolStatus struct {
// The count of tasks that have completed on this task pool.
// Complete *DOES* include failed tasks
Complete int64
// The count of failed tasks on this task pool
Failed int64
// The count of all tasks that have been queued on this task pool
Total int64
// Percent to completion of all tasks
Progress float64
// How long the pool has been alive
Runtime time.Duration
}
PoolStatus represents the current state and progress of a task pool.
type QueueState ¶
type QueueState int
QueueState represents the current state of a task in the queue.
const ( Created QueueState = iota InQueue Executing Sleeping Exited )
Task queue state constants.
type Task ¶
type Task struct {
QueueTime *wlatomic.AtomicValue[time.Time] // required
StartTime *wlatomic.AtomicValue[time.Time]
FinishTime *wlatomic.AtomicValue[time.Time]
Ctx context.Context
WorkerID int64
// contains filtered or unexported fields
}
Task represents a unit of work that can be queued and executed by a worker pool.
func NewTestQueueableTask ¶
NewTestQueueableTask creates a Task that can be queued for testing. Unlike NewTestTask, this creates a task in the Created state that can be queued.
func NewTestTask ¶
func NewTestTask(id, jobName string, workerID int64, exitStatus ExitStatus, result Result, startTime time.Time) *Task
NewTestTask creates a Task for testing purposes. This allows external packages to create Task instances with specific state. Only available when building with -tags=test.
func (*Task) AtomicSetResult ¶
AtomicSetResult atomically updates the task result using the provided function.
func (*Task) Cancel ¶
func (t *Task) Cancel()
Cancel - Unknowable if this is the last operation of a task, so t.success() will not have an effect after a task is cancelled. t.error() may override the exit status in special cases, such as a timeout, which is both an error and a reason for cancellation.
Cancellations are always external to the task. From within the body of the task, either error or success should be called. If a task finds itself not required to continue, and should exit early, success should be returned instead of cancel.
func (*Task) ClearAndRecompute ¶
func (t *Task) ClearAndRecompute()
ClearAndRecompute cancels the task, clears its state, and re-queues it.
func (*Task) ClearOnResult ¶
func (t *Task) ClearOnResult()
ClearOnResult removes the result callback.
func (*Task) Fail ¶
Fail will set the error on the task, and then panic with ErrTaskError, which informs the worker recovery function to exit the task with the error that is set, and not treat it as a real panic.
func (*Task) FirstResultChan ¶
func (t *Task) FirstResultChan() <-chan struct{}
FirstResultChan returns a channel that is closed the first time the task's result is set via SetResult. Callers can use it as a readiness signal to avoid polling for early task progress. The channel is created with the task and never re-opened — once closed it stays closed for the task's lifetime.
func (*Task) GetChildTaskPool ¶
GetChildTaskPool returns the child task pool, if any.
func (*Task) GetFinishTime ¶
GetFinishTime returns the time when the task finished executing.
func (*Task) GetResults ¶
GetResults returns a copy of the task result.
func (*Task) GetStartTime ¶
GetStartTime returns the time when the task started executing.
func (*Task) GetTaskPool ¶
GetTaskPool returns the task pool this task belongs to.
func (*Task) GetTimeout ¶
GetTimeout returns the task timeout.
func (*Task) GetWorkerID ¶
GetWorkerID returns the ID of the worker executing this task.
func (*Task) Manipulate ¶
Manipulate is used to change the metadata of a task while it is running. This can be useful to have a task be waiting for input from a client, and this function can be used to send that data to the task via a chan, for example.
func (*Task) OnResult ¶
OnResult installs a callback function that is called every time the task result is set
func (*Task) QueueState ¶
func (t *Task) QueueState() QueueState
QueueState returns the current queue state of the task.
func (*Task) QueueTimeDuration ¶
QueueTimeDuration returns how long the task waited in the queue.
func (*Task) SetChildTaskPool ¶
SetChildTaskPool sets the child task pool for this task.
func (*Task) SetCleanup ¶
func (t *Task) SetCleanup(cleanup HandlerFunc)
SetCleanup takes a function to be run after the task has completed, no matter the exit status. Many cleanup functions can be registered to run in sequence after the task completes. The cleanup functions are run in the order they are registered. Modifications to the task state should not be made in the cleanup functions (i.e. read-only), as the task has already completed, and may result in a deadlock. If the task has already completed, this function will NOT be called. Therefore, it is only safe to call SetCleanup() from inside of a task handler. If you want to register a function from outside the task handler, or to run after the task has completed successfully, use t.SetPostAction() instead.
func (*Task) SetErrorCleanup ¶
func (t *Task) SetErrorCleanup(cleanup HandlerFunc)
SetErrorCleanup works the same as t.SetCleanup(), but only runs if the task errors
func (*Task) SetPostAction ¶
SetPostAction takes a function to be run after the task has successfully completed with the task results as the input of the function
func (*Task) SetQueueState ¶
func (t *Task) SetQueueState(s QueueState)
SetQueueState sets the queue state of the task.
func (*Task) SetTimeout ¶
SetTimeout sets a timeout for the task.
func (*Task) Status ¶
func (t *Task) Status() (bool, ExitStatus)
Status returns a boolean representing if a task has completed, and a string describing its exit type, if completed.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of workers that execute tasks from task pools.
func NewTestWorkerPool ¶
func NewTestWorkerPool(numWorkers int) *WorkerPool
NewTestWorkerPool creates a WorkerPool for testing purposes.
func NewWorkerPool ¶
func NewWorkerPool(initWorkers int) *WorkerPool
NewWorkerPool creates and initializes a new worker pool with the specified number of workers.
func (*WorkerPool) AddHit ¶
func (wp *WorkerPool) AddHit(time time.Time, target *Task)
AddHit schedules a timeout check for the specified task at the given time.
func (*WorkerPool) DispatchJob ¶
func (wp *WorkerPool) DispatchJob(ctx context.Context, jobName string, meta Metadata, pool *Pool) (*Task, error)
DispatchJob creates and queues a new task for the specified registered job.
func (*WorkerPool) GetTask ¶
func (wp *WorkerPool) GetTask(taskID string) *Task
GetTask returns the task with the specified ID.
func (*WorkerPool) GetTaskPool ¶
func (wp *WorkerPool) GetTaskPool(tpID string) *Pool
GetTaskPool returns the task pool with the specified ID.
func (*WorkerPool) GetTaskPoolByJobName ¶
func (wp *WorkerPool) GetTaskPoolByJobName(jobName string) *Pool
GetTaskPoolByJobName returns the task pool created by a task with the specified job name.
func (*WorkerPool) GetTasks ¶
func (wp *WorkerPool) GetTasks() []*Task
GetTasks returns all tasks currently managed by this worker pool.
func (*WorkerPool) GetTasksByJobName ¶
func (wp *WorkerPool) GetTasksByJobName(jobName string) []*Task
GetTasksByJobName returns all tasks with the specified job name.
func (*WorkerPool) NewTaskPool ¶
func (wp *WorkerPool) NewTaskPool(replace bool, createdBy *Task) (*Pool, error)
NewTaskPool `replace` spawns a temporary replacement thread on the parent worker pool. This prevents a deadlock when the queue fills up while adding many tasks, and none are being executed
`parent` allows chaining of task pools for floating updates to the top. This makes it possible for clients to subscribe to a single task, and get notified about all of the sub-updates of that task See taskPool.go
func (*WorkerPool) RegisterJob ¶
func (wp *WorkerPool) RegisterJob(jobName string, fn HandlerFunc, opts ...Options)
RegisterJob adds a template for a repeatable job that can be called upon later in the program
func (*WorkerPool) Run ¶
func (wp *WorkerPool) Run(ctx context.Context)
Run launches the standard threads for this worker pool