Documentation
¶
Index ¶
- Constants
- Variables
- func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T
- type GRPCServer
- func (s *GRPCServer) CancelTask(ctx context.Context, req *workerpb.CancelTaskRequest) (*workerpb.CancelTaskResponse, error)
- func (s *GRPCServer) GetTask(ctx context.Context, req *workerpb.GetTaskRequest) (*workerpb.GetTaskResponse, error)
- func (s *GRPCServer) RegisterTasks(ctx context.Context, req *workerpb.RegisterTasksRequest) (*workerpb.RegisterTasksResponse, error)
- func (s *GRPCServer) StreamResults(req *workerpb.StreamResultsRequest, ...) error
- type HandlerSpec
- type MetricsSnapshot
- type Middleware
- type Result
- type Service
- type Task
- type TaskFunc
- type TaskManager
- func (tm *TaskManager) CancelAll()
- func (tm *TaskManager) CancelTask(id uuid.UUID)
- func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)
- func (tm *TaskManager) GetActiveTasks() int
- func (tm *TaskManager) GetCancelledTasks() <-chan *Task
- func (tm *TaskManager) GetMetrics() MetricsSnapshot
- func (tm *TaskManager) GetResults() []Result
- func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)
- func (tm *TaskManager) GetTasks() []*Task
- func (tm *TaskManager) IsEmpty() bool
- func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error
- func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task)
- func (tm *TaskManager) SetMaxWorkers(maxWorkers int)
- func (tm *TaskManager) StartWorkers(ctx context.Context)
- func (tm *TaskManager) Stop()
- func (tm *TaskManager) StreamResults() <-chan Result
- func (tm *TaskManager) Wait(timeout time.Duration)
- type TaskStatus
Constants ¶
const ( // ContextDeadlineReached means the context is past its deadline. ContextDeadlineReached = TaskStatus(1) // RateLimited means the number of concurrent tasks per second exceeded the maximum allowed. RateLimited = TaskStatus(2) // Cancelled means `CancelTask` was invoked and the `Task` was cancelled. Cancelled = TaskStatus(3) // Failed means the `Task` failed. Failed = TaskStatus(4) // Queued means the `Task` is queued. Queued = TaskStatus(5) // Running means the `Task` is running. Running = TaskStatus(6) // Invalid means the `Task` is invalid. Invalid = TaskStatus(7) // Completed means the `Task` is completed. Completed = TaskStatus(8) )
CancelReason values
- 1: `ContextDeadlineReached`
- 2: `RateLimited`
- 3: `Cancelled`
- 4: `Failed`
- 5: `Queued`
- 6: `Running`
- 7: `Invalid`
- 8: `Completed`
const ( // DefaultMaxTasks is the default maximum number of tasks that can be executed at once. DefaultMaxTasks = 10 // DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second. DefaultTasksPerSecond = 5 // DefaultTimeout is the default timeout for tasks. DefaultTimeout = 5 // DefaultRetryDelay is the default delay between retries. DefaultRetryDelay = 1 // DefaultMaxRetries is the default maximum number of retries. DefaultMaxRetries = 3 )
Variables ¶
var ( // ErrInvalidTaskID is returned when a task has an invalid ID. ErrInvalidTaskID = ewrap.New("invalid task id") // ErrInvalidTaskFunc is returned when a task has an invalid function. ErrInvalidTaskFunc = ewrap.New("invalid task function") // ErrInvalidTaskContext is returned when a task has an invalid context. ErrInvalidTaskContext = ewrap.New("invalid task context") // ErrTaskNotFound is returned when a task is not found. ErrTaskNotFound = ewrap.New("task not found") // ErrTaskTimeout is returned when a task times out. ErrTaskTimeout = ewrap.New("task timeout") // ErrTaskCancelled is returned when a task is cancelled. ErrTaskCancelled = ewrap.New("task cancelled") // ErrTaskAlreadyStarted is returned when a task is already started. ErrTaskAlreadyStarted = ewrap.New("task already started") // ErrTaskCompleted is returned when a task is already completed. ErrTaskCompleted = ewrap.New("task completed") )
Errors returned by the TaskManager.
Functions ¶
func RegisterMiddleware ¶
func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T
RegisterMiddleware registers middlewares to the provided service.
Types ¶
type GRPCServer ¶ added in v0.1.1
type GRPCServer struct {
// contains filtered or unexported fields
}
GRPCServer implements the generated WorkerServiceServer interface.
func NewGRPCServer ¶ added in v0.1.1
func NewGRPCServer(svc Service, handlers map[string]HandlerSpec) *GRPCServer
NewGRPCServer creates a new gRPC server backed by the provided Service.
func (*GRPCServer) CancelTask ¶ added in v0.1.1
func (s *GRPCServer) CancelTask(ctx context.Context, req *workerpb.CancelTaskRequest) (*workerpb.CancelTaskResponse, error)
CancelTask cancels an active task by its ID.
func (*GRPCServer) GetTask ¶ added in v0.1.1
func (s *GRPCServer) GetTask(ctx context.Context, req *workerpb.GetTaskRequest) (*workerpb.GetTaskResponse, error)
GetTask returns information about a task by its ID.
func (*GRPCServer) RegisterTasks ¶ added in v0.1.1
func (s *GRPCServer) RegisterTasks(ctx context.Context, req *workerpb.RegisterTasksRequest) (*workerpb.RegisterTasksResponse, error)
RegisterTasks registers one or more tasks with the underlying service.
func (*GRPCServer) StreamResults ¶ added in v0.1.1
func (s *GRPCServer) StreamResults(req *workerpb.StreamResultsRequest, stream workerpb.WorkerService_StreamResultsServer) error
StreamResults streams task results back to the client.
type HandlerSpec ¶ added in v0.1.1
type HandlerSpec struct {
// Make returns a zero value of the payload message to unmarshal into.
Make func() protoreflect.ProtoMessage
// Fn does the work. Your Task.Execute will call this.
Fn func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error)
}
HandlerSpec describes a single handler for a gRPC method.
type MetricsSnapshot ¶ added in v0.1.0
MetricsSnapshot represents a snapshot of task metrics.
type Result ¶ added in v0.0.4
type Result struct {
Task *Task // the task that produced the result
Result any // the result of the task
Error error // the error returned by the task
}
Result is a task result.
type Service ¶
type Service interface {
// contains filtered or unexported methods
}
Service is an interface for a task manager.
type Task ¶
type Task struct {
ID uuid.UUID `json:"id"` // ID is the id of the task
Name string `json:"name"` // Name is the name of the task
Description string `json:"description"` // Description is the description of the task
Priority int `json:"priority"` // Priority is the priority of the task
Execute TaskFunc `json:"-"` // Execute is the function that will be executed by the task
Ctx context.Context `json:"context"` // Ctx is the context of the task
CancelFunc context.CancelFunc `json:"-"` // CancelFunc is the cancel function of the task
Status TaskStatus `json:"task_status"` // TaskStatus is stores the status of the task
Result atomic.Value `json:"result"` // Result is the result of the task
Error atomic.Value `json:"error"` // Error is the error of the task
Started atomic.Int64 `json:"started"` // Started is the time the task started
Completed atomic.Int64 `json:"completed"` // Completed is the time the task completed
Cancelled atomic.Int64 `json:"cancelled"` // Cancelled is the time the task was cancelled
Retries int `json:"retries"` // Retries is the maximum number of retries for failed tasks
RetryDelay time.Duration `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks
// contains filtered or unexported fields
}
Task represents a function that can be executed by the task manager.
func (*Task) CancelledChan ¶ added in v0.0.4
func (task *Task) CancelledChan() <-chan struct{}
CancelledChan returns a channel which gets closed when the task is cancelled.
func (*Task) ShouldSchedule ¶ added in v0.0.5
ShouldSchedule returns an error if the task should not be scheduled.
func (*Task) WaitCancelled ¶ added in v0.0.4
func (task *Task) WaitCancelled()
WaitCancelled waits for the task to be cancelled.
type TaskManager ¶
type TaskManager struct {
Registry sync.Map // Registry is a map of registered tasks
Results chan Result // Results is the channel of results
Tasks chan *Task // Tasks is the channel of tasks
RegisterTaskErrors *ewrap.ErrorGroup // RegisterTaskErrors groups the errors that occurred while registering tasks
ExecuteTaskErrors *ewrap.ErrorGroup // ExecuteTaskErrors groups the errors that occurred while executing tasks
Cancelled chan *Task // Cancelled is the channel of cancelled tasks
Timeout time.Duration // Timeout is the default timeout for tasks
MaxWorkers int // MaxWorkers is the maximum number of workers that can be started
MaxTasks int // MaxTasks is the maximum number of tasks that can be executed at once
RetryDelay time.Duration // RetryDelay is the delay between retries
MaxRetries int // MaxRetries is the maximum number of retries
// contains filtered or unexported fields
}
TaskManager is a struct that manages a pool of goroutines that can execute tasks.
func NewTaskManager ¶
func NewTaskManager(ctx context.Context, maxWorkers int, maxTasks int, tasksPerSecond float64, timeout time.Duration, retryDelay time.Duration, maxRetries int) *TaskManager
NewTaskManager creates a new task manager
- `ctx` is the context for the task manager
- `maxWorkers` is the number of workers to start, if not specified, the number of CPUs will be used
- `maxTasks` is the maximum number of tasks that can be executed at once, defaults to 10
- `tasksPerSecond` is the rate limit of tasks that can be executed per second, defaults to 1
- `timeout` is the default timeout for tasks, defaults to 5 minute
- `retryDelay` is the default delay between retries, defaults to 1 second
- `maxRetries` is the default maximum number of retries, defaults to 3
func NewTaskManagerWithDefaults ¶ added in v0.0.5
func NewTaskManagerWithDefaults(ctx context.Context) *TaskManager
NewTaskManagerWithDefaults creates a new task manager with default values
- `maxWorkers`: `runtime.NumCPU()`
- `maxTasks`: 10
- `tasksPerSecond`: 5
- `timeout`: 5 minute
- `retryDelay`: 1 second
- `maxRetries`: 3
func (*TaskManager) CancelTask ¶
func (tm *TaskManager) CancelTask(id uuid.UUID)
CancelTask cancels a task by its ID.
func (*TaskManager) ExecuteTask ¶ added in v0.0.2
func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)
ExecuteTask executes a task given its ID and returns the result
- It gets the task by ID and locks the mutex to access the task data.
- If the task has already been started, it cancels it and returns an error.
- If the task is invalid, it sends it to the cancelled channel and returns an error.
- If the task is already running, it returns an error.
- It creates a new context for this task and waits for the result to be available and return it.
- It reserves a token from the limiter and waits for the task execution.
- If the token reservation fails, it waits for a delay and tries again.
- It executes the task and sends the result to the results channel.
- If the task execution fails, it retries the task up to max retries with a delay between retries.
- If the task fails with all retries exhausted, it cancels the task and returns an error.
func (*TaskManager) GetActiveTasks ¶ added in v0.0.4
func (tm *TaskManager) GetActiveTasks() int
GetActiveTasks returns the number of active tasks.
func (*TaskManager) GetCancelledTasks ¶ added in v0.0.7
func (tm *TaskManager) GetCancelledTasks() <-chan *Task
GetCancelledTasks gets the cancelled tasks channel Example usage:
get the cancelled tasks cancelledTasks := tm.GetCancelledTasks()
select { case task := <-cancelledTasks:
fmt.Printf("Task %s was cancelled\n", task.ID.String())
default:
fmt.Println("No tasks have been cancelled yet")
}
func (*TaskManager) GetMetrics ¶ added in v0.1.0
func (tm *TaskManager) GetMetrics() MetricsSnapshot
GetMetrics returns a snapshot of current metrics.
func (*TaskManager) GetResults ¶
func (tm *TaskManager) GetResults() []Result
GetResults gets the results channel.
func (*TaskManager) GetTask ¶
func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)
GetTask gets a task by its ID.
func (*TaskManager) IsEmpty ¶ added in v0.0.7
func (tm *TaskManager) IsEmpty() bool
IsEmpty checks if the task scheduler queue is empty.
func (*TaskManager) RegisterTask ¶
func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error
RegisterTask registers a new task to the task manager.
func (*TaskManager) RegisterTasks ¶ added in v0.0.4
func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task)
RegisterTasks registers multiple tasks to the task manager at once.
func (*TaskManager) SetMaxWorkers ¶ added in v0.1.0
func (tm *TaskManager) SetMaxWorkers(maxWorkers int)
SetMaxWorkers adjusts the number of worker goroutines. Increasing the number spawns additional workers; decreasing signals workers to stop.
func (*TaskManager) StartWorkers ¶ added in v0.0.4
func (tm *TaskManager) StartWorkers(ctx context.Context)
StartWorkers starts the task manager and its goroutines.
func (*TaskManager) Stop ¶
func (tm *TaskManager) Stop()
Stop stops the task manager and waits for all tasks to finish.
func (*TaskManager) StreamResults ¶ added in v0.0.5
func (tm *TaskManager) StreamResults() <-chan Result
StreamResults streams the results channel.
func (*TaskManager) Wait ¶ added in v0.0.4
func (tm *TaskManager) Wait(timeout time.Duration)
Wait waits for all tasks to complete or for the timeout to elapse.
type TaskStatus ¶ added in v0.0.4
type TaskStatus uint8
TaskStatus is a value used to represent the task status.
func (TaskStatus) String ¶ added in v0.0.4
func (ts TaskStatus) String() string
String returns the string representation of the task status.
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
grpc
command
|
|
|
manual
command
|
|
|
middleware
command
|
|
|
multi
command
|
|
|
pkg
|
|