Documentation
¶
Index ¶
- Constants
- Variables
- func NoOpServiceTaskConsumerFn(_ context.Context, _ string) error
- func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)
- func WithEmbargo(embargo int) *publishEmbargoOption
- func WithHeaders(headers map[string]string) *publishHeadersOption
- type AbortType
- type Engine
- func (c *Engine) CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) CheckProcessTaskDeprecation(ctx context.Context, workflow *model.Workflow, processName string) error
- func (s *Engine) CloseUserTask(ctx context.Context, trackingID string) error
- func (s *Engine) Compensate(ctx context.Context, state *model.WorkflowState) error
- func (c *Engine) CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (c *Engine) CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (c *Engine) CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (c *Engine) CompleteUserTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
- func (s *Engine) Conn() common.NatsConn
- func (s *Engine) CreateExecution(ctx context.Context, execution *model.Execution) (*model.Execution, error)
- func (s *Engine) CreateJob(ctx context.Context, job *model.WorkflowState) (string, error)
- func (s *Engine) CreateProcessInstance(ctx context.Context, executionId string, parentProcessID string, ...) (*model.ProcessInstance, error)
- func (s *Engine) DeleteJob(ctx context.Context, trackingID string) error
- func (s *Engine) DeleteNamespace(ctx context.Context, ns string) error
- func (s *Engine) DeprecateTaskSpec(ctx context.Context, uid []string) error
- func (s *Engine) DestroyProcessInstance(ctx context.Context, state *model.WorkflowState, pi *model.ProcessInstance, ...) error
- func (s *Engine) EnsureServiceTaskConsumer(ctx context.Context, uid string) error
- func (s *Engine) GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
- func (s *Engine) GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
- func (s *Engine) GetElement(ctx context.Context, state *model.WorkflowState) (*model.Element, error)
- func (s *Engine) GetExecutableWorkflowIds(ctx context.Context) ([]string, error)
- func (s *Engine) GetExecution(ctx context.Context, executionID string) (*model.Execution, error)
- func (s *Engine) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)
- func (s *Engine) GetGatewayInstanceID(state *model.WorkflowState) (string, string, error)
- func (s *Engine) GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)
- func (s *Engine) GetLatestVersion(ctx context.Context, workflowName string) (string, error)
- func (s *Engine) GetOldState(ctx context.Context, id string) (*model.WorkflowState, error)
- func (s *Engine) GetProcessHistory(ctx context.Context, processInstanceId string, ...)
- func (s *Engine) GetProcessHistoryItem(ctx context.Context, processInstanceID string, trackingID string, ...) (*model.ProcessHistoryEntry, error)
- func (s *Engine) GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)
- func (s *Engine) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
- func (s *Engine) GetProcessInstanceStatus(ctx context.Context, id string, wch chan<- *model.WorkflowState, ...)
- func (s *Engine) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)
- func (s *Engine) GetTaskSpecUID(ctx context.Context, name string) (string, error)
- func (s *Engine) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
- func (s *Engine) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
- func (s *Engine) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
- func (s *Engine) GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)
- func (s *Engine) GetWorkflow(ctx context.Context, workflowID string) (*model.Workflow, error)
- func (s *Engine) GetWorkflowNameFor(ctx context.Context, processName string) (string, error)
- func (s *Engine) GetWorkflowVersions(ctx context.Context, workflowName string, wch chan<- *model.WorkflowVersion, ...)
- func (s *Engine) HasValidProcess(ctx context.Context, processInstanceId, executionId string) (*model.ProcessInstance, *model.Execution, error)
- func (s *Engine) Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error
- func (s *Engine) KvsFor(ctx context.Context, ns string) (*NamespaceKvs, error)
- func (c *Engine) Launch(ctx context.Context, processName string, vars []byte) (string, string, error)
- func (s *Engine) ListExecutableProcesses(ctx context.Context, wch chan<- *model.ListExecutableProcessesItem, ...)
- func (s *Engine) ListExecutionProcesses(ctx context.Context, id string) ([]string, error)
- func (s *Engine) ListExecutions(ctx context.Context, workflowName string, wch chan<- *model.ListExecutionItem, ...)
- func (s *Engine) ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)
- func (s *Engine) ListWorkflows(ctx context.Context, res chan<- *model.ListWorkflowResponse, errs chan<- error)
- func (c *Engine) LoadWorkflow(ctx context.Context, model *model.Workflow) (string, error)
- func (s *Engine) Log(ctx context.Context, req *model.LogRequest) error
- func (s *Engine) OwnerID(ctx context.Context, name string) (string, error)
- func (s *Engine) OwnerName(ctx context.Context, id string) (string, error)
- func (s *Engine) ProcessServiceTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, ...) error
- func (s *Engine) PublishMessage(ctx context.Context, name string, key string, vars []byte) error
- func (s *Engine) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, ...) error
- func (s *Engine) PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)
- func (s *Engine) RecordHistory(ctx context.Context, state *model.WorkflowState, ...) error
- func (s *Engine) RecordHistoryActivityComplete(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) RecordHistoryActivityExecute(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) RecordHistoryCompensationCheckpoint(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) RecordHistoryJobAbort(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) RecordHistoryJobComplete(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) RecordHistoryJobExecute(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) RecordHistoryProcessAbort(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) RecordHistoryProcessComplete(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) RecordHistoryProcessSpawn(ctx context.Context, state *model.WorkflowState, newProcessInstanceID string) error
- func (s *Engine) RecordHistoryProcessStart(ctx context.Context, state *model.WorkflowState) error
- func (s *Engine) SaveState(ctx context.Context, id string, state *model.WorkflowState) error
- func (s *Engine) Shutdown()
- func (c *Engine) Start(ctx context.Context) error
- func (s *Engine) StartJob(ctx context.Context, subject string, job *model.WorkflowState, ...) error
- func (s *Engine) StartProcessing(ctx context.Context) error
- func (s *Engine) StoreWorkflow(ctx context.Context, wf *model.Workflow) (string, error)
- func (s *Engine) XDestroyProcessInstance(ctx context.Context, state *model.WorkflowState) error
- type NamespaceKvs
- type Nats
- type NatsConnConfiguration
- type PublishOpt
- type ServiceTaskConsumerFn
- type WorkflowProcessMappingFn
Constants ¶
const ( AbortTypeActivity = iota // AbortTypeActivity signifies an activity is being aborted AbortTypeServiceTask = iota // AbortTypeServiceTask signifies a service task is being aborted )
Variables ¶
var NatsConfig string
NatsConfig holds the current nats configuration for SHAR.
Functions ¶
func NoOpServiceTaskConsumerFn ¶
NoOpServiceTaskConsumerFn no op service task consumer fn
func NoOpWorkFlowProcessMappingFn ¶
func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)
NoOpWorkFlowProcessMappingFn no op workflow to process mapping fn
func WithEmbargo ¶
func WithEmbargo(embargo int) *publishEmbargoOption
WithEmbargo allows the specification of an embargo time on a workflow state message
func WithHeaders ¶
WithHeaders allows the addition of extra headers to a workflow state message
Types ¶
type AbortType ¶
type AbortType int
AbortType represents the type of termination being handled by the abort function
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine contains the workflow processing functions
func New ¶
func New(nc *NatsConnConfiguration, concurrency int, allowOrphanServiceTasks bool, telCfg telemetry.Config) (*Engine, error)
New returns an instance of the core workflow engine.
func (*Engine) CancelProcessInstance ¶
CancelProcessInstance cancels a workflow instance with a reason.
func (*Engine) CheckProcessTaskDeprecation ¶
func (s *Engine) CheckProcessTaskDeprecation(ctx context.Context, workflow *model.Workflow, processName string) error
CheckProcessTaskDeprecation checks if all the tasks in a process have not been deprecated.
func (*Engine) CloseUserTask ¶
CloseUserTask removes a completed user task.
func (*Engine) Compensate ¶
Compensate is a method of the Nats struct that performs the compensation process for a given workflow state. It retrieves the necessary information and history from the workflow and processes it to determine the compensation steps. It then creates a compensation plan and publishes each step to a designated subject for further processing. Finally, it updates the state of the workflow to indicate that the compensation is in progress. It returns an error if any step of the compensation process encounters an issue.
func (*Engine) CompleteManualTask ¶
func (c *Engine) CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteManualTask completes a manual workflow task
func (*Engine) CompleteSendMessageTask ¶
func (c *Engine) CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteSendMessageTask completes a send message task
func (*Engine) CompleteServiceTask ¶
func (c *Engine) CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteServiceTask completes a workflow service task
func (*Engine) CompleteUserTask ¶
func (c *Engine) CompleteUserTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error
CompleteUserTask completes and closes a user task with variables
func (*Engine) CreateExecution ¶
func (s *Engine) CreateExecution(ctx context.Context, execution *model.Execution) (*model.Execution, error)
CreateExecution given a workflow, starts a new execution and returns its ID
func (*Engine) CreateProcessInstance ¶
func (s *Engine) CreateProcessInstance(ctx context.Context, executionId string, parentProcessID string, parentElementID string, processName string, workflowName string, workflowId string) (*model.ProcessInstance, error)
CreateProcessInstance creates a new instance of a process and attaches it to the workflow instance.
func (*Engine) DeleteNamespace ¶
DeleteNamespace deletes the key-value store for the specified namespace in SHAR. It iterates over all the key-value stores and deletes them one by one. The function returns nil if all key-value stores are successfully deleted.
func (*Engine) DeprecateTaskSpec ¶
DeprecateTaskSpec deprecates one or more task specs by ID.
func (*Engine) DestroyProcessInstance ¶
func (s *Engine) DestroyProcessInstance(ctx context.Context, state *model.WorkflowState, pi *model.ProcessInstance, execution *model.Execution) error
DestroyProcessInstance deletes a process instance and removes the workflow instance dependent on all process instances being satisfied.
func (*Engine) EnsureServiceTaskConsumer ¶
EnsureServiceTaskConsumer creates or updates a service task consumer.
func (*Engine) GetCompensationInputVariables ¶
func (s *Engine) GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
GetCompensationInputVariables is a method of the Nats struct that retrieves the original input variables for a specific process instance and tracking ID. It returns the variables in byte array format.
func (*Engine) GetCompensationOutputVariables ¶
func (s *Engine) GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)
GetCompensationOutputVariables is a method of the Nats struct that retrieves the original output variables of a compensation history entry for a specific process instance and tracking ID. It returns the output variables as a byte array.
func (*Engine) GetElement ¶
func (s *Engine) GetElement(ctx context.Context, state *model.WorkflowState) (*model.Element, error)
GetElement gets the definition for the current element given a workflow state.
func (*Engine) GetExecutableWorkflowIds ¶
GetExecutableWorkflowIds returns a list of all workflow Ids that contain executable processes
func (*Engine) GetExecution ¶
GetExecution retrieves an execution given its ID.
func (*Engine) GetGatewayInstance ¶
func (s *Engine) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)
GetGatewayInstance - returns a gateway instance from the KV store.
func (*Engine) GetGatewayInstanceID ¶
GetGatewayInstanceID - returns a gateawy instance ID and a satisfying route to that gateway.
func (*Engine) GetLatestVersion ¶
GetLatestVersion queries the workflow versions table for the latest entry
func (*Engine) GetOldState ¶
GetOldState gets a task state given its tracking ID.
func (*Engine) GetProcessHistory ¶
func (s *Engine) GetProcessHistory(ctx context.Context, processInstanceId string, wch chan<- *model.ProcessHistoryEntry, errs chan<- error)
GetProcessHistory fetches the history object for a process.
func (*Engine) GetProcessHistoryItem ¶
func (s *Engine) GetProcessHistoryItem(ctx context.Context, processInstanceID string, trackingID string, historyType model.ProcessHistoryType) (*model.ProcessHistoryEntry, error)
GetProcessHistoryItem retrieves a process history entry based on the given process instance ID, tracking ID, and history type. If the entry is successfully retrieved, it is unmarshaled into a model.ProcessHistoryEntry object and returned. If an error occurs during the retrieval or unmarshaling process, an error is returned.
func (*Engine) GetProcessIdFor ¶
GetProcessIdFor retrieves the processId that a begun by a message start event
func (*Engine) GetProcessInstance ¶
func (s *Engine) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
GetProcessInstance returns a process instance for a given process ID
func (*Engine) GetProcessInstanceStatus ¶
func (s *Engine) GetProcessInstanceStatus(ctx context.Context, id string, wch chan<- *model.WorkflowState, errs chan<- error)
GetProcessInstanceStatus returns a list of workflow statuses for the specified process instance ID.
func (*Engine) GetTaskSpecByUID ¶
GetTaskSpecByUID fetches a task spec from the database.
func (*Engine) GetTaskSpecUID ¶
GetTaskSpecUID fetches
func (*Engine) GetTaskSpecUsage ¶
func (s *Engine) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
GetTaskSpecUsage returns the usage report for a list of task specs.
func (*Engine) GetTaskSpecUsageByName ¶
func (s *Engine) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
GetTaskSpecUsageByName produces a report of running and executable places where the task spec is in use.
func (*Engine) GetTaskSpecVersions ¶
func (s *Engine) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
GetTaskSpecVersions fetches the versions of a given task spec name
func (*Engine) GetUserTaskIDs ¶
GetUserTaskIDs gets a list of tasks given an owner.
func (*Engine) GetWorkflow ¶
GetWorkflow - retrieves a workflow model given its ID
func (*Engine) GetWorkflowNameFor ¶
GetWorkflowNameFor - get the worflow name a process is associated with
func (*Engine) GetWorkflowVersions ¶
func (s *Engine) GetWorkflowVersions(ctx context.Context, workflowName string, wch chan<- *model.WorkflowVersion, errs chan<- error)
GetWorkflowVersions - returns a list of versions for a given workflow.
func (*Engine) HasValidProcess ¶
func (s *Engine) HasValidProcess(ctx context.Context, processInstanceId, executionId string) (*model.ProcessInstance, *model.Execution, error)
HasValidProcess - checks for a valid process and instance for a workflow process and instance ids
func (*Engine) KvsFor ¶
KvsFor retrieves the shar KVs for a given namespace. If they do not exist for a namespace, it will initialise them and store them in a map for future lookup.
func (*Engine) Launch ¶
func (c *Engine) Launch(ctx context.Context, processName string, vars []byte) (string, string, error)
Launch starts a new instance of a workflow and returns an execution ID.
func (*Engine) ListExecutableProcesses ¶
func (s *Engine) ListExecutableProcesses(ctx context.Context, wch chan<- *model.ListExecutableProcessesItem, errs chan<- error)
ListExecutableProcesses returns a list of all the executable processes in SHAR. It retrieves the current SHAR namespace from the context and fetches the workflow versions for that namespace from the key-value store. It then iterates through each workflow version and loads the corresponding workflow. For each process in the workflow, it creates a ListExecutableProcessesItem object and populates it with the process name, workflow name, and the executable start parameters obtained from the workflow's start events. It sends each ListExecutableProcessesItem object to the wch channel.
Parameters: - ctx: The context containing the SHAR namespace. - wch: The channel for sending the list of executable processes. - errs: The channel for sending any errors that occur.
Returns: Nothing. Errors are sent to the errs channel if encountered.
func (*Engine) ListExecutionProcesses ¶
ListExecutionProcesses gets the current processIDs for an execution.
func (*Engine) ListExecutions ¶
func (s *Engine) ListExecutions(ctx context.Context, workflowName string, wch chan<- *model.ListExecutionItem, errs chan<- error)
ListExecutions returns a list of running workflows and versions given a workflow Name
func (*Engine) ListTaskSpecUIDs ¶
ListTaskSpecUIDs lists UIDs of active (and optionally deprecated) tasks specs.
func (*Engine) ListWorkflows ¶
func (s *Engine) ListWorkflows(ctx context.Context, res chan<- *model.ListWorkflowResponse, errs chan<- error)
ListWorkflows returns a list of all the workflows in SHAR.
func (*Engine) LoadWorkflow ¶
LoadWorkflow loads a model.Process describing a workflow into the engine ready for execution.
func (*Engine) ProcessServiceTasks ¶
func (s *Engine) ProcessServiceTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error
ProcessServiceTasks iterates over service tasks in the processes of a given workflow setting, validating them and setting their uid into their element definitions
func (*Engine) PublishMessage ¶
PublishMessage publishes a workflow message.
func (*Engine) PublishWorkflowState ¶
func (s *Engine) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, opts ...PublishOpt) error
PublishWorkflowState publishes a SHAR state object to a given subject
func (*Engine) PutTaskSpec ¶
PutTaskSpec writes a task spec to the database.
func (*Engine) RecordHistory ¶
func (s *Engine) RecordHistory(ctx context.Context, state *model.WorkflowState, historyType model.ProcessHistoryType) error
RecordHistory records into the history KV.
func (*Engine) RecordHistoryActivityComplete ¶
func (s *Engine) RecordHistoryActivityComplete(ctx context.Context, state *model.WorkflowState) error
RecordHistoryActivityComplete records the activity completion into the history object.
func (*Engine) RecordHistoryActivityExecute ¶
func (s *Engine) RecordHistoryActivityExecute(ctx context.Context, state *model.WorkflowState) error
RecordHistoryActivityExecute records the activity execute into the history object.
func (*Engine) RecordHistoryCompensationCheckpoint ¶
func (s *Engine) RecordHistoryCompensationCheckpoint(ctx context.Context, state *model.WorkflowState) error
RecordHistoryCompensationCheckpoint records the process aborting into the history object.
func (*Engine) RecordHistoryJobAbort ¶
RecordHistoryJobAbort records the job abort into the history object.
func (*Engine) RecordHistoryJobComplete ¶
RecordHistoryJobComplete records the job completion into the history object.
func (*Engine) RecordHistoryJobExecute ¶
RecordHistoryJobExecute records the job execute into the history object.
func (*Engine) RecordHistoryProcessAbort ¶
RecordHistoryProcessAbort records the process aborting into the history object.
func (*Engine) RecordHistoryProcessComplete ¶
func (s *Engine) RecordHistoryProcessComplete(ctx context.Context, state *model.WorkflowState) error
RecordHistoryProcessComplete records the process completion into the history object.
func (*Engine) RecordHistoryProcessSpawn ¶
func (s *Engine) RecordHistoryProcessSpawn(ctx context.Context, state *model.WorkflowState, newProcessInstanceID string) error
RecordHistoryProcessSpawn records the process spawning a new process into the history object.
func (*Engine) RecordHistoryProcessStart ¶
RecordHistoryProcessStart records the process start into the history object.
func (*Engine) Shutdown ¶
func (s *Engine) Shutdown()
Shutdown signals the engine to stop processing.
func (*Engine) Start ¶
Start sets up the activity and job processors and starts the engine processing workflows.
func (*Engine) StartJob ¶
func (s *Engine) StartJob(ctx context.Context, subject string, job *model.WorkflowState, el *model.Element, v []byte, opts ...PublishOpt) error
StartJob launches a user/service task
func (*Engine) StartProcessing ¶
StartProcessing begins listening to all the message processing queues.
func (*Engine) StoreWorkflow ¶
StoreWorkflow stores a workflow definition and returns a unique ID
func (*Engine) XDestroyProcessInstance ¶
XDestroyProcessInstance terminates a running process instance with a cancellation reason and error
type NamespaceKvs ¶
type NamespaceKvs struct {
// contains filtered or unexported fields
}
NamespaceKvs defines all of the key value stores shar needs to operate
type NatsConnConfiguration ¶
type NatsConnConfiguration struct {
Conn *nats.Conn
TxConn *nats.Conn
StorageType jetstream.StorageType
}
NatsConnConfiguration represents the configuration for a NATS connection.
- Conn: The NATS connection. - TxConn: The transactional NATS connection. - StorageType: The storage type for JetStream.
type PublishOpt ¶
type PublishOpt interface {
Apply(n *publishOptions)
}
PublishOpt represents an option that can be used when publishing a workflow state
type ServiceTaskConsumerFn ¶
ServiceTaskConsumerFn defines the type of a function that ensures existence of a service task consumer