engine

package
v1.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2026 License: Apache-2.0 Imports: 51 Imported by: 0

Documentation

Overview

Package engine is the workflow engine for orchestrating workflow execution

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInstanceNotFound is returned when a workflow instance cannot be found
	ErrInstanceNotFound = errors.New("workflow instance not found")
	// ErrScopeExpressionRequired is returned when a scope expression is required but empty
	ErrScopeExpressionRequired = errors.New("scope expression is required")
	// ErrAssignmentNotFound is returned when a workflow assignment cannot be found
	ErrAssignmentNotFound = errors.New("workflow assignment not found")
	// ErrInvalidState is returned when a workflow instance is in an invalid state for the operation
	ErrInvalidState = errors.New("invalid workflow state")
	// ErrConditionFailed is returned when a CEL condition evaluation fails
	ErrConditionFailed = errors.New("condition evaluation failed")
	// ErrActionExecutionFailed is returned when an action execution fails
	ErrActionExecutionFailed = errors.New("action execution failed")
	// ErrInvalidTargetType is returned when an unknown target type is encountered
	ErrInvalidTargetType = errors.New("invalid target type")
	// ErrInvalidActionType is returned when an unknown action type is encountered
	ErrInvalidActionType = errors.New("invalid action type")
	// ErrMissingRequiredField is returned when a required field is missing
	ErrMissingRequiredField = errors.New("missing required field")
	// ErrObjectRefMissingID is returned when a workflow object ref has no object ID set
	ErrObjectRefMissingID = errors.New("workflow object ref has no object ID set")
	// ErrCELTypeMismatch is returned when a CEL expression returns a non-boolean type
	ErrCELTypeMismatch = errors.New("CEL expression must return boolean")
	// ErrEvaluationTimeout is returned when CEL evaluation exceeds the timeout
	ErrEvaluationTimeout = errors.New("CEL evaluation timeout")
	// ErrCELNilOutput is returned when CEL evaluation returns nil output
	ErrCELNilOutput = errors.New("CEL evaluation returned nil output")
	// ErrWebhookFailed is returned when a webhook request fails
	ErrWebhookFailed = errors.New("webhook request failed")
	// ErrWebhookPayloadUnsupported is returned when legacy webhook payloads are provided
	ErrWebhookPayloadUnsupported = errors.New("webhook payload is not supported; use payload_expr")
	// ErrWebhookPayloadExpressionFailed is returned when webhook payload CEL evaluation fails
	ErrWebhookPayloadExpressionFailed = errors.New("webhook payload expression failed")
	// ErrWebhookPayloadExpressionInvalid is returned when webhook payload CEL does not produce a JSON object
	ErrWebhookPayloadExpressionInvalid = errors.New("webhook payload expression invalid")
	// ErrExecutorNotAvailable is returned when the executor is not available
	ErrExecutorNotAvailable = errors.New("executor is nil")
	// ErrIntegrationRegistryRequired is returned when an integration registry dependency is missing
	ErrIntegrationRegistryRequired = errors.New("integration registry required")
	// ErrIntegrationOperationsRequired is returned when integration dispatcher is not configured
	ErrIntegrationOperationsRequired = errors.New("integration dispatcher required")
	// ErrIntegrationOperationCriteriaRequired indicates operation name is required
	ErrIntegrationOperationCriteriaRequired = errors.New("integration operation name required")
	// ErrIntegrationScopeConditionFalse indicates integration scope evaluation rejected the operation
	ErrIntegrationScopeConditionFalse = errors.New("integration scope condition false")
	// ErrObjectNil is returned when the workflow object is nil
	ErrObjectNil = errors.New("object is nil")
	// ErrUnmarshalParams is returned when action params cannot be unmarshaled
	ErrUnmarshalParams = errors.New("failed to unmarshal action params")
	// ErrMarshalPayload is returned when a payload cannot be marshaled
	ErrMarshalPayload = errors.New("failed to marshal payload")
	// ErrIntegrationOwnerRequired is returned when integration action is missing owner
	ErrIntegrationOwnerRequired = errors.New("integration action requires instance owner_id")
	// ErrAssignmentCreationFailed is returned when workflow assignment creation fails
	ErrAssignmentCreationFailed = errors.New("failed to create workflow assignment")
	// ErrNotificationCreationFailed is returned when notification creation fails
	ErrNotificationCreationFailed = errors.New("failed to create notification")
	// ErrNotificationTemplateNotFound is returned when a notification template cannot be found
	ErrNotificationTemplateNotFound = errors.New("notification template not found")
	// ErrNotificationTemplateReferenceConflict is returned when both template_id and template_key are provided
	ErrNotificationTemplateReferenceConflict = errors.New("notification template reference conflict")
	// ErrNotificationTemplateDataInvalid is returned when template data fails schema validation
	ErrNotificationTemplateDataInvalid = errors.New("notification template data invalid")
	// ErrWebhookURLRequired is returned when webhook action is missing URL
	ErrWebhookURLRequired = errors.New("webhook action requires url")
	// ErrAssignmentUpdateFailed is returned when assignment update fails
	ErrAssignmentUpdateFailed = errors.New("failed to update workflow assignment")
	// ErrActionIndexOutOfBounds is returned when an action index is outside the workflow definition range
	ErrActionIndexOutOfBounds = errors.New("workflow action index out of bounds")
	// ErrAssignmentActionNotFound is returned when a workflow assignment cannot be mapped to an action
	ErrAssignmentActionNotFound = errors.New("workflow assignment action not found")
	// ErrCELProgramCreationFailed is returned when CEL program creation fails
	ErrCELProgramCreationFailed = errors.New("failed to create CEL program")
	// ErrCELCompilationFailed is returned when CEL fails to compile
	ErrCELCompilationFailed = errors.New("failed to compile CEL")
	// ErrFailedToComputeProposalHash is returned when a proposal hash cannot be computed
	ErrFailedToComputeProposalHash = errors.New("failed to compute proposal hash")
	// ErrFailedToCreateProposal is returned when a workflow proposal cannot be created
	ErrFailedToCreateProposal = errors.New("failed to create workflow proposal inside of proposal manager")
	// ErrFailedToQueryObjectRefs is returned when workflow object refs cannot be queried
	ErrFailedToQueryObjectRefs = errors.New("failed to query object refs")
	// ErrFailedToQueryProposals is returned when workflow proposals cannot be queried
	ErrFailedToQueryProposals = errors.New("failed to query proposals")
	// ErrFailedToLoadProposal is returned when a workflow proposal cannot be loaded
	ErrFailedToLoadProposal = errors.New("failed to load proposal")
	// ErrFailedToApplyFieldUpdates is returned when proposal field updates cannot be applied
	ErrFailedToApplyFieldUpdates = errors.New("failed to apply field updates")
	// ErrFailedToCreateAssignmentTarget is returned when an assignment target cannot be created
	ErrFailedToCreateAssignmentTarget = errors.New("failed to create assignment target")
	// ErrFailedToEnrichWebhookPayload is returned when webhook payload enrichment fails
	ErrFailedToEnrichWebhookPayload = errors.New("failed to enrich webhook payload")
	// ErrFailedToQueryDefinitions is returned when workflow definitions cannot be queried
	ErrFailedToQueryDefinitions = errors.New("failed to query workflow definitions")
	// ErrFailedToResolveTarget is returned when a target cannot be resolved
	ErrFailedToResolveTarget = errors.New("failed to resolve target")
	// ErrFailedToResolveNotificationTarget is returned when a notification target cannot be resolved
	ErrFailedToResolveNotificationTarget = errors.New("failed to resolve notification target")
	// ErrApprovalNoTargets indicates an approval action resolved no targets and should be skipped
	ErrApprovalNoTargets = errors.New("approval action has no resolved targets")
	// ErrMissingObjectRef is returned when a workflow object ref is nil
	ErrMissingObjectRef = errors.New("workflow object ref is required")
	// ErrReviewNoTargets indicates a review action resolved no targets and should be skipped
	ErrReviewNoTargets = errors.New("review action has no resolved targets")
	// ErrTemplateRenderDepthExceeded is returned when template rendering exceeds the maximum depth
	ErrTemplateRenderDepthExceeded = errors.New("template render depth exceeded")
	// ErrNotificationTemplateBlocksInvalid is returned when rendered template blocks are not a valid block list
	ErrNotificationTemplateBlocksInvalid = errors.New("notification template blocks invalid")
)

Functions

This section is empty.

Types

type AssignmentStatusCounts added in v1.5.10

type AssignmentStatusCounts struct {
	// Approved is the count of approved assignments
	Approved int
	// Pending is the count of pending assignments
	Pending int
	// Rejected is the count of rejected assignments
	Rejected int
	// ChangesRequested is the count of changes requested assignments
	ChangesRequested int
	// RejectedRequired indicates if any required assignment was rejected
	RejectedRequired bool
	// ChangesRequestedRequired indicates if any required assignment requested changes
	ChangesRequestedRequired bool
}

AssignmentStatusCounts holds counts of assignment statuses for quorum evaluation

func CountAssignmentStatus added in v1.5.10

func CountAssignmentStatus(assignments []*generated.WorkflowAssignment) AssignmentStatusCounts

CountAssignmentStatus counts the status of assignments for quorum evaluation

type CELEvaluator added in v1.5.10

type CELEvaluator struct {
	// contains filtered or unexported fields
}

CELEvaluator handles CEL expression compilation and evaluation with caching

func NewCELEvaluator added in v1.5.10

func NewCELEvaluator(env *cel.Env, config *workflows.Config) *CELEvaluator

NewCELEvaluator creates a new CEL evaluator with the provided environment and configuration

func (*CELEvaluator) Evaluate added in v1.5.10

func (e *CELEvaluator) Evaluate(ctx context.Context, expression string, vars map[string]any) (bool, error)

Evaluate evaluates a CEL expression with the given variables, using caching and timeout

func (*CELEvaluator) EvaluateJSONMap added in v1.7.0

func (e *CELEvaluator) EvaluateJSONMap(ctx context.Context, expression string, vars map[string]any) (map[string]any, error)

EvaluateJSONMap evaluates a CEL expression and converts the result to a JSON object map.

func (*CELEvaluator) EvaluateValue added in v1.9.3

func (e *CELEvaluator) EvaluateValue(ctx context.Context, expression string, vars map[string]any) (any, error)

EvaluateValue evaluates a CEL expression and returns the JSON-compatible value.

func (*CELEvaluator) Validate added in v1.5.10

func (e *CELEvaluator) Validate(expression string) error

Validate validates that a CEL expression compiles successfully

type IntegrationDeps added in v1.9.3

type IntegrationDeps struct {
	// Runtime provides access to integration definition descriptors and execution.
	Runtime *integrationsruntime.Runtime
}

IntegrationDeps wires integration-specific dependencies into the workflow engine

type IntegrationQueueRequest added in v1.9.3

type IntegrationQueueRequest struct {
	// OrgID identifies the organization requesting the operation
	OrgID string
	// InstallationID is the explicit installation identifier for the operation
	InstallationID string
	// DefinitionID identifies the integration definition when no installation ID is set
	DefinitionID string
	// Operation identifies the operation to execute
	Operation string
	// Config carries the operation configuration payload as a JSON object document
	Config json.RawMessage
	// ScopeExpression is an optional CEL expression gate for command execution
	ScopeExpression string
	// ScopePayload is optional data exposed to scope expression evaluation as a JSON object document
	ScopePayload json.RawMessage
	// ScopeResource is optional resource identity exposed to scope expression evaluation
	ScopeResource string
	// ForceClientRebuild requests client cache bypass
	ForceClientRebuild bool
	// RunType identifies the integration run type
	RunType enums.IntegrationRunType
	// Workflow links the operation to a workflow action
	Workflow *types.WorkflowMeta
}

IntegrationQueueRequest describes a queued integration operation

type IntegrationQueueResult added in v1.9.3

type IntegrationQueueResult struct {
	// RunID identifies the integration run record
	RunID string
	// EventID identifies the emitted event
	EventID string
	// Status captures the run status at queue time
	Status enums.IntegrationRunStatus
}

IntegrationQueueResult captures queue results

type IntegrationScopeEvaluator added in v1.15.0

type IntegrationScopeEvaluator struct {
	// contains filtered or unexported fields
}

IntegrationScopeEvaluator evaluates CEL scope conditions for integration actions

func NewIntegrationScopeEvaluator added in v1.15.0

func NewIntegrationScopeEvaluator() (*IntegrationScopeEvaluator, error)

NewIntegrationScopeEvaluator builds a CEL evaluator scoped to integration scope variables

func (*IntegrationScopeEvaluator) EvaluateConditionWithVars added in v1.15.0

func (e *IntegrationScopeEvaluator) EvaluateConditionWithVars(ctx context.Context, expr string, vars types.ScopeVars) (bool, error)

EvaluateConditionWithVars evaluates a CEL expression against the provided scope variables

func (*IntegrationScopeEvaluator) Validate added in v1.15.0

func (e *IntegrationScopeEvaluator) Validate(expr string) error

Validate compiles an expression to confirm it is syntactically and type-correct. Returns ErrScopeExpressionRequired for empty expressions, ErrCELCompilationFailed for syntax errors, and ErrCELProgramCreationFailed for program construction failures.

type ProposalManager added in v1.5.10

type ProposalManager struct {
	// contains filtered or unexported fields
}

ProposalManager handles WorkflowProposal operations

func NewProposalManager added in v1.5.10

func NewProposalManager(client *generated.Client) *ProposalManager

NewProposalManager creates a new proposal manager

func (*ProposalManager) Apply added in v1.5.10

func (m *ProposalManager) Apply(scope *observability.Scope, proposalID string, obj *workflows.Object) error

Apply applies the approved changes from a WorkflowProposal to the target object

func (*ProposalManager) ComputeHash added in v1.5.10

func (m *ProposalManager) ComputeHash(ctx context.Context, instance *generated.WorkflowInstance, obj *workflows.Object, domainKey string) (string, error)

ComputeHash computes a hash of the proposed changes for the given object/domain. It prefers the proposal attached to the instance when available.

func (*ProposalManager) Create added in v1.5.10

Create creates a WorkflowProposal for the approval domain within a transaction

func (*ProposalManager) LoadForObject added in v1.5.10

LoadForObject loads a WorkflowProposal for the given object using the ObjectFromRef registry

type TriggerInput added in v1.5.10

type TriggerInput struct {
	// EventType is the trigger event name
	EventType string
	// ChangedFields lists updated fields on the target object
	ChangedFields []string
	// ChangedEdges lists updated edges on the target object
	ChangedEdges []string
	// AddedIDs captures added edge IDs keyed by edge name
	AddedIDs map[string][]string
	// RemovedIDs captures removed edge IDs keyed by edge name
	RemovedIDs map[string][]string
	// ProposedChanges contains proposed field updates for approval workflows
	ProposedChanges map[string]any
}

TriggerInput captures the trigger metadata passed to workflow execution

func (TriggerInput) ChangeSet added in v1.11.0

func (input TriggerInput) ChangeSet() mutations.ChangeSet

ChangeSet returns the trigger mutation change-set from trigger input

func (*TriggerInput) SetChangeSet added in v1.11.0

func (input *TriggerInput) SetChangeSet(changeSet mutations.ChangeSet)

SetChangeSet applies a mutation change-set onto trigger input fields

type WorkflowEngine

type WorkflowEngine struct {
	// contains filtered or unexported fields
}

WorkflowEngine orchestrates workflow execution via event emission

func NewWorkflowEngine

func NewWorkflowEngine(client *generated.Client, runtime *gala.Gala, opts ...workflows.ConfigOpts) (*WorkflowEngine, error)

NewWorkflowEngine creates a new workflow engine using the provided configuration options

func NewWorkflowEngineWithConfig

func NewWorkflowEngineWithConfig(client *generated.Client, runtime *gala.Gala, config *workflows.Config) (*WorkflowEngine, error)

NewWorkflowEngineWithConfig creates a new workflow engine using the provided configuration

func (*WorkflowEngine) CompleteAssignment added in v1.5.10

func (e *WorkflowEngine) CompleteAssignment(ctx context.Context, assignmentID string, status enums.WorkflowAssignmentStatus, approvalMetadata *models.WorkflowAssignmentApproval, rejectionMetadata *models.WorkflowAssignmentRejection) (err error)

CompleteAssignment marks an assignment as approved/rejected

func (*WorkflowEngine) EvaluateActionWhen added in v1.5.10

func (e *WorkflowEngine) EvaluateActionWhen(ctx context.Context, expression string, instance *generated.WorkflowInstance, obj *workflows.Object) (bool, error)

EvaluateActionWhen evaluates an action's When expression with assignment context. This is used for re-evaluating NOTIFY actions when assignment status changes.

func (*WorkflowEngine) EvaluateConditions

func (e *WorkflowEngine) EvaluateConditions(ctx context.Context, def *generated.WorkflowDefinition, obj *workflows.Object, eventType string, changedFields []string, changedEdges []string, addedIDs, removedIDs map[string][]string, proposedChanges map[string]any) (bool, error)

EvaluateConditions checks if all conditions pass for a workflow.

func (*WorkflowEngine) Execute added in v1.5.10

Execute performs the action and returns any error.

func (*WorkflowEngine) FindMatchingDefinitions added in v1.5.10

func (e *WorkflowEngine) FindMatchingDefinitions(ctx context.Context, schemaType string, eventType string, changedFields []string, changedEdges []string, addedIDs map[string][]string, removedIDs map[string][]string, proposedChanges map[string]any, obj *workflows.Object) (defs []*generated.WorkflowDefinition, err error)

FindMatchingDefinitions returns all active workflow definitions that match the criteria

func (*WorkflowEngine) ProcessAction added in v1.5.10

func (e *WorkflowEngine) ProcessAction(ctx context.Context, instance *generated.WorkflowInstance, action models.WorkflowAction) error

ProcessAction executes a workflow action

func (*WorkflowEngine) QueueIntegrationOperation added in v1.9.3

func (e *WorkflowEngine) QueueIntegrationOperation(ctx context.Context, req IntegrationQueueRequest) (IntegrationQueueResult, error)

QueueIntegrationOperation resolves the installation and dispatches the operation

func (*WorkflowEngine) ResolveTargets added in v1.5.10

func (e *WorkflowEngine) ResolveTargets(ctx context.Context, target workflows.TargetConfig, obj *workflows.Object) ([]string, error)

ResolveTargets converts a TargetConfig into concrete user IDs.

func (*WorkflowEngine) SetIntegrationDeps added in v1.9.3

func (e *WorkflowEngine) SetIntegrationDeps(deps IntegrationDeps) error

SetIntegrationDeps attaches integration dependencies and registers a post-execution hook so the workflow engine receives completion signals from integration operations

func (*WorkflowEngine) TriggerExistingInstance added in v1.5.10

func (e *WorkflowEngine) TriggerExistingInstance(ctx context.Context, instance *generated.WorkflowInstance, def *generated.WorkflowDefinition, obj *workflows.Object, input TriggerInput) (err error)

TriggerExistingInstance resumes a pre-created workflow instance and emits a trigger event

func (*WorkflowEngine) TriggerWorkflow added in v1.5.10

func (e *WorkflowEngine) TriggerWorkflow(ctx context.Context, def *generated.WorkflowDefinition, obj *workflows.Object, input TriggerInput) (instance *generated.WorkflowInstance, err error)

TriggerWorkflow starts a new workflow instance

type WorkflowListeners added in v1.5.10

type WorkflowListeners struct {
	// contains filtered or unexported fields
}

WorkflowListeners contains all workflow event listeners

func NewWorkflowListeners added in v1.5.10

func NewWorkflowListeners(client *generated.Client, engine *WorkflowEngine, runtime *gala.Gala) *WorkflowListeners

NewWorkflowListeners creates workflow event listeners.

func (*WorkflowListeners) HandleActionCompleted added in v1.5.10

func (l *WorkflowListeners) HandleActionCompleted(ctx gala.HandlerContext, payload gala.WorkflowActionCompletedPayload) (err error)

HandleActionCompleted determines next steps after action completion.

func (*WorkflowListeners) HandleActionStarted added in v1.5.10

func (l *WorkflowListeners) HandleActionStarted(ctx gala.HandlerContext, payload gala.WorkflowActionStartedPayload) (err error)

HandleActionStarted executes a workflow action.

func (*WorkflowListeners) HandleAssignmentCompleted added in v1.5.10

func (l *WorkflowListeners) HandleAssignmentCompleted(ctx gala.HandlerContext, payload gala.WorkflowAssignmentCompletedPayload) (err error)

HandleAssignmentCompleted handles approval decisions and continues/cancels workflows.

func (*WorkflowListeners) HandleInstanceCompleted added in v1.5.10

func (l *WorkflowListeners) HandleInstanceCompleted(ctx gala.HandlerContext, payload gala.WorkflowInstanceCompletedPayload) (err error)

HandleInstanceCompleted marks a workflow instance as completed or failed.

func (*WorkflowListeners) HandleWorkflowAssignmentMutationGala added in v1.11.0

func (l *WorkflowListeners) HandleWorkflowAssignmentMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error

HandleWorkflowAssignmentMutationGala reacts to assignment status changes emitted via Gala. It fires CompleteAssignment whenever a non-Pending status is committed, allowing direct GraphQL mutations on assignment status to advance paused approval and review flows.

func (*WorkflowListeners) HandleWorkflowMutationGala added in v1.11.0

func (l *WorkflowListeners) HandleWorkflowMutationGala(ctx gala.HandlerContext, payload eventqueue.MutationGalaPayload) error

HandleWorkflowMutationGala triggers matching workflows for workflow-eligible Gala mutations.

func (*WorkflowListeners) HandleWorkflowTriggered added in v1.5.10

func (l *WorkflowListeners) HandleWorkflowTriggered(ctx gala.HandlerContext, payload gala.WorkflowTriggeredPayload) (err error)

HandleWorkflowTriggered processes a newly triggered workflow instance. For approval actions with When conditions, all matching actions are started concurrently. This enables workflows where multiple fields change in a single mutation to trigger their respective approval requirements simultaneously.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL