Documentation
¶
Index ¶
- Variables
- func GetActivityExecutionKey(iid string, taskID int32) string
- func IsDurableTaskGrpcRequest(fullMethodName string) bool
- func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error)
- func WithOnGetWorkItemsConnectionCallback(callback func(context.Context) error) grpcExecutorOptions
- func WithOnGetWorkItemsDisconnectCallback(callback func(context.Context) error) grpcExecutorOptions
- func WithSkipWaitForInstanceStart() grpcExecutorOptions
- func WithStreamSendTimeout(d time.Duration) grpcExecutorOptions
- func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions
- type ActivityCompletionAttestation
- type ActivityExecutor
- type ActivityRequest
- type ActivityWorkItem
- type Backend
- type BackendWorkflowStatedeprecated
- type BackendWorkflowStateMetadata
- type ChildCompletionAttestation
- type CreateWorkflowInstanceRequest
- type DurableTimer
- type ExecuteOptions
- type Executor
- type ExternalSigningCertificate
- type GetInstanceHistoryRequest
- type GetInstanceHistoryResponse
- type HistoryEvent
- type HistorySignature
- type ListInstanceIDsRequest
- type ListInstanceIDsResponse
- type Logger
- type NewTaskWorkerOptions
- type OrchestrationMetadatadeprecated
- type OrchestrationRuntimeStatedeprecated
- type OrchestrationRuntimeStateMessagedeprecated
- type OrchestrationStatus
- type OrchestrationWorkItemdeprecated
- type RerunWorkflowFromEventRequest
- type SigningCertificate
- type TaskFailureDetails
- type TaskHubClient
- type TaskHubWorker
- type TaskProcessor
- type TaskWorker
- func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, ...) TaskWorker[*ActivityWorkItem]
- func NewActivityTaskWorkerWithInProcess(be Backend, executor, inProcessExecutor ActivityExecutor, ...) TaskWorker[*ActivityWorkItem]
- func NewTaskWorker[T WorkItem](p TaskProcessor[T], logger Logger, opts ...NewTaskWorkerOptions) TaskWorker[T]
- func NewWorkflowWorker(opts WorkflowWorkerOptions, taskopts ...NewTaskWorkerOptions) TaskWorker[*WorkflowWorkItem]
- type WorkItem
- type WorkerOptions
- type WorkflowExecutor
- type WorkflowMetadata
- type WorkflowRuntimeState
- type WorkflowRuntimeStateMessage
- type WorkflowState
- type WorkflowStateMetadatadeprecated
- type WorkflowWorkItem
- type WorkflowWorkerOptions
Constants ¶
This section is empty.
Variables ¶
var ( ErrTaskHubExists = errors.New("task hub already exists") ErrTaskHubNotFound = errors.New("task hub not found") ErrNotInitialized = errors.New("backend not initialized") ErrWorkItemLockLost = errors.New("lock on work-item was lost") ErrBackendAlreadyStarted = errors.New("backend is already started") )
Functions ¶
func GetActivityExecutionKey ¶ added in v0.8.0
func IsDurableTaskGrpcRequest ¶
IsDurableTaskGrpcRequest returns true if the specified gRPC method name represents an operation that is compatible with the gRPC executor.
func MarshalHistoryEvent ¶
func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error)
MarshalHistoryEvent serializes the HistoryEvent into a protobuf byte array.
func WithOnGetWorkItemsConnectionCallback ¶
WithOnGetWorkItemsConnectionCallback allows the caller to get a notification when an external process connects over gRPC, and invokes the GetWorkItems operation. This can be useful for doing things like lazily auto-starting the task hub worker only when necessary.
func WithOnGetWorkItemsDisconnectCallback ¶
WithOnGetWorkItemsDisconnectCallback allows the caller to get a notification when an external process disconnects from the GetWorkItems operation. This can be useful for doing things like shutting down the task hub worker when the client disconnects.
func WithSkipWaitForInstanceStart ¶ added in v0.8.0
func WithSkipWaitForInstanceStart() grpcExecutorOptions
func WithStreamSendTimeout ¶ added in v0.6.7
func WithStreamShutdownChannel ¶
func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions
Types ¶
type ActivityCompletionAttestation ¶ added in v0.12.0
type ActivityCompletionAttestation = protos.ActivityCompletionAttestation
type ActivityExecutor ¶
type ActivityExecutor interface {
ExecuteActivity(ctx context.Context, iid api.InstanceID, e *protos.HistoryEvent, opts ExecuteOptions) (*protos.HistoryEvent, error)
}
type ActivityRequest ¶
type ActivityRequest = protos.ActivityRequest
type ActivityWorkItem ¶
type ActivityWorkItem struct {
SequenceNumber int64
InstanceID api.InstanceID
NewEvent *HistoryEvent
Result *HistoryEvent
LockedBy string
Properties map[string]interface{}
// receive from caller
IncomingHistory *protos.PropagatedHistory
}
func (ActivityWorkItem) IsWorkItem ¶
func (wi ActivityWorkItem) IsWorkItem() bool
IsWorkItem implements core.WorkItem
func (ActivityWorkItem) String ¶
func (wi ActivityWorkItem) String() string
String implements core.WorkItem and fmt.Stringer
type Backend ¶
type Backend interface {
// CreateTaskHub creates a new task hub for the current backend. Task hub creation must be idempotent.
//
// If the task hub for this backend already exists, an error of type [ErrTaskHubExists] is returned.
CreateTaskHub(context.Context) error
// DeleteTaskHub deletes an existing task hub configured for the current backend. It's up to the backend
// implementation to determine how the task hub data is deleted.
//
// If the task hub for this backend doesn't exist, an error of type [ErrTaskHubNotFound] is returned.
DeleteTaskHub(context.Context) error
// Start starts any background processing done by this backend.
Start(context.Context) error
// Stop stops any background processing done by this backend.
Stop(context.Context) error
// CreateWorkflowInstance creates a new workflow instance with a history event that
// wraps a ExecutionStarted event.
CreateWorkflowInstance(context.Context, *HistoryEvent) error
// RerunWorkflowFromEvent reruns a workflow from a specific event ID of some
// source instance ID. If not given, a random new instance ID will be
// generated and returned. Can optionally give a new input to the target
// event ID to rerun from.
RerunWorkflowFromEvent(ctx context.Context, req *protos.RerunWorkflowFromEventRequest) (api.InstanceID, error)
// AddNewEvent adds a new workflow event to the specified workflow instance.
AddNewWorkflowEvent(context.Context, api.InstanceID, *HistoryEvent) error
// NextWorkflowWorkItem blocks and returns the next workflow work
// item from the task hub. Should only return an error when shutting down.
NextWorkflowWorkItem(context.Context) (*WorkflowWorkItem, error)
// GetWorkflowRuntimeState gets the runtime state of a workflow instance.
GetWorkflowRuntimeState(context.Context, *WorkflowWorkItem) (*WorkflowRuntimeState, error)
// WatchWorkflowRuntimeStatus is a streaming API to watch for changes to
// the OrchestrtionMetadata, receiving events as and when the state changes.
// When the given condition is true, returns.
// Used over polling the metadata.
WatchWorkflowRuntimeStatus(ctx context.Context, id api.InstanceID, condition func(*WorkflowMetadata) bool) error
// GetWorkflowMetadata gets the metadata associated with the given workflow instance ID.
//
// Returns [api.ErrInstanceNotFound] if the workflow instance doesn't exist.
GetWorkflowMetadata(context.Context, api.InstanceID) (*WorkflowMetadata, error)
// CompleteWorkflowWorkItem completes a work item by saving the updated runtime state to durable storage.
//
// Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain).
CompleteWorkflowWorkItem(context.Context, *WorkflowWorkItem) error
// AbandonWorkflowWorkItem undoes any state changes and returns the work item to the work item queue.
//
// This is called if an internal failure happens in the processing of a workflow work item. It is
// not called if the workflow work item is processed successfully (note that a workflow that
// completes with a failure is still considered a successfully processed work item).
AbandonWorkflowWorkItem(context.Context, *WorkflowWorkItem) error
// NextActivityWorkItem blocks and returns the next activity work item from
// the task hub. Should only return an error when shutting down.
NextActivityWorkItem(context.Context) (*ActivityWorkItem, error)
// CompleteActivityWorkItem sends a message to the parent workflow indicating activity completion.
//
// Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain).
CompleteActivityWorkItem(context.Context, *ActivityWorkItem) error
// AbandonActivityWorkItem returns the work-item back to the queue without committing any other chances.
//
// This is called when an internal failure occurs during activity work-item processing.
AbandonActivityWorkItem(context.Context, *ActivityWorkItem) error
// PurgeWorkflowState deletes saved workflow state. When router is nil or
// targets the local app, this is a single-instance purge of id and the
// returned count is 1 on success. When router carries a foreign TargetAppID
// set by the recursive purge driver for a child started cross-app the
// backend is expected to delegate the entire recursive purge of that subtree
// to the target app (the local recursion stops walking through it) and
// return the number of instances purged on the remote side.
// [api.ErrInstanceNotFound] is returned if the specified workflow instance doesn't exist.
// [api.ErrNotCompleted] is returned if the specified workflow instance is still running.
PurgeWorkflowState(ctx context.Context, id api.InstanceID, router *protos.TaskRouter, force bool) (int, error)
// CompleteWorkflowTask completes the workflow task by saving the updated runtime state to durable storage.
CompleteWorkflowTask(context.Context, *protos.WorkflowResponse) error
// CancelWorkflowTask cancels the workflow task so instances of WaitForWorkflowTaskCompletion will return an error.
CancelWorkflowTask(context.Context, api.InstanceID) error
// WaitForWorkflowTaskCompletion blocks until the workflow completes and returns the final response.
//
// [api.ErrTaskCancelled] is returned if the task was cancelled.
WaitForWorkflowTaskCompletion(*protos.WorkflowRequest) func(context.Context) (*protos.WorkflowResponse, error)
// CompleteActivityTask completes the activity task by saving the updated runtime state to durable storage.
CompleteActivityTask(context.Context, *protos.ActivityResponse) error
// CancelActivityTask cancels the activity task so instances of WaitForActivityCompletion will return an error.
CancelActivityTask(context.Context, api.InstanceID, int32) error
// WaitForActivityCompletion blocks until the activity completes and returns the final response.
//
// [api.ErrTaskCancelled] is returned if the task was cancelled.
WaitForActivityCompletion(*protos.ActivityRequest) func(context.Context) (*protos.ActivityResponse, error)
// ListInstanceIDs lists workflow instance IDs based on the provided
// query parameters.
ListInstanceIDs(ctx context.Context, req *protos.ListInstanceIDsRequest) (*protos.ListInstanceIDsResponse, error)
// GetInstanceHistory returns the full current history of a workflow instance.
GetInstanceHistory(ctx context.Context, req *protos.GetInstanceHistoryRequest) (*protos.GetInstanceHistoryResponse, error)
}
type BackendWorkflowState
deprecated
added in
v0.12.0
type BackendWorkflowState = protos.BackendWorkflowState
Deprecated: Use protos.BackendWorkflowState instead.
type BackendWorkflowStateMetadata ¶ added in v0.12.0
type BackendWorkflowStateMetadata = protos.BackendWorkflowStateMetadata
type ChildCompletionAttestation ¶ added in v0.12.0
type ChildCompletionAttestation = protos.ChildCompletionAttestation
type CreateWorkflowInstanceRequest ¶
type CreateWorkflowInstanceRequest = protos.CreateWorkflowInstanceRequest
type DurableTimer ¶
type DurableTimer = protos.DurableTimer
type ExecuteOptions ¶ added in v0.12.0
type ExecuteOptions struct {
PropagatedHistory *protos.PropagatedHistory
}
type Executor ¶
type Executor interface {
ExecuteWorkflow(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent, opts ExecuteOptions) (*protos.WorkflowResponse, error)
ExecuteActivity(ctx context.Context, iid api.InstanceID, e *protos.HistoryEvent, opts ExecuteOptions) (*protos.HistoryEvent, error)
Shutdown(ctx context.Context) error
}
func NewGrpcExecutor ¶
func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar))
type ExternalSigningCertificate ¶ added in v0.12.0
type ExternalSigningCertificate = protos.ExternalSigningCertificate
type GetInstanceHistoryRequest ¶ added in v0.11.0
type GetInstanceHistoryRequest = protos.GetInstanceHistoryRequest
type GetInstanceHistoryResponse ¶ added in v0.11.0
type GetInstanceHistoryResponse = protos.GetInstanceHistoryResponse
type HistoryEvent ¶
type HistoryEvent = protos.HistoryEvent
func UnmarshalHistoryEvent ¶
func UnmarshalHistoryEvent(bytes []byte) (*HistoryEvent, error)
UnmarshalHistoryEvent deserializes a HistoryEvent from a protobuf byte array.
type HistorySignature ¶ added in v0.12.0
type HistorySignature = protos.HistorySignature
type ListInstanceIDsRequest ¶ added in v0.11.0
type ListInstanceIDsRequest = protos.ListInstanceIDsRequest
type ListInstanceIDsResponse ¶ added in v0.11.0
type ListInstanceIDsResponse = protos.ListInstanceIDsResponse
type Logger ¶
type Logger interface {
// Debug logs a message at level Debug.
Debug(v ...any)
// Debugf logs a message at level Debug.
Debugf(format string, v ...any)
// Info logs a message at level Info.
Info(v ...any)
// Infof logs a message at level Info.
Infof(format string, v ...any)
// Warn logs a message at level Warn.
Warn(v ...any)
// Warnf logs a message at level Warn.
Warnf(format string, v ...any)
// Error logs a message at level Error.
Error(v ...any)
// Errorf logs a message at level Error.
Errorf(format string, v ...any)
}
func DefaultLogger ¶
func DefaultLogger() Logger
type NewTaskWorkerOptions ¶
type NewTaskWorkerOptions func(*WorkerOptions)
func WithMaxParallelism ¶
func WithMaxParallelism(n int32) NewTaskWorkerOptions
type OrchestrationMetadata
deprecated
type OrchestrationMetadata = WorkflowMetadata
Deprecated: Use WorkflowMetadata instead.
type OrchestrationRuntimeState
deprecated
type OrchestrationRuntimeState = WorkflowRuntimeState
Deprecated: Use WorkflowRuntimeState instead.
type OrchestrationRuntimeStateMessage
deprecated
type OrchestrationRuntimeStateMessage = WorkflowRuntimeStateMessage
Deprecated: Use WorkflowRuntimeStateMessage instead.
type OrchestrationStatus ¶
type OrchestrationStatus = protos.OrchestrationStatus
type OrchestrationWorkItem
deprecated
type OrchestrationWorkItem = WorkflowWorkItem
Deprecated: Use WorkflowWorkItem instead.
type RerunWorkflowFromEventRequest ¶ added in v0.7.0
type RerunWorkflowFromEventRequest = protos.RerunWorkflowFromEventRequest
type SigningCertificate ¶ added in v0.12.0
type SigningCertificate = protos.SigningCertificate
type TaskFailureDetails ¶
type TaskFailureDetails = protos.TaskFailureDetails
type TaskHubClient ¶
type TaskHubClient interface {
ScheduleNewWorkflow(ctx context.Context, workflow interface{}, opts ...api.NewWorkflowOptions) (api.InstanceID, error)
FetchWorkflowMetadata(ctx context.Context, id api.InstanceID) (*WorkflowMetadata, error)
WaitForWorkflowStart(ctx context.Context, id api.InstanceID) (*WorkflowMetadata, error)
WaitForWorkflowCompletion(ctx context.Context, id api.InstanceID) (*WorkflowMetadata, error)
TerminateWorkflow(ctx context.Context, id api.InstanceID, opts ...api.TerminateOptions) error
RaiseEvent(ctx context.Context, id api.InstanceID, eventName string, opts ...api.RaiseEventOptions) error
SuspendWorkflow(ctx context.Context, id api.InstanceID, reason string) error
ResumeWorkflow(ctx context.Context, id api.InstanceID, reason string) error
PurgeWorkflowState(ctx context.Context, id api.InstanceID, opts ...api.PurgeOptions) error
RerunWorkflowFromEvent(ctx context.Context, source api.InstanceID, eventID uint32, opts ...api.RerunOptions) (api.InstanceID, error)
}
func NewTaskHubClient ¶
func NewTaskHubClient(be Backend) TaskHubClient
type TaskHubWorker ¶
type TaskHubWorker interface {
// Start starts the backend and the configured internal workers.
Start(context.Context) error
// Shutdown stops the backend and all internal workers.
Shutdown(context.Context) error
}
func NewTaskHubWorker ¶
func NewTaskHubWorker(be Backend, workflowWorker TaskWorker[*WorkflowWorkItem], activityWorker TaskWorker[*ActivityWorkItem], logger Logger) TaskHubWorker
type TaskProcessor ¶
type TaskWorker ¶
type TaskWorker[T WorkItem] interface { // Start starts background polling for the activity work items. Start(context.Context) // StopAndDrain stops the worker and waits for all outstanding work items to finish. StopAndDrain() }
func NewActivityTaskWorker ¶
func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker[*ActivityWorkItem]
NewActivityTaskWorker constructs an activity worker.
func NewActivityTaskWorkerWithInProcess ¶ added in v0.12.0
func NewActivityTaskWorkerWithInProcess(be Backend, executor, inProcessExecutor ActivityExecutor, inProcessNamePrefix string, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker[*ActivityWorkItem]
NewActivityTaskWorkerWithInProcess constructs an activity worker that dispatches activities whose name starts with inProcessNamePrefix to inProcessExecutor. An empty prefix disables in-process dispatch.
func NewTaskWorker ¶
func NewTaskWorker[T WorkItem](p TaskProcessor[T], logger Logger, opts ...NewTaskWorkerOptions) TaskWorker[T]
func NewWorkflowWorker ¶ added in v0.12.0
func NewWorkflowWorker(opts WorkflowWorkerOptions, taskopts ...NewTaskWorkerOptions) TaskWorker[*WorkflowWorkItem]
type WorkerOptions ¶
type WorkerOptions struct {
MaxParallelWorkItems *int32
}
func NewWorkerOptions ¶
func NewWorkerOptions() *WorkerOptions
type WorkflowExecutor ¶ added in v0.12.0
type WorkflowExecutor interface {
ExecuteWorkflow(
ctx context.Context,
iid api.InstanceID,
oldEvents []*protos.HistoryEvent,
newEvents []*protos.HistoryEvent,
opts ExecuteOptions) (*protos.WorkflowResponse, error)
}
type WorkflowMetadata ¶ added in v0.12.0
type WorkflowMetadata = protos.WorkflowMetadata
type WorkflowRuntimeState ¶ added in v0.12.0
type WorkflowRuntimeState = protos.WorkflowRuntimeState
type WorkflowRuntimeStateMessage ¶ added in v0.12.0
type WorkflowRuntimeStateMessage = protos.WorkflowRuntimeStateMessage
type WorkflowState ¶
type WorkflowState = protos.WorkflowState
type WorkflowStateMetadata
deprecated
type WorkflowStateMetadata = BackendWorkflowStateMetadata
Deprecated: Use BackendWorkflowStateMetadata instead.
type WorkflowWorkItem ¶ added in v0.12.0
type WorkflowWorkItem struct {
InstanceID api.InstanceID
NewEvents []*HistoryEvent
LockedBy string
RetryCount int32
State *protos.WorkflowRuntimeState
Properties map[string]interface{}
// receive from caller
IncomingHistory *protos.PropagatedHistory
// sent to each activity (child wf use state.PendingMessages)
OutgoingHistory map[int32]*protos.PropagatedHistory
}
func (*WorkflowWorkItem) GetAbandonDelay ¶ added in v0.12.0
func (wi *WorkflowWorkItem) GetAbandonDelay() time.Duration
func (WorkflowWorkItem) IsWorkItem ¶ added in v0.12.0
func (wi WorkflowWorkItem) IsWorkItem() bool
IsWorkItem implements core.WorkItem
func (WorkflowWorkItem) String ¶ added in v0.12.0
func (wi WorkflowWorkItem) String() string
String implements core.WorkItem and fmt.Stringer
type WorkflowWorkerOptions ¶ added in v0.12.0
type WorkflowWorkerOptions struct {
Backend Backend
Executor WorkflowExecutor
Logger Logger
AppID string
Namespace string
// InProcessExecutor is used to dispatch work items whose workflow name has
// InProcessNamePrefix as a prefix. This is how internal dapr-side workflows
// (e.g. dapr.internal.mcp.*) run inside the sidecar instead of being shipped
// to an external SDK via the gRPC work-item stream.
InProcessExecutor WorkflowExecutor
// InProcessNamePrefix is the workflow-name prefix that selects InProcessExecutor.
// Empty string disables prefix-based dispatch.
InProcessNamePrefix string
}
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
dedup
Package dedup detects duplicate task-resolution events (TaskCompleted/TaskFailed/TimerFired/ChildWorkflowInstance{Completed,Failed}) inside a workflow's runtime state.
|
Package dedup detects duplicate task-resolution events (TaskCompleted/TaskFailed/TimerFired/ChildWorkflowInstance{Completed,Failed}) inside a workflow's runtime state. |