workflow

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2025 License: Apache-2.0 Imports: 11 Imported by: 1

Documentation

Index

Constants

View Source
const (
	StatusRunning        = api.RUNTIME_STATUS_RUNNING
	StatusCompleted      = api.RUNTIME_STATUS_COMPLETED
	StatusContinuedAsNew = api.RUNTIME_STATUS_CONTINUED_AS_NEW
	StatusFailed         = api.RUNTIME_STATUS_FAILED
	StatusCanceled       = api.RUNTIME_STATUS_CANCELED
	StatusTerminated     = api.RUNTIME_STATUS_TERMINATED
	StatusPending        = api.RUNTIME_STATUS_PENDING
	StatusSuspended      = api.RUNTIME_STATUS_SUSPENDED
)

Variables

This section is empty.

Functions

func WorkflowMetadataIsComplete

func WorkflowMetadataIsComplete(o *WorkflowMetadata) bool

func WorkflowMetadataIsRunning

func WorkflowMetadataIsRunning(o *WorkflowMetadata) bool

Types

type Activity

type Activity func(ActivityContext) (any, error)

Activity is the functional interface for activity implementations.

type ActivityContext

type ActivityContext task.ActivityContext

ActivityContext is the context parameter type for activity implementations.

type CallActivityOption

type CallActivityOption task.CallActivityOption

func WithActivityAppID

func WithActivityAppID(targetAppID string) CallActivityOption

func WithActivityInput

func WithActivityInput(input any) CallActivityOption

WithActivityInput configures an input for an activity invocation. The specified input must be JSON serializable.

func WithActivityRetryPolicy

func WithActivityRetryPolicy(policy *RetryPolicy) CallActivityOption

func WithRawActivityInput

func WithRawActivityInput(input *wrapperspb.StringValue) CallActivityOption

WithRawActivityInput configures a raw input for an activity invocation.

type ChildWorkflowOption

type ChildWorkflowOption task.SubOrchestratorOption

ChildWorkflowOption is a functional option type for the CallChildWorkflow workflow method.

func WithChildWorkflowAppID

func WithChildWorkflowAppID(appID string) ChildWorkflowOption

WithChildWorkflowAppID is a functional option type for the CallChildWorkflow workflow method that specifies the app ID of the target activity.

func WithChildWorkflowInput

func WithChildWorkflowInput(input any) ChildWorkflowOption

WithChildWorkflowInput is a functional option type for the CallChildWorkflow workflow method that takes an input value and marshals it to JSON.

func WithChildWorkflowInstanceID

func WithChildWorkflowInstanceID(instanceID string) ChildWorkflowOption

WithChildWorkflowInstanceID is a functional option type for the CallChildWorkflow workflow method that specifies the instance ID of the child-workflow.

func WithChildWorkflowRetryPolicy

func WithChildWorkflowRetryPolicy(policy *RetryPolicy) ChildWorkflowOption

func WithRawChildWorkflowInput

func WithRawChildWorkflowInput(input *wrapperspb.StringValue) ChildWorkflowOption

WithRawChildWorkflowInput is a functional option type for the CallChildWorkflow workflow method that takes a raw input value.

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(cc grpc.ClientConnInterface) *Client

NewClient creates a client that can be used to manage worfklows.

func NewClientWithLogger

func NewClientWithLogger(cc grpc.ClientConnInterface, logger backend.Logger) *Client

func (*Client) FetchWorkflowMetadata

func (c *Client) FetchWorkflowMetadata(ctx context.Context, id string, opts ...FetchWorkflowMetadataOptions) (*WorkflowMetadata, error)

FetchWorkflowMetadata fetches metadata for the specified workflow from the configured task hub.

api.ErrInstanceNotFound is returned when the specified workflow doesn't exist.

func (*Client) PurgeWorkflowState

func (c *Client) PurgeWorkflowState(ctx context.Context, id string, opts ...PurgeOptions) error

PurgeWorkflowState deletes the state of the specified workflow instance.

[api.api.ErrInstanceNotFound] is returned if the specified workflow instance doesn't exist.

func (*Client) RaiseEvent

func (c *Client) RaiseEvent(ctx context.Context, id, eventName string, opts ...RaiseEventOptions) error

RaiseEvent sends an asynchronous event notification to a waiting workflow.

func (*Client) RerunWorkflowFromEvent

func (c *Client) RerunWorkflowFromEvent(ctx context.Context, id string, eventID uint32, opts ...RerunOptions) (string, error)

RerunWorkflowFromEvent reruns a workflow from a specific event ID of some source instance ID. If not given, a random new instance ID will be generated and returned. Can optionally give a new input to the target event ID to rerun from.

func (*Client) ResumeWorkflow

func (c *Client) ResumeWorkflow(ctx context.Context, id, reason string) error

ResumeWorkflow resumes an orchestration instance that was previously suspended.

func (*Client) ScheduleWorkflow

func (c *Client) ScheduleWorkflow(ctx context.Context, orchestrator string, opts ...NewWorkflowOptions) (string, error)

ScheduleWorkflow schedules a new workflow instance with a specified set of options for execution.

func (*Client) StartWorker

func (c *Client) StartWorker(ctx context.Context, r *Registry) error

StartWorker starts the workflow runtime to process workflows.

func (*Client) SuspendWorkflow

func (c *Client) SuspendWorkflow(ctx context.Context, id, reason string) error

SuspendWorkflow suspends an workflow instance, halting processing of its events until a "resume" operation resumes it.

Note that suspended workflows are still considered to be "running" even though they will not process events.

func (*Client) TerminateWorkflow

func (c *Client) TerminateWorkflow(ctx context.Context, id string, opts ...TerminateOptions) error

TerminateWorkflow terminates a running workflow by causing it to stop receiving new events and putting it directly into the TERMINATED state.

func (*Client) WaitForWorkflowCompletion

func (c *Client) WaitForWorkflowCompletion(ctx context.Context, id string, opts ...FetchWorkflowMetadataOptions) (*WorkflowMetadata, error)

WaitForWorkflowCompletion waits for an workflow to complete and returns an backend.WorkflowMetadata object that contains metadata about the completed instance.

api.ErrInstanceNotFound is returned when the specified workflow doesn't exist.

func (*Client) WaitForWorkflowStart

func (c *Client) WaitForWorkflowStart(ctx context.Context, id string, opts ...FetchWorkflowMetadataOptions) (*WorkflowMetadata, error)

WaitForWorkflowStart waits for an workflow to start running and returns an backend.WorkflowMetadata object that contains metadata about the started instance.

api.ErrInstanceNotFound is returned when the specified workflow doesn't exist.

type ContinueAsNewOption

type ContinueAsNewOption task.ContinueAsNewOption

ContinueAsNewOption is a functional option type for the ContinueAsNew workflow method.

func WithKeepUnprocessedEvents

func WithKeepUnprocessedEvents() ContinueAsNewOption

WithKeepUnprocessedEvents returns a ContinueAsNewOptions struct that instructs the runtime to carry forward any unprocessed external events to the new instance.

type CreateTimerOption

type CreateTimerOption task.CreateTimerOption

func WithTimerName

func WithTimerName(name string) CreateTimerOption

type FetchWorkflowMetadataOptions

type FetchWorkflowMetadataOptions api.FetchOrchestrationMetadataOptions

func WithFetchPayloads

func WithFetchPayloads(fetchPayloads bool) FetchWorkflowMetadataOptions

WithFetchPayloads configures whether to load workflow inputs, outputs, and custom status values, which could be large.

type NewWorkflowOptions

type NewWorkflowOptions api.NewOrchestrationOptions

func WithInput

func WithInput(input any) NewWorkflowOptions

WithInput configures an input for the workflow. The specified input must be serializable.

func WithInstanceID

func WithInstanceID(id string) NewWorkflowOptions

WithInstanceID configures an explicit workflow instance ID. If not specified, a random UUID value will be used for the workflow instance ID.

func WithRawInput

func WithRawInput(rawInput *wrapperspb.StringValue) NewWorkflowOptions

WithRawInput configures an input for the workflow. The specified input must be a string.

func WithStartTime

func WithStartTime(startTime time.Time) NewWorkflowOptions

WithStartTime configures a start time at which the workflow should start running. Note that the actual start time could be later than the specified start time if the task hub is under load or if the app is not running at the specified start time.

type PurgeOptions

type PurgeOptions api.PurgeOptions

func WithRecursivePurge

func WithRecursivePurge(recursive bool) PurgeOptions

WithRecursivePurge configures whether to purge all child-workflows created by the target workflow.

type RaiseEventOptions

type RaiseEventOptions api.RaiseEventOptions

func WithEventPayload

func WithEventPayload(data any) RaiseEventOptions

WithEventPayload configures an event payload. The specified payload must be serializable.

func WithRawEventData

func WithRawEventData(data *wrapperspb.StringValue) RaiseEventOptions

WithRawEventData configures an event payload that is a raw, unprocessed string (e.g. JSON data).

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry contains maps of names to corresponding orchestrator and activity functions.

func NewRegistry

func NewRegistry() *Registry

NewRegistry returns a new Registry struct.

func (*Registry) AddActivity

func (r *Registry) AddActivity(a Activity) error

AddActivity adds an activity function to the registry. The name of the activity function is determined using reflection.

func (*Registry) AddActivityN

func (r *Registry) AddActivityN(name string, a Activity) error

AddActivityN adds an activity function to the registry with a specified name.

func (*Registry) AddWorkflow

func (r *Registry) AddWorkflow(w Workflow) error

AddWorkflow adds an orchestrator function to the registry. The name of the orchestrator function is determined using reflection.

func (*Registry) AddWorkflowN

func (r *Registry) AddWorkflowN(name string, w Workflow) error

AddWorkflowN adds an orchestrator function to the registry with a specified name.

type RerunOptions

type RerunOptions api.RerunOptions

func WithRerunInput

func WithRerunInput(input any) RerunOptions

func WithRerunNewInstanceID

func WithRerunNewInstanceID(id string) RerunOptions

type RetryPolicy

type RetryPolicy struct {
	// Max number of attempts to try the activity call, first execution inclusive
	MaxAttempts int
	// Timespan to wait for the first retry
	InitialRetryInterval time.Duration
	// Used to determine rate of increase of back-off
	BackoffCoefficient float64
	// Max timespan to wait for a retry
	MaxRetryInterval time.Duration
	// Total timeout across all the retries performed
	RetryTimeout time.Duration
	// Optional function to control if retries should proceed
	Handle func(error) bool
}

type Task

type Task task.Task

Task is an interface for asynchronous durable tasks. A task is conceptually similar to a future.

type TerminateOptions

type TerminateOptions api.TerminateOptions

func WithOutput

func WithOutput(data any) TerminateOptions

WithOutput configures an output for the terminated workflow. The specified output must be serializable.

func WithRawOutput

func WithRawOutput(data *wrapperspb.StringValue) TerminateOptions

WithRawOutput configures a raw, unprocessed output (i.e. pre-serialized) for the terminated workflow.

func WithRecursiveTerminate

func WithRecursiveTerminate(recursive bool) TerminateOptions

WithRecursiveTerminate configures whether to terminate all child-workflows created by the target workflow.

type Workflow

type Workflow func(ctx *WorkflowContext) (any, error)

Workflow is the functional interface for workflow functions.

type WorkflowContext

type WorkflowContext struct {
	// contains filtered or unexported fields
}

WorkflowContext is the parameter type for workflow functions.

func (*WorkflowContext) CallActivity

func (w *WorkflowContext) CallActivity(activity any, opts ...CallActivityOption) Task

CallActivity schedules an asynchronous invocation of an activity function. The [activity] parameter can be either the name of an activity as a string or can be a pointer to the function that implements the activity, in which case the name is obtained via reflection.

func (*WorkflowContext) CallChildWorkflow

func (w *WorkflowContext) CallChildWorkflow(workflow any, opts ...ChildWorkflowOption) Task

func (*WorkflowContext) ContinueAsNew

func (w *WorkflowContext) ContinueAsNew(newInput any, options ...ContinueAsNewOption)

func (*WorkflowContext) CreateTimer

func (w *WorkflowContext) CreateTimer(delay time.Duration, opts ...CreateTimerOption) Task

CreateTimer schedules a durable timer that expires after the specified delay.

func (*WorkflowContext) CurrentTimeUTC

func (w *WorkflowContext) CurrentTimeUTC() time.Time

func (*WorkflowContext) GetInput

func (w *WorkflowContext) GetInput(v any) error

GetInput unmarshals the serialized workflow input and stores it in [v].

func (*WorkflowContext) ID

func (w *WorkflowContext) ID() string

func (*WorkflowContext) IsReplaying

func (w *WorkflowContext) IsReplaying() bool

func (*WorkflowContext) Name

func (w *WorkflowContext) Name() string

func (*WorkflowContext) SetCustomStatus

func (w *WorkflowContext) SetCustomStatus(cs string)

func (*WorkflowContext) WaitForExternalEvent

func (w *WorkflowContext) WaitForExternalEvent(eventName string, timeout time.Duration) Task

WaitForExternalEvent creates a task that is completed only after an event named [eventName] is received by this workflow or when the specified timeout expires.

The [timeout] parameter can be used to define a timeout for receiving the event. If the timeout expires before the named event is received, the task will be completed and will return a timeout error value [ErrTaskCanceled] when awaited. Otherwise, the awaited task will return the deserialized payload of the received event. A Duration value of zero returns a canceled task if the event isn't already available in the history. Use a negative Duration to wait indefinitely for the event to be received.

Workflows can wait for the same event name multiple times, so waiting for multiple events with the same name is allowed. Each event received by an workflow will complete just one task returned by this method.

Note that event names are case-insensitive.

type WorkflowMetadata

type WorkflowMetadata protos.OrchestrationMetadata

func (WorkflowMetadata) String

func (w WorkflowMetadata) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL