Documentation
¶
Overview ¶
Package workflows provides a client for interacting with Tilebox Workflows.
Documentation: https://docs.tilebox.com/workflows
Index ¶
- func Collect[K any](seq iter.Seq2[K, error]) ([]K, error)
- func GetCurrentCluster(ctx context.Context) (string, error)
- func SetTaskDisplay(ctx context.Context, display string) error
- func SubmitSubtask(ctx context.Context, task Task, options ...subtask.SubmitOption) (subtask.FutureTask, error)
- func SubmitSubtasks(ctx context.Context, tasks []Task, options ...subtask.SubmitOption) ([]subtask.FutureTask, error)
- func ValidateIdentifier(identifier TaskIdentifier) error
- func WithTaskSpan(ctx context.Context, name string, f func(ctx context.Context) error) error
- func WithTaskSpanResult[Result any](ctx context.Context, name string, f func(ctx context.Context) (Result, error)) (Result, error)
- type Client
- type ClientOption
- type Cluster
- type ClusterClient
- type ExecutableTask
- type ExplicitlyIdentifiableTask
- type Job
- type JobClient
- type JobService
- type JobState
- type ProgressIndicator
- type ProgressTracker
- type Task
- type TaskIdentifier
- type TaskRunner
- type TaskService
- type TaskState
- type TaskSummary
- type WorkflowService
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Collect ¶
Collect converts any sequence into a slice.
It returns an error if any of the elements in the sequence has a non-nil error.
func GetCurrentCluster ¶
GetCurrentCluster returns the current cluster slug.
This function is intended to be used in tasks to get the current cluster slug.
func SetTaskDisplay ¶
SetTaskDisplay sets the label name of the current task.
func SubmitSubtask ¶
func SubmitSubtask(ctx context.Context, task Task, options ...subtask.SubmitOption) (subtask.FutureTask, error)
SubmitSubtask submits a task to the task runner as a subtask of the current task.
Options:
- subtask.WithDependencies: sets the dependencies of the task.
- subtask.WithClusterSlug: sets the cluster slug of the cluster where the task will be executed. Defaults to the cluster of the task runner.
- subtask.WithMaxRetries: sets the maximum number of times a task can be automatically retried. Defaults to 0.
func SubmitSubtasks ¶
func SubmitSubtasks(ctx context.Context, tasks []Task, options ...subtask.SubmitOption) ([]subtask.FutureTask, error)
SubmitSubtasks submits multiple tasks to the task runner as subtask of the current task. It is similar to SubmitSubtask, but it takes a slice of tasks instead of a single task.
func ValidateIdentifier ¶
func ValidateIdentifier(identifier TaskIdentifier) error
ValidateIdentifier performs client-side validation on a task identifier.
func WithTaskSpan ¶
WithTaskSpan is a helper function that wraps a function with a tracing span.
Types ¶
type Client ¶
type Client struct {
Jobs JobClient
Clusters ClusterClient
// contains filtered or unexported fields
}
Client is a Tilebox Workflows client.
func NewClient ¶
func NewClient(options ...ClientOption) *Client
NewClient creates a new Tilebox Datasets client.
By default, the returned Client is configured with:
- "https://api.tilebox.com" as the URL
- environment variable TILEBOX_API_KEY as the API key
- a grpc.RetryHTTPClient HTTP client
- the global tracer provider
The passed options are used to override these default values and configure the returned Client appropriately.
func (*Client) NewTaskRunner ¶
NewTaskRunner creates a new TaskRunner.
Options:
- runner.WithClusterSlug: sets the cluster for which to listen tasks to. Default to the default cluster.
- runner.WithRunnerLogger: sets the logger to use for the task runner. Defaults to slog.Default().
- runner.WithDisableMetrics: disables OpenTelemetry metrics for the task runner.
type ClientOption ¶
type ClientOption func(*clientConfig)
ClientOption is an interface for configuring a client. Using such options helpers is a quite common pattern in Go, as it allows for optional parameters in constructors. This concrete implementation here is inspired by how libraries such as axiom-go and connect do their configuration.
func WithAPIKey ¶
func WithAPIKey(apiKey string) ClientOption
WithAPIKey sets the API key to use for the client.
Defaults to no API key.
func WithConnectClientOptions ¶
func WithConnectClientOptions(options ...connect.ClientOption) ClientOption
WithConnectClientOptions sets additional options for the connect.HTTPClient.
func WithDisableTracing ¶
func WithDisableTracing() ClientOption
WithDisableTracing disables OpenTelemetry tracing for the client.
func WithHTTPClient ¶
func WithHTTPClient(httpClient connect.HTTPClient) ClientOption
WithHTTPClient sets the connect.HTTPClient to use for the client.
Defaults to grpc.RetryHTTPClient.
func WithURL ¶
func WithURL(url string) ClientOption
WithURL sets the URL of the Tilebox Datasets service.
Defaults to "https://api.tilebox.com".
type Cluster ¶
type Cluster struct {
// Slug is the unique identifier of the cluster within the namespace.
Slug string
// Name is the label name of the cluster.
Name string
// Deletable is true when the cluster can be deleted.
Deletable bool
}
Cluster represents a Tilebox Workflows cluster.
Documentation: https://docs.tilebox.com/workflows/concepts/clusters
type ClusterClient ¶
type ClusterClient interface {
// Create creates a new cluster with the given name.
Create(ctx context.Context, name string) (*Cluster, error)
// Get returns a cluster by its slug.
Get(ctx context.Context, slug string) (*Cluster, error)
// Delete deletes a cluster by its slug.
Delete(ctx context.Context, slug string) error
// List returns a list of all available clusters.
List(ctx context.Context) ([]*Cluster, error)
}
type ExecutableTask ¶
ExecutableTask is the interface for a task that can be executed, and therefore be registered with a task runner.
type ExplicitlyIdentifiableTask ¶
type ExplicitlyIdentifiableTask interface {
Identifier() TaskIdentifier
}
ExplicitlyIdentifiableTask is the interface for a task that provides a user-defined task identifier. The identifier is used to uniquely identify the task and specify its version. If a task is not an ExplicitlyIdentifiableTask, the task runner will generate an identifier for it using reflection.
type Job ¶
type Job struct {
// ID is the unique identifier of the job.
ID uuid.UUID
// Name is the name of the job.
Name string
// Canceled indicates whether the job has been canceled.
Canceled bool
// State is the current state of the job.
State JobState
// SubmittedAt is the time the job was submitted.
SubmittedAt time.Time
// StartedAt is the time the job started running.
StartedAt time.Time
// TaskSummaries is the task summaries of the job.
TaskSummaries []*TaskSummary
// AutomationID is the ID of the automation that submitted the job.
AutomationID uuid.UUID
// Progress is a list of progress indicators for the job.
Progress []*ProgressIndicator
}
Job represents a Tilebox Workflows job.
Documentation: https://docs.tilebox.com/workflows/concepts/jobs
type JobClient ¶
type JobClient interface {
// Submit submits a job to a cluster.
//
// Options:
// - job.WithMaxRetries: sets the maximum number of times a job can be automatically retried. Defaults to 0.
// - job.WithClusterSlug: sets the cluster slug of the cluster where the job will be executed. Defaults to the default cluster.
//
// Documentation: https://docs.tilebox.com/workflows/concepts/jobs#submission
Submit(ctx context.Context, jobName string, tasks []Task, options ...job.SubmitOption) (*Job, error)
// Get returns a job by its ID.
Get(ctx context.Context, jobID uuid.UUID) (*Job, error)
// Retry retries a job.
//
// Returns the number of rescheduled tasks.
//
// Documentation: https://docs.tilebox.com/workflows/concepts/jobs#retry-handling
Retry(ctx context.Context, jobID uuid.UUID) (int64, error)
// Cancel cancels a job.
//
// Documentation: https://docs.tilebox.com/workflows/concepts/jobs#cancellation
Cancel(ctx context.Context, jobID uuid.UUID) error
// Query returns a list of all jobs within the given interval.
//
// Options:
// - job.WithTemporalExtent: specifies the time or ID interval for which jobs should be queried (Required)
// - job.WithAutomationID: specifies the automation ID to filter jobs by. Only jobs submitted by the specified
// automation will be returned. (Optional)
//
// The jobs are lazily loaded and returned as a sequence.
// The output sequence can be transformed into a slice using Collect.
Query(ctx context.Context, options ...job.QueryOption) iter.Seq2[*Job, error]
}
type JobService ¶
type JobService interface {
SubmitJob(ctx context.Context, req *workflowsv1.SubmitJobRequest) (*workflowsv1.Job, error)
GetJob(ctx context.Context, jobID uuid.UUID) (*workflowsv1.Job, error)
RetryJob(ctx context.Context, jobID uuid.UUID) (*workflowsv1.RetryJobResponse, error)
CancelJob(ctx context.Context, jobID uuid.UUID) error
QueryJobs(ctx context.Context, filters *workflowsv1.QueryFilters, page *tileboxv1.Pagination) (*workflowsv1.QueryJobsResponse, error)
}
type JobState ¶
type JobState int32
JobState is the state of a Job.
const ( JobQueued JobState // The job is queued and waiting to be run. JobStarted // At least one task of the job has been started. JobCompleted // All tasks of the job have been completed. )
JobState values.
type ProgressIndicator ¶ added in v0.1.0
type ProgressTracker ¶ added in v0.1.0
type ProgressTracker interface {
// Add a given amount of total work to be done to the progress indicator.
Add(context.Context, uint64) error
// Done marks a given amount of work as done.
Done(context.Context, uint64) error
}
ProgressTracker is an interface for updating the total work and completed work units for a named progress indicator for a job.
func DefaultProgress ¶ added in v0.1.0
func DefaultProgress() ProgressTracker
DefaultProgress returns the default, unnamed progress indicator instance for tracking job progress.
func Progress ¶ added in v0.1.0
func Progress(label string) ProgressTracker
Progress returns a named progress indicator instance for tracking job progress.
type Task ¶
type Task interface{}
Task is the interface for a task that can be submitted to the workflow service. It doesn't need to be identifiable or executable, but it can be both.
type TaskIdentifier ¶
TaskIdentifier is the struct that defines the unique identifier of a task. It is used to uniquely identify a task and specify its version.
func NewTaskIdentifier ¶
func NewTaskIdentifier(name, version string) TaskIdentifier
type TaskRunner ¶
type TaskRunner struct {
// contains filtered or unexported fields
}
TaskRunner executes tasks.
Documentation: https://docs.tilebox.com/workflows/concepts/task-runners
func (*TaskRunner) GetRegisteredTask ¶
func (t *TaskRunner) GetRegisteredTask(identifier TaskIdentifier) (ExecutableTask, bool)
GetRegisteredTask returns the task with the given identifier.
func (*TaskRunner) RegisterTasks ¶
func (t *TaskRunner) RegisterTasks(tasks ...ExecutableTask) error
RegisterTasks makes the task runner aware of multiple tasks.
func (*TaskRunner) RunAll ¶
func (t *TaskRunner) RunAll(ctx context.Context)
RunAll run the task runner and execute all tasks until there are no more tasks available.
func (*TaskRunner) RunForever ¶
func (t *TaskRunner) RunForever(ctx context.Context)
RunForever runs the task runner forever, looking for new tasks to run and polling for new tasks when idle.
type TaskService ¶
type TaskService interface {
NextTask(ctx context.Context, computedTask *workflowsv1.ComputedTask, nextTaskToRun *workflowsv1.NextTaskToRun) (*workflowsv1.NextTaskResponse, error)
TaskFailed(ctx context.Context, taskID uuid.UUID, display string, cancelJob bool, progressUpdates []*workflowsv1.Progress) (*workflowsv1.TaskStateResponse, error)
ExtendTaskLease(ctx context.Context, taskID uuid.UUID, requestedLease time.Duration) (*workflowsv1.TaskLease, error)
}
type TaskState ¶
type TaskState int32
TaskState is the state of a Task.
const ( TaskQueued TaskState // The task is queued and waiting to be run. TaskRunning // The task is currently running on some task runner. TaskComputed // The task has been computed and the output is available. TaskFailed // The task has failed. TaskCancelled // The task has been cancelled due to user request. )
TaskState values.
type TaskSummary ¶
type TaskSummary struct {
// ID is the unique identifier of the task.
ID uuid.UUID
// Display is the label message of the task.
Display string
// State is the state of the task.
State TaskState
// ParentID is the ID of the parent task.
ParentID uuid.UUID
// StartedAt is the time the task started.
StartedAt time.Time
// StoppedAt is the time the task stopped.
StoppedAt time.Time
}
TaskSummary is a summary of a task.
type WorkflowService ¶
type WorkflowService interface {
CreateCluster(ctx context.Context, name string) (*workflowsv1.Cluster, error)
GetCluster(ctx context.Context, slug string) (*workflowsv1.Cluster, error)
DeleteCluster(ctx context.Context, slug string) error
ListClusters(ctx context.Context) (*workflowsv1.ListClustersResponse, error)
}