workflow

package
v1.1.1124 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: MIT Imports: 46 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AbortTypeActivity    = iota // AbortTypeActivity signifies an activity is being aborted
	AbortTypeServiceTask = iota // AbortTypeServiceTask signifies a service task is being aborted
)

Variables

View Source
var NatsConfig string

NatsConfig holds the current nats configuration for SHAR.

Functions

func NoOpServiceTaskConsumerFn

func NoOpServiceTaskConsumerFn(_ context.Context, _ string) error

NoOpServiceTaskConsumerFn no op service task consumer fn

func NoOpWorkFlowProcessMappingFn

func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)

NoOpWorkFlowProcessMappingFn no op workflow to process mapping fn

func WithEmbargo

func WithEmbargo(embargo int) *publishEmbargoOption

WithEmbargo allows the specification of an embargo time on a workflow state message

func WithHeaders

func WithHeaders(headers map[string]string) *publishHeadersOption

WithHeaders allows the addition of extra headers to a workflow state message

Types

type AbortType

type AbortType int

AbortType represents the type of termination being handled by the abort function

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine contains the workflow processing functions

func New

func New(nc *NatsConnConfiguration, concurrency int, allowOrphanServiceTasks bool, telCfg telemetry.Config) (*Engine, error)

New returns an instance of the core workflow engine.

func (*Engine) CancelProcessInstance

func (c *Engine) CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error

CancelProcessInstance cancels a workflow instance with a reason.

func (*Engine) CheckProcessTaskDeprecation

func (s *Engine) CheckProcessTaskDeprecation(ctx context.Context, workflow *model.Workflow, processName string) error

CheckProcessTaskDeprecation checks if all the tasks in a process have not been deprecated.

func (*Engine) CloseUserTask

func (s *Engine) CloseUserTask(ctx context.Context, trackingID string) error

CloseUserTask removes a completed user task.

func (*Engine) Compensate

func (s *Engine) Compensate(ctx context.Context, state *model.WorkflowState) error

Compensate is a method of the Nats struct that performs the compensation process for a given workflow state. It retrieves the necessary information and history from the workflow and processes it to determine the compensation steps. It then creates a compensation plan and publishes each step to a designated subject for further processing. Finally, it updates the state of the workflow to indicate that the compensation is in progress. It returns an error if any step of the compensation process encounters an issue.

func (*Engine) CompleteManualTask

func (c *Engine) CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteManualTask completes a manual workflow task

func (*Engine) CompleteSendMessageTask

func (c *Engine) CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteSendMessageTask completes a send message task

func (*Engine) CompleteServiceTask

func (c *Engine) CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteServiceTask completes a workflow service task

func (*Engine) CompleteUserTask

func (c *Engine) CompleteUserTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteUserTask completes and closes a user task with variables

func (*Engine) Conn

func (s *Engine) Conn() common.NatsConn

Conn returns the active nats connection

func (*Engine) CreateExecution

func (s *Engine) CreateExecution(ctx context.Context, execution *model.Execution) (*model.Execution, error)

CreateExecution given a workflow, starts a new execution and returns its ID

func (*Engine) CreateJob

func (s *Engine) CreateJob(ctx context.Context, job *model.WorkflowState) (string, error)

CreateJob stores a workflow task state for user tasks.

func (*Engine) CreateProcessInstance

func (s *Engine) CreateProcessInstance(ctx context.Context, executionId string, parentProcessID string, parentElementID string, processName string, workflowName string, workflowId string) (*model.ProcessInstance, error)

CreateProcessInstance creates a new instance of a process and attaches it to the workflow instance.

func (*Engine) DeleteJob

func (s *Engine) DeleteJob(ctx context.Context, trackingID string) error

DeleteJob removes a workflow task state.

func (*Engine) DeleteNamespace

func (s *Engine) DeleteNamespace(ctx context.Context, ns string) error

DeleteNamespace deletes the key-value store for the specified namespace in SHAR. It iterates over all the key-value stores and deletes them one by one. The function returns nil if all key-value stores are successfully deleted.

func (*Engine) DeprecateTaskSpec

func (s *Engine) DeprecateTaskSpec(ctx context.Context, uid []string) error

DeprecateTaskSpec deprecates one or more task specs by ID.

func (*Engine) DestroyProcessInstance

func (s *Engine) DestroyProcessInstance(ctx context.Context, state *model.WorkflowState, processInstanceId string, executionId string) error

DestroyProcessInstance deletes a process instance and removes the workflow instance dependent on all process instances being satisfied.

func (*Engine) EnsureServiceTaskConsumer

func (s *Engine) EnsureServiceTaskConsumer(ctx context.Context, uid string) error

EnsureServiceTaskConsumer creates or updates a service task consumer.

func (*Engine) GetCompensationInputVariables

func (s *Engine) GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)

GetCompensationInputVariables is a method of the Nats struct that retrieves the original input variables for a specific process instance and tracking ID. It returns the variables in byte array format.

func (*Engine) GetCompensationOutputVariables

func (s *Engine) GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)

GetCompensationOutputVariables is a method of the Nats struct that retrieves the original output variables of a compensation history entry for a specific process instance and tracking ID. It returns the output variables as a byte array.

func (*Engine) GetElement

func (s *Engine) GetElement(ctx context.Context, state *model.WorkflowState) (*model.Element, error)

GetElement gets the definition for the current element given a workflow state.

func (*Engine) GetExecutableWorkflowIds

func (s *Engine) GetExecutableWorkflowIds(ctx context.Context) ([]string, error)

GetExecutableWorkflowIds returns a list of all workflow Ids that contain executable processes

func (*Engine) GetExecution

func (s *Engine) GetExecution(ctx context.Context, executionID string) (*model.Execution, error)

GetExecution retrieves an execution given its ID.

func (*Engine) GetGatewayInstance

func (s *Engine) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)

GetGatewayInstance - returns a gateway instance from the KV store.

func (*Engine) GetGatewayInstanceID

func (s *Engine) GetGatewayInstanceID(state *model.WorkflowState) (string, string, error)

GetGatewayInstanceID - returns a gateawy instance ID and a satisfying route to that gateway.

func (*Engine) GetJob

func (s *Engine) GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)

GetJob gets a workflow task state.

func (*Engine) GetLatestVersion

func (s *Engine) GetLatestVersion(ctx context.Context, workflowName string) (string, error)

GetLatestVersion queries the workflow versions table for the latest entry

func (*Engine) GetOldState

func (s *Engine) GetOldState(ctx context.Context, id string) (*model.WorkflowState, error)

GetOldState gets a task state given its tracking ID.

func (*Engine) GetProcessHistory

func (s *Engine) GetProcessHistory(ctx context.Context, processInstanceId string, wch chan<- *model.ProcessHistoryEntry, errs chan<- error)

GetProcessHistory fetches the history object for a process.

func (*Engine) GetProcessHistoryItem

func (s *Engine) GetProcessHistoryItem(ctx context.Context, processInstanceID string, trackingID string, historyType model.ProcessHistoryType) (*model.ProcessHistoryEntry, error)

GetProcessHistoryItem retrieves a process history entry based on the given process instance ID, tracking ID, and history type. If the entry is successfully retrieved, it is unmarshaled into a model.ProcessHistoryEntry object and returned. If an error occurs during the retrieval or unmarshaling process, an error is returned.

func (*Engine) GetProcessIdFor

func (s *Engine) GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)

GetProcessIdFor retrieves the processId that a begun by a message start event

func (*Engine) GetProcessInstance

func (s *Engine) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)

GetProcessInstance returns a process instance for a given process ID

func (*Engine) GetProcessInstanceStatus

func (s *Engine) GetProcessInstanceStatus(ctx context.Context, id string, wch chan<- *model.WorkflowState, errs chan<- error)

GetProcessInstanceStatus returns a list of workflow statuses for the specified process instance ID.

func (*Engine) GetTaskSpecByUID

func (s *Engine) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)

GetTaskSpecByUID fetches a task spec from the database.

func (*Engine) GetTaskSpecUID

func (s *Engine) GetTaskSpecUID(ctx context.Context, name string) (string, error)

GetTaskSpecUID fetches

func (*Engine) GetTaskSpecUsage

func (s *Engine) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)

GetTaskSpecUsage returns the usage report for a list of task specs.

func (*Engine) GetTaskSpecUsageByName

func (s *Engine) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)

GetTaskSpecUsageByName produces a report of running and executable places where the task spec is in use.

func (*Engine) GetTaskSpecVersions

func (s *Engine) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)

GetTaskSpecVersions fetches the versions of a given task spec name

func (*Engine) GetUserTaskIDs

func (s *Engine) GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)

GetUserTaskIDs gets a list of tasks given an owner.

func (*Engine) GetWorkflow

func (s *Engine) GetWorkflow(ctx context.Context, workflowID string) (*model.Workflow, error)

GetWorkflow - retrieves a workflow model given its ID

func (*Engine) GetWorkflowNameFor

func (s *Engine) GetWorkflowNameFor(ctx context.Context, processName string) (string, error)

GetWorkflowNameFor - get the worflow name a process is associated with

func (*Engine) GetWorkflowVersions

func (s *Engine) GetWorkflowVersions(ctx context.Context, workflowName string, wch chan<- *model.WorkflowVersion, errs chan<- error)

GetWorkflowVersions - returns a list of versions for a given workflow.

func (*Engine) HandleWorkflowError added in v1.1.1124

func (c *Engine) HandleWorkflowError(ctx context.Context, errorCode string, message string, inVars []byte, job *model.WorkflowState) error

HandleWorkflowError handles a workflow error by looking up the error definitions in the workflow, determining the appropriate action to take, and publishing the necessary workflow state updates. It returns an error if there was an issue retrieving the workflow definition, if the workflow doesn't support the specified error code.

func (*Engine) HasValidProcess

func (s *Engine) HasValidProcess(ctx context.Context, processInstanceId, executionId string) (*model.ProcessInstance, *model.Execution, error)

HasValidProcess - checks for a valid process and instance for a workflow process and instance ids

func (*Engine) Heartbeat

func (s *Engine) Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error

Heartbeat saves a client status to the client KV.

func (*Engine) KvsFor

func (s *Engine) KvsFor(ctx context.Context, ns string) (*NamespaceKvs, error)

KvsFor retrieves the shar KVs for a given namespace. If they do not exist for a namespace, it will initialise them and store them in a map for future lookup.

func (*Engine) Launch

func (c *Engine) Launch(ctx context.Context, processName string, vars []byte) (string, string, error)

Launch starts a new instance of a workflow and returns an execution ID.

func (*Engine) ListExecutableProcesses

func (s *Engine) ListExecutableProcesses(ctx context.Context, wch chan<- *model.ListExecutableProcessesItem, errs chan<- error)

ListExecutableProcesses returns a list of all the executable processes in SHAR. It retrieves the current SHAR namespace from the context and fetches the workflow versions for that namespace from the key-value store. It then iterates through each workflow version and loads the corresponding workflow. For each process in the workflow, it creates a ListExecutableProcessesItem object and populates it with the process name, workflow name, and the executable start parameters obtained from the workflow's start events. It sends each ListExecutableProcessesItem object to the wch channel.

Parameters: - ctx: The context containing the SHAR namespace. - wch: The channel for sending the list of executable processes. - errs: The channel for sending any errors that occur.

Returns: Nothing. Errors are sent to the errs channel if encountered.

func (*Engine) ListExecutionProcesses

func (s *Engine) ListExecutionProcesses(ctx context.Context, id string) ([]string, error)

ListExecutionProcesses gets the current processIDs for an execution.

func (*Engine) ListExecutions

func (s *Engine) ListExecutions(ctx context.Context, workflowName string, wch chan<- *model.ListExecutionItem, errs chan<- error)

ListExecutions returns a list of running workflows and versions given a workflow Name

func (*Engine) ListTaskSpecUIDs

func (s *Engine) ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)

ListTaskSpecUIDs lists UIDs of active (and optionally deprecated) tasks specs.

func (*Engine) ListWorkflows

func (s *Engine) ListWorkflows(ctx context.Context, res chan<- *model.ListWorkflowResponse, errs chan<- error)

ListWorkflows returns a list of all the workflows in SHAR.

func (*Engine) LoadWorkflow

func (c *Engine) LoadWorkflow(ctx context.Context, model *model.Workflow) (string, error)

LoadWorkflow loads a model.Process describing a workflow into the engine ready for execution.

func (*Engine) Log

func (s *Engine) Log(ctx context.Context, req *model.LogRequest) error

Log publishes LogRequest to WorkflowTelemetry Logs subject

func (*Engine) OwnerID

func (s *Engine) OwnerID(ctx context.Context, name string) (string, error)

OwnerID gets a unique identifier for a task owner.

func (*Engine) OwnerName

func (s *Engine) OwnerName(ctx context.Context, id string) (string, error)

OwnerName retrieves an owner name given an ID.

func (*Engine) ProcessServiceTasks

func (s *Engine) ProcessServiceTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error

ProcessServiceTasks iterates over service tasks in the processes of a given workflow setting, validating them and setting their uid into their element definitions

func (*Engine) PublishMsg added in v1.1.1102

func (s *Engine) PublishMsg(ctx context.Context, subject string, sharMsg proto.Message) error

PublishMsg publishes a workflow message.

func (*Engine) PublishWorkflowState

func (s *Engine) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, opts ...PublishOpt) error

PublishWorkflowState publishes a SHAR state object to a given subject

func (*Engine) PutTaskSpec

func (s *Engine) PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)

PutTaskSpec writes a task spec to the database.

func (*Engine) RecordHistory

func (s *Engine) RecordHistory(ctx context.Context, state *model.WorkflowState, historyType model.ProcessHistoryType) error

RecordHistory records into the history KV.

func (*Engine) RecordHistoryActivityComplete

func (s *Engine) RecordHistoryActivityComplete(ctx context.Context, state *model.WorkflowState) error

RecordHistoryActivityComplete records the activity completion into the history object.

func (*Engine) RecordHistoryActivityExecute

func (s *Engine) RecordHistoryActivityExecute(ctx context.Context, state *model.WorkflowState) error

RecordHistoryActivityExecute records the activity execute into the history object.

func (*Engine) RecordHistoryCompensationCheckpoint

func (s *Engine) RecordHistoryCompensationCheckpoint(ctx context.Context, state *model.WorkflowState) error

RecordHistoryCompensationCheckpoint records the process aborting into the history object.

func (*Engine) RecordHistoryJobAbort

func (s *Engine) RecordHistoryJobAbort(ctx context.Context, state *model.WorkflowState) error

RecordHistoryJobAbort records the job abort into the history object.

func (*Engine) RecordHistoryJobComplete

func (s *Engine) RecordHistoryJobComplete(ctx context.Context, state *model.WorkflowState) error

RecordHistoryJobComplete records the job completion into the history object.

func (*Engine) RecordHistoryJobExecute

func (s *Engine) RecordHistoryJobExecute(ctx context.Context, state *model.WorkflowState) error

RecordHistoryJobExecute records the job execute into the history object.

func (*Engine) RecordHistoryProcessAbort

func (s *Engine) RecordHistoryProcessAbort(ctx context.Context, state *model.WorkflowState) error

RecordHistoryProcessAbort records the process aborting into the history object.

func (*Engine) RecordHistoryProcessComplete

func (s *Engine) RecordHistoryProcessComplete(ctx context.Context, state *model.WorkflowState) error

RecordHistoryProcessComplete records the process completion into the history object.

func (*Engine) RecordHistoryProcessSpawn

func (s *Engine) RecordHistoryProcessSpawn(ctx context.Context, state *model.WorkflowState, newProcessInstanceID string) error

RecordHistoryProcessSpawn records the process spawning a new process into the history object.

func (*Engine) RecordHistoryProcessStart

func (s *Engine) RecordHistoryProcessStart(ctx context.Context, state *model.WorkflowState) error

RecordHistoryProcessStart records the process start into the history object.

func (*Engine) SaveState

func (s *Engine) SaveState(ctx context.Context, id string, state *model.WorkflowState) error

SaveState saves the task state.

func (*Engine) Shutdown

func (s *Engine) Shutdown()

Shutdown signals the engine to stop processing.

func (*Engine) SignalFatalError added in v1.1.1102

func (s *Engine) SignalFatalError(ctx context.Context, state *model.WorkflowState, log *slog.Logger)

SignalFatalError publishes a FatalError message on death of a process in a workflow

func (*Engine) Start

func (c *Engine) Start(ctx context.Context) error

Start sets up the activity and job processors and starts the engine processing workflows.

func (*Engine) StartJob

func (s *Engine) StartJob(ctx context.Context, subject string, job *model.WorkflowState, el *model.Element, v []byte, opts ...PublishOpt) error

StartJob launches a user/service task

func (*Engine) StartProcessing

func (s *Engine) StartProcessing(ctx context.Context) error

StartProcessing begins listening to all the message processing queues.

func (*Engine) StoreWorkflow

func (s *Engine) StoreWorkflow(ctx context.Context, wf *model.Workflow) (string, error)

StoreWorkflow stores a workflow definition and returns a unique ID

func (*Engine) XDestroyProcessInstance

func (s *Engine) XDestroyProcessInstance(ctx context.Context, state *model.WorkflowState) error

XDestroyProcessInstance terminates a running process instance with a cancellation reason and error

type NamespaceKvs

type NamespaceKvs struct {
	// contains filtered or unexported fields
}

NamespaceKvs defines all of the key value stores shar needs to operate

type Nats

type Nats struct {
}

Nats contains the engine functions that communicate with NATS.

type NatsConnConfiguration

type NatsConnConfiguration struct {
	Conn        *nats.Conn
	TxConn      *nats.Conn
	StorageType jetstream.StorageType
}

NatsConnConfiguration represents the configuration for a NATS connection.

- Conn: The NATS connection. - TxConn: The transactional NATS connection. - StorageType: The storage type for JetStream.

type PublishOpt

type PublishOpt interface {
	Apply(n *publishOptions)
}

PublishOpt represents an option that can be used when publishing a workflow state

type ServiceTaskConsumerFn

type ServiceTaskConsumerFn func(ctx context.Context, id string) error

ServiceTaskConsumerFn defines the type of a function that ensures existence of a service task consumer

type WorkflowProcessMappingFn

type WorkflowProcessMappingFn func(ctx context.Context, wf *model.Workflow, i *model.Process) (uint64, error)

WorkflowProcessMappingFn defines the type of a function that creates a workflow to process mapping

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL