Documentation
¶
Index ¶
- Variables
- func NewTaskExecutor(registry *TaskRegistry) backend.Executor
- type Activity
- type ActivityContext
- type CallActivityOption
- type CallActivityOptionFunc
- func WithActivityAppID(targetAppID string) CallActivityOptionFunc
- func WithActivityAppNamespace(namespace string) CallActivityOptionFunc
- func WithActivityInput(input any) CallActivityOptionFunc
- func WithActivityRetryPolicy(policy *RetryPolicy) CallActivityOptionFunc
- func WithRawActivityInput(input *wrapperspb.StringValue) CallActivityOptionFunc
- type ChildWorkflowOption
- func WithRawSubOrchestratorInput(input *wrapperspb.StringValue) ChildWorkflowOptiondeprecated
- func WithSubOrchestrationInstanceID(instanceID string) ChildWorkflowOptiondeprecated
- func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) ChildWorkflowOptiondeprecated
- func WithSubOrchestratorAppID(appID string) ChildWorkflowOptiondeprecated
- func WithSubOrchestratorInput(input any) ChildWorkflowOptiondeprecated
- type ChildWorkflowOptionFunc
- func WithChildWorkflowAppID(appID string) ChildWorkflowOptionFunc
- func WithChildWorkflowAppNamespace(namespace string) ChildWorkflowOptionFunc
- func WithChildWorkflowInput(input any) ChildWorkflowOptionFunc
- func WithChildWorkflowInstanceID(instanceID string) ChildWorkflowOptionFunc
- func WithChildWorkflowRetryPolicy(policy *RetryPolicy) ChildWorkflowOptionFunc
- func WithRawChildWorkflowInput(input *wrapperspb.StringValue) ChildWorkflowOptionFunc
- type ContinueAsNewOption
- type CreateTimerOption
- type HistoryPropagation
- type OrchestrationContextdeprecated
- type Orchestratordeprecated
- type RetryPolicy
- type SubOrchestratorOptiondeprecated
- type Task
- type TaskRegistry
- func (r *TaskRegistry) AddActivity(a Activity) error
- func (r *TaskRegistry) AddActivityN(name string, a Activity) error
- func (r *TaskRegistry) AddOrchestrator(o Workflow) errordeprecated
- func (r *TaskRegistry) AddOrchestratorN(name string, o Workflow) errordeprecated
- func (r *TaskRegistry) AddVersionedOrchestrator(canonicalName string, isLatest bool, o Workflow) errordeprecated
- func (r *TaskRegistry) AddVersionedOrchestratorN(canonicalName string, name string, isLatest bool, o Workflow) errordeprecated
- func (r *TaskRegistry) AddVersionedWorkflow(canonicalName string, isLatest bool, o Workflow) error
- func (r *TaskRegistry) AddVersionedWorkflowN(canonicalName string, name string, isLatest bool, o Workflow) error
- func (r *TaskRegistry) AddWorkflow(o Workflow) error
- func (r *TaskRegistry) AddWorkflowN(name string, o Workflow) error
- func (r *TaskRegistry) RemoveActivity(name string)
- func (r *TaskRegistry) RemoveVersionedWorkflow(canonicalName string)
- func (r *TaskRegistry) ResolveWorkflow(name string, pinnedVersion *string) (Workflow, *string, error)
- func (r *TaskRegistry) UpsertActivityN(name string, a Activity)
- func (r *TaskRegistry) UpsertVersionedWorkflowN(canonicalName string, name string, isLatest bool, o Workflow)
- type Workflow
- type WorkflowContext
- func (ctx *WorkflowContext) CallActivity(activity interface{}, opts ...CallActivityOption) Task
- func (ctx *WorkflowContext) CallChildWorkflow(workflow interface{}, opts ...ChildWorkflowOption) Task
- func (ctx *WorkflowContext) CallSubOrchestrator(workflow interface{}, opts ...ChildWorkflowOption) Taskdeprecated
- func (ctx *WorkflowContext) ContinueAsNew(newInput any, options ...ContinueAsNewOption)
- func (ctx *WorkflowContext) CreateTimer(delay time.Duration, opts ...CreateTimerOption) Task
- func (octx *WorkflowContext) GetInput(v any) error
- func (octx *WorkflowContext) GetPropagatedHistory() *api.PropagatedHistory
- func (ctx *WorkflowContext) IsPatched(patchName string) bool
- func (octx *WorkflowContext) SetCustomStatus(cs string)
- func (octx *WorkflowContext) SetPropagatedHistory(ph *api.PropagatedHistory)
- func (ctx *WorkflowContext) WaitForSingleEvent(eventName string, timeout time.Duration) Task
Constants ¶
This section is empty.
Variables ¶
var ErrTaskBlocked = errors.New("the current task is blocked")
ErrTaskBlocked is not an error, but rather a control flow signal indicating that a workflow function has executed as far as it can and that it now needs to unload, dispatch any scheduled tasks, and commit its current execution progress to durable storage.
var ErrTaskCanceled = errors.New("the task was canceled") // CONSIDER: More specific info about the task
ErrTaskCanceled is used to indicate that a task was canceled. Tasks can be canceled, for example, when configured timeouts expire.
Functions ¶
func NewTaskExecutor ¶
func NewTaskExecutor(registry *TaskRegistry) backend.Executor
NewTaskExecutor returns a backend.Executor implementation that executes workflow and activity functions in-memory.
Types ¶
type Activity ¶
type Activity func(ctx ActivityContext) (any, error)
Activity is the functional interface for activity implementations.
type ActivityContext ¶
type ActivityContext interface {
GetInput(resultPtr any) error
GetTaskID() int32
GetTaskExecutionID() string
Context() context.Context
GetTraceContext() *protos.TraceContext
GetPropagatedHistory() *api.PropagatedHistory
}
ActivityContext is the context parameter type for activity implementations.
type CallActivityOption ¶ added in v0.10.0
type CallActivityOption interface {
// contains filtered or unexported methods
}
CallActivityOption is the interface for options passed to CallActivity.
type CallActivityOptionFunc ¶ added in v0.12.0
type CallActivityOptionFunc func(*callActivityOptions) error
func WithActivityAppID ¶ added in v0.8.0
func WithActivityAppID(targetAppID string) CallActivityOptionFunc
func WithActivityAppNamespace ¶ added in v0.12.0
func WithActivityAppNamespace(namespace string) CallActivityOptionFunc
WithActivityAppNamespace specifies the Dapr namespace that hosts the target activity. When set, the routing envelope carries a targetAppNamespace so the caller sidecar performs a durable cross-namespace dispatch (service invocation with per-hop reminders) rather than a direct actor call via placement. Must be combined with WithActivityAppID; setting a namespace without an app ID is rejected when the activity is scheduled. Cross-namespace calls are gated by the WorkflowAccessPolicy feature: a policy on the target side must explicitly permit the caller's (namespace, appID).
func WithActivityInput ¶
func WithActivityInput(input any) CallActivityOptionFunc
WithActivityInput configures an input for an activity invocation. The specified input must be JSON serializable.
func WithActivityRetryPolicy ¶
func WithActivityRetryPolicy(policy *RetryPolicy) CallActivityOptionFunc
func WithRawActivityInput ¶
func WithRawActivityInput(input *wrapperspb.StringValue) CallActivityOptionFunc
WithRawActivityInput configures a raw input for an activity invocation.
type ChildWorkflowOption ¶ added in v0.12.0
type ChildWorkflowOption interface {
// contains filtered or unexported methods
}
ChildWorkflowOption is the interface for options passed to CallChildWorkflow.
func WithRawSubOrchestratorInput
deprecated
func WithRawSubOrchestratorInput(input *wrapperspb.StringValue) ChildWorkflowOption
Deprecated: Use WithRawChildWorkflowInput instead.
func WithSubOrchestrationInstanceID
deprecated
func WithSubOrchestrationInstanceID(instanceID string) ChildWorkflowOption
Deprecated: Use WithChildWorkflowInstanceID instead.
func WithSubOrchestrationRetryPolicy
deprecated
func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) ChildWorkflowOption
Deprecated: Use WithChildWorkflowRetryPolicy instead.
func WithSubOrchestratorAppID
deprecated
added in
v0.8.0
func WithSubOrchestratorAppID(appID string) ChildWorkflowOption
Deprecated: Use WithChildWorkflowAppID instead.
func WithSubOrchestratorInput
deprecated
func WithSubOrchestratorInput(input any) ChildWorkflowOption
Deprecated: Use WithChildWorkflowInput instead.
type ChildWorkflowOptionFunc ¶ added in v0.12.0
type ChildWorkflowOptionFunc func(*callChildWorkflowOptions) error
ChildWorkflowOptionFunc adapts a function to the ChildWorkflowOption interface.
func WithChildWorkflowAppID ¶ added in v0.12.0
func WithChildWorkflowAppID(appID string) ChildWorkflowOptionFunc
WithChildWorkflowAppID is a functional option type for the CallChildWorkflow workflow method that specifies the app ID of the target activity.
func WithChildWorkflowAppNamespace ¶ added in v0.12.0
func WithChildWorkflowAppNamespace(namespace string) ChildWorkflowOptionFunc
WithChildWorkflowAppNamespace specifies the Dapr namespace that hosts the target child workflow. When set, the routing envelope carries a targetAppNamespace so that the caller sidecar performs a durable cross-namespace dispatch (service invocation with per-hop reminders) instead of a direct actor call via placement. Must be combined with WithChildWorkflowAppID; setting a namespace without an app ID is rejected when the child workflow is scheduled. Cross-namespace calls are gated by the WorkflowAccessPolicy feature: a policy on the target side must explicitly permit the caller's (namespace, appID).
func WithChildWorkflowInput ¶ added in v0.12.0
func WithChildWorkflowInput(input any) ChildWorkflowOptionFunc
WithChildWorkflowInput is a functional option type for the CallChildWorkflow workflow method that takes an input value and marshals it to JSON.
func WithChildWorkflowInstanceID ¶ added in v0.12.0
func WithChildWorkflowInstanceID(instanceID string) ChildWorkflowOptionFunc
WithChildWorkflowInstanceID is a functional option type for the CallChildWorkflow workflow method that specifies the instance ID of the child workflow.
func WithChildWorkflowRetryPolicy ¶ added in v0.12.0
func WithChildWorkflowRetryPolicy(policy *RetryPolicy) ChildWorkflowOptionFunc
func WithRawChildWorkflowInput ¶ added in v0.12.0
func WithRawChildWorkflowInput(input *wrapperspb.StringValue) ChildWorkflowOptionFunc
WithRawChildWorkflowInput is a functional option type for the CallChildWorkflow workflow method that takes a raw input value.
type ContinueAsNewOption ¶
type ContinueAsNewOption func(*WorkflowContext)
ContinueAsNewOption is a functional option type for the ContinueAsNew workflow method.
func WithKeepUnprocessedEvents ¶
func WithKeepUnprocessedEvents() ContinueAsNewOption
WithKeepUnprocessedEvents returns a ContinueAsNewOptions struct that instructs the runtime to carry forward any unprocessed external events to the new instance.
type CreateTimerOption ¶ added in v0.10.0
type CreateTimerOption func(*createTimerOptions) error
func WithTimerName ¶ added in v0.7.0
func WithTimerName(name string) CreateTimerOption
type HistoryPropagation ¶ added in v0.12.0
type HistoryPropagation struct {
// contains filtered or unexported fields
}
HistoryPropagation implements both CallActivityOption and ChildWorkflowOption, allowing WithHistoryPropagation to be used with both CallActivity and CallChildWorkflow.
func WithHistoryPropagation ¶ added in v0.12.0
func WithHistoryPropagation(opt api.PropagationOption) HistoryPropagation
WithHistoryPropagation creates an option that configures history propagation for a child workflow or activity call. It can be passed to both CallActivity and CallChildWorkflow. Pass one of PropagateOwnHistory() or PropagateLineage().
Usage:
ctx.CallChildWorkflow("child",
task.WithChildWorkflowInput(input),
task.WithHistoryPropagation(task.PropagateLineage()),
)
ctx.CallActivity("activity",
task.WithActivityInput(input),
task.WithHistoryPropagation(task.PropagateOwnHistory()),
)
type OrchestrationContext
deprecated
type OrchestrationContext = WorkflowContext
Deprecated: Use WorkflowContext instead.
type Orchestrator
deprecated
type Orchestrator = Workflow
Deprecated: Use Workflow instead.
type RetryPolicy ¶
type RetryPolicy struct {
// Max number of attempts to try the activity call, first execution inclusive
MaxAttempts int
// Timespan to wait for the first retry
InitialRetryInterval time.Duration
// Used to determine rate of increase of back-off
BackoffCoefficient float64
// Max timespan to wait for a retry
MaxRetryInterval time.Duration
// Total timeout across all the retries performed
RetryTimeout time.Duration
// Optional function to control if retries should proceed
Handle func(error) bool
}
func (*RetryPolicy) Validate ¶
func (policy *RetryPolicy) Validate() error
type SubOrchestratorOption
deprecated
added in
v0.10.0
type SubOrchestratorOption = ChildWorkflowOption
Deprecated: Use ChildWorkflowOption instead.
type Task ¶
Task is an interface for asynchronous durable tasks. A task is conceptually similar to a future.
type TaskRegistry ¶
type TaskRegistry struct {
// contains filtered or unexported fields
}
TaskRegistry contains maps of names to corresponding workflow and activity functions.
func NewTaskRegistry ¶
func NewTaskRegistry() *TaskRegistry
NewTaskRegistry returns a new TaskRegistry struct.
func (*TaskRegistry) AddActivity ¶
func (r *TaskRegistry) AddActivity(a Activity) error
AddActivity adds an activity function to the registry. The name of the activity function is determined using reflection.
func (*TaskRegistry) AddActivityN ¶
func (r *TaskRegistry) AddActivityN(name string, a Activity) error
AddActivityN adds an activity function to the registry with a specified name. Returns an error if an activity with the same name is already registered.
func (*TaskRegistry) AddOrchestrator
deprecated
func (r *TaskRegistry) AddOrchestrator(o Workflow) error
Deprecated: Use AddWorkflow instead.
func (*TaskRegistry) AddOrchestratorN
deprecated
func (r *TaskRegistry) AddOrchestratorN(name string, o Workflow) error
Deprecated: Use AddWorkflowN instead.
func (*TaskRegistry) AddVersionedOrchestrator
deprecated
added in
v0.11.0
func (r *TaskRegistry) AddVersionedOrchestrator(canonicalName string, isLatest bool, o Workflow) error
Deprecated: Use AddVersionedWorkflow instead.
func (*TaskRegistry) AddVersionedOrchestratorN
deprecated
added in
v0.11.0
func (*TaskRegistry) AddVersionedWorkflow ¶ added in v0.12.0
func (r *TaskRegistry) AddVersionedWorkflow(canonicalName string, isLatest bool, o Workflow) error
AddVersionedWorkflow adds a versioned workflow function to the registry with a specified name.
func (*TaskRegistry) AddVersionedWorkflowN ¶ added in v0.12.0
func (r *TaskRegistry) AddVersionedWorkflowN(canonicalName string, name string, isLatest bool, o Workflow) error
AddVersionedWorkflowN adds a versioned workflow function to the registry. Returns an error if a workflow with the same version name is already registered.
func (*TaskRegistry) AddWorkflow ¶ added in v0.12.0
func (r *TaskRegistry) AddWorkflow(o Workflow) error
AddWorkflow adds a workflow function to the registry. The name of the workflow function is determined using reflection.
func (*TaskRegistry) AddWorkflowN ¶ added in v0.12.0
func (r *TaskRegistry) AddWorkflowN(name string, o Workflow) error
AddWorkflowN adds a workflow function to the registry with a specified name.
func (*TaskRegistry) RemoveActivity ¶ added in v0.12.0
func (r *TaskRegistry) RemoveActivity(name string)
RemoveActivity removes an activity from the registry.
func (*TaskRegistry) RemoveVersionedWorkflow ¶ added in v0.12.0
func (r *TaskRegistry) RemoveVersionedWorkflow(canonicalName string)
RemoveVersionedWorkflow removes all versions of a workflow from the registry.
func (*TaskRegistry) ResolveWorkflow ¶ added in v0.12.0
func (r *TaskRegistry) ResolveWorkflow(name string, pinnedVersion *string) (Workflow, *string, error)
ResolveWorkflow looks up a workflow by name under a single read lock. It checks: exact match → versioned match → wildcard, returning the first hit. Returns the resolved workflow, an optional version string, or an error.
func (*TaskRegistry) UpsertActivityN ¶ added in v0.12.0
func (r *TaskRegistry) UpsertActivityN(name string, a Activity)
UpsertActivityN adds or replaces an activity function in the registry. Used by callers that intentionally re-register (e.g. hot-reload).
func (*TaskRegistry) UpsertVersionedWorkflowN ¶ added in v0.12.0
func (r *TaskRegistry) UpsertVersionedWorkflowN(canonicalName string, name string, isLatest bool, o Workflow)
UpsertVersionedWorkflowN adds or replaces a versioned workflow function in the registry. Used by callers that intentionally re-register (e.g. hot-reload).
type Workflow ¶ added in v0.12.0
type Workflow func(ctx *WorkflowContext) (any, error)
Workflow is the functional interface for workflow functions.
type WorkflowContext ¶ added in v0.12.0
type WorkflowContext struct {
ID api.InstanceID
Name string
VersionName *string
IsReplaying bool
CurrentTimeUtc time.Time
// contains filtered or unexported fields
}
WorkflowContext is the parameter type for workflow functions.
func NewWorkflowContext ¶ added in v0.12.0
func NewWorkflowContext(registry *TaskRegistry, id api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) *WorkflowContext
NewWorkflowContext returns a new WorkflowContext struct with the specified parameters.
func (*WorkflowContext) CallActivity ¶ added in v0.12.0
func (ctx *WorkflowContext) CallActivity(activity interface{}, opts ...CallActivityOption) Task
CallActivity schedules an asynchronous invocation of an activity function. The [activity] parameter can be either the name of an activity as a string or can be a pointer to the function that implements the activity, in which case the name is obtained via reflection.
func (*WorkflowContext) CallChildWorkflow ¶ added in v0.12.0
func (ctx *WorkflowContext) CallChildWorkflow(workflow interface{}, opts ...ChildWorkflowOption) Task
func (*WorkflowContext) CallSubOrchestrator
deprecated
added in
v0.12.0
func (ctx *WorkflowContext) CallSubOrchestrator(workflow interface{}, opts ...ChildWorkflowOption) Task
Deprecated: Use CallChildWorkflow instead.
func (*WorkflowContext) ContinueAsNew ¶ added in v0.12.0
func (ctx *WorkflowContext) ContinueAsNew(newInput any, options ...ContinueAsNewOption)
func (*WorkflowContext) CreateTimer ¶ added in v0.12.0
func (ctx *WorkflowContext) CreateTimer(delay time.Duration, opts ...CreateTimerOption) Task
CreateTimer schedules a durable timer that expires after the specified delay.
func (*WorkflowContext) GetInput ¶ added in v0.12.0
func (octx *WorkflowContext) GetInput(v any) error
GetInput unmarshals the serialized workflow input and stores it in [v].
func (*WorkflowContext) GetPropagatedHistory ¶ added in v0.12.0
func (octx *WorkflowContext) GetPropagatedHistory() *api.PropagatedHistory
GetPropagatedHistory returns the propagated history from a parent workflow, or nil if no history was propagated. The propagated history contains events from the parent (and optionally ancestor) workflows that opted to propagate their execution context.
func (*WorkflowContext) IsPatched ¶ added in v0.12.0
func (ctx *WorkflowContext) IsPatched(patchName string) bool
func (*WorkflowContext) SetCustomStatus ¶ added in v0.12.0
func (octx *WorkflowContext) SetCustomStatus(cs string)
func (*WorkflowContext) SetPropagatedHistory ¶ added in v0.12.0
func (octx *WorkflowContext) SetPropagatedHistory(ph *api.PropagatedHistory)
SetPropagatedHistory sets the propagated history on the context.
func (*WorkflowContext) WaitForSingleEvent ¶ added in v0.12.0
func (ctx *WorkflowContext) WaitForSingleEvent(eventName string, timeout time.Duration) Task
WaitForSingleEvent creates a task that is completed only after an event named [eventName] is received by this workflow or when the specified timeout expires.
The [timeout] parameter can be used to define a timeout for receiving the event. If the timeout expires before the named event is received, the task will be completed and will return a timeout error value ErrTaskCanceled when awaited. Otherwise, the awaited task will return the deserialized payload of the received event. A Duration value of zero returns a canceled task if the event isn't already available in the history. Use a negative Duration to wait indefinitely for the event to be received.
Workflows can wait for the same event name multiple times, so waiting for multiple events with the same name is allowed. Each event received by an workflow will complete just one task returned by this method.
Note that event names are case-insensitive.