Documentation
¶
Index ¶
- Variables
- func IsDurableTaskGrpcRequest(fullMethodName string) bool
- func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error)
- func WithOnGetWorkItemsConnectionCallback(callback func(context.Context)) grpcExecutorOptions
- type ActivityExecutor
- type ActivityWorkItem
- type Backend
- type ExecutionResults
- type Executor
- type HistoryEvent
- type Logger
- type NewTaskWorkerOptions
- type OrchestrationRuntimeState
- func (s *OrchestrationRuntimeState) AddEvent(e *HistoryEvent) error
- func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorAction, currentTraceContext *protos.TraceContext) (bool, error)
- func (s *OrchestrationRuntimeState) CompletedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) ContinuedAsNew() bool
- func (s *OrchestrationRuntimeState) CreatedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) FailureDetails() (*TaskFailureDetails, error)
- func (s *OrchestrationRuntimeState) Input() (string, error)
- func (s *OrchestrationRuntimeState) InstanceID() api.InstanceID
- func (s *OrchestrationRuntimeState) IsCompleted() bool
- func (s *OrchestrationRuntimeState) IsValid() bool
- func (s *OrchestrationRuntimeState) LastUpdatedTime() (time.Time, error)
- func (s *OrchestrationRuntimeState) Name() (string, error)
- func (s *OrchestrationRuntimeState) NewEvents() []*HistoryEvent
- func (s *OrchestrationRuntimeState) OldEvents() []*HistoryEvent
- func (s *OrchestrationRuntimeState) Output() (string, error)
- func (s *OrchestrationRuntimeState) PendingMessages() []OrchestratorMessage
- func (s *OrchestrationRuntimeState) PendingTasks() []*HistoryEvent
- func (s *OrchestrationRuntimeState) PendingTimers() []*HistoryEvent
- func (s *OrchestrationRuntimeState) RuntimeStatus() protos.OrchestrationStatus
- func (s *OrchestrationRuntimeState) String() string
- type OrchestrationWorkItem
- type OrchestratorExecutor
- type OrchestratorMessage
- type TaskFailureDetails
- type TaskHubClient
- type TaskHubWorker
- type TaskProcessor
- type TaskWorker
- func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, ...) TaskWorker
- func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Logger, ...) TaskWorker
- func NewTaskWorker(be Backend, p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
- type WorkItem
- type WorkerOptions
Constants ¶
This section is empty.
Variables ¶
var ( ErrTaskHubExists = errors.New("task hub already exists") ErrTaskHubNotFound = errors.New("task hub not found") ErrNotInitialized = errors.New("backend not initialized") ErrWorkItemLockLost = errors.New("lock on work-item was lost") ErrBackendAlreadyStarted = errors.New("backend is already started") )
var ErrDuplicateEvent = errors.New("duplicate event")
var ErrNoWorkItems = errors.New("no work items were found")
Functions ¶
func IsDurableTaskGrpcRequest ¶ added in v0.1.1
IsDurableTaskGrpcRequest returns true if the specified gRPC method name represents an operation that is compatible with the gRPC executor.
func MarshalHistoryEvent ¶ added in v0.1.1
func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error)
MarshalHistoryEvent serializes the HistoryEvent into a protobuf byte array.
func WithOnGetWorkItemsConnectionCallback ¶ added in v0.1.1
WithOnGetWorkItemsConnectionCallback allows the caller to get a notification when an external process connects over gRPC and invokes the GetWorkItems operation. This can be useful for doing things like lazily auto-starting the task hub worker only when necessary.
Types ¶
type ActivityExecutor ¶
type ActivityExecutor interface {
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}
type ActivityWorkItem ¶
type ActivityWorkItem struct {
SequenceNumber int64
InstanceID api.InstanceID
NewEvent *HistoryEvent
Result *HistoryEvent
LockedBy string
Properties map[string]interface{}
}
func (*ActivityWorkItem) Description ¶
func (wi *ActivityWorkItem) Description() string
Description implements core.WorkItem
type Backend ¶
type Backend interface {
// CreateTaskHub creates a new task hub for the current backend. Task hub creation must be idempotent.
//
// If the task hub for this backend already exists, an error of type [ErrTaskHubExists] is returned.
CreateTaskHub(context.Context) error
// DeleteTaskHub deletes an existing task hub configured for the current backend. It's up to the backend
// implementation to determine how the task hub data is deleted.
//
// If the task hub for this backend doesn't exist, an error of type [ErrTaskHubNotFound] is returned.
DeleteTaskHub(context.Context) error
// Start starts any background processing done by this backend.
Start(context.Context) error
// Stop stops any background processing done by this backend.
Stop(context.Context) error
// CreateOrchestrationInstance creates a new orchestration instance with a history event that
// wraps a ExecutionStarted event.
CreateOrchestrationInstance(context.Context, *HistoryEvent) error
// AddNewEvent adds a new orchestration event to the specified orchestration instance.
AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error
// GetOrchestrationWorkItem gets a pending work item from the task hub or returns [ErrNoOrchWorkItems]
// if there are no pending work items.
GetOrchestrationWorkItem(context.Context) (*OrchestrationWorkItem, error)
// GetOrchestrationRuntimeState gets the runtime state of an orchestration instance.
GetOrchestrationRuntimeState(context.Context, *OrchestrationWorkItem) (*OrchestrationRuntimeState, error)
// GetOrchestrationMetadata gets the metadata associated with the given orchestration instance ID.
//
// Returns [api.ErrInstanceNotFound] if the orchestration instance doesn't exist.
GetOrchestrationMetadata(context.Context, api.InstanceID) (*api.OrchestrationMetadata, error)
// CompleteOrchestrationWorkItem completes a work item by saving the updated runtime state to durable storage.
//
// Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain).
CompleteOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error
// AbandonOrchestrationWorkItem undoes any state changes and returns the work item to the work item queue.
//
// This is called if an internal failure happens in the processing of an orchestration work item. It is
// not called if the orchestration work item is processed successfully (note that an orchestration that
// completes with a failure is still considered a successfully processed work item).
AbandonOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error
// GetActivityWorkItem gets a pending activity work item from the task hub or returns [ErrNoWorkItems]
// if there are no pending activity work items.
GetActivityWorkItem(context.Context) (*ActivityWorkItem, error)
// CompleteActivityWorkItem sends a message to the parent orchestration indicating activity completion.
//
// Returns [ErrWorkItemLockLost] if the work-item couldn't be completed due to a lock-lost conflict (e.g., split-brain).
CompleteActivityWorkItem(context.Context, *ActivityWorkItem) error
// AbandonActivityWorkItem returns the work-item back to the queue without committing any other chances.
//
// This is called when an internal failure occurs during activity work-item processing.
AbandonActivityWorkItem(context.Context, *ActivityWorkItem) error
}
type ExecutionResults ¶
type ExecutionResults struct {
Response *protos.OrchestratorResponse
// contains filtered or unexported fields
}
type Executor ¶
type Executor interface {
ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*ExecutionResults, error)
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}
type HistoryEvent ¶ added in v0.1.1
type HistoryEvent = protos.HistoryEvent
func UnmarshalHistoryEvent ¶ added in v0.1.1
func UnmarshalHistoryEvent(bytes []byte) (*HistoryEvent, error)
UnmarshalHistoryEvent deserializes a HistoryEvent from a protobuf byte array.
type Logger ¶ added in v0.1.1
type Logger interface {
// Debug logs a message at level Debug.
Debug(v ...any)
// Debugf logs a message at level Debug.
Debugf(format string, v ...any)
// Info logs a message at level Info.
Info(v ...any)
// Infof logs a message at level Info.
Infof(format string, v ...any)
// Warn logs a message at level Warn.
Warn(v ...any)
// Warnf logs a message at level Warn.
Warnf(format string, v ...any)
// Error logs a message at level Error.
Error(v ...any)
// Errorf logs a message at level Error.
Errorf(format string, v ...any)
}
func DefaultLogger ¶ added in v0.1.1
func DefaultLogger() Logger
type NewTaskWorkerOptions ¶ added in v0.1.1
type NewTaskWorkerOptions func(*WorkerOptions)
func WithMaxParallelism ¶ added in v0.1.1
func WithMaxParallelism(n int32) NewTaskWorkerOptions
type OrchestrationRuntimeState ¶
type OrchestrationRuntimeState struct {
CustomStatus *wrapperspb.StringValue
// contains filtered or unexported fields
}
func NewOrchestrationRuntimeState ¶
func NewOrchestrationRuntimeState(instanceID api.InstanceID, existingHistory []*HistoryEvent) *OrchestrationRuntimeState
func (*OrchestrationRuntimeState) AddEvent ¶
func (s *OrchestrationRuntimeState) AddEvent(e *HistoryEvent) error
AddEvent appends a new history event to the orchestration history
func (*OrchestrationRuntimeState) ApplyActions ¶
func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorAction, currentTraceContext *protos.TraceContext) (bool, error)
ApplyActions takes a set of actions and updates its internal state, including populating the outbox.
func (*OrchestrationRuntimeState) CompletedTime ¶
func (s *OrchestrationRuntimeState) CompletedTime() (time.Time, error)
func (*OrchestrationRuntimeState) ContinuedAsNew ¶
func (s *OrchestrationRuntimeState) ContinuedAsNew() bool
func (*OrchestrationRuntimeState) CreatedTime ¶
func (s *OrchestrationRuntimeState) CreatedTime() (time.Time, error)
func (*OrchestrationRuntimeState) FailureDetails ¶
func (s *OrchestrationRuntimeState) FailureDetails() (*TaskFailureDetails, error)
func (*OrchestrationRuntimeState) Input ¶
func (s *OrchestrationRuntimeState) Input() (string, error)
func (*OrchestrationRuntimeState) InstanceID ¶
func (s *OrchestrationRuntimeState) InstanceID() api.InstanceID
func (*OrchestrationRuntimeState) IsCompleted ¶
func (s *OrchestrationRuntimeState) IsCompleted() bool
func (*OrchestrationRuntimeState) IsValid ¶
func (s *OrchestrationRuntimeState) IsValid() bool
func (*OrchestrationRuntimeState) LastUpdatedTime ¶ added in v0.1.1
func (s *OrchestrationRuntimeState) LastUpdatedTime() (time.Time, error)
func (*OrchestrationRuntimeState) Name ¶
func (s *OrchestrationRuntimeState) Name() (string, error)
func (*OrchestrationRuntimeState) NewEvents ¶
func (s *OrchestrationRuntimeState) NewEvents() []*HistoryEvent
func (*OrchestrationRuntimeState) OldEvents ¶
func (s *OrchestrationRuntimeState) OldEvents() []*HistoryEvent
func (*OrchestrationRuntimeState) Output ¶
func (s *OrchestrationRuntimeState) Output() (string, error)
func (*OrchestrationRuntimeState) PendingMessages ¶
func (s *OrchestrationRuntimeState) PendingMessages() []OrchestratorMessage
func (*OrchestrationRuntimeState) PendingTasks ¶
func (s *OrchestrationRuntimeState) PendingTasks() []*HistoryEvent
func (*OrchestrationRuntimeState) PendingTimers ¶
func (s *OrchestrationRuntimeState) PendingTimers() []*HistoryEvent
func (*OrchestrationRuntimeState) RuntimeStatus ¶
func (s *OrchestrationRuntimeState) RuntimeStatus() protos.OrchestrationStatus
func (*OrchestrationRuntimeState) String ¶
func (s *OrchestrationRuntimeState) String() string
type OrchestrationWorkItem ¶
type OrchestrationWorkItem struct {
InstanceID api.InstanceID
NewEvents []*HistoryEvent
LockedBy string
RetryCount int32
State *OrchestrationRuntimeState
Properties map[string]interface{}
}
func (*OrchestrationWorkItem) Description ¶
func (wi *OrchestrationWorkItem) Description() string
func (*OrchestrationWorkItem) GetAbandonDelay ¶
func (wi *OrchestrationWorkItem) GetAbandonDelay() time.Duration
type OrchestratorExecutor ¶
type OrchestratorExecutor interface {
ExecuteOrchestrator(
ctx context.Context,
iid api.InstanceID,
oldEvents []*protos.HistoryEvent,
newEvents []*protos.HistoryEvent) (*ExecutionResults, error)
}
type OrchestratorMessage ¶
type OrchestratorMessage struct {
HistoryEvent *HistoryEvent
TargetInstanceID string
}
type TaskFailureDetails ¶ added in v0.1.1
type TaskFailureDetails = protos.TaskFailureDetails
type TaskHubClient ¶ added in v0.1.1
type TaskHubClient interface {
ScheduleNewOrchestration(ctx context.Context, orchestrator interface{}, opts ...api.NewOrchestrationOptions) (api.InstanceID, error)
FetchOrchestrationMetadata(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
WaitForOrchestrationStart(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
WaitForOrchestrationCompletion(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
TerminateOrchestration(ctx context.Context, id api.InstanceID, reason string) error
RaiseEvent(ctx context.Context, id api.InstanceID, eventName string, data any) error
}
func NewTaskHubClient ¶ added in v0.1.1
func NewTaskHubClient(be Backend) TaskHubClient
type TaskHubWorker ¶
type TaskHubWorker interface {
// Start starts the backend and the configured internal workers.
Start(context.Context) error
// Shutdown stops the backend and all internal workers.
Shutdown(context.Context) error
}
func NewTaskHubWorker ¶
func NewTaskHubWorker(be Backend, orchestrationWorker TaskWorker, activityWorker TaskWorker, logger Logger) TaskHubWorker
type TaskProcessor ¶
type TaskWorker ¶
type TaskWorker interface {
// Start starts background polling for the activity work items.
Start(context.Context)
// ProcessNext attempts to fetch and process a work item. This method returns
// true if a work item was found and processing started; false otherwise. An
// error is returned if the context is cancelled.
ProcessNext(context.Context) (bool, error)
// StopAndDrain stops the worker and waits for all outstanding work items to finish.
StopAndDrain()
}
func NewActivityTaskWorker ¶
func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
func NewOrchestrationWorker ¶
func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
func NewTaskWorker ¶
func NewTaskWorker(be Backend, p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker
type WorkerOptions ¶ added in v0.1.1
type WorkerOptions struct {
MaxParallelWorkItems int32
}
func NewWorkerOptions ¶ added in v0.1.1
func NewWorkerOptions() *WorkerOptions