Documentation
¶
Index ¶
- Constants
- func NoOpServiceTaskConsumerFn(_ context.Context, _ *model.TaskSpec) error
- func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)
- type AbortType
- type Engine
- func (s *Engine) Compensate(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)
- func (s *Engine) GetGatewayInstanceID(state *model.WorkflowState) (string, string, error)
- func (s *Engine) Shutdown()
- func (c *Engine) Start(ctx context.Context) error
- func (s *Engine) StartProcessing(ctx context.Context) error
- type Features
- type MockServiceTaskConsumerFn
- type MockServiceTaskConsumerFn_Execute_Call
- func (_c *MockServiceTaskConsumerFn_Execute_Call) Return(_a0 error) *MockServiceTaskConsumerFn_Execute_Call
- func (_c *MockServiceTaskConsumerFn_Execute_Call) Run(run func(ctx context.Context, taskSpec *model.TaskSpec)) *MockServiceTaskConsumerFn_Execute_Call
- func (_c *MockServiceTaskConsumerFn_Execute_Call) RunAndReturn(run func(context.Context, *model.TaskSpec) error) *MockServiceTaskConsumerFn_Execute_Call
- type MockServiceTaskConsumerFn_Expecter
- type MockWorkflowProcessMappingFn
- type MockWorkflowProcessMappingFn_Execute_Call
- func (_c *MockWorkflowProcessMappingFn_Execute_Call) Return(_a0 uint64, _a1 error) *MockWorkflowProcessMappingFn_Execute_Call
- func (_c *MockWorkflowProcessMappingFn_Execute_Call) Run(run func(ctx context.Context, wf *model.Workflow, i *model.Process)) *MockWorkflowProcessMappingFn_Execute_Call
- func (_c *MockWorkflowProcessMappingFn_Execute_Call) RunAndReturn(run func(context.Context, *model.Workflow, *model.Process) (uint64, error)) *MockWorkflowProcessMappingFn_Execute_Call
- type MockWorkflowProcessMappingFn_Expecter
- type MocksetPartyFn
- type MocksetPartyFn_Execute_Call
- func (_c *MocksetPartyFn_Execute_Call) Return(_a0 *model.Exchange, _a1 error) *MocksetPartyFn_Execute_Call
- func (_c *MocksetPartyFn_Execute_Call) Run(run func(exch *model.Exchange)) *MocksetPartyFn_Execute_Call
- func (_c *MocksetPartyFn_Execute_Call) RunAndReturn(run func(*model.Exchange) (*model.Exchange, error)) *MocksetPartyFn_Execute_Call
- type MocksetPartyFn_Expecter
- type Operations
- func (c *Operations) AbortProcess(ctx context.Context, state *model.WorkflowState, cascadeUntilProcessID string) error
- func (c *Operations) CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error
- func (c *Operations) CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (c *Operations) CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (c *Operations) CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (c *Operations) CompleteUserTask(ctx context.Context, jobId string) error
- func (s *Operations) DeleteFatalError(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) DeleteJob(ctx context.Context, trackingID string) error
- func (s *Operations) DeleteNamespace(ctx context.Context, ns string) error
- func (s *Operations) DeprecateTaskSpec(ctx context.Context, uid []string) error
- func (s *Operations) GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
- func (s *Operations) GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
- func (s *Operations) GetExecutableWorkflowIds(ctx context.Context) ([]string, error)
- func (s *Operations) GetExecution(ctx context.Context, executionID string) (*model.Execution, error)
- func (s *Operations) GetFatalErrors(ctx context.Context, keyPrefix string) <-chan data.Result[*model.FatalError]
- func (s *Operations) GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)
- func (s *Operations) GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)
- func (s *Operations) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
- func (s *Operations) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)
- func (s *Operations) GetTaskSpecUID(ctx context.Context, name string) (string, error)
- func (s *Operations) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
- func (s *Operations) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
- func (s *Operations) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
- func (s *Operations) GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)
- func (c *Operations) HandleWorkflowError(ctx context.Context, errorCode string, inVars []byte, ...) error
- func (s *Operations) Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error
- func (c *Operations) Launch(ctx context.Context, processId string, vars []byte, headers []byte) (string, string, error)
- func (c *Operations) LaunchWithParent(ctx context.Context, processId string, ID common.TrackingID, vrs []byte, ...) (string, string, error)
- func (s *Operations) ListExecutableProcesses(ctx context.Context) <-chan data.Result[*model.ListExecutableProcessesItem]
- func (s *Operations) ListExecutionProcesses(ctx context.Context, id string) ([]string, error)
- func (s *Operations) ListExecutions(ctx context.Context, workflowName string) <-chan data.Result[*model.ListExecutionItem]
- func (s *Operations) ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)
- func (s *Operations) ListUserTasks(ctx context.Context, owner string, group string) <-chan data.Result[*model.ListUserTasksResponse]
- func (s *Operations) Log(ctx context.Context, req *model.LogRequest) error
- func (s *Operations) OpenUserTask(ctx context.Context, id string, owner string) ([]byte, error)
- func (s *Operations) OwnerID(ctx context.Context, name string) (string, error)
- func (s *Operations) OwnerName(ctx context.Context, id string) (string, error)
- func (s *Operations) PauseServiceTask(ctx context.Context, uid string) error
- func (s *Operations) PersistFatalError(ctx context.Context, fatalError *model.FatalError) (bool, error)
- func (s *Operations) ProcessTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, ...) error
- func (s *Operations) PublishMsg(ctx context.Context, subject string, msg proto.Message) error
- func (s *Operations) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, ...) error
- func (s *Operations) PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)
- func (s *Operations) ResumeServiceTask(ctx context.Context, uid string) error
- func (s *Operations) RetryActivity(ctx context.Context, processInstanceId string, failedElementId string, ...) error
- func (s *Operations) SaveUserTask(ctx context.Context, id string, b []byte, overwrite bool) error
- func (s *Operations) SignalFatalErrorPause(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
- func (s *Operations) SignalFatalErrorTeardown(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
- type Ops
- type ServiceTaskConsumerFn
- type TaskOperations
- func (to *TaskOperations) GetLatestTaskSpecUID(ctx context.Context, name string) (string, error)
- func (to *TaskOperations) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)
- func (to *TaskOperations) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
- func (s *TaskOperations) PauseServiceTask(ctx context.Context, uid string) error
- func (s *TaskOperations) ResumeServiceTask(ctx context.Context, uid string) error
- type WorkflowOperations
- func (wfo *WorkflowOperations) DisableWorkflow(ctx context.Context, workflowName string) error
- func (wfo *WorkflowOperations) EnableWorkflow(ctx context.Context, workflowName string) error
- func (wfo *WorkflowOperations) GetWorkflow(ctx context.Context, workflowID string) (*model.Workflow, error)
- func (wfo *WorkflowOperations) ListWorkflows(ctx context.Context) <-chan data.Result[*model.ListWorkflowResponse]
- func (wfo *WorkflowOperations) LoadWorkflow(ctx context.Context, model *model.Workflow) (string, error)
- func (wfo *WorkflowOperations) ProcessTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, ...) error
- func (wfo *WorkflowOperations) StreamWorkflowVersions(ctx context.Context, workflowName string) <-chan data.Result[*model.WorkflowVersion]
- type WorkflowProcessMappingFn
Constants ¶
const ( // AbortTypeActivity signifies an activity is being aborted AbortTypeActivity = iota // AbortTypeServiceTask signifies a service task is being aborted AbortTypeServiceTask )
Variables ¶
This section is empty.
Functions ¶
func NoOpServiceTaskConsumerFn ¶
NoOpServiceTaskConsumerFn no op service task consumer fn
Types ¶
type AbortType ¶
type AbortType int
AbortType represents the type of termination being handled by the abort function
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine contains the workflow processing functions
func NewEngine ¶ added in v1.1.1687
func NewEngine(natsService *natz.NatsService, operations *Operations, historyOperations store.WorkflowHistoryOperations, wfOperations *WorkflowOperations, options *option.ServerOptions) (*Engine, error)
NewEngine returns an instance of the core workflow engine.
func (*Engine) Compensate ¶
Compensate is a method of the Engine struct that performs the compensation process for a given workflow state. It retrieves the necessary information and history from the workflow and processes it to determine the compensation steps. It then creates a compensation plan and publishes each step to a designated subject for further processing. Finally, it updates the state of the workflow to indicate that the compensation is in progress. It returns an error if any step of the compensation process encounters an issue.
func (*Engine) GetGatewayInstance ¶
func (s *Engine) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)
GetGatewayInstance - returns a gateway instance from the KV store.
func (*Engine) GetGatewayInstanceID ¶
GetGatewayInstanceID - returns a gateawy instance ID and a satisfying route to that gateway.
func (*Engine) Shutdown ¶
func (s *Engine) Shutdown()
Shutdown signals the engine to stop processing.
type Features ¶ added in v1.1.1913
type Features struct {
// ExplicitProcessReturns indicates whether the process should return only explicitly declared variables.
ExplicitProcessReturns bool
}
Features to be enabled or disabled.
type MockServiceTaskConsumerFn ¶ added in v1.1.1404
MockServiceTaskConsumerFn is an autogenerated mock type for the ServiceTaskConsumerFn type
func NewMockServiceTaskConsumerFn ¶ added in v1.1.1404
func NewMockServiceTaskConsumerFn(t interface {
mock.TestingT
Cleanup(func())
}) *MockServiceTaskConsumerFn
NewMockServiceTaskConsumerFn creates a new instance of MockServiceTaskConsumerFn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockServiceTaskConsumerFn) EXPECT ¶ added in v1.1.1404
func (_m *MockServiceTaskConsumerFn) EXPECT() *MockServiceTaskConsumerFn_Expecter
type MockServiceTaskConsumerFn_Execute_Call ¶ added in v1.1.1404
MockServiceTaskConsumerFn_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute'
func (*MockServiceTaskConsumerFn_Execute_Call) Return ¶ added in v1.1.1404
func (_c *MockServiceTaskConsumerFn_Execute_Call) Return(_a0 error) *MockServiceTaskConsumerFn_Execute_Call
func (*MockServiceTaskConsumerFn_Execute_Call) Run ¶ added in v1.1.1404
func (_c *MockServiceTaskConsumerFn_Execute_Call) Run(run func(ctx context.Context, taskSpec *model.TaskSpec)) *MockServiceTaskConsumerFn_Execute_Call
func (*MockServiceTaskConsumerFn_Execute_Call) RunAndReturn ¶ added in v1.1.1404
func (_c *MockServiceTaskConsumerFn_Execute_Call) RunAndReturn(run func(context.Context, *model.TaskSpec) error) *MockServiceTaskConsumerFn_Execute_Call
type MockServiceTaskConsumerFn_Expecter ¶ added in v1.1.1404
type MockServiceTaskConsumerFn_Expecter struct {
// contains filtered or unexported fields
}
func (*MockServiceTaskConsumerFn_Expecter) Execute ¶ added in v1.1.1404
func (_e *MockServiceTaskConsumerFn_Expecter) Execute(ctx interface{}, taskSpec interface{}) *MockServiceTaskConsumerFn_Execute_Call
Execute is a helper method to define mock.On call
- ctx context.Context
- taskSpec *model.TaskSpec
type MockWorkflowProcessMappingFn ¶ added in v1.1.1404
MockWorkflowProcessMappingFn is an autogenerated mock type for the WorkflowProcessMappingFn type
func NewMockWorkflowProcessMappingFn ¶ added in v1.1.1404
func NewMockWorkflowProcessMappingFn(t interface {
mock.TestingT
Cleanup(func())
}) *MockWorkflowProcessMappingFn
NewMockWorkflowProcessMappingFn creates a new instance of MockWorkflowProcessMappingFn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockWorkflowProcessMappingFn) EXPECT ¶ added in v1.1.1404
func (_m *MockWorkflowProcessMappingFn) EXPECT() *MockWorkflowProcessMappingFn_Expecter
type MockWorkflowProcessMappingFn_Execute_Call ¶ added in v1.1.1404
MockWorkflowProcessMappingFn_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute'
func (*MockWorkflowProcessMappingFn_Execute_Call) Return ¶ added in v1.1.1404
func (_c *MockWorkflowProcessMappingFn_Execute_Call) Return(_a0 uint64, _a1 error) *MockWorkflowProcessMappingFn_Execute_Call
func (*MockWorkflowProcessMappingFn_Execute_Call) Run ¶ added in v1.1.1404
func (_c *MockWorkflowProcessMappingFn_Execute_Call) Run(run func(ctx context.Context, wf *model.Workflow, i *model.Process)) *MockWorkflowProcessMappingFn_Execute_Call
func (*MockWorkflowProcessMappingFn_Execute_Call) RunAndReturn ¶ added in v1.1.1404
func (_c *MockWorkflowProcessMappingFn_Execute_Call) RunAndReturn(run func(context.Context, *model.Workflow, *model.Process) (uint64, error)) *MockWorkflowProcessMappingFn_Execute_Call
type MockWorkflowProcessMappingFn_Expecter ¶ added in v1.1.1404
type MockWorkflowProcessMappingFn_Expecter struct {
// contains filtered or unexported fields
}
func (*MockWorkflowProcessMappingFn_Expecter) Execute ¶ added in v1.1.1404
func (_e *MockWorkflowProcessMappingFn_Expecter) Execute(ctx interface{}, wf interface{}, i interface{}) *MockWorkflowProcessMappingFn_Execute_Call
Execute is a helper method to define mock.On call
- ctx context.Context
- wf *model.Workflow
- i *model.Process
type MocksetPartyFn ¶ added in v1.1.1404
MocksetPartyFn is an autogenerated mock type for the setPartyFn type
func NewMocksetPartyFn ¶ added in v1.1.1404
func NewMocksetPartyFn(t interface {
mock.TestingT
Cleanup(func())
}) *MocksetPartyFn
NewMocksetPartyFn creates a new instance of MocksetPartyFn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MocksetPartyFn) EXPECT ¶ added in v1.1.1404
func (_m *MocksetPartyFn) EXPECT() *MocksetPartyFn_Expecter
type MocksetPartyFn_Execute_Call ¶ added in v1.1.1404
MocksetPartyFn_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute'
func (*MocksetPartyFn_Execute_Call) Return ¶ added in v1.1.1404
func (_c *MocksetPartyFn_Execute_Call) Return(_a0 *model.Exchange, _a1 error) *MocksetPartyFn_Execute_Call
func (*MocksetPartyFn_Execute_Call) Run ¶ added in v1.1.1404
func (_c *MocksetPartyFn_Execute_Call) Run(run func(exch *model.Exchange)) *MocksetPartyFn_Execute_Call
func (*MocksetPartyFn_Execute_Call) RunAndReturn ¶ added in v1.1.1404
func (_c *MocksetPartyFn_Execute_Call) RunAndReturn(run func(*model.Exchange) (*model.Exchange, error)) *MocksetPartyFn_Execute_Call
type MocksetPartyFn_Expecter ¶ added in v1.1.1404
type MocksetPartyFn_Expecter struct {
// contains filtered or unexported fields
}
func (*MocksetPartyFn_Expecter) Execute ¶ added in v1.1.1404
func (_e *MocksetPartyFn_Expecter) Execute(exch interface{}) *MocksetPartyFn_Execute_Call
Execute is a helper method to define mock.On call
- exch *model.Exchange
type Operations ¶ added in v1.1.1131
type Operations struct {
Feature Features
// contains filtered or unexported fields
}
Operations provides methods for executing and managing workflow processes. It provides methods for various tasks such as canceling process instances, completing tasks, retrieving workflow-related information, and managing workflow execution.
func NewOperations ¶ added in v1.1.1131
func NewOperations(natsService *natz.NatsService, natsConn *natz.NatsConnConfiguration, exprEngine *expression.ExprEngine, historyOperations store.WorkflowHistoryOperations, workflowOperations *WorkflowOperations, taskOperations *TaskOperations, tracerProvider trace.TracerProvider) (*Operations, error)
NewOperations constructs a new Operations instance
func (*Operations) AbortProcess ¶ added in v1.1.1913
func (c *Operations) AbortProcess(ctx context.Context, state *model.WorkflowState, cascadeUntilProcessID string) error
AbortProcess aborts the process instance and all parent process instances.
func (*Operations) CancelProcessInstance ¶ added in v1.1.1131
func (c *Operations) CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error
CancelProcessInstance cancels a workflow instance with a reason.
func (*Operations) CompleteManualTask ¶ added in v1.1.1131
func (c *Operations) CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteManualTask completes a manual workflow task
func (*Operations) CompleteSendMessageTask ¶ added in v1.1.1131
func (c *Operations) CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteSendMessageTask completes a send message task
func (*Operations) CompleteServiceTask ¶ added in v1.1.1131
func (c *Operations) CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteServiceTask completes a workflow service task
func (*Operations) CompleteUserTask ¶ added in v1.1.1131
func (c *Operations) CompleteUserTask(ctx context.Context, jobId string) error
CompleteUserTask completes and closes a user task with variables
func (*Operations) DeleteFatalError ¶ added in v1.1.1377
func (s *Operations) DeleteFatalError(ctx context.Context, state *model.WorkflowState) error
DeleteFatalError removes the fatal error for a given workflow state
func (*Operations) DeleteJob ¶ added in v1.1.1131
func (s *Operations) DeleteJob(ctx context.Context, trackingID string) error
DeleteJob removes a workflow task state.
func (*Operations) DeleteNamespace ¶ added in v1.1.1131
func (s *Operations) DeleteNamespace(ctx context.Context, ns string) error
DeleteNamespace deletes the key-value store for the specified namespace in SHAR. It iterates over all the key-value stores and deletes them one by one. The function returns nil if all key-value stores are successfully deleted.
func (*Operations) DeprecateTaskSpec ¶ added in v1.1.1131
func (s *Operations) DeprecateTaskSpec(ctx context.Context, uid []string) error
DeprecateTaskSpec deprecates one or more task specs by ID.
func (*Operations) GetCompensationInputVariables ¶ added in v1.1.1131
func (s *Operations) GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
GetCompensationInputVariables is a method of the Operations struct that retrieves the original input variables for a specific process instance and tracking ID. It returns the variables in byte array format.
func (*Operations) GetCompensationOutputVariables ¶ added in v1.1.1131
func (s *Operations) GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
GetCompensationOutputVariables is a method of the Operations struct that retrieves the original output variables of a compensation history entry for a specific process instance and tracking ID. It returns the output variables as a byte array.
func (*Operations) GetExecutableWorkflowIds ¶ added in v1.1.1131
func (s *Operations) GetExecutableWorkflowIds(ctx context.Context) ([]string, error)
GetExecutableWorkflowIds returns a list of all workflow Ids that contain executable processes
func (*Operations) GetExecution ¶ added in v1.1.1131
func (s *Operations) GetExecution(ctx context.Context, executionID string) (*model.Execution, error)
GetExecution retrieves an execution given its ID.
func (*Operations) GetFatalErrors ¶ added in v1.1.1377
func (s *Operations) GetFatalErrors(ctx context.Context, keyPrefix string) <-chan data.Result[*model.FatalError]
GetFatalErrors queries the fatal error KV with a key prefix of the format <workflowName>.<executionId>.<processInstanceId>. and sends the results down the provided fatalErrs channel.
func (*Operations) GetJob ¶ added in v1.1.1131
func (s *Operations) GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)
GetJob gets a workflow task state.
func (*Operations) GetProcessIdFor ¶ added in v1.1.1131
func (s *Operations) GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)
GetProcessIdFor retrieves the processId that a begun by a message start event
func (*Operations) GetProcessInstance ¶ added in v1.1.1131
func (s *Operations) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
GetProcessInstance returns a process instance for a given process ID
func (*Operations) GetTaskSpecByUID ¶ added in v1.1.1131
GetTaskSpecByUID fetches a task spec from the database.
func (*Operations) GetTaskSpecUID ¶ added in v1.1.1131
GetTaskSpecUID fetches
func (*Operations) GetTaskSpecUsage ¶ added in v1.1.1131
func (s *Operations) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
GetTaskSpecUsage returns the usage report for a list of task specs.
func (*Operations) GetTaskSpecUsageByName ¶ added in v1.1.1131
func (s *Operations) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
GetTaskSpecUsageByName produces a report of running and executable places where the task spec is in use.
func (*Operations) GetTaskSpecVersions ¶ added in v1.1.1131
func (s *Operations) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
GetTaskSpecVersions fetches the versions of a given task spec name
func (*Operations) GetUserTaskIDs ¶ added in v1.1.1131
GetUserTaskIDs gets a list of tasks given an owner.
func (*Operations) HandleWorkflowError ¶ added in v1.1.1131
func (c *Operations) HandleWorkflowError(ctx context.Context, errorCode string, inVars []byte, state *model.WorkflowState, errorIfUnhandled bool, source model.WorkflowErrorSource) error
HandleWorkflowError handles a workflow error by looking up the error definitions in the workflow, determining the appropriate action, and publishing the necessary workflow state updates. It returns an error if there was an issue retrieving the workflow definition, if the workflow doesn't support the specified error code.
func (*Operations) Heartbeat ¶ added in v1.1.1131
func (s *Operations) Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error
Heartbeat saves a client status to the client KV.
func (*Operations) Launch ¶ added in v1.1.1131
func (c *Operations) Launch(ctx context.Context, processId string, vars []byte, headers []byte) (string, string, error)
Launch starts a new instance of a workflow and returns an execution ID.
func (*Operations) LaunchWithParent ¶ added in v1.1.1131
func (c *Operations) LaunchWithParent(ctx context.Context, processId string, ID common.TrackingID, vrs []byte, headers []byte, parentpiID string, parentElID string, subProcessWorkflowID string) (string, string, error)
LaunchWithParent contains the underlying logic to start a workflow. It is also called to spawn new instances of child workflows.
func (*Operations) ListExecutableProcesses ¶ added in v1.1.1131
func (s *Operations) ListExecutableProcesses(ctx context.Context) <-chan data.Result[*model.ListExecutableProcessesItem]
ListExecutableProcesses returns a list of all the executable processes in SHAR. It retrieves the current SHAR namespace from the context and fetches the workflow versions for that namespace from the key-value store. It then iterates through each workflow version and loads the corresponding workflow. For each process in the workflow, it creates a ListExecutableProcessesItem object and populates it with the process name, workflow name, and the executable start parameters obtained from the workflow's start events. It sends each ListExecutableProcessesItem object to the wch channel.
Parameters: - ctx: The context containing the SHAR namespace. - wch: The channel for sending the list of executable processes. - errs: The channel for sending any errors that occur.
Returns: Nothing. Errors are sent to the errs channel if encountered.
func (*Operations) ListExecutionProcesses ¶ added in v1.1.1131
ListExecutionProcesses gets the current processIDs for an execution.
func (*Operations) ListExecutions ¶ added in v1.1.1131
func (s *Operations) ListExecutions(ctx context.Context, workflowName string) <-chan data.Result[*model.ListExecutionItem]
ListExecutions returns a list of running workflows and versions given a workflow Name
func (*Operations) ListTaskSpecUIDs ¶ added in v1.1.1131
ListTaskSpecUIDs lists UIDs of active (and optionally deprecated) tasks specs.
func (*Operations) ListUserTasks ¶ added in v1.1.1492
func (s *Operations) ListUserTasks(ctx context.Context, owner string, group string) <-chan data.Result[*model.ListUserTasksResponse]
ListUserTasks lists user tasks based on the specified owner or group and returns results to the provided channel.
func (*Operations) Log ¶ added in v1.1.1131
func (s *Operations) Log(ctx context.Context, req *model.LogRequest) error
Log publishes LogRequest to WorkflowTelemetry Logs subject
func (*Operations) OpenUserTask ¶ added in v1.1.1492
OpenUserTask opens a user task by its ID and owner, ensuring the task is not locked by another owner.
func (*Operations) PauseServiceTask ¶ added in v1.1.1796
func (s *Operations) PauseServiceTask(ctx context.Context, uid string) error
PauseServiceTask pause the svc task identified by uid
func (*Operations) PersistFatalError ¶ added in v1.1.1377
func (s *Operations) PersistFatalError(ctx context.Context, fatalError *model.FatalError) (bool, error)
PersistFatalError saves a fatal error to the kv
func (*Operations) ProcessTasks ¶ added in v1.1.1492
func (s *Operations) ProcessTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error
ProcessTasks iterates over service tasks in the processes of a given workflow setting, validating them and setting their uid into their element definitions
func (*Operations) PublishMsg ¶ added in v1.1.1131
PublishMsg publishes a proto message.
func (*Operations) PublishWorkflowState ¶ added in v1.1.1131
func (s *Operations) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, headers ...natz.MsgHeader) error
PublishWorkflowState publishes a SHAR state object to a given subject
func (*Operations) PutTaskSpec ¶ added in v1.1.1131
PutTaskSpec writes a task spec to the database.
func (*Operations) ResumeServiceTask ¶ added in v1.1.1796
func (s *Operations) ResumeServiceTask(ctx context.Context, uid string) error
ResumeServiceTask pause the svc task identified by uid
func (*Operations) RetryActivity ¶ added in v1.1.1377
func (s *Operations) RetryActivity(ctx context.Context, processInstanceId string, failedElementId string, vars []byte, priorElementId string) error
RetryActivity publishes the state from a prior FatalError to attempt a retry. Vars are optional and if present, are merged for retry.
func (*Operations) SaveUserTask ¶ added in v1.1.1492
SaveUserTask saves the user task's state, either by overwriting or merging with existing state based on the `overwrite` flag.
func (*Operations) SignalFatalErrorPause ¶ added in v1.1.1377
func (s *Operations) SignalFatalErrorPause(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
SignalFatalErrorPause publishes a FatalError message on FatalErr of a process in a workflow with a Pause strategy
func (*Operations) SignalFatalErrorTeardown ¶ added in v1.1.1377
func (s *Operations) SignalFatalErrorTeardown(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
SignalFatalErrorTeardown publishes a FatalError message on FatalErr of a process in a workflow with a Teardown strategy
type Ops ¶ added in v1.1.1377
type Ops interface {
GetTaskSpecUID(ctx context.Context, name string) (string, error)
GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)
GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)
GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
GetExecutableWorkflowIds(ctx context.Context) ([]string, error)
GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
Launch(ctx context.Context, processId string, vars []byte, headers []byte) (string, string, error)
LaunchWithParent(ctx context.Context, processId string, ID common.TrackingID, vrs []byte, headers []byte, parentpiID string, parentElID string, subProcessWorkflowID string) (string, string, error)
CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error
CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteUserTask(ctx context.Context, jobId string) error
ProcessTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error
GetExecution(ctx context.Context, executionID string) (*model.Execution, error)
GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)
DeleteJob(ctx context.Context, trackingID string) error
ListExecutions(ctx context.Context, workflowName string) <-chan data.Result[*model.ListExecutionItem]
ListExecutionProcesses(ctx context.Context, id string) ([]string, error)
PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, opts ...natz.MsgHeader) error
SignalFatalErrorTeardown(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
SignalFatalErrorPause(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
PublishMsg(ctx context.Context, subject string, sharMsg proto.Message) error
GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)
OwnerID(ctx context.Context, name string) (string, error)
OwnerName(ctx context.Context, id string) (string, error)
GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
DeprecateTaskSpec(ctx context.Context, uid []string) error
ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)
GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)
Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error
Log(ctx context.Context, req *model.LogRequest) error
DeleteNamespace(ctx context.Context, ns string) error
ListExecutableProcesses(ctx context.Context) <-chan data.Result[*model.ListExecutableProcessesItem]
GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
HandleWorkflowError(ctx context.Context, errorCode string, inVars []byte, state *model.WorkflowState, errorIfUnhandled bool, source model.WorkflowErrorSource) error
GetFatalErrors(ctx context.Context, keyPrefix string) <-chan data.Result[*model.FatalError]
RetryActivity(ctx context.Context, processInstanceId string, elementId string, vars []byte, priorElementId string) error
PersistFatalError(ctx context.Context, fatalError *model.FatalError) (bool, error)
DeleteFatalError(ctx context.Context, state *model.WorkflowState) error
ListUserTasks(ctx context.Context, owner string, group string) <-chan data.Result[*model.ListUserTasksResponse]
SaveUserTask(ctx context.Context, id string, v []byte, overwrite bool) error
OpenUserTask(ctx context.Context, id string, owner string) ([]byte, error)
PauseServiceTask(ctx context.Context, uid string) error
ResumeServiceTask(ctx context.Context, uid string) error
}
Ops is the interface for the Operations struct
type ServiceTaskConsumerFn ¶
ServiceTaskConsumerFn defines the type of a function that ensures existence of a service task consumer
type TaskOperations ¶ added in v1.1.1687
type TaskOperations struct {
// contains filtered or unexported fields
}
TaskOperations is a struct off of which to hang varios svc task related opereations off of
func NewTaskOperations ¶ added in v1.1.1687
func NewTaskOperations(natsService *natz.NatsService) *TaskOperations
NewTaskOperations construct new Task Operations
func (*TaskOperations) GetLatestTaskSpecUID ¶ added in v1.1.1727
GetLatestTaskSpecUID fetches
func (*TaskOperations) GetTaskSpecByUID ¶ added in v1.1.1687
func (to *TaskOperations) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)
GetTaskSpecByUID fetches a task spec from the database.
func (*TaskOperations) GetTaskSpecVersions ¶ added in v1.1.1687
func (to *TaskOperations) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
GetTaskSpecVersions fetches the versions of a given task spec name
func (*TaskOperations) PauseServiceTask ¶ added in v1.1.1796
func (s *TaskOperations) PauseServiceTask(ctx context.Context, uid string) error
PauseServiceTask pause the svc task identified by uid
func (*TaskOperations) ResumeServiceTask ¶ added in v1.1.1796
func (s *TaskOperations) ResumeServiceTask(ctx context.Context, uid string) error
ResumeServiceTask pause the svc task identified by uid
type WorkflowOperations ¶ added in v1.1.1687
type WorkflowOperations struct {
// contains filtered or unexported fields
}
WorkflowOperations is a place to hang operations related to the Workflow model off of
func NewWorkflowOperations ¶ added in v1.1.1687
func NewWorkflowOperations(natsService *natz.NatsService, taskOperations *TaskOperations, exprEngine expression.Engine) (*WorkflowOperations, error)
NewWorkflowOperations constructs a new WorkflowOperations struct
func (*WorkflowOperations) DisableWorkflow ¶ added in v1.1.1687
func (wfo *WorkflowOperations) DisableWorkflow(ctx context.Context, workflowName string) error
DisableWorkflow stops a workflow from being launched
func (*WorkflowOperations) EnableWorkflow ¶ added in v1.1.1687
func (wfo *WorkflowOperations) EnableWorkflow(ctx context.Context, workflowName string) error
EnableWorkflow allows a workflow to be launched
func (*WorkflowOperations) GetWorkflow ¶ added in v1.1.1687
func (wfo *WorkflowOperations) GetWorkflow(ctx context.Context, workflowID string) (*model.Workflow, error)
GetWorkflow - retrieves a workflow model given its ID
func (*WorkflowOperations) ListWorkflows ¶ added in v1.1.1687
func (wfo *WorkflowOperations) ListWorkflows(ctx context.Context) <-chan data.Result[*model.ListWorkflowResponse]
ListWorkflows returns a list of all the workflows in SHAR.
func (*WorkflowOperations) LoadWorkflow ¶ added in v1.1.1687
func (wfo *WorkflowOperations) LoadWorkflow(ctx context.Context, model *model.Workflow) (string, error)
LoadWorkflow loads a model.Process describing a workflow into the engine ready for execution.
func (*WorkflowOperations) ProcessTasks ¶ added in v1.1.1687
func (wfo *WorkflowOperations) ProcessTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error
ProcessTasks iterates over service tasks in the processes of a given workflow setting, validating them and setting their uid into their element definitions
func (*WorkflowOperations) StreamWorkflowVersions ¶ added in v1.1.1687
func (wfo *WorkflowOperations) StreamWorkflowVersions(ctx context.Context, workflowName string) <-chan data.Result[*model.WorkflowVersion]
StreamWorkflowVersions - send the versions of the workflow down the supplied channel