workflows

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2025 License: MIT Imports: 35 Imported by: 0

Documentation

Overview

Package workflows provides a client for interacting with Tilebox Workflows.

Documentation: https://docs.tilebox.com/workflows

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Collect

func Collect[K any](seq iter.Seq2[K, error]) ([]K, error)

Collect converts any sequence into a slice.

It returns an error if any of the elements in the sequence has a non-nil error.

func GetCurrentCluster

func GetCurrentCluster(ctx context.Context) (string, error)

GetCurrentCluster returns the current cluster slug.

This function is intended to be used in tasks to get the current cluster slug.

func SetTaskDisplay

func SetTaskDisplay(ctx context.Context, display string) error

SetTaskDisplay sets the display name of the current task.

func SubmitSubtask

func SubmitSubtask(ctx context.Context, task Task, options ...subtask.SubmitOption) (subtask.FutureTask, error)

SubmitSubtask submits a task to the task runner as a subtask of the current task.

Options:

  • subtask.WithDependencies: sets the dependencies of the task.
  • subtask.WithClusterSlug: sets the cluster slug of the cluster where the task will be executed. Defaults to the cluster of the task runner.
  • subtask.WithMaxRetries: sets the maximum number of times a task can be automatically retried. Defaults to 0.

func SubmitSubtasks

func SubmitSubtasks(ctx context.Context, tasks []Task, options ...subtask.SubmitOption) ([]subtask.FutureTask, error)

SubmitSubtasks submits multiple tasks to the task runner as subtask of the current task. It is similar to SubmitSubtask, but it takes a slice of tasks instead of a single task.

func ValidateIdentifier

func ValidateIdentifier(identifier TaskIdentifier) error

ValidateIdentifier performs client-side validation on a task identifier.

func WithTaskSpan

func WithTaskSpan(ctx context.Context, name string, f func(ctx context.Context) error) error

WithTaskSpan is a helper function that wraps a function with a tracing span.

func WithTaskSpanResult

func WithTaskSpanResult[Result any](ctx context.Context, name string, f func(ctx context.Context) (Result, error)) (Result, error)

WithTaskSpanResult is a helper function that wraps a function with a tracing span. It returns the result of the function and an error if any.

Types

type Client

type Client struct {
	Jobs     JobClient
	Clusters ClusterClient
	// contains filtered or unexported fields
}

Client is a Tilebox Workflows client.

func NewClient

func NewClient(options ...ClientOption) *Client

NewClient creates a new Tilebox Datasets client.

By default, the returned Client is configured with:

  • "https://api.tilebox.com" as the URL
  • environment variable TILEBOX_API_KEY as the API key
  • a grpc.RetryHTTPClient HTTP client
  • the global tracer provider

The passed options are used to override these default values and configure the returned Client appropriately.

func (*Client) NewTaskRunner

func (c *Client) NewTaskRunner(cluster *Cluster, options ...runner.Option) (*TaskRunner, error)

NewTaskRunner creates a new TaskRunner for the specified cluster.

Options:

  • runner.WithRunnerLogger: sets the logger to use for the task runner. Defaults to slog.Default().
  • runner.WithDisableMetrics: disables OpenTelemetry metrics for the task runner.

type ClientOption

type ClientOption func(*clientConfig)

ClientOption is an interface for configuring a client. Using such options helpers is a quite common pattern in Go, as it allows for optional parameters in constructors. This concrete implementation here is inspired by how libraries such as axiom-go and connect do their configuration.

func WithAPIKey

func WithAPIKey(apiKey string) ClientOption

WithAPIKey sets the API key to use for the client.

Defaults to no API key.

func WithConnectClientOptions

func WithConnectClientOptions(options ...connect.ClientOption) ClientOption

WithConnectClientOptions sets additional options for the connect.HTTPClient.

func WithDisableTracing

func WithDisableTracing() ClientOption

WithDisableTracing disables OpenTelemetry tracing for the client.

func WithHTTPClient

func WithHTTPClient(httpClient connect.HTTPClient) ClientOption

WithHTTPClient sets the connect.HTTPClient to use for the client.

Defaults to grpc.RetryHTTPClient.

func WithURL

func WithURL(url string) ClientOption

WithURL sets the URL of the Tilebox Datasets service.

Defaults to "https://api.tilebox.com".

type Cluster

type Cluster struct {
	// Slug is the unique identifier of the cluster within the namespace.
	Slug string
	// Name is the display name of the cluster.
	Name string
}

Cluster represents a Tilebox Workflows cluster.

Documentation: https://docs.tilebox.com/workflows/concepts/clusters

type ClusterClient

type ClusterClient interface {
	Create(ctx context.Context, name string) (*Cluster, error)
	Get(ctx context.Context, slug string) (*Cluster, error)
	Delete(ctx context.Context, slug string) error
	List(ctx context.Context) ([]*Cluster, error)
}

type ExecutableTask

type ExecutableTask interface {
	Execute(ctx context.Context) error
}

ExecutableTask is the interface for a task that can be executed, and therefore be registered with a task runner.

type ExplicitlyIdentifiableTask

type ExplicitlyIdentifiableTask interface {
	Identifier() TaskIdentifier
}

ExplicitlyIdentifiableTask is the interface for a task that provides a user-defined task identifier. The identifier is used to uniquely identify the task and specify its version. If a task is not an ExplicitlyIdentifiableTask, the task runner will generate an identifier for it using reflection.

type Job

type Job struct {
	// ID is the unique identifier of the job.
	ID uuid.UUID
	// Name is the name of the job.
	Name string
	// Canceled indicates whether the job has been canceled.
	Canceled bool
	// State is the current state of the job.
	State JobState
	// SubmittedAt is the time the job was submitted.
	SubmittedAt time.Time
	// StartedAt is the time the job started running.
	StartedAt time.Time
	// TaskSummaries is the task summaries of the job.
	TaskSummaries []*TaskSummary
	// AutomationID is the ID of the automation that submitted the job.
	AutomationID uuid.UUID
}

Job represents a Tilebox Workflows job.

Documentation: https://docs.tilebox.com/workflows/concepts/jobs

type JobClient

type JobClient interface {
	Submit(ctx context.Context, jobName string, cluster *Cluster, tasks []Task, options ...job.SubmitOption) (*Job, error)
	Get(ctx context.Context, jobID uuid.UUID) (*Job, error)
	Retry(ctx context.Context, jobID uuid.UUID) (int64, error)
	Cancel(ctx context.Context, jobID uuid.UUID) error
	List(ctx context.Context, interval query.TemporalExtent) iter.Seq2[*Job, error]
}

type JobService

type JobService interface {
	SubmitJob(ctx context.Context, req *workflowsv1.SubmitJobRequest) (*workflowsv1.Job, error)
	GetJob(ctx context.Context, jobID uuid.UUID) (*workflowsv1.Job, error)
	RetryJob(ctx context.Context, jobID uuid.UUID) (*workflowsv1.RetryJobResponse, error)
	CancelJob(ctx context.Context, jobID uuid.UUID) error
	ListJobs(ctx context.Context, idInterval *workflowsv1.IDInterval, page *workflowsv1.Pagination) (*workflowsv1.ListJobsResponse, error)
}

type JobState

type JobState int32

JobState is the state of a Job.

const (
	JobQueued    JobState // The job is queued and waiting to be run.
	JobStarted            // At least one task of the job has been started.
	JobCompleted          // All tasks of the job have been completed.
)

JobState values.

type Task

type Task interface{}

Task is the interface for a task that can be submitted to the workflow service. It doesn't need to be identifiable or executable, but it can be both.

type TaskIdentifier

type TaskIdentifier interface {
	Name() string
	Version() string
	Display() string
}

TaskIdentifier is the struct that defines the unique identifier of a task. It is used to uniquely identify a task and specify its version.

func NewTaskIdentifier

func NewTaskIdentifier(name, version string) TaskIdentifier

type TaskRunner

type TaskRunner struct {
	// contains filtered or unexported fields
}

TaskRunner executes tasks.

Documentation: https://docs.tilebox.com/workflows/concepts/task-runners

func (*TaskRunner) GetRegisteredTask

func (t *TaskRunner) GetRegisteredTask(identifier TaskIdentifier) (ExecutableTask, bool)

GetRegisteredTask returns the task with the given identifier.

func (*TaskRunner) RegisterTasks

func (t *TaskRunner) RegisterTasks(tasks ...ExecutableTask) error

RegisterTasks makes the task runner aware of multiple tasks.

func (*TaskRunner) RunAll

func (t *TaskRunner) RunAll(ctx context.Context)

RunAll run the task runner and execute all tasks until there are no more tasks available.

func (*TaskRunner) RunForever

func (t *TaskRunner) RunForever(ctx context.Context)

RunForever runs the task runner forever, looking for new tasks to run and polling for new tasks when idle.

type TaskService

type TaskService interface {
	NextTask(ctx context.Context, computedTask *workflowsv1.ComputedTask, nextTaskToRun *workflowsv1.NextTaskToRun) (*workflowsv1.NextTaskResponse, error)
	TaskFailed(ctx context.Context, taskID uuid.UUID, display string, cancelJob bool) (*workflowsv1.TaskStateResponse, error)
	ExtendTaskLease(ctx context.Context, taskID uuid.UUID, requestedLease time.Duration) (*workflowsv1.TaskLease, error)
}

type TaskState

type TaskState int32

TaskState is the state of a Task.

const (
	TaskQueued    TaskState // The task is queued and waiting to be run.
	TaskRunning             // The task is currently running on some task runner.
	TaskComputed            // The task has been computed and the output is available.
	TaskFailed              // The task has failed.
	TaskCancelled           // The task has been cancelled due to user request.
)

TaskState values.

type TaskSummary

type TaskSummary struct {
	// ID is the unique identifier of the task.
	ID uuid.UUID
	// Display is the display message of the task.
	Display string
	// State is the state of the task.
	State TaskState
	// ParentID is the ID of the parent task.
	ParentID uuid.UUID
	// DependsOn is the list of task IDs that this task depends on.
	DependsOn []uuid.UUID
	// StartedAt is the time the task started.
	StartedAt time.Time
	// StoppedAt is the time the task stopped.
	StoppedAt time.Time
}

TaskSummary is a summary of a task.

type WorkflowService

type WorkflowService interface {
	CreateCluster(ctx context.Context, name string) (*workflowsv1.Cluster, error)
	GetCluster(ctx context.Context, slug string) (*workflowsv1.Cluster, error)
	DeleteCluster(ctx context.Context, slug string) error
	ListClusters(ctx context.Context) (*workflowsv1.ListClustersResponse, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL