Documentation
¶
Index ¶
- Constants
- Variables
- func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T
- type GRPCServer
- func (s *GRPCServer) CancelTask(_ context.Context, req *workerpb.CancelTaskRequest) (*workerpb.CancelTaskResponse, error)
- func (s *GRPCServer) GetTask(_ context.Context, req *workerpb.GetTaskRequest) (*workerpb.GetTaskResponse, error)
- func (s *GRPCServer) RegisterTasks(ctx context.Context, req *workerpb.RegisterTasksRequest) (*workerpb.RegisterTasksResponse, error)
- func (s *GRPCServer) StreamResults(req *workerpb.StreamResultsRequest, ...) error
- type HandlerSpec
- type MetricsSnapshot
- type Middleware
- type Result
- type RetentionPolicy
- type Service
- type Task
- func (task *Task) CancelledAt() time.Time
- func (task *Task) CancelledChan() <-chan struct{}
- func (task *Task) CompletedAt() time.Time
- func (task *Task) Error() error
- func (task *Task) IsValid() (err error)
- func (task *Task) Result() any
- func (task *Task) ShouldSchedule() error
- func (task *Task) StartedAt() time.Time
- func (task *Task) Status() TaskStatus
- type TaskFunc
- type TaskHooks
- type TaskManager
- func (tm *TaskManager) CancelAll()
- func (tm *TaskManager) CancelTask(id uuid.UUID) error
- func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)
- func (tm *TaskManager) GetActiveTasks() int
- func (tm *TaskManager) GetMetrics() MetricsSnapshot
- func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)
- func (tm *TaskManager) GetTasks() []*Task
- func (tm *TaskManager) IsEmpty() bool
- func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error
- func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task) error
- func (tm *TaskManager) SetHooks(hooks TaskHooks)
- func (tm *TaskManager) SetMaxWorkers(maxWorkers int)
- func (tm *TaskManager) SetRetentionPolicy(policy RetentionPolicy)
- func (tm *TaskManager) StartWorkers(ctx context.Context)
- func (tm *TaskManager) StopGraceful(ctx context.Context) error
- func (tm *TaskManager) StopNow()
- func (tm *TaskManager) SubscribeResults(buffer int) (<-chan Result, func())
- func (tm *TaskManager) Wait(ctx context.Context) error
- type TaskStatus
Constants ¶
const ( // ContextDeadlineReached means the context is past its deadline. ContextDeadlineReached = TaskStatus(1) // RateLimited means the number of concurrent tasks per second exceeded the maximum allowed. RateLimited = TaskStatus(2) // Cancelled means `CancelTask` was invoked and the `Task` was cancelled. Cancelled = TaskStatus(3) // Failed means the `Task` failed. Failed = TaskStatus(4) // Queued means the `Task` is queued. Queued = TaskStatus(5) // Running means the `Task` is running. Running = TaskStatus(6) // Invalid means the `Task` is invalid. Invalid = TaskStatus(7) // Completed means the `Task` is completed. Completed = TaskStatus(8) )
TaskStatus values.
const ( // DefaultMaxTasks is the default maximum number of tasks that can be executed at once. DefaultMaxTasks = 10 // DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second. DefaultTasksPerSecond = 5 // DefaultTimeout is the default timeout for tasks. DefaultTimeout = 5 * time.Minute // DefaultRetryDelay is the default delay between retries. DefaultRetryDelay = 1 * time.Second // DefaultMaxRetries is the default maximum number of retries. DefaultMaxRetries = 3 // ErrMsgContextDone is the error message used when the context is done. ErrMsgContextDone = "context done" )
Variables ¶
var ( // ErrInvalidTaskID is returned when a task has an invalid ID. ErrInvalidTaskID = ewrap.New("invalid task id") // ErrInvalidTaskFunc is returned when a task has an invalid function. ErrInvalidTaskFunc = ewrap.New("invalid task function") // ErrInvalidTaskContext is returned when a task has an invalid context. ErrInvalidTaskContext = ewrap.New("invalid task context") // ErrTaskNotFound is returned when a task is not found. ErrTaskNotFound = ewrap.New("task not found") // ErrTaskTimeout is returned when a task times out. ErrTaskTimeout = ewrap.New("task timeout") // ErrTaskCancelled is returned when a task is cancelled. ErrTaskCancelled = ewrap.New("task cancelled") // ErrTaskAlreadyStarted is returned when a task is already started. ErrTaskAlreadyStarted = ewrap.New("task already started") // ErrTaskCompleted is returned when a task is already completed. ErrTaskCompleted = ewrap.New("task completed") )
Errors returned by the TaskManager.
Functions ¶
func RegisterMiddleware ¶
func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T
RegisterMiddleware registers middlewares to the provided service.
Types ¶
type GRPCServer ¶ added in v0.1.1
type GRPCServer struct {
// contains filtered or unexported fields
}
GRPCServer implements the generated WorkerServiceServer interface.
func NewGRPCServer ¶ added in v0.1.1
func NewGRPCServer(svc Service, handlers map[string]HandlerSpec) *GRPCServer
NewGRPCServer creates a new gRPC server backed by the provided Service.
func (*GRPCServer) CancelTask ¶ added in v0.1.1
func (s *GRPCServer) CancelTask(_ context.Context, req *workerpb.CancelTaskRequest) (*workerpb.CancelTaskResponse, error)
CancelTask cancels an active task by its ID.
func (*GRPCServer) GetTask ¶ added in v0.1.1
func (s *GRPCServer) GetTask(_ context.Context, req *workerpb.GetTaskRequest) (*workerpb.GetTaskResponse, error)
GetTask returns information about a task by its ID.
func (*GRPCServer) RegisterTasks ¶ added in v0.1.1
func (s *GRPCServer) RegisterTasks(ctx context.Context, req *workerpb.RegisterTasksRequest) (*workerpb.RegisterTasksResponse, error)
RegisterTasks registers one or more tasks with the underlying service.
func (*GRPCServer) StreamResults ¶ added in v0.1.1
func (s *GRPCServer) StreamResults(req *workerpb.StreamResultsRequest, stream workerpb.WorkerService_StreamResultsServer) error
StreamResults streams task results back to the client.
type HandlerSpec ¶ added in v0.1.1
type HandlerSpec struct {
// Make returns a zero value of the payload message to unmarshal into.
Make func() protoreflect.ProtoMessage
// Fn does the work. Your Task.Execute will call this.
Fn func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error)
}
HandlerSpec describes a single handler for a gRPC method.
type MetricsSnapshot ¶ added in v0.1.0
type MetricsSnapshot struct {
Scheduled int64
Running int64
Completed int64
Failed int64
Cancelled int64
Retried int64
ResultsDropped int64
QueueDepth int
TaskLatencyCount int64
TaskLatencyTotal time.Duration
TaskLatencyMax time.Duration
}
MetricsSnapshot represents a snapshot of task metrics.
type Result ¶ added in v0.0.4
type Result struct {
Task *Task // the task that produced the result
Result any // the result of the task
Error error // the error returned by the task
}
Result is a task result.
type RetentionPolicy ¶ added in v0.1.4
RetentionPolicy controls how completed tasks are kept in the registry.
type Service ¶
type Service interface {
// contains filtered or unexported methods
}
Service is an interface for a task manager.
type Task ¶
type Task struct {
ID uuid.UUID `json:"id"` // ID is the id of the task
Name string `json:"name"` // Name is the name of the task
Description string `json:"description"` // Description is the description of the task
Priority int `json:"priority"` // Priority is the priority of the task
Execute TaskFunc `json:"-"` // Execute is the function that will be executed by the task
Ctx context.Context `json:"-"` // Ctx is the context of the task
CancelFunc context.CancelFunc `json:"-"` // CancelFunc is the cancel function of the task
Retries int `json:"retries"` // Retries is the maximum number of retries for failed tasks
RetryDelay time.Duration `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks
// contains filtered or unexported fields
}
Task represents a function that can be executed by the task manager.
Note: access Task state via methods to avoid data races.
func (*Task) CancelledAt ¶ added in v0.1.4
CancelledAt returns the cancellation time if set.
func (*Task) CancelledChan ¶ added in v0.0.4
func (task *Task) CancelledChan() <-chan struct{}
CancelledChan returns a channel which gets closed when the task is cancelled.
func (*Task) CompletedAt ¶ added in v0.1.4
CompletedAt returns the completion time if set.
func (*Task) ShouldSchedule ¶ added in v0.0.5
ShouldSchedule returns an error if the task should not be scheduled.
func (*Task) Status ¶ added in v0.0.4
func (task *Task) Status() TaskStatus
Status returns the current task status.
type TaskHooks ¶ added in v0.1.5
type TaskHooks struct {
OnQueued func(task *Task)
OnStart func(task *Task)
OnFinish func(task *Task, status TaskStatus, result any, err error)
OnRetry func(task *Task, delay time.Duration, attempt int)
}
TaskHooks defines optional callbacks for task lifecycle events.
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager is a struct that manages a pool of goroutines that can execute tasks.
func NewTaskManager ¶
func NewTaskManager( ctx context.Context, maxWorkers, maxTasks int, tasksPerSecond float64, timeout, retryDelay time.Duration, maxRetries int, ) *TaskManager
NewTaskManager creates a new task manager.
- ctx is the context for the task manager
- maxWorkers is the number of workers to start, if <=0, the number of CPUs will be used
- maxTasks is the maximum number of tasks that can be queued at once, defaults to 10
- tasksPerSecond is the rate limit of tasks that can be executed per second; <=0 disables rate limiting
- timeout is the default timeout for tasks, defaults to 5 minutes
- retryDelay is the default delay between retries, defaults to 1 second
- maxRetries is the default maximum number of retries, defaults to 3 (0 disables retries)
func NewTaskManagerWithDefaults ¶ added in v0.0.5
func NewTaskManagerWithDefaults(ctx context.Context) *TaskManager
NewTaskManagerWithDefaults creates a new task manager with default values.
- maxWorkers: runtime.NumCPU()
- maxTasks: 10
- tasksPerSecond: 5
- timeout: 5 minutes
- retryDelay: 1 second
- maxRetries: 3
func (*TaskManager) CancelTask ¶
func (tm *TaskManager) CancelTask(id uuid.UUID) error
CancelTask cancels a task by its ID.
func (*TaskManager) ExecuteTask ¶ added in v0.0.2
func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)
ExecuteTask executes a task given its ID and returns the result.
func (*TaskManager) GetActiveTasks ¶ added in v0.0.4
func (tm *TaskManager) GetActiveTasks() int
GetActiveTasks returns the number of running tasks.
func (*TaskManager) GetMetrics ¶ added in v0.1.0
func (tm *TaskManager) GetMetrics() MetricsSnapshot
GetMetrics returns a snapshot of current metrics.
func (*TaskManager) GetTask ¶
func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)
GetTask gets a task by its ID.
func (*TaskManager) IsEmpty ¶ added in v0.0.7
func (tm *TaskManager) IsEmpty() bool
IsEmpty checks if the task scheduler queue is empty.
func (*TaskManager) RegisterTask ¶
func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error
RegisterTask registers a new task to the task manager.
func (*TaskManager) RegisterTasks ¶ added in v0.0.4
func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task) error
RegisterTasks registers multiple tasks to the task manager at once.
func (*TaskManager) SetHooks ¶ added in v0.1.5
func (tm *TaskManager) SetHooks(hooks TaskHooks)
SetHooks configures callbacks for task lifecycle events.
func (*TaskManager) SetMaxWorkers ¶ added in v0.1.0
func (tm *TaskManager) SetMaxWorkers(maxWorkers int)
SetMaxWorkers adjusts the number of worker goroutines. Increasing the number spawns additional workers; decreasing signals workers to stop.
func (*TaskManager) SetRetentionPolicy ¶ added in v0.1.4
func (tm *TaskManager) SetRetentionPolicy(policy RetentionPolicy)
SetRetentionPolicy configures task registry retention.
func (*TaskManager) StartWorkers ¶ added in v0.0.4
func (tm *TaskManager) StartWorkers(ctx context.Context)
StartWorkers starts the task manager's workers and scheduler (idempotent).
func (*TaskManager) StopGraceful ¶ added in v0.1.4
func (tm *TaskManager) StopGraceful(ctx context.Context) error
StopGraceful stops accepting new tasks and waits for completion before stopping workers.
func (*TaskManager) StopNow ¶ added in v0.1.4
func (tm *TaskManager) StopNow()
StopNow cancels running tasks and stops workers immediately.
func (*TaskManager) SubscribeResults ¶ added in v0.1.4
func (tm *TaskManager) SubscribeResults(buffer int) (<-chan Result, func())
SubscribeResults returns a results channel and an unsubscribe function.
type TaskStatus ¶ added in v0.0.4
type TaskStatus uint8
TaskStatus is a value used to represent the task status.
func (TaskStatus) String ¶ added in v0.0.4
func (ts TaskStatus) String() string
String returns the string representation of the task status.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
__examples
|
|
|
grpc
command
|
|
|
manual
command
|
|
|
middleware
command
|
|
|
multi
command
|
|
|
pkg
|
|