Documentation
¶
Index ¶
- Constants
- func WorkflowMetadataIsComplete(o *WorkflowMetadata) bool
- func WorkflowMetadataIsRunning(o *WorkflowMetadata) bool
- type Activity
- type ActivityContext
- type CallActivityOption
- type ChildWorkflowOption
- func WithChildWorkflowAppID(appID string) ChildWorkflowOption
- func WithChildWorkflowInput(input any) ChildWorkflowOption
- func WithChildWorkflowInstanceID(instanceID string) ChildWorkflowOption
- func WithChildWorkflowRetryPolicy(policy *RetryPolicy) ChildWorkflowOption
- func WithRawChildWorkflowInput(input *wrapperspb.StringValue) ChildWorkflowOption
- type Client
- func (c *Client) FetchWorkflowMetadata(ctx context.Context, id string, opts ...FetchWorkflowMetadataOptions) (*WorkflowMetadata, error)
- func (c *Client) PurgeWorkflowState(ctx context.Context, id string, opts ...PurgeOptions) error
- func (c *Client) RaiseEvent(ctx context.Context, id, eventName string, opts ...RaiseEventOptions) error
- func (c *Client) RerunWorkflowFromEvent(ctx context.Context, id string, eventID uint32, opts ...RerunOptions) (string, error)
- func (c *Client) ResumeWorkflow(ctx context.Context, id, reason string) error
- func (c *Client) ScheduleWorkflow(ctx context.Context, orchestrator string, opts ...NewWorkflowOptions) (string, error)
- func (c *Client) StartWorker(ctx context.Context, r *Registry) error
- func (c *Client) SuspendWorkflow(ctx context.Context, id, reason string) error
- func (c *Client) TerminateWorkflow(ctx context.Context, id string, opts ...TerminateOptions) error
- func (c *Client) WaitForWorkflowCompletion(ctx context.Context, id string, opts ...FetchWorkflowMetadataOptions) (*WorkflowMetadata, error)
- func (c *Client) WaitForWorkflowStart(ctx context.Context, id string, opts ...FetchWorkflowMetadataOptions) (*WorkflowMetadata, error)
- type ContinueAsNewOption
- type CreateTimerOption
- type FetchWorkflowMetadataOptions
- type NewWorkflowOptions
- type PurgeOptions
- type RaiseEventOptions
- type Registry
- type RerunOptions
- type RetryPolicy
- type Task
- type TerminateOptions
- type Workflow
- type WorkflowContext
- func (w *WorkflowContext) CallActivity(activity any, opts ...CallActivityOption) Task
- func (w *WorkflowContext) CallChildWorkflow(workflow any, opts ...ChildWorkflowOption) Task
- func (w *WorkflowContext) ContinueAsNew(newInput any, options ...ContinueAsNewOption)
- func (w *WorkflowContext) CreateTimer(delay time.Duration, opts ...CreateTimerOption) Task
- func (w *WorkflowContext) CurrentTimeUTC() time.Time
- func (w *WorkflowContext) GetInput(v any) error
- func (w *WorkflowContext) ID() string
- func (w *WorkflowContext) IsReplaying() bool
- func (w *WorkflowContext) Name() string
- func (w *WorkflowContext) SetCustomStatus(cs string)
- func (w *WorkflowContext) WaitForExternalEvent(eventName string, timeout time.Duration) Task
- type WorkflowMetadata
Constants ¶
const ( StatusRunning = api.RUNTIME_STATUS_RUNNING StatusCompleted = api.RUNTIME_STATUS_COMPLETED StatusContinuedAsNew = api.RUNTIME_STATUS_CONTINUED_AS_NEW StatusFailed = api.RUNTIME_STATUS_FAILED StatusCanceled = api.RUNTIME_STATUS_CANCELED StatusTerminated = api.RUNTIME_STATUS_TERMINATED StatusPending = api.RUNTIME_STATUS_PENDING StatusSuspended = api.RUNTIME_STATUS_SUSPENDED )
Variables ¶
This section is empty.
Functions ¶
func WorkflowMetadataIsComplete ¶
func WorkflowMetadataIsComplete(o *WorkflowMetadata) bool
func WorkflowMetadataIsRunning ¶
func WorkflowMetadataIsRunning(o *WorkflowMetadata) bool
Types ¶
type Activity ¶
type Activity func(ActivityContext) (any, error)
Activity is the functional interface for activity implementations.
type ActivityContext ¶
type ActivityContext task.ActivityContext
ActivityContext is the context parameter type for activity implementations.
type CallActivityOption ¶
type CallActivityOption task.CallActivityOption
func WithActivityAppID ¶
func WithActivityAppID(targetAppID string) CallActivityOption
func WithActivityInput ¶
func WithActivityInput(input any) CallActivityOption
WithActivityInput configures an input for an activity invocation. The specified input must be JSON serializable.
func WithActivityRetryPolicy ¶
func WithActivityRetryPolicy(policy *RetryPolicy) CallActivityOption
func WithRawActivityInput ¶
func WithRawActivityInput(input *wrapperspb.StringValue) CallActivityOption
WithRawActivityInput configures a raw input for an activity invocation.
type ChildWorkflowOption ¶
type ChildWorkflowOption task.SubOrchestratorOption
ChildWorkflowOption is a functional option type for the CallChildWorkflow workflow method.
func WithChildWorkflowAppID ¶
func WithChildWorkflowAppID(appID string) ChildWorkflowOption
WithChildWorkflowAppID is a functional option type for the CallChildWorkflow workflow method that specifies the app ID of the target activity.
func WithChildWorkflowInput ¶
func WithChildWorkflowInput(input any) ChildWorkflowOption
WithChildWorkflowInput is a functional option type for the CallChildWorkflow workflow method that takes an input value and marshals it to JSON.
func WithChildWorkflowInstanceID ¶
func WithChildWorkflowInstanceID(instanceID string) ChildWorkflowOption
WithChildWorkflowInstanceID is a functional option type for the CallChildWorkflow workflow method that specifies the instance ID of the child-workflow.
func WithChildWorkflowRetryPolicy ¶
func WithChildWorkflowRetryPolicy(policy *RetryPolicy) ChildWorkflowOption
func WithRawChildWorkflowInput ¶
func WithRawChildWorkflowInput(input *wrapperspb.StringValue) ChildWorkflowOption
WithRawChildWorkflowInput is a functional option type for the CallChildWorkflow workflow method that takes a raw input value.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(cc grpc.ClientConnInterface) *Client
NewClient creates a client that can be used to manage worfklows.
func NewClientWithLogger ¶
func NewClientWithLogger(cc grpc.ClientConnInterface, logger backend.Logger) *Client
func (*Client) FetchWorkflowMetadata ¶
func (c *Client) FetchWorkflowMetadata(ctx context.Context, id string, opts ...FetchWorkflowMetadataOptions) (*WorkflowMetadata, error)
FetchWorkflowMetadata fetches metadata for the specified workflow from the configured task hub.
api.ErrInstanceNotFound is returned when the specified workflow doesn't exist.
func (*Client) PurgeWorkflowState ¶
PurgeWorkflowState deletes the state of the specified workflow instance.
[api.api.ErrInstanceNotFound] is returned if the specified workflow instance doesn't exist.
func (*Client) RaiseEvent ¶
func (c *Client) RaiseEvent(ctx context.Context, id, eventName string, opts ...RaiseEventOptions) error
RaiseEvent sends an asynchronous event notification to a waiting workflow.
func (*Client) RerunWorkflowFromEvent ¶
func (c *Client) RerunWorkflowFromEvent(ctx context.Context, id string, eventID uint32, opts ...RerunOptions) (string, error)
RerunWorkflowFromEvent reruns a workflow from a specific event ID of some source instance ID. If not given, a random new instance ID will be generated and returned. Can optionally give a new input to the target event ID to rerun from.
func (*Client) ResumeWorkflow ¶
ResumeWorkflow resumes an orchestration instance that was previously suspended.
func (*Client) ScheduleWorkflow ¶
func (c *Client) ScheduleWorkflow(ctx context.Context, orchestrator string, opts ...NewWorkflowOptions) (string, error)
ScheduleWorkflow schedules a new workflow instance with a specified set of options for execution.
func (*Client) StartWorker ¶
StartWorker starts the workflow runtime to process workflows.
func (*Client) SuspendWorkflow ¶
SuspendWorkflow suspends an workflow instance, halting processing of its events until a "resume" operation resumes it.
Note that suspended workflows are still considered to be "running" even though they will not process events.
func (*Client) TerminateWorkflow ¶
TerminateWorkflow terminates a running workflow by causing it to stop receiving new events and putting it directly into the TERMINATED state.
func (*Client) WaitForWorkflowCompletion ¶
func (c *Client) WaitForWorkflowCompletion(ctx context.Context, id string, opts ...FetchWorkflowMetadataOptions) (*WorkflowMetadata, error)
WaitForWorkflowCompletion waits for an workflow to complete and returns an backend.WorkflowMetadata object that contains metadata about the completed instance.
api.ErrInstanceNotFound is returned when the specified workflow doesn't exist.
func (*Client) WaitForWorkflowStart ¶
func (c *Client) WaitForWorkflowStart(ctx context.Context, id string, opts ...FetchWorkflowMetadataOptions) (*WorkflowMetadata, error)
WaitForWorkflowStart waits for an workflow to start running and returns an backend.WorkflowMetadata object that contains metadata about the started instance.
api.ErrInstanceNotFound is returned when the specified workflow doesn't exist.
type ContinueAsNewOption ¶
type ContinueAsNewOption task.ContinueAsNewOption
ContinueAsNewOption is a functional option type for the ContinueAsNew workflow method.
func WithKeepUnprocessedEvents ¶
func WithKeepUnprocessedEvents() ContinueAsNewOption
WithKeepUnprocessedEvents returns a ContinueAsNewOptions struct that instructs the runtime to carry forward any unprocessed external events to the new instance.
type CreateTimerOption ¶
type CreateTimerOption task.CreateTimerOption
func WithTimerName ¶
func WithTimerName(name string) CreateTimerOption
type FetchWorkflowMetadataOptions ¶
type FetchWorkflowMetadataOptions api.FetchOrchestrationMetadataOptions
func WithFetchPayloads ¶
func WithFetchPayloads(fetchPayloads bool) FetchWorkflowMetadataOptions
WithFetchPayloads configures whether to load workflow inputs, outputs, and custom status values, which could be large.
type NewWorkflowOptions ¶
type NewWorkflowOptions api.NewOrchestrationOptions
func WithInput ¶
func WithInput(input any) NewWorkflowOptions
WithInput configures an input for the workflow. The specified input must be serializable.
func WithInstanceID ¶
func WithInstanceID(id string) NewWorkflowOptions
WithInstanceID configures an explicit workflow instance ID. If not specified, a random UUID value will be used for the workflow instance ID.
func WithRawInput ¶
func WithRawInput(rawInput *wrapperspb.StringValue) NewWorkflowOptions
WithRawInput configures an input for the workflow. The specified input must be a string.
func WithStartTime ¶
func WithStartTime(startTime time.Time) NewWorkflowOptions
WithStartTime configures a start time at which the workflow should start running. Note that the actual start time could be later than the specified start time if the task hub is under load or if the app is not running at the specified start time.
type PurgeOptions ¶
type PurgeOptions api.PurgeOptions
func WithRecursivePurge ¶
func WithRecursivePurge(recursive bool) PurgeOptions
WithRecursivePurge configures whether to purge all child-workflows created by the target workflow.
type RaiseEventOptions ¶
type RaiseEventOptions api.RaiseEventOptions
func WithEventPayload ¶
func WithEventPayload(data any) RaiseEventOptions
WithEventPayload configures an event payload. The specified payload must be serializable.
func WithRawEventData ¶
func WithRawEventData(data *wrapperspb.StringValue) RaiseEventOptions
WithRawEventData configures an event payload that is a raw, unprocessed string (e.g. JSON data).
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry contains maps of names to corresponding orchestrator and activity functions.
func (*Registry) AddActivity ¶
AddActivity adds an activity function to the registry. The name of the activity function is determined using reflection.
func (*Registry) AddActivityN ¶
AddActivityN adds an activity function to the registry with a specified name.
func (*Registry) AddWorkflow ¶
AddWorkflow adds an orchestrator function to the registry. The name of the orchestrator function is determined using reflection.
type RerunOptions ¶
type RerunOptions api.RerunOptions
func WithRerunInput ¶
func WithRerunInput(input any) RerunOptions
func WithRerunNewInstanceID ¶
func WithRerunNewInstanceID(id string) RerunOptions
type RetryPolicy ¶
type RetryPolicy struct {
// Max number of attempts to try the activity call, first execution inclusive
MaxAttempts int
// Timespan to wait for the first retry
InitialRetryInterval time.Duration
// Used to determine rate of increase of back-off
BackoffCoefficient float64
// Max timespan to wait for a retry
MaxRetryInterval time.Duration
// Total timeout across all the retries performed
RetryTimeout time.Duration
// Optional function to control if retries should proceed
Handle func(error) bool
}
type Task ¶
Task is an interface for asynchronous durable tasks. A task is conceptually similar to a future.
type TerminateOptions ¶
type TerminateOptions api.TerminateOptions
func WithOutput ¶
func WithOutput(data any) TerminateOptions
WithOutput configures an output for the terminated workflow. The specified output must be serializable.
func WithRawOutput ¶
func WithRawOutput(data *wrapperspb.StringValue) TerminateOptions
WithRawOutput configures a raw, unprocessed output (i.e. pre-serialized) for the terminated workflow.
func WithRecursiveTerminate ¶
func WithRecursiveTerminate(recursive bool) TerminateOptions
WithRecursiveTerminate configures whether to terminate all child-workflows created by the target workflow.
type Workflow ¶
type Workflow func(ctx *WorkflowContext) (any, error)
Workflow is the functional interface for workflow functions.
type WorkflowContext ¶
type WorkflowContext struct {
// contains filtered or unexported fields
}
WorkflowContext is the parameter type for workflow functions.
func (*WorkflowContext) CallActivity ¶
func (w *WorkflowContext) CallActivity(activity any, opts ...CallActivityOption) Task
CallActivity schedules an asynchronous invocation of an activity function. The [activity] parameter can be either the name of an activity as a string or can be a pointer to the function that implements the activity, in which case the name is obtained via reflection.
func (*WorkflowContext) CallChildWorkflow ¶
func (w *WorkflowContext) CallChildWorkflow(workflow any, opts ...ChildWorkflowOption) Task
func (*WorkflowContext) ContinueAsNew ¶
func (w *WorkflowContext) ContinueAsNew(newInput any, options ...ContinueAsNewOption)
func (*WorkflowContext) CreateTimer ¶
func (w *WorkflowContext) CreateTimer(delay time.Duration, opts ...CreateTimerOption) Task
CreateTimer schedules a durable timer that expires after the specified delay.
func (*WorkflowContext) CurrentTimeUTC ¶
func (w *WorkflowContext) CurrentTimeUTC() time.Time
func (*WorkflowContext) GetInput ¶
func (w *WorkflowContext) GetInput(v any) error
GetInput unmarshals the serialized workflow input and stores it in [v].
func (*WorkflowContext) ID ¶
func (w *WorkflowContext) ID() string
func (*WorkflowContext) IsReplaying ¶
func (w *WorkflowContext) IsReplaying() bool
func (*WorkflowContext) Name ¶
func (w *WorkflowContext) Name() string
func (*WorkflowContext) SetCustomStatus ¶
func (w *WorkflowContext) SetCustomStatus(cs string)
func (*WorkflowContext) WaitForExternalEvent ¶
func (w *WorkflowContext) WaitForExternalEvent(eventName string, timeout time.Duration) Task
WaitForExternalEvent creates a task that is completed only after an event named [eventName] is received by this workflow or when the specified timeout expires.
The [timeout] parameter can be used to define a timeout for receiving the event. If the timeout expires before the named event is received, the task will be completed and will return a timeout error value [ErrTaskCanceled] when awaited. Otherwise, the awaited task will return the deserialized payload of the received event. A Duration value of zero returns a canceled task if the event isn't already available in the history. Use a negative Duration to wait indefinitely for the event to be received.
Workflows can wait for the same event name multiple times, so waiting for multiple events with the same name is allowed. Each event received by an workflow will complete just one task returned by this method.
Note that event names are case-insensitive.
type WorkflowMetadata ¶
type WorkflowMetadata protos.OrchestrationMetadata
func (WorkflowMetadata) String ¶
func (w WorkflowMetadata) String() string