Documentation
¶
Index ¶
- Constants
- Variables
- func InjectWorkflowExecutionContext(ctx context.Context, workflowExecutionContext *WorkflowExecutionContext) context.Context
- func NewWorkflowTaskProcessor(be backend.Backend, executor WorkflowTaskExecutor, logger *zap.Logger) worker.TaskProcessor[*task.WorkflowTask, *task.WorkflowTaskResult]
- type ActivityPromise
- type TimerPromise
- type WorkflowExecutionContext
- type WorkflowRuntime
- func (w *WorkflowRuntime) CreateTimer(fireAt int64) *TimerPromise
- func (w *WorkflowRuntime) GetWorkflowTaskResult() *task.WorkflowTaskResult
- func (w *WorkflowRuntime) RunSimulation() (err error)
- func (w *WorkflowRuntime) ScheduleNewActivity(activity any, input any) *ActivityPromise
- func (w *WorkflowRuntime) Step() (bool, error)
- type WorkflowTaskExecutor
Constants ¶
View Source
const WorkflowExecutionContextKey = "workflowExecutionContext"
Variables ¶
View Source
var ErrControlledPanic = errors.New("controlled panic")
View Source
var ErrNonDeterministicError = errors.New("non-deterministic error")
Functions ¶
func InjectWorkflowExecutionContext ¶
func InjectWorkflowExecutionContext(ctx context.Context, workflowExecutionContext *WorkflowExecutionContext) context.Context
func NewWorkflowTaskProcessor ¶
func NewWorkflowTaskProcessor( be backend.Backend, executor WorkflowTaskExecutor, logger *zap.Logger, ) worker.TaskProcessor[*task.WorkflowTask, *task.WorkflowTaskResult]
Types ¶
type ActivityPromise ¶
type ActivityPromise struct {
WorkflowRuntime *WorkflowRuntime
Activity any
Promise *promise.Promise[[]byte]
}
func NewActivityPromise ¶
func NewActivityPromise(runtime *WorkflowRuntime, activity any) *ActivityPromise
func (*ActivityPromise) Await ¶
func (a *ActivityPromise) Await() (any, error)
type TimerPromise ¶
type TimerPromise struct {
WorkflowRuntime *WorkflowRuntime
Promise *promise.Promise[struct{}]
}
func NewTimerPromise ¶
func NewTimerPromise(runtime *WorkflowRuntime) *TimerPromise
func (*TimerPromise) Await ¶
func (a *TimerPromise) Await() error
type WorkflowExecutionContext ¶
type WorkflowExecutionContext struct {
WorkflowRuntime *WorkflowRuntime
UserDefinedVars map[string]any
EventCallbacks map[string][]func([]byte)
}
func MustExtractWorkflowExecutionContext ¶
func MustExtractWorkflowExecutionContext(ctx context.Context) *WorkflowExecutionContext
func NewWorkflowExecutionContext ¶
func NewWorkflowExecutionContext(runtime *WorkflowRuntime) *WorkflowExecutionContext
type WorkflowRuntime ¶
type WorkflowRuntime struct {
// init
WorkflowRegistry *registry.WorkflowRegistry
DataConverter dataconverter.DataConverter
// workflow task
Task *task.WorkflowTask
// runtime state
HistoryIndex int
IsReplaying bool
SequenceNo int64
CurrentTimestamp int64
ActivityScheduledEvents map[int64]*history.ActivityScheduled
ActivityPromises map[int64]*ActivityPromise
TimerCreatedEvents map[int64]*history.TimerCreated
TimerPromises map[int64]*TimerPromise
// start
WorkflowExecutionStartedTimestamp int64
WorkflowExecutionStartedEvent *history.WorkflowExecutionStarted
WorkflowExecutionContext *WorkflowExecutionContext
Version string
// final event, can only be execution completed event, and it's internally emit
WorkflowExecutionCompleted *history.WorkflowExecutionCompleted
}
func NewWorkflowRuntime ¶
func NewWorkflowRuntime( workflowRegistry *registry.WorkflowRegistry, dataConverter dataconverter.DataConverter, task *task.WorkflowTask, ) *WorkflowRuntime
func (*WorkflowRuntime) CreateTimer ¶
func (w *WorkflowRuntime) CreateTimer(fireAt int64) *TimerPromise
func (*WorkflowRuntime) GetWorkflowTaskResult ¶
func (w *WorkflowRuntime) GetWorkflowTaskResult() *task.WorkflowTaskResult
func (*WorkflowRuntime) RunSimulation ¶
func (w *WorkflowRuntime) RunSimulation() (err error)
func (*WorkflowRuntime) ScheduleNewActivity ¶
func (w *WorkflowRuntime) ScheduleNewActivity(activity any, input any) *ActivityPromise
func (*WorkflowRuntime) Step ¶
func (w *WorkflowRuntime) Step() (bool, error)
type WorkflowTaskExecutor ¶
type WorkflowTaskExecutor interface {
Execute(ctx context.Context, task *task.WorkflowTask) (*task.WorkflowTaskResult, error)
}
func NewWorkflowTaskExecutor ¶
func NewWorkflowTaskExecutor( workflowRegistry *registry.WorkflowRegistry, dataConverter dataconverter.DataConverter, logger *zap.Logger, ) WorkflowTaskExecutor
Click to show internal directories.
Click to hide internal directories.