Documentation
¶
Overview ¶
Package workflows provides a client for interacting with Tilebox Workflows.
Documentation: https://docs.tilebox.com/workflows
Index ¶
- func Collect[K any](seq iter.Seq2[K, error]) ([]K, error)
- func GetCurrentCluster(ctx context.Context) (string, error)
- func SetTaskDisplay(ctx context.Context, display string) error
- func SubmitSubtask(ctx context.Context, task Task, options ...subtask.SubmitOption) (subtask.FutureTask, error)
- func SubmitSubtasks(ctx context.Context, tasks []Task, options ...subtask.SubmitOption) ([]subtask.FutureTask, error)
- func ValidateIdentifier(identifier TaskIdentifier) error
- func WithTaskSpan(ctx context.Context, name string, f func(ctx context.Context) error) error
- func WithTaskSpanResult[Result any](ctx context.Context, name string, f func(ctx context.Context) (Result, error)) (Result, error)
- type Client
- type ClientOption
- type Cluster
- type ClusterClient
- type ExecutableTask
- type ExplicitlyIdentifiableTask
- type Job
- type JobClient
- type JobService
- type JobState
- type Task
- type TaskIdentifier
- type TaskRunner
- type TaskService
- type TaskState
- type TaskSummary
- type WorkflowService
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Collect ¶
Collect converts any sequence into a slice.
It returns an error if any of the elements in the sequence has a non-nil error.
func GetCurrentCluster ¶
GetCurrentCluster returns the current cluster slug.
This function is intended to be used in tasks to get the current cluster slug.
func SetTaskDisplay ¶
SetTaskDisplay sets the display name of the current task.
func SubmitSubtask ¶
func SubmitSubtask(ctx context.Context, task Task, options ...subtask.SubmitOption) (subtask.FutureTask, error)
SubmitSubtask submits a task to the task runner as a subtask of the current task.
Options:
- subtask.WithDependencies: sets the dependencies of the task.
- subtask.WithClusterSlug: sets the cluster slug of the cluster where the task will be executed. Defaults to the cluster of the task runner.
- subtask.WithMaxRetries: sets the maximum number of times a task can be automatically retried. Defaults to 0.
func SubmitSubtasks ¶
func SubmitSubtasks(ctx context.Context, tasks []Task, options ...subtask.SubmitOption) ([]subtask.FutureTask, error)
SubmitSubtasks submits multiple tasks to the task runner as subtask of the current task. It is similar to SubmitSubtask, but it takes a slice of tasks instead of a single task.
func ValidateIdentifier ¶
func ValidateIdentifier(identifier TaskIdentifier) error
ValidateIdentifier performs client-side validation on a task identifier.
func WithTaskSpan ¶
WithTaskSpan is a helper function that wraps a function with a tracing span.
Types ¶
type Client ¶
type Client struct { Jobs JobClient Clusters ClusterClient // contains filtered or unexported fields }
Client is a Tilebox Workflows client.
func NewClient ¶
func NewClient(options ...ClientOption) *Client
NewClient creates a new Tilebox Datasets client.
By default, the returned Client is configured with:
- "https://api.tilebox.com" as the URL
- environment variable TILEBOX_API_KEY as the API key
- a grpc.RetryHTTPClient HTTP client
- the global tracer provider
The passed options are used to override these default values and configure the returned Client appropriately.
func (*Client) NewTaskRunner ¶
NewTaskRunner creates a new TaskRunner for the specified cluster.
Options:
- runner.WithRunnerLogger: sets the logger to use for the task runner. Defaults to slog.Default().
- runner.WithDisableMetrics: disables OpenTelemetry metrics for the task runner.
type ClientOption ¶
type ClientOption func(*clientConfig)
ClientOption is an interface for configuring a client. Using such options helpers is a quite common pattern in Go, as it allows for optional parameters in constructors. This concrete implementation here is inspired by how libraries such as axiom-go and connect do their configuration.
func WithAPIKey ¶
func WithAPIKey(apiKey string) ClientOption
WithAPIKey sets the API key to use for the client.
Defaults to no API key.
func WithConnectClientOptions ¶
func WithConnectClientOptions(options ...connect.ClientOption) ClientOption
WithConnectClientOptions sets additional options for the connect.HTTPClient.
func WithDisableTracing ¶
func WithDisableTracing() ClientOption
WithDisableTracing disables OpenTelemetry tracing for the client.
func WithHTTPClient ¶
func WithHTTPClient(httpClient connect.HTTPClient) ClientOption
WithHTTPClient sets the connect.HTTPClient to use for the client.
Defaults to grpc.RetryHTTPClient.
func WithURL ¶
func WithURL(url string) ClientOption
WithURL sets the URL of the Tilebox Datasets service.
Defaults to "https://api.tilebox.com".
type Cluster ¶
type Cluster struct { // Slug is the unique identifier of the cluster within the namespace. Slug string // Name is the display name of the cluster. Name string }
Cluster represents a Tilebox Workflows cluster.
Documentation: https://docs.tilebox.com/workflows/concepts/clusters
type ClusterClient ¶
type ExecutableTask ¶
ExecutableTask is the interface for a task that can be executed, and therefore be registered with a task runner.
type ExplicitlyIdentifiableTask ¶
type ExplicitlyIdentifiableTask interface {
Identifier() TaskIdentifier
}
ExplicitlyIdentifiableTask is the interface for a task that provides a user-defined task identifier. The identifier is used to uniquely identify the task and specify its version. If a task is not an ExplicitlyIdentifiableTask, the task runner will generate an identifier for it using reflection.
type Job ¶
type Job struct { // ID is the unique identifier of the job. ID uuid.UUID // Name is the name of the job. Name string // Canceled indicates whether the job has been canceled. Canceled bool // State is the current state of the job. State JobState // SubmittedAt is the time the job was submitted. SubmittedAt time.Time // StartedAt is the time the job started running. StartedAt time.Time // TaskSummaries is the task summaries of the job. TaskSummaries []*TaskSummary // AutomationID is the ID of the automation that submitted the job. AutomationID uuid.UUID }
Job represents a Tilebox Workflows job.
Documentation: https://docs.tilebox.com/workflows/concepts/jobs
type JobClient ¶
type JobClient interface { Submit(ctx context.Context, jobName string, cluster *Cluster, tasks []Task, options ...job.SubmitOption) (*Job, error) Get(ctx context.Context, jobID uuid.UUID) (*Job, error) Retry(ctx context.Context, jobID uuid.UUID) (int64, error) Cancel(ctx context.Context, jobID uuid.UUID) error List(ctx context.Context, interval query.TemporalExtent) iter.Seq2[*Job, error] }
type JobService ¶
type JobService interface { SubmitJob(ctx context.Context, req *workflowsv1.SubmitJobRequest) (*workflowsv1.Job, error) GetJob(ctx context.Context, jobID uuid.UUID) (*workflowsv1.Job, error) RetryJob(ctx context.Context, jobID uuid.UUID) (*workflowsv1.RetryJobResponse, error) CancelJob(ctx context.Context, jobID uuid.UUID) error ListJobs(ctx context.Context, idInterval *workflowsv1.IDInterval, page *workflowsv1.Pagination) (*workflowsv1.ListJobsResponse, error) }
type JobState ¶
type JobState int32
JobState is the state of a Job.
const ( JobQueued JobState // The job is queued and waiting to be run. JobStarted // At least one task of the job has been started. JobCompleted // All tasks of the job have been completed. )
JobState values.
type Task ¶
type Task interface{}
Task is the interface for a task that can be submitted to the workflow service. It doesn't need to be identifiable or executable, but it can be both.
type TaskIdentifier ¶
TaskIdentifier is the struct that defines the unique identifier of a task. It is used to uniquely identify a task and specify its version.
func NewTaskIdentifier ¶
func NewTaskIdentifier(name, version string) TaskIdentifier
type TaskRunner ¶
type TaskRunner struct {
// contains filtered or unexported fields
}
TaskRunner executes tasks.
Documentation: https://docs.tilebox.com/workflows/concepts/task-runners
func (*TaskRunner) GetRegisteredTask ¶
func (t *TaskRunner) GetRegisteredTask(identifier TaskIdentifier) (ExecutableTask, bool)
GetRegisteredTask returns the task with the given identifier.
func (*TaskRunner) RegisterTasks ¶
func (t *TaskRunner) RegisterTasks(tasks ...ExecutableTask) error
RegisterTasks makes the task runner aware of multiple tasks.
func (*TaskRunner) RunAll ¶
func (t *TaskRunner) RunAll(ctx context.Context)
RunAll run the task runner and execute all tasks until there are no more tasks available.
func (*TaskRunner) RunForever ¶
func (t *TaskRunner) RunForever(ctx context.Context)
RunForever runs the task runner forever, looking for new tasks to run and polling for new tasks when idle.
type TaskService ¶
type TaskService interface { NextTask(ctx context.Context, computedTask *workflowsv1.ComputedTask, nextTaskToRun *workflowsv1.NextTaskToRun) (*workflowsv1.NextTaskResponse, error) TaskFailed(ctx context.Context, taskID uuid.UUID, display string, cancelJob bool) (*workflowsv1.TaskStateResponse, error) ExtendTaskLease(ctx context.Context, taskID uuid.UUID, requestedLease time.Duration) (*workflowsv1.TaskLease, error) }
type TaskState ¶
type TaskState int32
TaskState is the state of a Task.
const ( TaskQueued TaskState // The task is queued and waiting to be run. TaskRunning // The task is currently running on some task runner. TaskComputed // The task has been computed and the output is available. TaskFailed // The task has failed. TaskCancelled // The task has been cancelled due to user request. )
TaskState values.
type TaskSummary ¶
type TaskSummary struct { // ID is the unique identifier of the task. ID uuid.UUID // Display is the display message of the task. Display string // State is the state of the task. State TaskState // ParentID is the ID of the parent task. ParentID uuid.UUID // DependsOn is the list of task IDs that this task depends on. DependsOn []uuid.UUID // StartedAt is the time the task started. StartedAt time.Time // StoppedAt is the time the task stopped. StoppedAt time.Time }
TaskSummary is a summary of a task.
type WorkflowService ¶
type WorkflowService interface { CreateCluster(ctx context.Context, name string) (*workflowsv1.Cluster, error) GetCluster(ctx context.Context, slug string) (*workflowsv1.Cluster, error) DeleteCluster(ctx context.Context, slug string) error ListClusters(ctx context.Context) (*workflowsv1.ListClustersResponse, error) }