task

package
v0.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2026 License: Apache-2.0 Imports: 21 Imported by: 37

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTaskBlocked = errors.New("the current task is blocked")

ErrTaskBlocked is not an error, but rather a control flow signal indicating that a workflow function has executed as far as it can and that it now needs to unload, dispatch any scheduled tasks, and commit its current execution progress to durable storage.

View Source
var ErrTaskCanceled = errors.New("the task was canceled") // CONSIDER: More specific info about the task

ErrTaskCanceled is used to indicate that a task was canceled. Tasks can be canceled, for example, when configured timeouts expire.

Functions

func NewTaskExecutor

func NewTaskExecutor(registry *TaskRegistry) backend.Executor

NewTaskExecutor returns a backend.Executor implementation that executes workflow and activity functions in-memory.

Types

type Activity

type Activity func(ctx ActivityContext) (any, error)

Activity is the functional interface for activity implementations.

type ActivityContext

type ActivityContext interface {
	GetInput(resultPtr any) error
	GetTaskID() int32
	GetTaskExecutionID() string
	Context() context.Context
	GetTraceContext() *protos.TraceContext
	GetPropagatedHistory() *api.PropagatedHistory
}

ActivityContext is the context parameter type for activity implementations.

type CallActivityOption added in v0.10.0

type CallActivityOption interface {
	// contains filtered or unexported methods
}

CallActivityOption is the interface for options passed to CallActivity.

type CallActivityOptionFunc added in v0.12.0

type CallActivityOptionFunc func(*callActivityOptions) error

func WithActivityAppID added in v0.8.0

func WithActivityAppID(targetAppID string) CallActivityOptionFunc

func WithActivityAppNamespace added in v0.12.0

func WithActivityAppNamespace(namespace string) CallActivityOptionFunc

WithActivityAppNamespace specifies the Dapr namespace that hosts the target activity. When set, the routing envelope carries a targetAppNamespace so the caller sidecar performs a durable cross-namespace dispatch (service invocation with per-hop reminders) rather than a direct actor call via placement. Must be combined with WithActivityAppID; setting a namespace without an app ID is rejected when the activity is scheduled. Cross-namespace calls are gated by the WorkflowAccessPolicy feature: a policy on the target side must explicitly permit the caller's (namespace, appID).

func WithActivityInput

func WithActivityInput(input any) CallActivityOptionFunc

WithActivityInput configures an input for an activity invocation. The specified input must be JSON serializable.

func WithActivityRetryPolicy

func WithActivityRetryPolicy(policy *RetryPolicy) CallActivityOptionFunc

func WithRawActivityInput

func WithRawActivityInput(input *wrapperspb.StringValue) CallActivityOptionFunc

WithRawActivityInput configures a raw input for an activity invocation.

type ChildWorkflowOption added in v0.12.0

type ChildWorkflowOption interface {
	// contains filtered or unexported methods
}

ChildWorkflowOption is the interface for options passed to CallChildWorkflow.

func WithRawSubOrchestratorInput deprecated

func WithRawSubOrchestratorInput(input *wrapperspb.StringValue) ChildWorkflowOption

Deprecated: Use WithRawChildWorkflowInput instead.

func WithSubOrchestrationInstanceID deprecated

func WithSubOrchestrationInstanceID(instanceID string) ChildWorkflowOption

Deprecated: Use WithChildWorkflowInstanceID instead.

func WithSubOrchestrationRetryPolicy deprecated

func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) ChildWorkflowOption

Deprecated: Use WithChildWorkflowRetryPolicy instead.

func WithSubOrchestratorAppID deprecated added in v0.8.0

func WithSubOrchestratorAppID(appID string) ChildWorkflowOption

Deprecated: Use WithChildWorkflowAppID instead.

func WithSubOrchestratorInput deprecated

func WithSubOrchestratorInput(input any) ChildWorkflowOption

Deprecated: Use WithChildWorkflowInput instead.

type ChildWorkflowOptionFunc added in v0.12.0

type ChildWorkflowOptionFunc func(*callChildWorkflowOptions) error

ChildWorkflowOptionFunc adapts a function to the ChildWorkflowOption interface.

func WithChildWorkflowAppID added in v0.12.0

func WithChildWorkflowAppID(appID string) ChildWorkflowOptionFunc

WithChildWorkflowAppID is a functional option type for the CallChildWorkflow workflow method that specifies the app ID of the target activity.

func WithChildWorkflowAppNamespace added in v0.12.0

func WithChildWorkflowAppNamespace(namespace string) ChildWorkflowOptionFunc

WithChildWorkflowAppNamespace specifies the Dapr namespace that hosts the target child workflow. When set, the routing envelope carries a targetAppNamespace so that the caller sidecar performs a durable cross-namespace dispatch (service invocation with per-hop reminders) instead of a direct actor call via placement. Must be combined with WithChildWorkflowAppID; setting a namespace without an app ID is rejected when the child workflow is scheduled. Cross-namespace calls are gated by the WorkflowAccessPolicy feature: a policy on the target side must explicitly permit the caller's (namespace, appID).

func WithChildWorkflowInput added in v0.12.0

func WithChildWorkflowInput(input any) ChildWorkflowOptionFunc

WithChildWorkflowInput is a functional option type for the CallChildWorkflow workflow method that takes an input value and marshals it to JSON.

func WithChildWorkflowInstanceID added in v0.12.0

func WithChildWorkflowInstanceID(instanceID string) ChildWorkflowOptionFunc

WithChildWorkflowInstanceID is a functional option type for the CallChildWorkflow workflow method that specifies the instance ID of the child workflow.

func WithChildWorkflowRetryPolicy added in v0.12.0

func WithChildWorkflowRetryPolicy(policy *RetryPolicy) ChildWorkflowOptionFunc

func WithRawChildWorkflowInput added in v0.12.0

func WithRawChildWorkflowInput(input *wrapperspb.StringValue) ChildWorkflowOptionFunc

WithRawChildWorkflowInput is a functional option type for the CallChildWorkflow workflow method that takes a raw input value.

type ContinueAsNewOption

type ContinueAsNewOption func(*WorkflowContext)

ContinueAsNewOption is a functional option type for the ContinueAsNew workflow method.

func WithKeepUnprocessedEvents

func WithKeepUnprocessedEvents() ContinueAsNewOption

WithKeepUnprocessedEvents returns a ContinueAsNewOptions struct that instructs the runtime to carry forward any unprocessed external events to the new instance.

type CreateTimerOption added in v0.10.0

type CreateTimerOption func(*createTimerOptions) error

func WithTimerName added in v0.7.0

func WithTimerName(name string) CreateTimerOption

type HistoryPropagation added in v0.12.0

type HistoryPropagation struct {
	// contains filtered or unexported fields
}

HistoryPropagation implements both CallActivityOption and ChildWorkflowOption, allowing WithHistoryPropagation to be used with both CallActivity and CallChildWorkflow.

func WithHistoryPropagation added in v0.12.0

func WithHistoryPropagation(opt api.PropagationOption) HistoryPropagation

WithHistoryPropagation creates an option that configures history propagation for a child workflow or activity call. It can be passed to both CallActivity and CallChildWorkflow. Pass one of PropagateOwnHistory() or PropagateLineage().

Usage:

ctx.CallChildWorkflow("child",
    task.WithChildWorkflowInput(input),
    task.WithHistoryPropagation(task.PropagateLineage()),
)

ctx.CallActivity("activity",
    task.WithActivityInput(input),
    task.WithHistoryPropagation(task.PropagateOwnHistory()),
)

type OrchestrationContext deprecated

type OrchestrationContext = WorkflowContext

Deprecated: Use WorkflowContext instead.

type Orchestrator deprecated

type Orchestrator = Workflow

Deprecated: Use Workflow instead.

type RetryPolicy

type RetryPolicy struct {
	// Max number of attempts to try the activity call, first execution inclusive
	MaxAttempts int
	// Timespan to wait for the first retry
	InitialRetryInterval time.Duration
	// Used to determine rate of increase of back-off
	BackoffCoefficient float64
	// Max timespan to wait for a retry
	MaxRetryInterval time.Duration
	// Total timeout across all the retries performed
	RetryTimeout time.Duration
	// Optional function to control if retries should proceed
	Handle func(error) bool
}

func (*RetryPolicy) Validate

func (policy *RetryPolicy) Validate() error

type SubOrchestratorOption deprecated added in v0.10.0

type SubOrchestratorOption = ChildWorkflowOption

Deprecated: Use ChildWorkflowOption instead.

type Task

type Task interface {
	Await(v any) error
	TaskExecutionId() string
}

Task is an interface for asynchronous durable tasks. A task is conceptually similar to a future.

type TaskRegistry

type TaskRegistry struct {
	// contains filtered or unexported fields
}

TaskRegistry contains maps of names to corresponding workflow and activity functions.

func NewTaskRegistry

func NewTaskRegistry() *TaskRegistry

NewTaskRegistry returns a new TaskRegistry struct.

func (*TaskRegistry) AddActivity

func (r *TaskRegistry) AddActivity(a Activity) error

AddActivity adds an activity function to the registry. The name of the activity function is determined using reflection.

func (*TaskRegistry) AddActivityN

func (r *TaskRegistry) AddActivityN(name string, a Activity) error

AddActivityN adds an activity function to the registry with a specified name. Returns an error if an activity with the same name is already registered.

func (*TaskRegistry) AddOrchestrator deprecated

func (r *TaskRegistry) AddOrchestrator(o Workflow) error

Deprecated: Use AddWorkflow instead.

func (*TaskRegistry) AddOrchestratorN deprecated

func (r *TaskRegistry) AddOrchestratorN(name string, o Workflow) error

Deprecated: Use AddWorkflowN instead.

func (*TaskRegistry) AddVersionedOrchestrator deprecated added in v0.11.0

func (r *TaskRegistry) AddVersionedOrchestrator(canonicalName string, isLatest bool, o Workflow) error

Deprecated: Use AddVersionedWorkflow instead.

func (*TaskRegistry) AddVersionedOrchestratorN deprecated added in v0.11.0

func (r *TaskRegistry) AddVersionedOrchestratorN(canonicalName string, name string, isLatest bool, o Workflow) error

Deprecated: Use AddVersionedWorkflowN instead.

func (*TaskRegistry) AddVersionedWorkflow added in v0.12.0

func (r *TaskRegistry) AddVersionedWorkflow(canonicalName string, isLatest bool, o Workflow) error

AddVersionedWorkflow adds a versioned workflow function to the registry with a specified name.

func (*TaskRegistry) AddVersionedWorkflowN added in v0.12.0

func (r *TaskRegistry) AddVersionedWorkflowN(canonicalName string, name string, isLatest bool, o Workflow) error

AddVersionedWorkflowN adds a versioned workflow function to the registry. Returns an error if a workflow with the same version name is already registered.

func (*TaskRegistry) AddWorkflow added in v0.12.0

func (r *TaskRegistry) AddWorkflow(o Workflow) error

AddWorkflow adds a workflow function to the registry. The name of the workflow function is determined using reflection.

func (*TaskRegistry) AddWorkflowN added in v0.12.0

func (r *TaskRegistry) AddWorkflowN(name string, o Workflow) error

AddWorkflowN adds a workflow function to the registry with a specified name.

func (*TaskRegistry) RemoveActivity added in v0.12.0

func (r *TaskRegistry) RemoveActivity(name string)

RemoveActivity removes an activity from the registry.

func (*TaskRegistry) RemoveVersionedWorkflow added in v0.12.0

func (r *TaskRegistry) RemoveVersionedWorkflow(canonicalName string)

RemoveVersionedWorkflow removes all versions of a workflow from the registry.

func (*TaskRegistry) ResolveWorkflow added in v0.12.0

func (r *TaskRegistry) ResolveWorkflow(name string, pinnedVersion *string) (Workflow, *string, error)

ResolveWorkflow looks up a workflow by name under a single read lock. It checks: exact match → versioned match → wildcard, returning the first hit. Returns the resolved workflow, an optional version string, or an error.

func (*TaskRegistry) UpsertActivityN added in v0.12.0

func (r *TaskRegistry) UpsertActivityN(name string, a Activity)

UpsertActivityN adds or replaces an activity function in the registry. Used by callers that intentionally re-register (e.g. hot-reload).

func (*TaskRegistry) UpsertVersionedWorkflowN added in v0.12.0

func (r *TaskRegistry) UpsertVersionedWorkflowN(canonicalName string, name string, isLatest bool, o Workflow)

UpsertVersionedWorkflowN adds or replaces a versioned workflow function in the registry. Used by callers that intentionally re-register (e.g. hot-reload).

type Workflow added in v0.12.0

type Workflow func(ctx *WorkflowContext) (any, error)

Workflow is the functional interface for workflow functions.

type WorkflowContext added in v0.12.0

type WorkflowContext struct {
	ID             api.InstanceID
	Name           string
	VersionName    *string
	IsReplaying    bool
	CurrentTimeUtc time.Time
	// contains filtered or unexported fields
}

WorkflowContext is the parameter type for workflow functions.

func NewWorkflowContext added in v0.12.0

func NewWorkflowContext(registry *TaskRegistry, id api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) *WorkflowContext

NewWorkflowContext returns a new WorkflowContext struct with the specified parameters.

func (*WorkflowContext) CallActivity added in v0.12.0

func (ctx *WorkflowContext) CallActivity(activity interface{}, opts ...CallActivityOption) Task

CallActivity schedules an asynchronous invocation of an activity function. The [activity] parameter can be either the name of an activity as a string or can be a pointer to the function that implements the activity, in which case the name is obtained via reflection.

func (*WorkflowContext) CallChildWorkflow added in v0.12.0

func (ctx *WorkflowContext) CallChildWorkflow(workflow interface{}, opts ...ChildWorkflowOption) Task

func (*WorkflowContext) CallSubOrchestrator deprecated added in v0.12.0

func (ctx *WorkflowContext) CallSubOrchestrator(workflow interface{}, opts ...ChildWorkflowOption) Task

Deprecated: Use CallChildWorkflow instead.

func (*WorkflowContext) ContinueAsNew added in v0.12.0

func (ctx *WorkflowContext) ContinueAsNew(newInput any, options ...ContinueAsNewOption)

func (*WorkflowContext) CreateTimer added in v0.12.0

func (ctx *WorkflowContext) CreateTimer(delay time.Duration, opts ...CreateTimerOption) Task

CreateTimer schedules a durable timer that expires after the specified delay.

func (*WorkflowContext) GetInput added in v0.12.0

func (octx *WorkflowContext) GetInput(v any) error

GetInput unmarshals the serialized workflow input and stores it in [v].

func (*WorkflowContext) GetPropagatedHistory added in v0.12.0

func (octx *WorkflowContext) GetPropagatedHistory() *api.PropagatedHistory

GetPropagatedHistory returns the propagated history from a parent workflow, or nil if no history was propagated. The propagated history contains events from the parent (and optionally ancestor) workflows that opted to propagate their execution context.

func (*WorkflowContext) IsPatched added in v0.12.0

func (ctx *WorkflowContext) IsPatched(patchName string) bool

func (*WorkflowContext) SetCustomStatus added in v0.12.0

func (octx *WorkflowContext) SetCustomStatus(cs string)

func (*WorkflowContext) SetPropagatedHistory added in v0.12.0

func (octx *WorkflowContext) SetPropagatedHistory(ph *api.PropagatedHistory)

SetPropagatedHistory sets the propagated history on the context.

func (*WorkflowContext) WaitForSingleEvent added in v0.12.0

func (ctx *WorkflowContext) WaitForSingleEvent(eventName string, timeout time.Duration) Task

WaitForSingleEvent creates a task that is completed only after an event named [eventName] is received by this workflow or when the specified timeout expires.

The [timeout] parameter can be used to define a timeout for receiving the event. If the timeout expires before the named event is received, the task will be completed and will return a timeout error value ErrTaskCanceled when awaited. Otherwise, the awaited task will return the deserialized payload of the received event. A Duration value of zero returns a canceled task if the event isn't already available in the history. Use a negative Duration to wait indefinitely for the event to be received.

Workflows can wait for the same event name multiple times, so waiting for multiple events with the same name is allowed. Each event received by an workflow will complete just one task returned by this method.

Note that event names are case-insensitive.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL