workflow

package
v1.1.1966 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2025 License: MIT Imports: 53 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// AbortTypeActivity signifies an activity is being aborted
	AbortTypeActivity = iota
	// AbortTypeServiceTask signifies a service task is being aborted
	AbortTypeServiceTask
)

Variables

This section is empty.

Functions

func NoOpServiceTaskConsumerFn

func NoOpServiceTaskConsumerFn(_ context.Context, _ *model.TaskSpec) error

NoOpServiceTaskConsumerFn no op service task consumer fn

func NoOpWorkFlowProcessMappingFn

func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)

NoOpWorkFlowProcessMappingFn no op workflow to process mapping fn

Types

type AbortType

type AbortType int

AbortType represents the type of termination being handled by the abort function

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine contains the workflow processing functions

func NewEngine added in v1.1.1687

func NewEngine(natsService *natz.NatsService, operations *Operations, historyOperations store.WorkflowHistoryOperations, wfOperations *WorkflowOperations, options *option.ServerOptions) (*Engine, error)

NewEngine returns an instance of the core workflow engine.

func (*Engine) Compensate

func (s *Engine) Compensate(ctx context.Context, state *model.WorkflowState) error

Compensate is a method of the Engine struct that performs the compensation process for a given workflow state. It retrieves the necessary information and history from the workflow and processes it to determine the compensation steps. It then creates a compensation plan and publishes each step to a designated subject for further processing. Finally, it updates the state of the workflow to indicate that the compensation is in progress. It returns an error if any step of the compensation process encounters an issue.

func (*Engine) GetGatewayInstance

func (s *Engine) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)

GetGatewayInstance - returns a gateway instance from the KV store.

func (*Engine) GetGatewayInstanceID

func (s *Engine) GetGatewayInstanceID(state *model.WorkflowState) (string, string, error)

GetGatewayInstanceID - returns a gateawy instance ID and a satisfying route to that gateway.

func (*Engine) Shutdown

func (s *Engine) Shutdown()

Shutdown signals the engine to stop processing.

func (*Engine) Start

func (c *Engine) Start(ctx context.Context) error

Start sets up the activity and job processors and starts the engine processing workflows.

func (*Engine) StartProcessing

func (s *Engine) StartProcessing(ctx context.Context) error

StartProcessing begins listening to all the message processing queues.

type Features added in v1.1.1913

type Features struct {
	// ExplicitProcessReturns indicates whether the process should return only explicitly declared variables.
	ExplicitProcessReturns bool
}

Features to be enabled or disabled.

type MockServiceTaskConsumerFn added in v1.1.1404

type MockServiceTaskConsumerFn struct {
	mock.Mock
}

MockServiceTaskConsumerFn is an autogenerated mock type for the ServiceTaskConsumerFn type

func NewMockServiceTaskConsumerFn added in v1.1.1404

func NewMockServiceTaskConsumerFn(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockServiceTaskConsumerFn

NewMockServiceTaskConsumerFn creates a new instance of MockServiceTaskConsumerFn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockServiceTaskConsumerFn) EXPECT added in v1.1.1404

func (*MockServiceTaskConsumerFn) Execute added in v1.1.1404

func (_m *MockServiceTaskConsumerFn) Execute(ctx context.Context, taskSpec *model.TaskSpec) error

Execute provides a mock function with given fields: ctx, taskSpec

type MockServiceTaskConsumerFn_Execute_Call added in v1.1.1404

type MockServiceTaskConsumerFn_Execute_Call struct {
	*mock.Call
}

MockServiceTaskConsumerFn_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute'

func (*MockServiceTaskConsumerFn_Execute_Call) Return added in v1.1.1404

func (*MockServiceTaskConsumerFn_Execute_Call) Run added in v1.1.1404

func (*MockServiceTaskConsumerFn_Execute_Call) RunAndReturn added in v1.1.1404

type MockServiceTaskConsumerFn_Expecter added in v1.1.1404

type MockServiceTaskConsumerFn_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockServiceTaskConsumerFn_Expecter) Execute added in v1.1.1404

func (_e *MockServiceTaskConsumerFn_Expecter) Execute(ctx interface{}, taskSpec interface{}) *MockServiceTaskConsumerFn_Execute_Call

Execute is a helper method to define mock.On call

  • ctx context.Context
  • taskSpec *model.TaskSpec

type MockWorkflowProcessMappingFn added in v1.1.1404

type MockWorkflowProcessMappingFn struct {
	mock.Mock
}

MockWorkflowProcessMappingFn is an autogenerated mock type for the WorkflowProcessMappingFn type

func NewMockWorkflowProcessMappingFn added in v1.1.1404

func NewMockWorkflowProcessMappingFn(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockWorkflowProcessMappingFn

NewMockWorkflowProcessMappingFn creates a new instance of MockWorkflowProcessMappingFn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockWorkflowProcessMappingFn) EXPECT added in v1.1.1404

func (*MockWorkflowProcessMappingFn) Execute added in v1.1.1404

Execute provides a mock function with given fields: ctx, wf, i

type MockWorkflowProcessMappingFn_Execute_Call added in v1.1.1404

type MockWorkflowProcessMappingFn_Execute_Call struct {
	*mock.Call
}

MockWorkflowProcessMappingFn_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute'

func (*MockWorkflowProcessMappingFn_Execute_Call) Return added in v1.1.1404

func (*MockWorkflowProcessMappingFn_Execute_Call) Run added in v1.1.1404

func (*MockWorkflowProcessMappingFn_Execute_Call) RunAndReturn added in v1.1.1404

type MockWorkflowProcessMappingFn_Expecter added in v1.1.1404

type MockWorkflowProcessMappingFn_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockWorkflowProcessMappingFn_Expecter) Execute added in v1.1.1404

func (_e *MockWorkflowProcessMappingFn_Expecter) Execute(ctx interface{}, wf interface{}, i interface{}) *MockWorkflowProcessMappingFn_Execute_Call

Execute is a helper method to define mock.On call

  • ctx context.Context
  • wf *model.Workflow
  • i *model.Process

type MocksetPartyFn added in v1.1.1404

type MocksetPartyFn struct {
	mock.Mock
}

MocksetPartyFn is an autogenerated mock type for the setPartyFn type

func NewMocksetPartyFn added in v1.1.1404

func NewMocksetPartyFn(t interface {
	mock.TestingT
	Cleanup(func())
}) *MocksetPartyFn

NewMocksetPartyFn creates a new instance of MocksetPartyFn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MocksetPartyFn) EXPECT added in v1.1.1404

func (*MocksetPartyFn) Execute added in v1.1.1404

func (_m *MocksetPartyFn) Execute(exch *model.Exchange) (*model.Exchange, error)

Execute provides a mock function with given fields: exch

type MocksetPartyFn_Execute_Call added in v1.1.1404

type MocksetPartyFn_Execute_Call struct {
	*mock.Call
}

MocksetPartyFn_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute'

func (*MocksetPartyFn_Execute_Call) Return added in v1.1.1404

func (*MocksetPartyFn_Execute_Call) Run added in v1.1.1404

func (*MocksetPartyFn_Execute_Call) RunAndReturn added in v1.1.1404

type MocksetPartyFn_Expecter added in v1.1.1404

type MocksetPartyFn_Expecter struct {
	// contains filtered or unexported fields
}

func (*MocksetPartyFn_Expecter) Execute added in v1.1.1404

func (_e *MocksetPartyFn_Expecter) Execute(exch interface{}) *MocksetPartyFn_Execute_Call

Execute is a helper method to define mock.On call

  • exch *model.Exchange

type Operations added in v1.1.1131

type Operations struct {
	Feature Features
	// contains filtered or unexported fields
}

Operations provides methods for executing and managing workflow processes. It provides methods for various tasks such as canceling process instances, completing tasks, retrieving workflow-related information, and managing workflow execution.

func NewOperations added in v1.1.1131

func NewOperations(natsService *natz.NatsService, natsConn *natz.NatsConnConfiguration, exprEngine *expression.ExprEngine, historyOperations store.WorkflowHistoryOperations, workflowOperations *WorkflowOperations, taskOperations *TaskOperations, tracerProvider trace.TracerProvider) (*Operations, error)

NewOperations constructs a new Operations instance

func (*Operations) AbortProcess added in v1.1.1913

func (c *Operations) AbortProcess(ctx context.Context, state *model.WorkflowState, cascadeUntilProcessID string) error

AbortProcess aborts the process instance and all parent process instances.

func (*Operations) CancelProcessInstance added in v1.1.1131

func (c *Operations) CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error

CancelProcessInstance cancels a workflow instance with a reason.

func (*Operations) CompleteManualTask added in v1.1.1131

func (c *Operations) CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteManualTask completes a manual workflow task

func (*Operations) CompleteSendMessageTask added in v1.1.1131

func (c *Operations) CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteSendMessageTask completes a send message task

func (*Operations) CompleteServiceTask added in v1.1.1131

func (c *Operations) CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteServiceTask completes a workflow service task

func (*Operations) CompleteUserTask added in v1.1.1131

func (c *Operations) CompleteUserTask(ctx context.Context, jobId string) error

CompleteUserTask completes and closes a user task with variables

func (*Operations) DeleteFatalError added in v1.1.1377

func (s *Operations) DeleteFatalError(ctx context.Context, state *model.WorkflowState) error

DeleteFatalError removes the fatal error for a given workflow state

func (*Operations) DeleteJob added in v1.1.1131

func (s *Operations) DeleteJob(ctx context.Context, trackingID string) error

DeleteJob removes a workflow task state.

func (*Operations) DeleteNamespace added in v1.1.1131

func (s *Operations) DeleteNamespace(ctx context.Context, ns string) error

DeleteNamespace deletes the key-value store for the specified namespace in SHAR. It iterates over all the key-value stores and deletes them one by one. The function returns nil if all key-value stores are successfully deleted.

func (*Operations) DeprecateTaskSpec added in v1.1.1131

func (s *Operations) DeprecateTaskSpec(ctx context.Context, uid []string) error

DeprecateTaskSpec deprecates one or more task specs by ID.

func (*Operations) GetCompensationInputVariables added in v1.1.1131

func (s *Operations) GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)

GetCompensationInputVariables is a method of the Operations struct that retrieves the original input variables for a specific process instance and tracking ID. It returns the variables in byte array format.

func (*Operations) GetCompensationOutputVariables added in v1.1.1131

func (s *Operations) GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)

GetCompensationOutputVariables is a method of the Operations struct that retrieves the original output variables of a compensation history entry for a specific process instance and tracking ID. It returns the output variables as a byte array.

func (*Operations) GetExecutableWorkflowIds added in v1.1.1131

func (s *Operations) GetExecutableWorkflowIds(ctx context.Context) ([]string, error)

GetExecutableWorkflowIds returns a list of all workflow Ids that contain executable processes

func (*Operations) GetExecution added in v1.1.1131

func (s *Operations) GetExecution(ctx context.Context, executionID string) (*model.Execution, error)

GetExecution retrieves an execution given its ID.

func (*Operations) GetFatalErrors added in v1.1.1377

func (s *Operations) GetFatalErrors(ctx context.Context, keyPrefix string) <-chan data.Result[*model.FatalError]

GetFatalErrors queries the fatal error KV with a key prefix of the format <workflowName>.<executionId>.<processInstanceId>. and sends the results down the provided fatalErrs channel.

func (*Operations) GetJob added in v1.1.1131

func (s *Operations) GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)

GetJob gets a workflow task state.

func (*Operations) GetProcessIdFor added in v1.1.1131

func (s *Operations) GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)

GetProcessIdFor retrieves the processId that a begun by a message start event

func (*Operations) GetProcessInstance added in v1.1.1131

func (s *Operations) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)

GetProcessInstance returns a process instance for a given process ID

func (*Operations) GetTaskSpecByUID added in v1.1.1131

func (s *Operations) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)

GetTaskSpecByUID fetches a task spec from the database.

func (*Operations) GetTaskSpecUID added in v1.1.1131

func (s *Operations) GetTaskSpecUID(ctx context.Context, name string) (string, error)

GetTaskSpecUID fetches

func (*Operations) GetTaskSpecUsage added in v1.1.1131

func (s *Operations) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)

GetTaskSpecUsage returns the usage report for a list of task specs.

func (*Operations) GetTaskSpecUsageByName added in v1.1.1131

func (s *Operations) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)

GetTaskSpecUsageByName produces a report of running and executable places where the task spec is in use.

func (*Operations) GetTaskSpecVersions added in v1.1.1131

func (s *Operations) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)

GetTaskSpecVersions fetches the versions of a given task spec name

func (*Operations) GetUserTaskIDs added in v1.1.1131

func (s *Operations) GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)

GetUserTaskIDs gets a list of tasks given an owner.

func (*Operations) HandleWorkflowError added in v1.1.1131

func (c *Operations) HandleWorkflowError(ctx context.Context, errorCode string, inVars []byte, state *model.WorkflowState, errorIfUnhandled bool, source model.WorkflowErrorSource) error

HandleWorkflowError handles a workflow error by looking up the error definitions in the workflow, determining the appropriate action, and publishing the necessary workflow state updates. It returns an error if there was an issue retrieving the workflow definition, if the workflow doesn't support the specified error code.

func (*Operations) Heartbeat added in v1.1.1131

func (s *Operations) Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error

Heartbeat saves a client status to the client KV.

func (*Operations) Launch added in v1.1.1131

func (c *Operations) Launch(ctx context.Context, processId string, vars []byte, headers []byte) (string, string, error)

Launch starts a new instance of a workflow and returns an execution ID.

func (*Operations) LaunchWithParent added in v1.1.1131

func (c *Operations) LaunchWithParent(ctx context.Context, processId string, ID common.TrackingID, vrs []byte, headers []byte, parentpiID string, parentElID string, subProcessWorkflowID string) (string, string, error)

LaunchWithParent contains the underlying logic to start a workflow. It is also called to spawn new instances of child workflows.

func (*Operations) ListExecutableProcesses added in v1.1.1131

func (s *Operations) ListExecutableProcesses(ctx context.Context) <-chan data.Result[*model.ListExecutableProcessesItem]

ListExecutableProcesses returns a list of all the executable processes in SHAR. It retrieves the current SHAR namespace from the context and fetches the workflow versions for that namespace from the key-value store. It then iterates through each workflow version and loads the corresponding workflow. For each process in the workflow, it creates a ListExecutableProcessesItem object and populates it with the process name, workflow name, and the executable start parameters obtained from the workflow's start events. It sends each ListExecutableProcessesItem object to the wch channel.

Parameters: - ctx: The context containing the SHAR namespace. - wch: The channel for sending the list of executable processes. - errs: The channel for sending any errors that occur.

Returns: Nothing. Errors are sent to the errs channel if encountered.

func (*Operations) ListExecutionProcesses added in v1.1.1131

func (s *Operations) ListExecutionProcesses(ctx context.Context, id string) ([]string, error)

ListExecutionProcesses gets the current processIDs for an execution.

func (*Operations) ListExecutions added in v1.1.1131

func (s *Operations) ListExecutions(ctx context.Context, workflowName string) <-chan data.Result[*model.ListExecutionItem]

ListExecutions returns a list of running workflows and versions given a workflow Name

func (*Operations) ListTaskSpecUIDs added in v1.1.1131

func (s *Operations) ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)

ListTaskSpecUIDs lists UIDs of active (and optionally deprecated) tasks specs.

func (*Operations) ListUserTasks added in v1.1.1492

func (s *Operations) ListUserTasks(ctx context.Context, owner string, group string) <-chan data.Result[*model.ListUserTasksResponse]

ListUserTasks lists user tasks based on the specified owner or group and returns results to the provided channel.

func (*Operations) Log added in v1.1.1131

func (s *Operations) Log(ctx context.Context, req *model.LogRequest) error

Log publishes LogRequest to WorkflowTelemetry Logs subject

func (*Operations) OpenUserTask added in v1.1.1492

func (s *Operations) OpenUserTask(ctx context.Context, id string, owner string) ([]byte, error)

OpenUserTask opens a user task by its ID and owner, ensuring the task is not locked by another owner.

func (*Operations) OwnerID added in v1.1.1131

func (s *Operations) OwnerID(ctx context.Context, name string) (string, error)

OwnerID gets a unique identifier for a task owner.

func (*Operations) OwnerName added in v1.1.1131

func (s *Operations) OwnerName(ctx context.Context, id string) (string, error)

OwnerName retrieves an owner name given an ID.

func (*Operations) PauseServiceTask added in v1.1.1796

func (s *Operations) PauseServiceTask(ctx context.Context, uid string) error

PauseServiceTask pause the svc task identified by uid

func (*Operations) PersistFatalError added in v1.1.1377

func (s *Operations) PersistFatalError(ctx context.Context, fatalError *model.FatalError) (bool, error)

PersistFatalError saves a fatal error to the kv

func (*Operations) ProcessTasks added in v1.1.1492

func (s *Operations) ProcessTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error

ProcessTasks iterates over service tasks in the processes of a given workflow setting, validating them and setting their uid into their element definitions

func (*Operations) PublishMsg added in v1.1.1131

func (s *Operations) PublishMsg(ctx context.Context, subject string, msg proto.Message) error

PublishMsg publishes a proto message.

func (*Operations) PublishWorkflowState added in v1.1.1131

func (s *Operations) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, headers ...natz.MsgHeader) error

PublishWorkflowState publishes a SHAR state object to a given subject

func (*Operations) PutTaskSpec added in v1.1.1131

func (s *Operations) PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)

PutTaskSpec writes a task spec to the database.

func (*Operations) ResumeServiceTask added in v1.1.1796

func (s *Operations) ResumeServiceTask(ctx context.Context, uid string) error

ResumeServiceTask pause the svc task identified by uid

func (*Operations) RetryActivity added in v1.1.1377

func (s *Operations) RetryActivity(ctx context.Context, processInstanceId string, failedElementId string, vars []byte, priorElementId string) error

RetryActivity publishes the state from a prior FatalError to attempt a retry. Vars are optional and if present, are merged for retry.

func (*Operations) SaveUserTask added in v1.1.1492

func (s *Operations) SaveUserTask(ctx context.Context, id string, b []byte, overwrite bool) error

SaveUserTask saves the user task's state, either by overwriting or merging with existing state based on the `overwrite` flag.

func (*Operations) SignalFatalErrorPause added in v1.1.1377

func (s *Operations) SignalFatalErrorPause(ctx context.Context, state *model.WorkflowState, log *slog.Logger)

SignalFatalErrorPause publishes a FatalError message on FatalErr of a process in a workflow with a Pause strategy

func (*Operations) SignalFatalErrorTeardown added in v1.1.1377

func (s *Operations) SignalFatalErrorTeardown(ctx context.Context, state *model.WorkflowState, log *slog.Logger)

SignalFatalErrorTeardown publishes a FatalError message on FatalErr of a process in a workflow with a Teardown strategy

type Ops added in v1.1.1377

type Ops interface {
	GetTaskSpecUID(ctx context.Context, name string) (string, error)
	GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
	PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)
	GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)
	GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
	GetExecutableWorkflowIds(ctx context.Context) ([]string, error)
	GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
	Launch(ctx context.Context, processId string, vars []byte, headers []byte) (string, string, error)
	LaunchWithParent(ctx context.Context, processId string, ID common.TrackingID, vrs []byte, headers []byte, parentpiID string, parentElID string, subProcessWorkflowID string) (string, string, error)
	CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error
	CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
	CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
	CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
	CompleteUserTask(ctx context.Context, jobId string) error
	ProcessTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error
	GetExecution(ctx context.Context, executionID string) (*model.Execution, error)
	GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)
	DeleteJob(ctx context.Context, trackingID string) error
	ListExecutions(ctx context.Context, workflowName string) <-chan data.Result[*model.ListExecutionItem]
	ListExecutionProcesses(ctx context.Context, id string) ([]string, error)
	PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, opts ...natz.MsgHeader) error
	SignalFatalErrorTeardown(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
	SignalFatalErrorPause(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
	PublishMsg(ctx context.Context, subject string, sharMsg proto.Message) error
	GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)
	OwnerID(ctx context.Context, name string) (string, error)
	OwnerName(ctx context.Context, id string) (string, error)
	GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
	DeprecateTaskSpec(ctx context.Context, uid []string) error
	ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)
	GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)
	Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error
	Log(ctx context.Context, req *model.LogRequest) error
	DeleteNamespace(ctx context.Context, ns string) error
	ListExecutableProcesses(ctx context.Context) <-chan data.Result[*model.ListExecutableProcessesItem]
	GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
	GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
	HandleWorkflowError(ctx context.Context, errorCode string, inVars []byte, state *model.WorkflowState, errorIfUnhandled bool, source model.WorkflowErrorSource) error
	GetFatalErrors(ctx context.Context, keyPrefix string) <-chan data.Result[*model.FatalError]
	RetryActivity(ctx context.Context, processInstanceId string, elementId string, vars []byte, priorElementId string) error
	PersistFatalError(ctx context.Context, fatalError *model.FatalError) (bool, error)
	DeleteFatalError(ctx context.Context, state *model.WorkflowState) error
	ListUserTasks(ctx context.Context, owner string, group string) <-chan data.Result[*model.ListUserTasksResponse]
	SaveUserTask(ctx context.Context, id string, v []byte, overwrite bool) error
	OpenUserTask(ctx context.Context, id string, owner string) ([]byte, error)
	PauseServiceTask(ctx context.Context, uid string) error
	ResumeServiceTask(ctx context.Context, uid string) error
}

Ops is the interface for the Operations struct

type ServiceTaskConsumerFn

type ServiceTaskConsumerFn func(ctx context.Context, taskSpec *model.TaskSpec) error

ServiceTaskConsumerFn defines the type of a function that ensures existence of a service task consumer

type TaskOperations added in v1.1.1687

type TaskOperations struct {
	// contains filtered or unexported fields
}

TaskOperations is a struct off of which to hang varios svc task related opereations off of

func NewTaskOperations added in v1.1.1687

func NewTaskOperations(natsService *natz.NatsService) *TaskOperations

NewTaskOperations construct new Task Operations

func (*TaskOperations) GetLatestTaskSpecUID added in v1.1.1727

func (to *TaskOperations) GetLatestTaskSpecUID(ctx context.Context, name string) (string, error)

GetLatestTaskSpecUID fetches

func (*TaskOperations) GetTaskSpecByUID added in v1.1.1687

func (to *TaskOperations) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)

GetTaskSpecByUID fetches a task spec from the database.

func (*TaskOperations) GetTaskSpecVersions added in v1.1.1687

func (to *TaskOperations) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)

GetTaskSpecVersions fetches the versions of a given task spec name

func (*TaskOperations) PauseServiceTask added in v1.1.1796

func (s *TaskOperations) PauseServiceTask(ctx context.Context, uid string) error

PauseServiceTask pause the svc task identified by uid

func (*TaskOperations) ResumeServiceTask added in v1.1.1796

func (s *TaskOperations) ResumeServiceTask(ctx context.Context, uid string) error

ResumeServiceTask pause the svc task identified by uid

type WorkflowOperations added in v1.1.1687

type WorkflowOperations struct {
	// contains filtered or unexported fields
}

WorkflowOperations is a place to hang operations related to the Workflow model off of

func NewWorkflowOperations added in v1.1.1687

func NewWorkflowOperations(natsService *natz.NatsService, taskOperations *TaskOperations, exprEngine expression.Engine) (*WorkflowOperations, error)

NewWorkflowOperations constructs a new WorkflowOperations struct

func (*WorkflowOperations) DisableWorkflow added in v1.1.1687

func (wfo *WorkflowOperations) DisableWorkflow(ctx context.Context, workflowName string) error

DisableWorkflow stops a workflow from being launched

func (*WorkflowOperations) EnableWorkflow added in v1.1.1687

func (wfo *WorkflowOperations) EnableWorkflow(ctx context.Context, workflowName string) error

EnableWorkflow allows a workflow to be launched

func (*WorkflowOperations) GetWorkflow added in v1.1.1687

func (wfo *WorkflowOperations) GetWorkflow(ctx context.Context, workflowID string) (*model.Workflow, error)

GetWorkflow - retrieves a workflow model given its ID

func (*WorkflowOperations) ListWorkflows added in v1.1.1687

func (wfo *WorkflowOperations) ListWorkflows(ctx context.Context) <-chan data.Result[*model.ListWorkflowResponse]

ListWorkflows returns a list of all the workflows in SHAR.

func (*WorkflowOperations) LoadWorkflow added in v1.1.1687

func (wfo *WorkflowOperations) LoadWorkflow(ctx context.Context, model *model.Workflow) (string, error)

LoadWorkflow loads a model.Process describing a workflow into the engine ready for execution.

func (*WorkflowOperations) ProcessTasks added in v1.1.1687

func (wfo *WorkflowOperations) ProcessTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error

ProcessTasks iterates over service tasks in the processes of a given workflow setting, validating them and setting their uid into their element definitions

func (*WorkflowOperations) StreamWorkflowVersions added in v1.1.1687

func (wfo *WorkflowOperations) StreamWorkflowVersions(ctx context.Context, workflowName string) <-chan data.Result[*model.WorkflowVersion]

StreamWorkflowVersions - send the versions of the workflow down the supplied channel

type WorkflowProcessMappingFn

type WorkflowProcessMappingFn func(ctx context.Context, wf *model.Workflow, i *model.Process) (uint64, error)

WorkflowProcessMappingFn defines the type of a function that creates a workflow to process mapping

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL