workflows

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2025 License: MIT Imports: 38 Imported by: 0

Documentation

Overview

Package workflows provides a client for interacting with Tilebox Workflows.

Documentation: https://docs.tilebox.com/workflows

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Collect

func Collect[K any](seq iter.Seq2[K, error]) ([]K, error)

Collect converts any sequence into a slice.

It returns an error if any of the elements in the sequence has a non-nil error.

func GetCurrentCluster

func GetCurrentCluster(ctx context.Context) (string, error)

GetCurrentCluster returns the current cluster slug.

This function is intended to be used in tasks to get the current cluster slug.

func SetTaskDisplay

func SetTaskDisplay(ctx context.Context, display string) error

SetTaskDisplay sets the label name of the current task.

func SubmitSubtask

func SubmitSubtask(ctx context.Context, task Task, options ...subtask.SubmitOption) (subtask.FutureTask, error)

SubmitSubtask submits a task to the task runner as a subtask of the current task.

Options:

  • subtask.WithDependencies: sets the dependencies of the task.
  • subtask.WithClusterSlug: sets the cluster slug of the cluster where the task will be executed. Defaults to the cluster of the task runner.
  • subtask.WithMaxRetries: sets the maximum number of times a task can be automatically retried. Defaults to 0.

func SubmitSubtasks

func SubmitSubtasks(ctx context.Context, tasks []Task, options ...subtask.SubmitOption) ([]subtask.FutureTask, error)

SubmitSubtasks submits multiple tasks to the task runner as subtask of the current task. It is similar to SubmitSubtask, but it takes a slice of tasks instead of a single task.

func ValidateIdentifier

func ValidateIdentifier(identifier TaskIdentifier) error

ValidateIdentifier performs client-side validation on a task identifier.

func WithTaskSpan

func WithTaskSpan(ctx context.Context, name string, f func(ctx context.Context) error) error

WithTaskSpan is a helper function that wraps a function with a tracing span.

func WithTaskSpanResult

func WithTaskSpanResult[Result any](ctx context.Context, name string, f func(ctx context.Context) (Result, error)) (Result, error)

WithTaskSpanResult is a helper function that wraps a function with a tracing span. It returns the result of the function and an error if any.

Types

type Client

type Client struct {
	Jobs     JobClient
	Clusters ClusterClient
	// contains filtered or unexported fields
}

Client is a Tilebox Workflows client.

func NewClient

func NewClient(options ...ClientOption) *Client

NewClient creates a new Tilebox Datasets client.

By default, the returned Client is configured with:

  • "https://api.tilebox.com" as the URL
  • environment variable TILEBOX_API_KEY as the API key
  • a grpc.RetryHTTPClient HTTP client
  • the global tracer provider

The passed options are used to override these default values and configure the returned Client appropriately.

func (*Client) NewTaskRunner

func (c *Client) NewTaskRunner(ctx context.Context, options ...runner.Option) (*TaskRunner, error)

NewTaskRunner creates a new TaskRunner.

Options:

  • runner.WithClusterSlug: sets the cluster for which to listen tasks to. Default to the default cluster.
  • runner.WithRunnerLogger: sets the logger to use for the task runner. Defaults to slog.Default().
  • runner.WithDisableMetrics: disables OpenTelemetry metrics for the task runner.

type ClientOption

type ClientOption func(*clientConfig)

ClientOption is an interface for configuring a client. Using such options helpers is a quite common pattern in Go, as it allows for optional parameters in constructors. This concrete implementation here is inspired by how libraries such as axiom-go and connect do their configuration.

func WithAPIKey

func WithAPIKey(apiKey string) ClientOption

WithAPIKey sets the API key to use for the client.

Defaults to no API key.

func WithConnectClientOptions

func WithConnectClientOptions(options ...connect.ClientOption) ClientOption

WithConnectClientOptions sets additional options for the connect.HTTPClient.

func WithDisableTracing

func WithDisableTracing() ClientOption

WithDisableTracing disables OpenTelemetry tracing for the client.

func WithHTTPClient

func WithHTTPClient(httpClient connect.HTTPClient) ClientOption

WithHTTPClient sets the connect.HTTPClient to use for the client.

Defaults to grpc.RetryHTTPClient.

func WithURL

func WithURL(url string) ClientOption

WithURL sets the URL of the Tilebox Datasets service.

Defaults to "https://api.tilebox.com".

type Cluster

type Cluster struct {
	// Slug is the unique identifier of the cluster within the namespace.
	Slug string
	// Name is the label name of the cluster.
	Name string
	// Deletable is true when the cluster can be deleted.
	Deletable bool
}

Cluster represents a Tilebox Workflows cluster.

Documentation: https://docs.tilebox.com/workflows/concepts/clusters

type ClusterClient

type ClusterClient interface {
	// Create creates a new cluster with the given name.
	Create(ctx context.Context, name string) (*Cluster, error)

	// Get returns a cluster by its slug.
	Get(ctx context.Context, slug string) (*Cluster, error)

	// Delete deletes a cluster by its slug.
	Delete(ctx context.Context, slug string) error

	// List returns a list of all available clusters.
	List(ctx context.Context) ([]*Cluster, error)
}

type ExecutableTask

type ExecutableTask interface {
	Execute(ctx context.Context) error
}

ExecutableTask is the interface for a task that can be executed, and therefore be registered with a task runner.

type ExplicitlyIdentifiableTask

type ExplicitlyIdentifiableTask interface {
	Identifier() TaskIdentifier
}

ExplicitlyIdentifiableTask is the interface for a task that provides a user-defined task identifier. The identifier is used to uniquely identify the task and specify its version. If a task is not an ExplicitlyIdentifiableTask, the task runner will generate an identifier for it using reflection.

type Job

type Job struct {
	// ID is the unique identifier of the job.
	ID uuid.UUID
	// Name is the name of the job.
	Name string
	// Canceled indicates whether the job has been canceled.
	Canceled bool
	// State is the current state of the job.
	State JobState
	// SubmittedAt is the time the job was submitted.
	SubmittedAt time.Time
	// StartedAt is the time the job started running.
	StartedAt time.Time
	// TaskSummaries is the task summaries of the job.
	TaskSummaries []*TaskSummary
	// AutomationID is the ID of the automation that submitted the job.
	AutomationID uuid.UUID
	// Progress is a list of progress indicators for the job.
	Progress []*ProgressIndicator
}

Job represents a Tilebox Workflows job.

Documentation: https://docs.tilebox.com/workflows/concepts/jobs

type JobClient

type JobClient interface {
	// Submit submits a job to a cluster.
	//
	// Options:
	//   - job.WithMaxRetries: sets the maximum number of times a job can be automatically retried. Defaults to 0.
	//   - job.WithClusterSlug: sets the cluster slug of the cluster where the job will be executed. Defaults to the default cluster.
	//
	// Documentation: https://docs.tilebox.com/workflows/concepts/jobs#submission
	Submit(ctx context.Context, jobName string, tasks []Task, options ...job.SubmitOption) (*Job, error)

	// Get returns a job by its ID.
	Get(ctx context.Context, jobID uuid.UUID) (*Job, error)

	// Retry retries a job.
	//
	// Returns the number of rescheduled tasks.
	//
	// Documentation: https://docs.tilebox.com/workflows/concepts/jobs#retry-handling
	Retry(ctx context.Context, jobID uuid.UUID) (int64, error)

	// Cancel cancels a job.
	//
	// Documentation: https://docs.tilebox.com/workflows/concepts/jobs#cancellation
	Cancel(ctx context.Context, jobID uuid.UUID) error

	// Query returns a list of all jobs within the given interval.
	//
	// Options:
	//   - job.WithTemporalExtent: specifies the time or ID interval for which jobs should be queried (Required)
	//   - job.WithAutomationID: specifies the automation ID to filter jobs by. Only jobs submitted by the specified
	//  automation will be returned. (Optional)
	//
	// The jobs are lazily loaded and returned as a sequence.
	// The output sequence can be transformed into a slice using Collect.
	Query(ctx context.Context, options ...job.QueryOption) iter.Seq2[*Job, error]
}

type JobService

type JobService interface {
	SubmitJob(ctx context.Context, req *workflowsv1.SubmitJobRequest) (*workflowsv1.Job, error)
	GetJob(ctx context.Context, jobID uuid.UUID) (*workflowsv1.Job, error)
	RetryJob(ctx context.Context, jobID uuid.UUID) (*workflowsv1.RetryJobResponse, error)
	CancelJob(ctx context.Context, jobID uuid.UUID) error
	QueryJobs(ctx context.Context, filters *workflowsv1.QueryFilters, page *tileboxv1.Pagination) (*workflowsv1.QueryJobsResponse, error)
}

type JobState

type JobState int32

JobState is the state of a Job.

const (
	JobQueued    JobState // The job is queued and waiting to be run.
	JobStarted            // At least one task of the job has been started.
	JobCompleted          // All tasks of the job have been completed.
)

JobState values.

type ProgressIndicator added in v0.1.0

type ProgressIndicator struct {
	Label string
	Total uint64
	Done  uint64
}

type ProgressTracker added in v0.1.0

type ProgressTracker interface {
	// Add a given amount of total work to be done to the progress indicator.
	Add(context.Context, uint64) error

	// Done marks a given amount of work as done.
	Done(context.Context, uint64) error
}

ProgressTracker is an interface for updating the total work and completed work units for a named progress indicator for a job.

func DefaultProgress added in v0.1.0

func DefaultProgress() ProgressTracker

DefaultProgress returns the default, unnamed progress indicator instance for tracking job progress.

func Progress added in v0.1.0

func Progress(label string) ProgressTracker

Progress returns a named progress indicator instance for tracking job progress.

type Task

type Task interface{}

Task is the interface for a task that can be submitted to the workflow service. It doesn't need to be identifiable or executable, but it can be both.

type TaskIdentifier

type TaskIdentifier interface {
	Name() string
	Version() string
	Display() string
}

TaskIdentifier is the struct that defines the unique identifier of a task. It is used to uniquely identify a task and specify its version.

func NewTaskIdentifier

func NewTaskIdentifier(name, version string) TaskIdentifier

type TaskRunner

type TaskRunner struct {
	// contains filtered or unexported fields
}

TaskRunner executes tasks.

Documentation: https://docs.tilebox.com/workflows/concepts/task-runners

func (*TaskRunner) GetRegisteredTask

func (t *TaskRunner) GetRegisteredTask(identifier TaskIdentifier) (ExecutableTask, bool)

GetRegisteredTask returns the task with the given identifier.

func (*TaskRunner) RegisterTasks

func (t *TaskRunner) RegisterTasks(tasks ...ExecutableTask) error

RegisterTasks makes the task runner aware of multiple tasks.

func (*TaskRunner) RunAll

func (t *TaskRunner) RunAll(ctx context.Context)

RunAll run the task runner and execute all tasks until there are no more tasks available.

func (*TaskRunner) RunForever

func (t *TaskRunner) RunForever(ctx context.Context)

RunForever runs the task runner forever, looking for new tasks to run and polling for new tasks when idle.

type TaskService

type TaskService interface {
	NextTask(ctx context.Context, computedTask *workflowsv1.ComputedTask, nextTaskToRun *workflowsv1.NextTaskToRun) (*workflowsv1.NextTaskResponse, error)
	TaskFailed(ctx context.Context, taskID uuid.UUID, display string, cancelJob bool, progressUpdates []*workflowsv1.Progress) (*workflowsv1.TaskStateResponse, error)
	ExtendTaskLease(ctx context.Context, taskID uuid.UUID, requestedLease time.Duration) (*workflowsv1.TaskLease, error)
}

type TaskState

type TaskState int32

TaskState is the state of a Task.

const (
	TaskQueued    TaskState // The task is queued and waiting to be run.
	TaskRunning             // The task is currently running on some task runner.
	TaskComputed            // The task has been computed and the output is available.
	TaskFailed              // The task has failed.
	TaskCancelled           // The task has been cancelled due to user request.
)

TaskState values.

type TaskSummary

type TaskSummary struct {
	// ID is the unique identifier of the task.
	ID uuid.UUID
	// Display is the label message of the task.
	Display string
	// State is the state of the task.
	State TaskState
	// ParentID is the ID of the parent task.
	ParentID uuid.UUID
	// StartedAt is the time the task started.
	StartedAt time.Time
	// StoppedAt is the time the task stopped.
	StoppedAt time.Time
}

TaskSummary is a summary of a task.

type WorkflowService

type WorkflowService interface {
	CreateCluster(ctx context.Context, name string) (*workflowsv1.Cluster, error)
	GetCluster(ctx context.Context, slug string) (*workflowsv1.Cluster, error)
	DeleteCluster(ctx context.Context, slug string) error
	ListClusters(ctx context.Context) (*workflowsv1.ListClustersResponse, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL