actors

package
v1.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: Apache-2.0 Imports: 41 Imported by: 0

Documentation

Overview

Copyright 2025 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	WorkflowNameLabelKey    = "workflow"
	ActivityNameLabelKey    = "activity"
	ExecutorNameLabelKey    = "executor"
	RetentionerNameLabelKey = "retentioner"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Actors added in v1.15.0

type Actors struct {
	// contains filtered or unexported fields
}

func New added in v1.15.0

func New(opts Options) *Actors

func (*Actors) AbandonActivityWorkItem added in v1.15.0

func (*Actors) AbandonActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error

AbandonActivityWorkItem implements backend.Backend. It gets called by durabletask-go when there is an unexpected failure in the workflow activity execution pipeline.

func (*Actors) AbandonWorkflowWorkItem added in v1.18.0

func (*Actors) AbandonWorkflowWorkItem(ctx context.Context, wi *backend.WorkflowWorkItem) error

AbandonWorkflowWorkItem implements backend.Backend. It gets called by durabletask-go when there is an unexpected failure in the workflow orchestration execution pipeline.

func (*Actors) ActivityActorType added in v1.15.0

func (abe *Actors) ActivityActorType() string

func (*Actors) AddNewWorkflowEvent added in v1.18.0

func (abe *Actors) AddNewWorkflowEvent(ctx context.Context, id api.InstanceID, e *backend.HistoryEvent) error

AddNewWorkflowEvent implements backend.Backend and sends the event e to the workflow actor identified by id.

func (*Actors) CancelActivityTask added in v1.16.0

func (abe *Actors) CancelActivityTask(ctx context.Context, instanceID api.InstanceID, taskID int32) error

CancelActivityTask implements backend.Backend.

func (*Actors) CancelWorkflowTask added in v1.18.0

func (abe *Actors) CancelWorkflowTask(ctx context.Context, instanceID api.InstanceID) error

CancelWorkflowTask implements backend.Backend.

func (*Actors) CompleteActivityTask added in v1.16.0

func (abe *Actors) CompleteActivityTask(ctx context.Context, response *protos.ActivityResponse) error

CompleteActivityTask implements backend.Backend.

func (*Actors) CompleteActivityWorkItem added in v1.15.0

func (*Actors) CompleteActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error

CompleteActivityWorkItem implements backend.Backend

func (*Actors) CompleteWorkflowTask added in v1.18.0

func (abe *Actors) CompleteWorkflowTask(ctx context.Context, response *protos.WorkflowResponse) error

CompleteWorkflowTask implements backend.Backend.

func (*Actors) CompleteWorkflowWorkItem added in v1.18.0

func (*Actors) CompleteWorkflowWorkItem(ctx context.Context, wi *backend.WorkflowWorkItem) error

CompleteWorkflowWorkItem implements backend.Backend

func (*Actors) CreateTaskHub added in v1.15.0

func (*Actors) CreateTaskHub(context.Context) error

CreateTaskHub implements backend.Backend

func (*Actors) CreateWorkflowInstance added in v1.18.0

func (abe *Actors) CreateWorkflowInstance(ctx context.Context, e *backend.HistoryEvent) error

CreateWorkflowInstance implements backend.Backend and creates a new workflow instance.

Internally, creating a workflow instance also creates a new actor with the same ID. The create request is saved into the actor's "inbox" and then executed via a reminder thread. If the app is scaled out across multiple replicas, the actor might get assigned to a replicas other than this one.

func (*Actors) DeleteTaskHub added in v1.15.0

func (*Actors) DeleteTaskHub(context.Context) error

DeleteTaskHub implements backend.Backend

func (*Actors) GetInstanceHistory added in v1.17.0

func (*Actors) GetWorkflowMetadata added in v1.18.0

func (abe *Actors) GetWorkflowMetadata(ctx context.Context, id api.InstanceID) (*backend.WorkflowMetadata, error)

GetWorkflowMetadata implements backend.Backend

func (*Actors) GetWorkflowRuntimeState added in v1.18.0

func (abe *Actors) GetWorkflowRuntimeState(ctx context.Context, owi *backend.WorkflowWorkItem) (*backend.WorkflowRuntimeState, error)

GetWorkflowRuntimeState implements backend.Backend

func (*Actors) ListInstanceIDs added in v1.17.0

func (*Actors) NextActivityWorkItem added in v1.15.0

func (abe *Actors) NextActivityWorkItem(ctx context.Context) (*backend.ActivityWorkItem, error)

NextActivityWorkItem implements backend.Backend

func (*Actors) NextWorkflowWorkItem added in v1.18.0

func (abe *Actors) NextWorkflowWorkItem(ctx context.Context) (*backend.WorkflowWorkItem, error)

NextWorkflowWorkItem implements backend.Backend

func (*Actors) PurgeWorkflowState added in v1.18.0

func (abe *Actors) PurgeWorkflowState(ctx context.Context, id api.InstanceID, router *protos.TaskRouter, force bool) (int, error)

PurgeWorkflowState implements backend.Backend.

When router is nil or targets the local app, this is a single-instance purge of id and returns 1 on success.

When router carries a foreign TargetAppID — set by the recursive purge driver for a child started cross-app — the entire subtree lives on that app, so we delegate to it via an actor invocation: the remote daprd's workflow actor recursively handles its own subtree and returns the count. Mirrors the "each app handles its own subtree" model that recursive terminate already uses.

func (*Actors) RegisterActors added in v1.15.0

func (abe *Actors) RegisterActors(ctx context.Context) error

func (*Actors) RerunWorkflowFromEvent added in v1.16.0

func (abe *Actors) RerunWorkflowFromEvent(ctx context.Context, req *backend.RerunWorkflowFromEventRequest) (api.InstanceID, error)

RerunWorkflowFromEvent implements backend.Backend and reruns a workflow from a specific event ID.

func (*Actors) Start added in v1.15.0

func (abe *Actors) Start(ctx context.Context) error

Start implements backend.Backend

func (*Actors) Stop added in v1.15.0

func (abe *Actors) Stop(context.Context) error

Stop implements backend.Backend

func (*Actors) String added in v1.15.0

func (abe *Actors) String() string

String displays the type information

func (*Actors) UnRegisterActors added in v1.15.0

func (abe *Actors) UnRegisterActors(ctx context.Context) error

func (*Actors) WaitForActivityCompletion added in v1.16.0

func (abe *Actors) WaitForActivityCompletion(request *protos.ActivityRequest) func(context.Context) (*protos.ActivityResponse, error)

WaitForActivityCompletion implements backend.Backend.

func (*Actors) WaitForWorkflowTaskCompletion added in v1.18.0

func (abe *Actors) WaitForWorkflowTaskCompletion(request *protos.WorkflowRequest) func(context.Context) (*protos.WorkflowResponse, error)

WaitForWorkflowTaskCompletion implements backend.Backend.

func (*Actors) WatchWorkflowRuntimeStatus added in v1.18.0

func (abe *Actors) WatchWorkflowRuntimeStatus(ctx context.Context, id api.InstanceID, condition func(*backend.WorkflowMetadata) bool) error

func (*Actors) WorkflowActorType added in v1.18.0

func (abe *Actors) WorkflowActorType() string

type ClusterTasksBackend added in v1.16.0

type ClusterTasksBackend struct {
	// contains filtered or unexported fields
}

func NewClusterTasksBackend added in v1.16.0

func NewClusterTasksBackend(opts ClusterTasksBackendOptions) *ClusterTasksBackend

func (*ClusterTasksBackend) CancelActivityTask added in v1.16.0

func (be *ClusterTasksBackend) CancelActivityTask(ctx context.Context, id api.InstanceID, taskID int32) error

func (*ClusterTasksBackend) CancelWorkflowTask added in v1.18.0

func (be *ClusterTasksBackend) CancelWorkflowTask(ctx context.Context, id api.InstanceID) error

func (*ClusterTasksBackend) CompleteActivityTask added in v1.16.0

func (be *ClusterTasksBackend) CompleteActivityTask(ctx context.Context, resp *protos.ActivityResponse) error

func (*ClusterTasksBackend) CompleteWorkflowTask added in v1.18.0

func (be *ClusterTasksBackend) CompleteWorkflowTask(ctx context.Context, resp *protos.WorkflowResponse) error

func (*ClusterTasksBackend) WaitForActivityCompletion added in v1.16.0

func (be *ClusterTasksBackend) WaitForActivityCompletion(req *protos.ActivityRequest) func(context.Context) (*protos.ActivityResponse, error)

func (*ClusterTasksBackend) WaitForWorkflowTaskCompletion added in v1.18.0

func (be *ClusterTasksBackend) WaitForWorkflowTaskCompletion(req *protos.WorkflowRequest) func(context.Context) (*protos.WorkflowResponse, error)

type ClusterTasksBackendOptions added in v1.16.0

type ClusterTasksBackendOptions struct {
	Actors            actors.Interface
	ExecutorActorType string
}

type Options added in v1.15.0

type Options struct {
	AppID          string
	Namespace      string
	Actors         actors.Interface
	Resiliency     resiliency.Provider
	EventSink      orchestrator.EventSink
	ComponentStore *compstore.ComponentStore

	// experimental feature
	// enabling this will use the cluster tasks backend for pending tasks, instead of the default local implementation
	// the cluster tasks backend uses actors to share the state of pending tasks
	// allowing to deploy multiple daprd replicas and expose them through a loadbalancer
	EnableClusteredDeployment bool

	// Enables a feature to make activities send their results to workflow when
	// the workflow is running on a different application. Useful when using
	// cross app workflows. Ensures that activities are not retried forever if
	// the workflow app is not available, and instead queues the result for when
	// the workflow app is back online. Strongly recommended to always be enabled
	// if using the same Dapr version on all daprds.
	WorkflowsRemoteActivityReminder bool

	RetentionPolicy *config.WorkflowStateRetentionPolicy
	Signer          *signer.Signer

	// May be nil when the WorkflowAccessPolicy feature is disabled.
	WorkflowAccessPolicies *workflowacl.Holder

	// MaxRequestBodySize is the gRPC server max message size in bytes. The
	// orchestrator uses it to detect and gracefully stall workflows whose
	// history payload would exceed the GetWorkItems stream limit.
	MaxRequestBodySize int
}

type PendingTasksBackend added in v1.16.0

type PendingTasksBackend interface {
	// CancelActivityTask implements backend.Backend.
	CancelActivityTask(ctx context.Context, instanceID api.InstanceID, taskID int32) error
	// CancelWorkflowTask implements backend.Backend.
	CancelWorkflowTask(ctx context.Context, instanceID api.InstanceID) error
	// CompleteActivityTask implements backend.Backend.
	CompleteActivityTask(ctx context.Context, response *protos.ActivityResponse) error
	// CompleteWorkflowTask implements backend.Backend.
	CompleteWorkflowTask(ctx context.Context, response *protos.WorkflowResponse) error
	// WaitForActivityCompletion implements backend.Backend.
	WaitForActivityCompletion(request *protos.ActivityRequest) func(context.Context) (*protos.ActivityResponse, error)
	// WaitForWorkflowTaskCompletion implements backend.Backend.
	WaitForWorkflowTaskCompletion(request *protos.WorkflowRequest) func(context.Context) (*protos.WorkflowResponse, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL