Documentation
¶
Index ¶
- Constants
- func NoOpServiceTaskConsumerFn(_ context.Context, _ string) error
- func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)
- func WithEmbargo(embargo int) *publishEmbargoOption
- func WithHeaders(headers map[string]string) *publishHeadersOption
- type AbortType
- type Engine
- func (s *Engine) Compensate(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)
- func (s *Engine) GetGatewayInstanceID(state *model.WorkflowState) (string, string, error)
- func (s *Engine) Shutdown()
- func (c *Engine) Start(ctx context.Context) error
- func (s *Engine) StartProcessing(ctx context.Context) error
- type Operations
- func (c *Operations) CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) CheckProcessTaskDeprecation(ctx context.Context, workflow *model.Workflow, processId string) error
- func (c *Operations) CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (c *Operations) CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (c *Operations) CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (c *Operations) CompleteUserTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (s *Operations) CreateExecution(ctx context.Context, execution *model.Execution) (*model.Execution, error)
- func (s *Operations) CreateJob(ctx context.Context, job *model.WorkflowState) (string, error)
- func (s *Operations) CreateProcessInstance(ctx context.Context, executionId string, parentProcessID string, ...) (*model.ProcessInstance, error)
- func (s *Operations) DeleteFatalError(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) DeleteJob(ctx context.Context, trackingID string) error
- func (s *Operations) DeleteNamespace(ctx context.Context, ns string) error
- func (s *Operations) DeprecateTaskSpec(ctx context.Context, uid []string) error
- func (s *Operations) DestroyProcessInstance(ctx context.Context, state *model.WorkflowState, processInstanceId string, ...) error
- func (s *Operations) EnsureServiceTaskConsumer(ctx context.Context, uid string) error
- func (s *Operations) GetActiveEntries(ctx context.Context, processInstanceID string, ...)
- func (s *Operations) GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
- func (s *Operations) GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
- func (s *Operations) GetElement(ctx context.Context, state *model.WorkflowState) (*model.Element, error)
- func (s *Operations) GetExecutableWorkflowIds(ctx context.Context) ([]string, error)
- func (s *Operations) GetExecution(ctx context.Context, executionID string) (*model.Execution, error)
- func (s *Operations) GetFatalErrors(ctx context.Context, keyPrefix string, fatalErrs chan<- *model.FatalError, ...)
- func (s *Operations) GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)
- func (s *Operations) GetLatestVersion(ctx context.Context, workflowName string) (string, error)
- func (s *Operations) GetProcessHistory(ctx context.Context, processInstanceId string, ...)
- func (s *Operations) GetProcessHistoryItem(ctx context.Context, processInstanceID string, trackingID string, ...) (*model.ProcessHistoryEntry, error)
- func (s *Operations) GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)
- func (s *Operations) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
- func (s *Operations) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)
- func (s *Operations) GetTaskSpecUID(ctx context.Context, name string) (string, error)
- func (s *Operations) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
- func (s *Operations) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
- func (s *Operations) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
- func (s *Operations) GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)
- func (s *Operations) GetWorkflow(ctx context.Context, workflowID string) (*model.Workflow, error)
- func (s *Operations) GetWorkflowNameFor(ctx context.Context, processId string) (string, error)
- func (s *Operations) GetWorkflowVersions(ctx context.Context, workflowName string, wch chan<- *model.WorkflowVersion, ...)
- func (c *Operations) HandleWorkflowError(ctx context.Context, errorCode string, inVars []byte, ...) error
- func (s *Operations) HasValidExecution(ctx context.Context, executionId string) (*model.Execution, error)
- func (s *Operations) HasValidProcess(ctx context.Context, processInstanceId, executionId string) (*model.ProcessInstance, *model.Execution, error)
- func (s *Operations) Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error
- func (c *Operations) Launch(ctx context.Context, processId string, vars []byte, headers []byte) (string, string, error)
- func (c *Operations) LaunchWithParent(ctx context.Context, processId string, ID common.TrackingID, vrs []byte, ...) (string, string, error)
- func (s *Operations) ListExecutableProcesses(ctx context.Context, wch chan<- *model.ListExecutableProcessesItem, ...)
- func (s *Operations) ListExecutionProcesses(ctx context.Context, id string) ([]string, error)
- func (s *Operations) ListExecutions(ctx context.Context, workflowName string, wch chan<- *model.ListExecutionItem, ...)
- func (s *Operations) ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)
- func (s *Operations) ListWorkflows(ctx context.Context, res chan<- *model.ListWorkflowResponse, errs chan<- error)
- func (c *Operations) LoadWorkflow(ctx context.Context, model *model.Workflow) (string, error)
- func (s *Operations) Log(ctx context.Context, req *model.LogRequest) error
- func (s *Operations) OwnerID(ctx context.Context, name string) (string, error)
- func (s *Operations) OwnerName(ctx context.Context, id string) (string, error)
- func (s *Operations) PersistFatalError(ctx context.Context, fatalError *model.FatalError) (bool, error)
- func (s *Operations) ProcessServiceTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, ...) error
- func (s *Operations) PublishMsg(ctx context.Context, subject string, sharMsg proto.Message) error
- func (s *Operations) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, ...) error
- func (s *Operations) PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)
- func (s *Operations) RecordHistory(ctx context.Context, state *model.WorkflowState, ...) error
- func (s *Operations) RecordHistoryActivityComplete(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) RecordHistoryActivityExecute(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) RecordHistoryCompensationCheckpoint(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) RecordHistoryJobAbort(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) RecordHistoryJobComplete(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) RecordHistoryJobExecute(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) RecordHistoryProcessAbort(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) RecordHistoryProcessComplete(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) RecordHistoryProcessSpawn(ctx context.Context, state *model.WorkflowState, newProcessInstanceID string) error
- func (s *Operations) RecordHistoryProcessStart(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) RetryActivity(ctx context.Context, state *model.WorkflowState) error
- func (s *Operations) SignalFatalErrorPause(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
- func (s *Operations) SignalFatalErrorTeardown(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
- func (s *Operations) StartJob(ctx context.Context, subject string, job *model.WorkflowState, ...) error
- func (s *Operations) StoreWorkflow(ctx context.Context, wf *model.Workflow) (string, error)
- func (s *Operations) TearDownWorkflow(ctx context.Context, state *model.WorkflowState) (bool, error)
- func (s *Operations) XDestroyProcessInstance(ctx context.Context, state *model.WorkflowState) error
- type Ops
- type PublishOpt
- type ServiceTaskConsumerFn
- type WorkflowProcessMappingFn
Constants ¶
const ( AbortTypeActivity = iota // AbortTypeActivity signifies an activity is being aborted AbortTypeServiceTask = iota // AbortTypeServiceTask signifies a service task is being aborted )
Variables ¶
This section is empty.
Functions ¶
func NoOpServiceTaskConsumerFn ¶
NoOpServiceTaskConsumerFn no op service task consumer fn
func NoOpWorkFlowProcessMappingFn ¶
func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)
NoOpWorkFlowProcessMappingFn no op workflow to process mapping fn
func WithEmbargo ¶
func WithEmbargo(embargo int) *publishEmbargoOption
WithEmbargo allows the specification of an embargo time on a workflow state message
func WithHeaders ¶
WithHeaders allows the addition of extra headers to a workflow state message
Types ¶
type AbortType ¶
type AbortType int
AbortType represents the type of termination being handled by the abort function
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine contains the workflow processing functions
func New ¶
func New(natsService *natz.NatsService, operations *Operations, options *option.ServerOptions) (*Engine, error)
New returns an instance of the core workflow engine.
func (*Engine) Compensate ¶
Compensate is a method of the Engine struct that performs the compensation process for a given workflow state. It retrieves the necessary information and history from the workflow and processes it to determine the compensation steps. It then creates a compensation plan and publishes each step to a designated subject for further processing. Finally, it updates the state of the workflow to indicate that the compensation is in progress. It returns an error if any step of the compensation process encounters an issue.
func (*Engine) GetGatewayInstance ¶
func (s *Engine) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)
GetGatewayInstance - returns a gateway instance from the KV store.
func (*Engine) GetGatewayInstanceID ¶
GetGatewayInstanceID - returns a gateawy instance ID and a satisfying route to that gateway.
func (*Engine) Shutdown ¶
func (s *Engine) Shutdown()
Shutdown signals the engine to stop processing.
type Operations ¶ added in v1.1.1131
type Operations struct {
// contains filtered or unexported fields
}
Operations provides methods for executing and managing workflow processes. It provides methods for various tasks such as canceling process instances, completing tasks, retrieving workflow-related information, and managing workflow execution.
func NewOperations ¶ added in v1.1.1131
func NewOperations(natsService *natz.NatsService) (*Operations, error)
NewOperations constructs a new Operations instance
func (*Operations) CancelProcessInstance ¶ added in v1.1.1131
func (c *Operations) CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error
CancelProcessInstance cancels a workflow instance with a reason.
func (*Operations) CheckProcessTaskDeprecation ¶ added in v1.1.1131
func (s *Operations) CheckProcessTaskDeprecation(ctx context.Context, workflow *model.Workflow, processId string) error
CheckProcessTaskDeprecation checks if all the tasks in a process have not been deprecated.
func (*Operations) CompleteManualTask ¶ added in v1.1.1131
func (c *Operations) CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteManualTask completes a manual workflow task
func (*Operations) CompleteSendMessageTask ¶ added in v1.1.1131
func (c *Operations) CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteSendMessageTask completes a send message task
func (*Operations) CompleteServiceTask ¶ added in v1.1.1131
func (c *Operations) CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteServiceTask completes a workflow service task
func (*Operations) CompleteUserTask ¶ added in v1.1.1131
func (c *Operations) CompleteUserTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteUserTask completes and closes a user task with variables
func (*Operations) CreateExecution ¶ added in v1.1.1131
func (s *Operations) CreateExecution(ctx context.Context, execution *model.Execution) (*model.Execution, error)
CreateExecution given a workflow, starts a new execution and returns its ID
func (*Operations) CreateJob ¶ added in v1.1.1131
func (s *Operations) CreateJob(ctx context.Context, job *model.WorkflowState) (string, error)
CreateJob stores a workflow task state for user tasks.
func (*Operations) CreateProcessInstance ¶ added in v1.1.1131
func (s *Operations) CreateProcessInstance(ctx context.Context, executionId string, parentProcessID string, parentElementID string, processId string, workflowName string, workflowId string, headers []byte) (*model.ProcessInstance, error)
CreateProcessInstance creates a new instance of a process and attaches it to the workflow instance.
func (*Operations) DeleteFatalError ¶ added in v1.1.1377
func (s *Operations) DeleteFatalError(ctx context.Context, state *model.WorkflowState) error
DeleteFatalError removes the fatal error for a given workflow state
func (*Operations) DeleteJob ¶ added in v1.1.1131
func (s *Operations) DeleteJob(ctx context.Context, trackingID string) error
DeleteJob removes a workflow task state.
func (*Operations) DeleteNamespace ¶ added in v1.1.1131
func (s *Operations) DeleteNamespace(ctx context.Context, ns string) error
DeleteNamespace deletes the key-value store for the specified namespace in SHAR. It iterates over all the key-value stores and deletes them one by one. The function returns nil if all key-value stores are successfully deleted.
func (*Operations) DeprecateTaskSpec ¶ added in v1.1.1131
func (s *Operations) DeprecateTaskSpec(ctx context.Context, uid []string) error
DeprecateTaskSpec deprecates one or more task specs by ID.
func (*Operations) DestroyProcessInstance ¶ added in v1.1.1131
func (s *Operations) DestroyProcessInstance(ctx context.Context, state *model.WorkflowState, processInstanceId string, executionId string) error
DestroyProcessInstance deletes a process instance and removes the workflow instance dependent on all process instances being satisfied.
func (*Operations) EnsureServiceTaskConsumer ¶ added in v1.1.1131
func (s *Operations) EnsureServiceTaskConsumer(ctx context.Context, uid string) error
EnsureServiceTaskConsumer creates or updates a service task consumer.
func (*Operations) GetActiveEntries ¶ added in v1.1.1349
func (s *Operations) GetActiveEntries(ctx context.Context, processInstanceID string, result chan<- *model.ProcessHistoryEntry, errs chan<- error)
GetActiveEntries returns a list of workflow statuses for the specified process instance ID.
func (*Operations) GetCompensationInputVariables ¶ added in v1.1.1131
func (s *Operations) GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
GetCompensationInputVariables is a method of the Operations struct that retrieves the original input variables for a specific process instance and tracking ID. It returns the variables in byte array format.
func (*Operations) GetCompensationOutputVariables ¶ added in v1.1.1131
func (s *Operations) GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
GetCompensationOutputVariables is a method of the Operations struct that retrieves the original output variables of a compensation history entry for a specific process instance and tracking ID. It returns the output variables as a byte array.
func (*Operations) GetElement ¶ added in v1.1.1131
func (s *Operations) GetElement(ctx context.Context, state *model.WorkflowState) (*model.Element, error)
GetElement gets the definition for the current element given a workflow state.
func (*Operations) GetExecutableWorkflowIds ¶ added in v1.1.1131
func (s *Operations) GetExecutableWorkflowIds(ctx context.Context) ([]string, error)
GetExecutableWorkflowIds returns a list of all workflow Ids that contain executable processes
func (*Operations) GetExecution ¶ added in v1.1.1131
func (s *Operations) GetExecution(ctx context.Context, executionID string) (*model.Execution, error)
GetExecution retrieves an execution given its ID.
func (*Operations) GetFatalErrors ¶ added in v1.1.1377
func (s *Operations) GetFatalErrors(ctx context.Context, keyPrefix string, fatalErrs chan<- *model.FatalError, errs chan<- error)
GetFatalErrors queries the fatal error KV with a key prefix of the format <workflowName>.<executionId>.<processInstanceId>. and sends the results down the provided fatalErrs channel.
func (*Operations) GetJob ¶ added in v1.1.1131
func (s *Operations) GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)
GetJob gets a workflow task state.
func (*Operations) GetLatestVersion ¶ added in v1.1.1131
GetLatestVersion queries the workflow versions table for the latest entry
func (*Operations) GetProcessHistory ¶ added in v1.1.1131
func (s *Operations) GetProcessHistory(ctx context.Context, processInstanceId string, wch chan<- *model.ProcessHistoryEntry, errs chan<- error)
GetProcessHistory fetches the history object for a process.
func (*Operations) GetProcessHistoryItem ¶ added in v1.1.1131
func (s *Operations) GetProcessHistoryItem(ctx context.Context, processInstanceID string, trackingID string, historyType model.ProcessHistoryType) (*model.ProcessHistoryEntry, error)
GetProcessHistoryItem retrieves a process history entry based on the given process instance ID, tracking ID, and history type. If the entry is successfully retrieved, it is unmarshaled into a model.ProcessHistoryEntry object and returned. If an error occurs during the retrieval or unmarshaling process, an error is returned.
func (*Operations) GetProcessIdFor ¶ added in v1.1.1131
func (s *Operations) GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)
GetProcessIdFor retrieves the processId that a begun by a message start event
func (*Operations) GetProcessInstance ¶ added in v1.1.1131
func (s *Operations) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
GetProcessInstance returns a process instance for a given process ID
func (*Operations) GetTaskSpecByUID ¶ added in v1.1.1131
GetTaskSpecByUID fetches a task spec from the database.
func (*Operations) GetTaskSpecUID ¶ added in v1.1.1131
GetTaskSpecUID fetches
func (*Operations) GetTaskSpecUsage ¶ added in v1.1.1131
func (s *Operations) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
GetTaskSpecUsage returns the usage report for a list of task specs.
func (*Operations) GetTaskSpecUsageByName ¶ added in v1.1.1131
func (s *Operations) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
GetTaskSpecUsageByName produces a report of running and executable places where the task spec is in use.
func (*Operations) GetTaskSpecVersions ¶ added in v1.1.1131
func (s *Operations) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
GetTaskSpecVersions fetches the versions of a given task spec name
func (*Operations) GetUserTaskIDs ¶ added in v1.1.1131
GetUserTaskIDs gets a list of tasks given an owner.
func (*Operations) GetWorkflow ¶ added in v1.1.1131
GetWorkflow - retrieves a workflow model given its ID
func (*Operations) GetWorkflowNameFor ¶ added in v1.1.1131
GetWorkflowNameFor - get the worflow name a process is associated with
func (*Operations) GetWorkflowVersions ¶ added in v1.1.1131
func (s *Operations) GetWorkflowVersions(ctx context.Context, workflowName string, wch chan<- *model.WorkflowVersion, errs chan<- error)
GetWorkflowVersions - returns a list of versions for a given workflow.
func (*Operations) HandleWorkflowError ¶ added in v1.1.1131
func (c *Operations) HandleWorkflowError(ctx context.Context, errorCode string, inVars []byte, state *model.WorkflowState) error
HandleWorkflowError handles a workflow error by looking up the error definitions in the workflow, determining the appropriate action to take, and publishing the necessary workflow state updates. It returns an error if there was an issue retrieving the workflow definition, if the workflow doesn't support the specified error code.
func (*Operations) HasValidExecution ¶ added in v1.1.1131
func (s *Operations) HasValidExecution(ctx context.Context, executionId string) (*model.Execution, error)
HasValidExecution checks to see whether an execution exists for the executionId
func (*Operations) HasValidProcess ¶ added in v1.1.1131
func (s *Operations) HasValidProcess(ctx context.Context, processInstanceId, executionId string) (*model.ProcessInstance, *model.Execution, error)
HasValidProcess - checks for a valid process and instance for a workflow process and instance ids
func (*Operations) Heartbeat ¶ added in v1.1.1131
func (s *Operations) Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error
Heartbeat saves a client status to the client KV.
func (*Operations) Launch ¶ added in v1.1.1131
func (c *Operations) Launch(ctx context.Context, processId string, vars []byte, headers []byte) (string, string, error)
Launch starts a new instance of a workflow and returns an execution ID.
func (*Operations) LaunchWithParent ¶ added in v1.1.1131
func (c *Operations) LaunchWithParent(ctx context.Context, processId string, ID common.TrackingID, vrs []byte, headers []byte, parentpiID string, parentElID string) (string, string, error)
LaunchWithParent contains the underlying logic to start a workflow. It is also called to spawn new instances of child workflows.
func (*Operations) ListExecutableProcesses ¶ added in v1.1.1131
func (s *Operations) ListExecutableProcesses(ctx context.Context, wch chan<- *model.ListExecutableProcessesItem, errs chan<- error)
ListExecutableProcesses returns a list of all the executable processes in SHAR. It retrieves the current SHAR namespace from the context and fetches the workflow versions for that namespace from the key-value store. It then iterates through each workflow version and loads the corresponding workflow. For each process in the workflow, it creates a ListExecutableProcessesItem object and populates it with the process name, workflow name, and the executable start parameters obtained from the workflow's start events. It sends each ListExecutableProcessesItem object to the wch channel.
Parameters: - ctx: The context containing the SHAR namespace. - wch: The channel for sending the list of executable processes. - errs: The channel for sending any errors that occur.
Returns: Nothing. Errors are sent to the errs channel if encountered.
func (*Operations) ListExecutionProcesses ¶ added in v1.1.1131
ListExecutionProcesses gets the current processIDs for an execution.
func (*Operations) ListExecutions ¶ added in v1.1.1131
func (s *Operations) ListExecutions(ctx context.Context, workflowName string, wch chan<- *model.ListExecutionItem, errs chan<- error)
ListExecutions returns a list of running workflows and versions given a workflow Name
func (*Operations) ListTaskSpecUIDs ¶ added in v1.1.1131
ListTaskSpecUIDs lists UIDs of active (and optionally deprecated) tasks specs.
func (*Operations) ListWorkflows ¶ added in v1.1.1131
func (s *Operations) ListWorkflows(ctx context.Context, res chan<- *model.ListWorkflowResponse, errs chan<- error)
ListWorkflows returns a list of all the workflows in SHAR.
func (*Operations) LoadWorkflow ¶ added in v1.1.1131
LoadWorkflow loads a model.Process describing a workflow into the engine ready for execution.
func (*Operations) Log ¶ added in v1.1.1131
func (s *Operations) Log(ctx context.Context, req *model.LogRequest) error
Log publishes LogRequest to WorkflowTelemetry Logs subject
func (*Operations) PersistFatalError ¶ added in v1.1.1377
func (s *Operations) PersistFatalError(ctx context.Context, fatalError *model.FatalError) (bool, error)
PersistFatalError saves a fatal error to the kv
func (*Operations) ProcessServiceTasks ¶ added in v1.1.1131
func (s *Operations) ProcessServiceTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error
ProcessServiceTasks iterates over service tasks in the processes of a given workflow setting, validating them and setting their uid into their element definitions
func (*Operations) PublishMsg ¶ added in v1.1.1131
PublishMsg publishes a workflow message.
func (*Operations) PublishWorkflowState ¶ added in v1.1.1131
func (s *Operations) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, opts ...PublishOpt) error
PublishWorkflowState publishes a SHAR state object to a given subject
func (*Operations) PutTaskSpec ¶ added in v1.1.1131
PutTaskSpec writes a task spec to the database.
func (*Operations) RecordHistory ¶ added in v1.1.1131
func (s *Operations) RecordHistory(ctx context.Context, state *model.WorkflowState, historyType model.ProcessHistoryType) error
RecordHistory records into the history KV.
func (*Operations) RecordHistoryActivityComplete ¶ added in v1.1.1131
func (s *Operations) RecordHistoryActivityComplete(ctx context.Context, state *model.WorkflowState) error
RecordHistoryActivityComplete records the activity completion into the history object.
func (*Operations) RecordHistoryActivityExecute ¶ added in v1.1.1131
func (s *Operations) RecordHistoryActivityExecute(ctx context.Context, state *model.WorkflowState) error
RecordHistoryActivityExecute records the activity execute into the history object.
func (*Operations) RecordHistoryCompensationCheckpoint ¶ added in v1.1.1131
func (s *Operations) RecordHistoryCompensationCheckpoint(ctx context.Context, state *model.WorkflowState) error
RecordHistoryCompensationCheckpoint records the process aborting into the history object.
func (*Operations) RecordHistoryJobAbort ¶ added in v1.1.1131
func (s *Operations) RecordHistoryJobAbort(ctx context.Context, state *model.WorkflowState) error
RecordHistoryJobAbort records the job abort into the history object.
func (*Operations) RecordHistoryJobComplete ¶ added in v1.1.1131
func (s *Operations) RecordHistoryJobComplete(ctx context.Context, state *model.WorkflowState) error
RecordHistoryJobComplete records the job completion into the history object.
func (*Operations) RecordHistoryJobExecute ¶ added in v1.1.1131
func (s *Operations) RecordHistoryJobExecute(ctx context.Context, state *model.WorkflowState) error
RecordHistoryJobExecute records the job execute into the history object.
func (*Operations) RecordHistoryProcessAbort ¶ added in v1.1.1131
func (s *Operations) RecordHistoryProcessAbort(ctx context.Context, state *model.WorkflowState) error
RecordHistoryProcessAbort records the process aborting into the history object.
func (*Operations) RecordHistoryProcessComplete ¶ added in v1.1.1131
func (s *Operations) RecordHistoryProcessComplete(ctx context.Context, state *model.WorkflowState) error
RecordHistoryProcessComplete records the process completion into the history object.
func (*Operations) RecordHistoryProcessSpawn ¶ added in v1.1.1131
func (s *Operations) RecordHistoryProcessSpawn(ctx context.Context, state *model.WorkflowState, newProcessInstanceID string) error
RecordHistoryProcessSpawn records the process spawning a new process into the history object.
func (*Operations) RecordHistoryProcessStart ¶ added in v1.1.1131
func (s *Operations) RecordHistoryProcessStart(ctx context.Context, state *model.WorkflowState) error
RecordHistoryProcessStart records the process start into the history object.
func (*Operations) RetryActivity ¶ added in v1.1.1377
func (s *Operations) RetryActivity(ctx context.Context, state *model.WorkflowState) error
RetryActivity publishes the state from a prior FatalError to attempt a retry
func (*Operations) SignalFatalErrorPause ¶ added in v1.1.1377
func (s *Operations) SignalFatalErrorPause(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
SignalFatalErrorPause publishes a FatalError message on FatalErr of a process in a workflow with a Pause strategy
func (*Operations) SignalFatalErrorTeardown ¶ added in v1.1.1377
func (s *Operations) SignalFatalErrorTeardown(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
SignalFatalErrorTeardown publishes a FatalError message on FatalErr of a process in a workflow with a Teardown strategy
func (*Operations) StartJob ¶ added in v1.1.1131
func (s *Operations) StartJob(ctx context.Context, subject string, job *model.WorkflowState, el *model.Element, v []byte, opts ...PublishOpt) error
StartJob launches a user/service task
func (*Operations) StoreWorkflow ¶ added in v1.1.1131
StoreWorkflow stores a workflow definition and returns a unique ID
func (*Operations) TearDownWorkflow ¶ added in v1.1.1377
func (s *Operations) TearDownWorkflow(ctx context.Context, state *model.WorkflowState) (bool, error)
TearDownWorkflow removes any state associated with a fatal errored workflow
func (*Operations) XDestroyProcessInstance ¶ added in v1.1.1131
func (s *Operations) XDestroyProcessInstance(ctx context.Context, state *model.WorkflowState) error
XDestroyProcessInstance terminates a running process instance with a cancellation reason and error
type Ops ¶ added in v1.1.1377
type Ops interface {
GetTaskSpecUID(ctx context.Context, name string) (string, error)
GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)
GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)
GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
GetExecutableWorkflowIds(ctx context.Context) ([]string, error)
GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
RecordHistory(ctx context.Context, state *model.WorkflowState, historyType model.ProcessHistoryType) error
RecordHistoryProcessStart(ctx context.Context, state *model.WorkflowState) error
RecordHistoryActivityExecute(ctx context.Context, state *model.WorkflowState) error
RecordHistoryActivityComplete(ctx context.Context, state *model.WorkflowState) error
RecordHistoryJobExecute(ctx context.Context, state *model.WorkflowState) error
RecordHistoryJobComplete(ctx context.Context, state *model.WorkflowState) error
RecordHistoryJobAbort(ctx context.Context, state *model.WorkflowState) error
RecordHistoryProcessComplete(ctx context.Context, state *model.WorkflowState) error
RecordHistoryProcessSpawn(ctx context.Context, state *model.WorkflowState, newProcessInstanceID string) error
RecordHistoryProcessAbort(ctx context.Context, state *model.WorkflowState) error
RecordHistoryCompensationCheckpoint(ctx context.Context, state *model.WorkflowState) error
GetProcessHistory(ctx context.Context, processInstanceId string, wch chan<- *model.ProcessHistoryEntry, errs chan<- error)
GetProcessHistoryItem(ctx context.Context, processInstanceID string, trackingID string, historyType model.ProcessHistoryType) (*model.ProcessHistoryEntry, error)
LoadWorkflow(ctx context.Context, model *model.Workflow) (string, error)
Launch(ctx context.Context, processId string, vars []byte, headers []byte) (string, string, error)
LaunchWithParent(ctx context.Context, processId string, ID common.TrackingID, vrs []byte, headers []byte, parentpiID string, parentElID string) (string, string, error)
CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error
CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteUserTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
ListWorkflows(ctx context.Context, res chan<- *model.ListWorkflowResponse, errs chan<- error)
StoreWorkflow(ctx context.Context, wf *model.Workflow) (string, error)
ProcessServiceTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error
EnsureServiceTaskConsumer(ctx context.Context, uid string) error
GetWorkflow(ctx context.Context, workflowID string) (*model.Workflow, error)
GetWorkflowNameFor(ctx context.Context, processId string) (string, error)
GetWorkflowVersions(ctx context.Context, workflowName string, wch chan<- *model.WorkflowVersion, errs chan<- error)
CreateExecution(ctx context.Context, execution *model.Execution) (*model.Execution, error)
GetExecution(ctx context.Context, executionID string) (*model.Execution, error)
XDestroyProcessInstance(ctx context.Context, state *model.WorkflowState) error
GetLatestVersion(ctx context.Context, workflowName string) (string, error)
CreateJob(ctx context.Context, job *model.WorkflowState) (string, error)
GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)
DeleteJob(ctx context.Context, trackingID string) error
ListExecutions(ctx context.Context, workflowName string, wch chan<- *model.ListExecutionItem, errs chan<- error)
ListExecutionProcesses(ctx context.Context, id string) ([]string, error)
PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, opts ...PublishOpt) error
SignalFatalErrorTeardown(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
SignalFatalErrorPause(ctx context.Context, state *model.WorkflowState, log *slog.Logger)
PublishMsg(ctx context.Context, subject string, sharMsg proto.Message) error
GetElement(ctx context.Context, state *model.WorkflowState) (*model.Element, error)
HasValidProcess(ctx context.Context, processInstanceId, executionId string) (*model.ProcessInstance, *model.Execution, error)
HasValidExecution(ctx context.Context, executionId string) (*model.Execution, error)
GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)
OwnerID(ctx context.Context, name string) (string, error)
OwnerName(ctx context.Context, id string) (string, error)
CreateProcessInstance(ctx context.Context, executionId string, parentProcessID string, parentElementID string, processId string, workflowName string, workflowId string, headers []byte) (*model.ProcessInstance, error)
GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
DestroyProcessInstance(ctx context.Context, state *model.WorkflowState, processInstanceId string, executionId string) error
DeprecateTaskSpec(ctx context.Context, uid []string) error
CheckProcessTaskDeprecation(ctx context.Context, workflow *model.Workflow, processId string) error
ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)
GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)
Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error
Log(ctx context.Context, req *model.LogRequest) error
DeleteNamespace(ctx context.Context, ns string) error
ListExecutableProcesses(ctx context.Context, wch chan<- *model.ListExecutableProcessesItem, errs chan<- error)
StartJob(ctx context.Context, subject string, job *model.WorkflowState, el *model.Element, v []byte, opts ...PublishOpt) error
GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
HandleWorkflowError(ctx context.Context, errorCode string, inVars []byte, state *model.WorkflowState) error
GetFatalErrors(ctx context.Context, keyPrefix string, fatalErrs chan<- *model.FatalError, errs chan<- error)
RetryActivity(ctx context.Context, state *model.WorkflowState) error
PersistFatalError(ctx context.Context, fatalError *model.FatalError) (bool, error)
TearDownWorkflow(ctx context.Context, state *model.WorkflowState) (bool, error)
DeleteFatalError(ctx context.Context, state *model.WorkflowState) error
GetActiveEntries(ctx context.Context, processInstanceID string, result chan<- *model.ProcessHistoryEntry, errs chan<- error)
}
Ops is the interface for the Operations struct
type PublishOpt ¶
type PublishOpt interface {
Apply(n *publishOptions)
}
PublishOpt represents an option that can be used when publishing a workflow state
type ServiceTaskConsumerFn ¶
ServiceTaskConsumerFn defines the type of a function that ensures existence of a service task consumer