task

package
v0.0.0-...-dd4be11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package task provides task and worker pool management for the Weblens application.

Index

Constants

View Source
const GlobalTaskPoolID = "GLOBAL"

GlobalTaskPoolID is the identifier for the global task pool that is always available.

Variables

View Source
var ErrChildTaskFailed = wlerrors.New("child task failed")

ErrChildTaskFailed indicates that one or more child tasks in a pool failed.

View Source
var ErrTaskAlreadyComplete = wlerrors.New("task already complete")

ErrTaskAlreadyComplete indicates an attempt to execute an already completed task.

View Source
var ErrTaskCanceled = wlerrors.New("task canceled")

ErrTaskCanceled indicates a task was canceled before completion.

View Source
var ErrTaskError = wlerrors.New("task error")

ErrTaskError indicates a task encountered an error during execution.

View Source
var ErrTaskExit = wlerrors.New("task exit")

ErrTaskExit indicates a task has exited.

View Source
var ErrTaskTimeout = wlerrors.New("task timeout")

ErrTaskTimeout indicates a task exceeded its timeout duration.

Functions

func NewTestContext

func NewTestContext() context_mod.Z

NewTestContext creates a context implementing context_mod.Z for testing.

func NewTestContextWithCancel

func NewTestContextWithCancel() (context_mod.Z, context.CancelFunc)

NewTestContextWithCancel creates a cancelable context implementing context_mod.Z for testing.

Types

type Dispatcher

type Dispatcher interface {
	// DispatchJob creates and dispatches a new job with the specified name and metadata.
	DispatchJob(ctx context.Context, jobName string, meta Metadata) (*Task, error)
}

Dispatcher provides an interface for dispatching jobs with associated metadata.

type ExitStatus

type ExitStatus string

ExitStatus represents the final state of a completed task.

const (
	// TaskNoStatus indicates the task has not yet completed.
	TaskNoStatus ExitStatus = ""
	// TaskSuccess indicates the task completed successfully.
	TaskSuccess ExitStatus = "success"
	// TaskCanceled indicates the task was canceled before completion.
	TaskCanceled ExitStatus = "canceled"
	// TaskError indicates the task failed with an error.
	TaskError ExitStatus = "error"
)

type HandlerFunc

type HandlerFunc func(task *Task)

HandlerFunc defines the function signature for task execution functions.

type Metadata

type Metadata interface {
	JobName() string
	MetaString() string
	FormatToResult() Result
	Verify() error
}

Metadata provides information about a task including its name, configuration, and validation.

func NewTestMetadata

func NewTestMetadata(jobName string) Metadata

NewTestMetadata creates a Metadata instance for testing.

type Options

type Options struct {
	// Persistent indicates whether the task should persist after completion.
	// If true, the task's state and results will be saved in the pool to avoid recomputation.
	// If false, the task will be removed from memory after it has completed, and if the same task is
	// queued again (assuming the Unique flag is false), it will be recomputed from scratch.
	Persistent bool

	// Unique indicates whether the task should be unique in the queue.
	// If true, duplicate tasks (based on metadata string) will not be added to the queue, and instead the existing
	// task with matching metadata will be returned. Otherwise, multiple tasks with the same metadata can coexist in the queue.
	Unique bool
}

Options specifies configuration options for task behavior.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages a collection of tasks and tracks their execution progress.

func (*Pool) AddCleanup

func (tp *Pool) AddCleanup(fn PoolCleanupFunc)

AddCleanup registers a cleanup function to run when the pool completes.

func (*Pool) AddError

func (tp *Pool) AddError(t *Task)

AddError adds a task that encountered an error to the pool's error list.

func (*Pool) Cancel

func (tp *Pool) Cancel()

Cancel cancels all tasks in this pool.

func (*Pool) CreatedInTask

func (tp *Pool) CreatedInTask() *Task

CreatedInTask returns the task that created this pool, or nil if created outside a task.

func (*Pool) Errors

func (tp *Pool) Errors() []*Task

Errors returns the list of tasks that encountered errors in this pool.

func (*Pool) GetCompletedTaskCount

func (tp *Pool) GetCompletedTaskCount() int

GetCompletedTaskCount returns the number of tasks that have completed in this pool.

func (*Pool) GetRootPool

func (tp *Pool) GetRootPool() *Pool

GetRootPool returns the root task pool in the hierarchy.

func (*Pool) GetTasks

func (tp *Pool) GetTasks() []*Task

GetTasks returns a slice of all tasks currently attached to this pool.

func (*Pool) GetTotalTaskCount

func (tp *Pool) GetTotalTaskCount() int

GetTotalTaskCount returns the total number of tasks that have been added to this pool.

func (*Pool) GetWorkerPool

func (tp *Pool) GetWorkerPool() *WorkerPool

GetWorkerPool returns the worker pool that manages this task pool.

func (*Pool) HandleTaskExit

func (tp *Pool) HandleTaskExit(isReplacementThread bool) (canContinue bool)

HandleTaskExit processes task completion and determines if the worker should continue.

func (*Pool) ID

func (tp *Pool) ID() string

ID returns the unique identifier for this task pool.

func (*Pool) IncCompletedTasks

func (tp *Pool) IncCompletedTasks(count int)

IncCompletedTasks increments the completed task count for this pool by the specified amount.

func (*Pool) IncTaskCount

func (tp *Pool) IncTaskCount(count int)

IncTaskCount increments the total task count for this pool by the specified amount.

func (*Pool) IsGlobal

func (tp *Pool) IsGlobal() bool

IsGlobal returns true if this task pool is treated as a global pool.

func (*Pool) IsRoot

func (tp *Pool) IsRoot() bool

IsRoot returns true if this task pool has no parent or its parent is global.

func (*Pool) LockExit

func (tp *Pool) LockExit()

LockExit acquires the exit lock for this task pool.

func (*Pool) MarkGlobal

func (tp *Pool) MarkGlobal()

MarkGlobal specifies the work queue as being a "global" one

func (*Pool) QueueTask

func (tp *Pool) QueueTask(tsk *Task) (err error)

QueueTask adds a task to this pool for execution.

func (*Pool) RemoveTask

func (tp *Pool) RemoveTask(taskID string)

RemoveTask removes the task with the specified ID from this pool.

func (*Pool) SignalAllQueued

func (tp *Pool) SignalAllQueued()

SignalAllQueued marks that all tasks have been queued to this pool.

func (*Pool) Status

func (tp *Pool) Status() PoolStatus

Status returns the current status of the task pool including completion progress.

func (*Pool) UnlockExit

func (tp *Pool) UnlockExit()

UnlockExit releases the exit lock for this task pool.

func (*Pool) Wait

func (tp *Pool) Wait(supplementWorker bool, task ...*Task)

Wait Parks the thread on the work queue until all the tasks have been queued and finish. **If you never call tp.SignalAllQueued(), the waiters will never wake up** Make sure that you SignalAllQueued before parking here if it is the only thread loading tasks. If you are parking a thread that is currently executing a task, you can pass that task in as well, and that task will also listen for exit events.

type PoolCleanupFunc

type PoolCleanupFunc func(tp *Pool)

PoolCleanupFunc defines a function type for cleaning up resources associated with a task pool.

type PoolStatus

type PoolStatus struct {
	// The count of tasks that have completed on this task pool.
	// Complete *DOES* include failed tasks
	Complete int64

	// The count of failed tasks on this task pool
	Failed int64

	// The count of all tasks that have been queued on this task pool
	Total int64

	// Percent to completion of all tasks
	Progress float64

	// How long the pool has been alive
	Runtime time.Duration
}

PoolStatus represents the current state and progress of a task pool.

type QueueState

type QueueState int

QueueState represents the current state of a task in the queue.

const (
	Created QueueState = iota
	InQueue
	Executing
	Sleeping
	Exited
)

Task queue state constants.

type Result

type Result map[string]any

Result represents the results of a task execution as key-value pairs.

func (Result) ToMap

func (tr Result) ToMap() map[string]any

ToMap returns a cloned map representation of the TaskResult.

type Task

type Task struct {
	QueueTime  *wlatomic.AtomicValue[time.Time] // required
	StartTime  *wlatomic.AtomicValue[time.Time]
	FinishTime *wlatomic.AtomicValue[time.Time]

	Ctx context.Context

	WorkerID int64
	// contains filtered or unexported fields
}

Task represents a unit of work that can be queued and executed by a worker pool.

func NewTestQueueableTask

func NewTestQueueableTask(id, jobName string) *Task

NewTestQueueableTask creates a Task that can be queued for testing. Unlike NewTestTask, this creates a task in the Created state that can be queued.

func NewTestTask

func NewTestTask(id, jobName string, workerID int64, exitStatus ExitStatus, result Result, startTime time.Time) *Task

NewTestTask creates a Task for testing purposes. This allows external packages to create Task instances with specific state. Only available when building with -tags=test.

func (*Task) AtomicSetResult

func (t *Task) AtomicSetResult(fn func(Result) Result)

AtomicSetResult atomically updates the task result using the provided function.

func (*Task) Cancel

func (t *Task) Cancel()

Cancel - Unknowable if this is the last operation of a task, so t.success() will not have an effect after a task is cancelled. t.error() may override the exit status in special cases, such as a timeout, which is both an error and a reason for cancellation.

Cancellations are always external to the task. From within the body of the task, either error or success should be called. If a task finds itself not required to continue, and should exit early, success should be returned instead of cancel.

func (*Task) ClearAndRecompute

func (t *Task) ClearAndRecompute()

ClearAndRecompute cancels the task, clears its state, and re-queues it.

func (*Task) ClearOnResult

func (t *Task) ClearOnResult()

ClearOnResult removes the result callback.

func (*Task) ClearTimeout

func (t *Task) ClearTimeout()

ClearTimeout clears the task timeout.

func (*Task) ExeTime

func (t *Task) ExeTime() time.Duration

ExeTime returns the execution duration of the task.

func (*Task) Fail

func (t *Task) Fail(err error)

Fail will set the error on the task, and then panic with ErrTaskError, which informs the worker recovery function to exit the task with the error that is set, and not treat it as a real panic.

func (*Task) FirstResultChan

func (t *Task) FirstResultChan() <-chan struct{}

FirstResultChan returns a channel that is closed the first time the task's result is set via SetResult. Callers can use it as a readiness signal to avoid polling for early task progress. The channel is created with the task and never re-opened — once closed it stays closed for the task's lifetime.

func (*Task) GetChildTaskPool

func (t *Task) GetChildTaskPool() *Pool

GetChildTaskPool returns the child task pool, if any.

func (*Task) GetFinishTime

func (t *Task) GetFinishTime() time.Time

GetFinishTime returns the time when the task finished executing.

func (*Task) GetMeta

func (t *Task) GetMeta() Metadata

GetMeta returns the task metadata.

func (*Task) GetResult

func (t *Task) GetResult() Result

GetResult returns the task result.

func (*Task) GetResults

func (t *Task) GetResults() Result

GetResults returns a copy of the task result.

func (*Task) GetStartTime

func (t *Task) GetStartTime() time.Time

GetStartTime returns the time when the task started executing.

func (*Task) GetTaskPool

func (t *Task) GetTaskPool() *Pool

GetTaskPool returns the task pool this task belongs to.

func (*Task) GetTimeout

func (t *Task) GetTimeout() time.Time

GetTimeout returns the task timeout.

func (*Task) GetWorkerID

func (t *Task) GetWorkerID() int

GetWorkerID returns the ID of the worker executing this task.

func (*Task) ID

func (t *Task) ID() string

ID returns the unique identifier of the task.

func (*Task) JobName

func (t *Task) JobName() string

JobName returns the name of the job this task is executing.

func (*Task) Log

func (t *Task) Log() *zerolog.Logger

Log returns the logger associated with the task.

func (*Task) Manipulate

func (t *Task) Manipulate(fn func(meta Metadata) error) error

Manipulate is used to change the metadata of a task while it is running. This can be useful to have a task be waiting for input from a client, and this function can be used to send that data to the task via a chan, for example.

func (*Task) OnResult

func (t *Task) OnResult(callback func(Result))

OnResult installs a callback function that is called every time the task result is set

func (*Task) QueueState

func (t *Task) QueueState() QueueState

QueueState returns the current queue state of the task.

func (*Task) QueueTimeDuration

func (t *Task) QueueTimeDuration() time.Duration

QueueTimeDuration returns how long the task waited in the queue.

func (*Task) ReadError

func (t *Task) ReadError() error

ReadError returns the error that caused the task to fail, if any.

func (*Task) ReqNoErr

func (t *Task) ReqNoErr(err error)

ReqNoErr is a wrapper around t.Fail, but only fails if the error is not nil

func (*Task) SetChildTaskPool

func (t *Task) SetChildTaskPool(pool *Pool)

SetChildTaskPool sets the child task pool for this task.

func (*Task) SetCleanup

func (t *Task) SetCleanup(cleanup HandlerFunc)

SetCleanup takes a function to be run after the task has completed, no matter the exit status. Many cleanup functions can be registered to run in sequence after the task completes. The cleanup functions are run in the order they are registered. Modifications to the task state should not be made in the cleanup functions (i.e. read-only), as the task has already completed, and may result in a deadlock. If the task has already completed, this function will NOT be called. Therefore, it is only safe to call SetCleanup() from inside of a task handler. If you want to register a function from outside the task handler, or to run after the task has completed successfully, use t.SetPostAction() instead.

func (*Task) SetErrorCleanup

func (t *Task) SetErrorCleanup(cleanup HandlerFunc)

SetErrorCleanup works the same as t.SetCleanup(), but only runs if the task errors

func (*Task) SetPostAction

func (t *Task) SetPostAction(action func(Result))

SetPostAction takes a function to be run after the task has successfully completed with the task results as the input of the function

func (*Task) SetQueueState

func (t *Task) SetQueueState(s QueueState)

SetQueueState sets the queue state of the task.

func (*Task) SetResult

func (t *Task) SetResult(results Result)

SetResult sets the task result.

func (*Task) SetTimeout

func (t *Task) SetTimeout(timeout time.Time)

SetTimeout sets a timeout for the task.

func (*Task) Status

func (t *Task) Status() (bool, ExitStatus)

Status returns a boolean representing if a task has completed, and a string describing its exit type, if completed.

func (*Task) Success

func (t *Task) Success(msg ...any)

Success marks the task as successfully completed.

func (*Task) Wait

func (t *Task) Wait()

Wait Block until a task is finished. "Finished" can define success, failure, or cancel

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of workers that execute tasks from task pools.

func NewTestWorkerPool

func NewTestWorkerPool(numWorkers int) *WorkerPool

NewTestWorkerPool creates a WorkerPool for testing purposes.

func NewWorkerPool

func NewWorkerPool(initWorkers int) *WorkerPool

NewWorkerPool creates and initializes a new worker pool with the specified number of workers.

func (*WorkerPool) AddHit

func (wp *WorkerPool) AddHit(time time.Time, target *Task)

AddHit schedules a timeout check for the specified task at the given time.

func (*WorkerPool) DispatchJob

func (wp *WorkerPool) DispatchJob(ctx context.Context, jobName string, meta Metadata, pool *Pool) (*Task, error)

DispatchJob creates and queues a new task for the specified registered job.

func (*WorkerPool) GetTask

func (wp *WorkerPool) GetTask(taskID string) *Task

GetTask returns the task with the specified ID.

func (*WorkerPool) GetTaskPool

func (wp *WorkerPool) GetTaskPool(tpID string) *Pool

GetTaskPool returns the task pool with the specified ID.

func (*WorkerPool) GetTaskPoolByJobName

func (wp *WorkerPool) GetTaskPoolByJobName(jobName string) *Pool

GetTaskPoolByJobName returns the task pool created by a task with the specified job name.

func (*WorkerPool) GetTasks

func (wp *WorkerPool) GetTasks() []*Task

GetTasks returns all tasks currently managed by this worker pool.

func (*WorkerPool) GetTasksByJobName

func (wp *WorkerPool) GetTasksByJobName(jobName string) []*Task

GetTasksByJobName returns all tasks with the specified job name.

func (*WorkerPool) NewTaskPool

func (wp *WorkerPool) NewTaskPool(replace bool, createdBy *Task) (*Pool, error)

NewTaskPool `replace` spawns a temporary replacement thread on the parent worker pool. This prevents a deadlock when the queue fills up while adding many tasks, and none are being executed

`parent` allows chaining of task pools for floating updates to the top. This makes it possible for clients to subscribe to a single task, and get notified about all of the sub-updates of that task See taskPool.go

func (*WorkerPool) RegisterJob

func (wp *WorkerPool) RegisterJob(jobName string, fn HandlerFunc, opts ...Options)

RegisterJob adds a template for a repeatable job that can be called upon later in the program

func (*WorkerPool) Run

func (wp *WorkerPool) Run(ctx context.Context)

Run launches the standard threads for this worker pool

func (*WorkerPool) Status

func (wp *WorkerPool) Status() (int, int, int, int, int)

Status returns the count of tasks in the queue, the total number of tasks accepted, number of busy workers, and the total number of live workers in the worker pool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL