workflow

package
v0.58.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package workflow provides functionality for defining, managing, and executing workflows in Hatchet. A workflow is a collection of tasks with defined dependencies and execution logic.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunChildWorkflow

func RunChildWorkflow[I any, O any](
	ctx worker.HatchetContext,
	workflow WorkflowDeclaration[I, O],
	input I,
	opts ...RunOpts,
) (*O, error)

RunChildWorkflow is a helper function to run a child workflow with full type safety It takes the parent context, the child workflow declaration, and input Returns the typed output of the child workflow

Types

type DurableWrappedTaskFn

type DurableWrappedTaskFn func(ctx worker.DurableHatchetContext) (interface{}, error)

DurableWrappedTaskFn represents a durable task function that can be executed by the Hatchet worker. It takes a DurableHatchetContext and returns an interface{} result and an error.

type NamedFunction

type NamedFunction struct {
	ActionID string
	Fn       WrappedTaskFn
}

NamedFunction represents a function with its associated action ID

type RunAsChildOpts

type RunAsChildOpts struct {
	RunOpts
	Sticky *bool
	Key    *string
}

type RunOpts

type RunOpts struct {
	AdditionalMetadata *map[string]interface{}
	// contains filtered or unexported fields
}

type TaskWithSpecificOutput

type TaskWithSpecificOutput[I any, T any] struct {
	Name string
	Fn   func(ctx worker.HatchetContext, input I) (*T, error)
}

Define a TaskDeclaration with specific output type

type WorkflowBase

type WorkflowBase interface {
	// Dump converts the workflow declaration into a protobuf request and function mappings.
	// Returns the workflow definition, regular task functions, durable task functions, and the on failure task function.
	Dump() (*contracts.CreateWorkflowVersionRequest, []NamedFunction, []NamedFunction, WrappedTaskFn)
}

WorkflowBase defines the common interface for all workflow types.

type WorkflowDeclaration

type WorkflowDeclaration[I, O any] interface {
	WorkflowBase

	// Task registers a task that will be executed as part of the workflow
	Task(opts create.WorkflowTask[I, O], fn func(ctx worker.HatchetContext, input I) (interface{}, error)) *task.TaskDeclaration[I]

	// DurableTask registers a durable task that will be executed as part of the workflow.
	// Durable tasks can be paused and resumed across workflow runs, making them suitable
	// for long-running operations or tasks that require human intervention.
	DurableTask(opts create.WorkflowTask[I, O], fn func(ctx worker.DurableHatchetContext, input I) (interface{}, error)) *task.DurableTaskDeclaration[I]

	// OnFailureTask registers a task that will be executed if the workflow fails.
	OnFailure(opts create.WorkflowOnFailureTask[I, O], fn func(ctx worker.HatchetContext, input I) (interface{}, error)) *task.OnFailureTaskDeclaration[I]

	// Run executes the workflow with the provided input.
	Run(ctx context.Context, input I, opts ...RunOpts) (*O, error)

	// RunChild executes a child workflow with the provided input.
	RunAsChild(ctx worker.HatchetContext, input I, opts ...RunAsChildOpts) (*O, error)

	// RunNoWait executes the workflow with the provided input without waiting for it to complete.
	// Instead it returns a run ID that can be used to check the status of the workflow.
	RunNoWait(ctx context.Context, input I, opts ...RunOpts) (*v0Client.Workflow, error)

	// RunBulkNoWait executes the workflow with the provided inputs without waiting for them to complete.
	RunBulkNoWait(ctx context.Context, input []I, opts ...RunOpts) ([]string, error)

	// Cron schedules the workflow to run on a regular basis using a cron expression.
	Cron(ctx context.Context, name string, cronExpr string, input I, opts ...RunOpts) (*rest.CronWorkflows, error)

	// Schedule schedules the workflow to run at a specific time.
	Schedule(ctx context.Context, triggerAt time.Time, input I, opts ...RunOpts) (*rest.ScheduledWorkflows, error)

	// Get retrieves the current state of the workflow.
	Get(ctx context.Context) (*rest.Workflow, error)

	// Metrics retrieves metrics for this workflow.
	Metrics(ctx context.Context, opts ...rest.WorkflowGetMetricsParams) (*rest.WorkflowMetrics, error)

	// QueueMetrics retrieves queue metrics for this workflow.
	QueueMetrics(ctx context.Context, opts ...rest.TenantGetQueueMetricsParams) (*rest.TenantGetQueueMetricsResponse, error)
}

WorkflowDeclaration represents a workflow with input type I and output type O. It provides methods to define tasks, specify dependencies, and execute the workflow.

func NewWorkflowDeclaration

func NewWorkflowDeclaration[I any, O any](opts create.WorkflowCreateOpts[I], v0 v0Client.Client) WorkflowDeclaration[I, O]

NewWorkflowDeclaration creates a new workflow declaration with the specified options and client. The workflow will have input type I and output type O.

type WrappedTaskFn

type WrappedTaskFn func(ctx worker.HatchetContext) (interface{}, error)

WrappedTaskFn represents a task function that can be executed by the Hatchet worker. It takes a HatchetContext and returns an interface{} result and an error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL