engine

package
v1.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Overview

Package engine is the workflow engine for orchestrating workflow execution

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrWorkflowNotFound is returned when a workflow definition cannot be found
	ErrWorkflowNotFound = errors.New("workflow definition not found")
	// ErrInstanceNotFound is returned when a workflow instance cannot be found
	ErrInstanceNotFound = errors.New("workflow instance not found")
	// ErrAssignmentNotFound is returned when a workflow assignment cannot be found
	ErrAssignmentNotFound = errors.New("workflow assignment not found")
	// ErrInvalidState is returned when a workflow instance is in an invalid state for the operation
	ErrInvalidState = errors.New("invalid workflow state")
	// ErrConditionFailed is returned when a CEL condition evaluation fails
	ErrConditionFailed = errors.New("condition evaluation failed")
	// ErrNoMatchingDefinitions is returned when no workflow definitions match the trigger criteria
	ErrNoMatchingDefinitions = errors.New("no matching workflow definitions")
	// ErrTargetResolutionFailed is returned when dynamic target resolution fails
	ErrTargetResolutionFailed = errors.New("target resolution failed")
	// ErrActionExecutionFailed is returned when an action execution fails
	ErrActionExecutionFailed = errors.New("action execution failed")
	// ErrInvalidTargetType is returned when an unknown target type is encountered
	ErrInvalidTargetType = errors.New("invalid target type")
	// ErrInvalidActionType is returned when an unknown action type is encountered
	ErrInvalidActionType = errors.New("invalid action type")
	// ErrMissingRequiredField is returned when a required field is missing
	ErrMissingRequiredField = errors.New("missing required field")
	// ErrInvalidObjectType is returned when an unknown object type is encountered
	ErrInvalidObjectType = errors.New("invalid object type")
	// ErrObjectRefMissingID is returned when a workflow object ref has no object ID set
	ErrObjectRefMissingID = errors.New("workflow object ref has no object ID set")
	// ErrFieldNotWorkflowEligible is returned when a field cannot be updated by workflow
	ErrFieldNotWorkflowEligible = errors.New("field is not eligible for workflow modification")
	// ErrCELTypeMismatch is returned when a CEL expression returns a non-boolean type
	ErrCELTypeMismatch = errors.New("CEL expression must return boolean")
	// ErrCELValueExtraction is returned when extracting a boolean value from CEL fails
	ErrCELValueExtraction = errors.New("failed to extract boolean value from CEL result")
	// ErrEvaluationTimeout is returned when CEL evaluation exceeds the timeout
	ErrEvaluationTimeout = errors.New("CEL evaluation timeout")
	// ErrCELPanic is returned when CEL evaluation panics
	ErrCELPanic = errors.New("CEL evaluation panic")
	// ErrCELNilOutput is returned when CEL evaluation returns nil output
	ErrCELNilOutput = errors.New("CEL evaluation returned nil output")
	// ErrWebhookFailed is returned when a webhook request fails
	ErrWebhookFailed = errors.New("webhook request failed")
	// ErrWebhookPayloadUnsupported is returned when legacy webhook payloads are provided
	ErrWebhookPayloadUnsupported = errors.New("webhook payload is not supported; use payload_expr")
	// ErrWebhookPayloadExpressionFailed is returned when webhook payload CEL evaluation fails
	ErrWebhookPayloadExpressionFailed = errors.New("webhook payload expression failed")
	// ErrWebhookPayloadExpressionInvalid is returned when webhook payload CEL does not produce a JSON object
	ErrWebhookPayloadExpressionInvalid = errors.New("webhook payload expression invalid")
	// ErrIntegrationFailed is returned when an integration operation fails
	ErrIntegrationFailed = errors.New("integration operation failed")
	// ErrExecutorNotAvailable is returned when the executor is not available
	ErrExecutorNotAvailable = errors.New("executor is nil")
	// ErrIntegrationManagerNotAvailable is returned when the integration manager is not available
	ErrIntegrationManagerNotAvailable = errors.New("integration operations manager not available")
	// ErrIntegrationStoreRequired is returned when an integration store dependency is missing
	ErrIntegrationStoreRequired = errors.New("integration store required")
	// ErrIntegrationOperationsRequired is returned when integration operations are not configured
	ErrIntegrationOperationsRequired = errors.New("integration operations required")
	// ErrIntegrationEmitterRequired is returned when integration event emitter is missing
	ErrIntegrationEmitterRequired = errors.New("integration emitter required")
	// ErrIntegrationRunIDRequired indicates the integration run identifier is missing
	ErrIntegrationRunIDRequired = errors.New("integration run id required")
	// ErrIntegrationRecordMissing indicates the integration record is missing for a run
	ErrIntegrationRecordMissing = errors.New("integration record missing for run")
	// ErrIntegrationProviderUnknown indicates the integration provider could not be resolved
	ErrIntegrationProviderUnknown = errors.New("integration provider unknown")
	// ErrIntegrationOperationNameRequired indicates the run operation name is missing
	ErrIntegrationOperationNameRequired = errors.New("integration operation name required")
	// ErrIntegrationOperationFailed indicates the operation failed to execute successfully
	ErrIntegrationOperationFailed = errors.New("integration operation failed")
	// ErrIntegrationAlertPayloadsMissing indicates alert payloads are missing from operation output
	ErrIntegrationAlertPayloadsMissing = errors.New("integration alert payloads missing")
	// ErrIntegrationActionQueued indicates the integration action was queued for async processing
	ErrIntegrationActionQueued = errors.New("integration action queued")
	// ErrUnsupportedTimeFormat is returned when a time format is not supported
	ErrUnsupportedTimeFormat = errors.New("unsupported time format")
	// ErrObjectNil is returned when the workflow object is nil
	ErrObjectNil = errors.New("object is nil")
	// ErrProposalChangesModified is returned when proposal changes are modified after approval
	ErrProposalChangesModified = errors.New("proposal changes modified after approval")
	// ErrUnmarshalParams is returned when action params cannot be unmarshaled
	ErrUnmarshalParams = errors.New("failed to unmarshal action params")
	// ErrMarshalPayload is returned when a payload cannot be marshaled
	ErrMarshalPayload = errors.New("failed to marshal payload")
	// ErrIntegrationProviderRequired is returned when integration action is missing provider
	ErrIntegrationProviderRequired = errors.New("integration action requires provider")
	// ErrIntegrationOwnerRequired is returned when integration action is missing owner
	ErrIntegrationOwnerRequired = errors.New("integration action requires instance owner_id")
	// ErrAssignmentCreationFailed is returned when workflow assignment creation fails
	ErrAssignmentCreationFailed = errors.New("failed to create workflow assignment")
	// ErrNotificationCreationFailed is returned when notification creation fails
	ErrNotificationCreationFailed = errors.New("failed to create notification")
	// ErrNotificationTemplateNotFound is returned when a notification template cannot be found
	ErrNotificationTemplateNotFound = errors.New("notification template not found")
	// ErrNotificationTemplateReferenceConflict is returned when both template_id and template_key are provided
	ErrNotificationTemplateReferenceConflict = errors.New("notification template reference conflict")
	// ErrNotificationTemplateDataInvalid is returned when template data fails schema validation
	ErrNotificationTemplateDataInvalid = errors.New("notification template data invalid")
	// ErrNotificationTemplateChannelMismatch is returned when template channel does not match requested channel
	ErrNotificationTemplateChannelMismatch = errors.New("notification template channel mismatch")
	// ErrNotificationChannelUnsupported is returned when a notification channel lacks integration support
	ErrNotificationChannelUnsupported = errors.New("notification channel unsupported")
	// ErrWebhookURLRequired is returned when webhook action is missing URL
	ErrWebhookURLRequired = errors.New("webhook action requires url")
	// ErrAssignmentUpdateFailed is returned when assignment update fails
	ErrAssignmentUpdateFailed = errors.New("failed to update workflow assignment")
	// ErrActionIndexOutOfBounds is returned when an action index is outside the workflow definition range
	ErrActionIndexOutOfBounds = errors.New("workflow action index out of bounds")
	// ErrAssignmentActionNotFound is returned when a workflow assignment cannot be mapped to an action
	ErrAssignmentActionNotFound = errors.New("workflow assignment action not found")
	// ErrNilClient is returned when a nil database client is provided
	ErrNilClient = errors.New("ent client is required to initialize workflow engine")
	// ErrCELProgramCreationFailed is returned when CEL program creation fails
	ErrCELProgramCreationFailed = errors.New("failed to create CEL program")
	// ErrCELCompilationFailed is returned when CEL fails to compile
	ErrCELCompilationFailed = errors.New("failed to compile CEL")
	// ErrFailedToComputeProposalHash is returned when a proposal hash cannot be computed
	ErrFailedToComputeProposalHash = errors.New("failed to compute proposal hash")
	// ErrFailedToCreateProposal is returned when a workflow proposal cannot be created
	ErrFailedToCreateProposal = errors.New("failed to create workflow proposal inside of proposal manager")
	// ErrFailedToQueryObjectRefs is returned when workflow object refs cannot be queried
	ErrFailedToQueryObjectRefs = errors.New("failed to query object refs")
	// ErrFailedToQueryProposals is returned when workflow proposals cannot be queried
	ErrFailedToQueryProposals = errors.New("failed to query proposals")
	// ErrFailedToLoadProposal is returned when a workflow proposal cannot be loaded
	ErrFailedToLoadProposal = errors.New("failed to load proposal")
	// ErrFailedToApplyFieldUpdates is returned when proposal field updates cannot be applied
	ErrFailedToApplyFieldUpdates = errors.New("failed to apply field updates")
	// ErrFailedToCreateAssignmentTarget is returned when an assignment target cannot be created
	ErrFailedToCreateAssignmentTarget = errors.New("failed to create assignment target")
	// ErrFailedToEnrichWebhookPayload is returned when webhook payload enrichment fails
	ErrFailedToEnrichWebhookPayload = errors.New("failed to enrich webhook payload")
	// ErrFailedToQueryDefinitions is returned when workflow definitions cannot be queried
	ErrFailedToQueryDefinitions = errors.New("failed to query workflow definitions")
	// ErrFailedToResolveTarget is returned when a target cannot be resolved
	ErrFailedToResolveTarget = errors.New("failed to resolve target")
	// ErrFailedToResolveNotificationTarget is returned when a notification target cannot be resolved
	ErrFailedToResolveNotificationTarget = errors.New("failed to resolve notification target")
	// ErrApprovalNoTargets indicates an approval action resolved no targets and should be skipped
	ErrApprovalNoTargets = errors.New("approval action has no resolved targets")
	// ErrMissingObjectRef is returned when a workflow object ref is nil
	ErrMissingObjectRef = errors.New("workflow object ref is required")
	// ErrReviewNoTargets indicates a review action resolved no targets and should be skipped
	ErrReviewNoTargets = errors.New("review action has no resolved targets")
	// ErrTemplateRenderDepthExceeded is returned when template rendering exceeds the maximum depth
	ErrTemplateRenderDepthExceeded = errors.New("template render depth exceeded")
)

Functions

This section is empty.

Types

type AssignmentStatusCounts added in v1.5.10

type AssignmentStatusCounts struct {
	// Approved is the count of approved assignments
	Approved int
	// Pending is the count of pending assignments
	Pending int
	// Rejected is the count of rejected assignments
	Rejected int
	// ChangesRequested is the count of changes requested assignments
	ChangesRequested int
	// RejectedRequired indicates if any required assignment was rejected
	RejectedRequired bool
	// ChangesRequestedRequired indicates if any required assignment requested changes
	ChangesRequestedRequired bool
}

AssignmentStatusCounts holds counts of assignment statuses for quorum evaluation

func CountAssignmentStatus added in v1.5.10

func CountAssignmentStatus(assignments []*generated.WorkflowAssignment) AssignmentStatusCounts

CountAssignmentStatus counts the status of assignments for quorum evaluation

type CELEvaluator added in v1.5.10

type CELEvaluator struct {
	// contains filtered or unexported fields
}

CELEvaluator handles CEL expression compilation and evaluation with caching

func NewCELEvaluator added in v1.5.10

func NewCELEvaluator(env *cel.Env, config *workflows.Config) *CELEvaluator

NewCELEvaluator creates a new CEL evaluator with the provided environment and configuration

func (*CELEvaluator) Evaluate added in v1.5.10

func (e *CELEvaluator) Evaluate(ctx context.Context, expression string, vars map[string]any) (bool, error)

Evaluate evaluates a CEL expression with the given variables, using caching and timeout

func (*CELEvaluator) EvaluateJSONMap added in v1.7.0

func (e *CELEvaluator) EvaluateJSONMap(ctx context.Context, expression string, vars map[string]any) (map[string]any, error)

EvaluateJSONMap evaluates a CEL expression and converts the result to a JSON object map.

func (*CELEvaluator) EvaluateValue added in v1.9.3

func (e *CELEvaluator) EvaluateValue(ctx context.Context, expression string, vars map[string]any) (any, error)

EvaluateValue evaluates a CEL expression and returns the JSON-compatible value.

func (*CELEvaluator) Validate added in v1.5.10

func (e *CELEvaluator) Validate(expression string) error

Validate validates that a CEL expression compiles successfully

type IntegrationDeps added in v1.9.3

type IntegrationDeps struct {
	// Registry provides access to integration operation descriptors
	Registry IntegrationRegistry
	// Store provides integration persistence for providers
	Store IntegrationStore
	// Operations executes provider operations through keystore
	Operations IntegrationOperations
	// Emitter publishes integration events for async processing
	Emitter *soiree.EventBus
}

IntegrationDeps wires integration-specific dependencies into the workflow engine

type IntegrationOperations added in v1.9.3

type IntegrationOperations interface {
	Run(ctx context.Context, req types.OperationRequest) (types.OperationResult, error)
}

IntegrationOperations executes provider operations

type IntegrationQueueRequest added in v1.9.3

type IntegrationQueueRequest struct {
	// OrgID identifies the organization requesting the operation
	OrgID string
	// Provider identifies the integration provider
	Provider types.ProviderType
	// IntegrationID identifies the integration record
	IntegrationID string
	// Operation identifies the provider operation
	Operation types.OperationName
	// Config carries the operation configuration payload
	Config map[string]any
	// Force requests credential refresh
	Force bool
	// ClientForce requests client refresh
	ClientForce bool
	// RunType identifies the integration run type
	RunType enums.IntegrationRunType
	// WorkflowMeta links the operation to a workflow action
	WorkflowMeta *IntegrationWorkflowMeta
}

IntegrationQueueRequest describes a queued integration operation

type IntegrationQueueResult added in v1.9.3

type IntegrationQueueResult struct {
	// RunID identifies the integration run record
	RunID string
	// EventID identifies the emitted event
	EventID string
	// Status captures the run status at queue time
	Status enums.IntegrationRunStatus
}

IntegrationQueueResult captures queue results

type IntegrationRegistry added in v1.9.3

type IntegrationRegistry interface {
	OperationDescriptors(provider types.ProviderType) []types.OperationDescriptor
}

IntegrationRegistry exposes provider operation descriptors to the engine

type IntegrationStore added in v1.9.3

type IntegrationStore interface {
	EnsureIntegration(ctx context.Context, orgID string, provider types.ProviderType) (*ent.Integration, error)
}

IntegrationStore ensures integration records exist for providers

type IntegrationWorkflowMeta added in v1.9.3

type IntegrationWorkflowMeta struct {
	// InstanceID identifies the workflow instance that triggered the run
	InstanceID string
	// ActionKey identifies the workflow action key
	ActionKey string
	// ActionIndex captures the workflow action index
	ActionIndex int
	// ObjectID identifies the workflow object
	ObjectID string
	// ObjectType identifies the workflow object type
	ObjectType enums.WorkflowObjectType
}

IntegrationWorkflowMeta ties an integration run back to a workflow action

type ProposalManager added in v1.5.10

type ProposalManager struct {
	// contains filtered or unexported fields
}

ProposalManager handles WorkflowProposal operations

func NewProposalManager added in v1.5.10

func NewProposalManager(client *generated.Client) *ProposalManager

NewProposalManager creates a new proposal manager

func (*ProposalManager) Apply added in v1.5.10

func (m *ProposalManager) Apply(scope *observability.Scope, proposalID string, obj *workflows.Object) error

Apply applies the approved changes from a WorkflowProposal to the target object

func (*ProposalManager) ComputeHash added in v1.5.10

func (m *ProposalManager) ComputeHash(ctx context.Context, instance *generated.WorkflowInstance, obj *workflows.Object, domainKey string) (string, error)

ComputeHash computes a hash of the proposed changes for the given object/domain. It prefers the proposal attached to the instance when available.

func (*ProposalManager) Create added in v1.5.10

Create creates a WorkflowProposal for the approval domain within a transaction

func (*ProposalManager) LoadForObject added in v1.5.10

LoadForObject loads a WorkflowProposal for the given object using the ObjectFromRef registry

type TriggerInput added in v1.5.10

type TriggerInput struct {
	// EventType is the trigger event name
	EventType string
	// ChangedFields lists updated fields on the target object
	ChangedFields []string
	// ChangedEdges lists updated edges on the target object
	ChangedEdges []string
	// AddedIDs captures added edge IDs keyed by edge name
	AddedIDs map[string][]string
	// RemovedIDs captures removed edge IDs keyed by edge name
	RemovedIDs map[string][]string
	// ProposedChanges contains proposed field updates for approval workflows
	ProposedChanges map[string]any
}

TriggerInput captures the trigger metadata passed to workflow execution

type WorkflowEngine

type WorkflowEngine struct {
	// contains filtered or unexported fields
}

WorkflowEngine orchestrates workflow execution via event emission

func NewWorkflowEngine

func NewWorkflowEngine(client *generated.Client, emitter soiree.Emitter, opts ...workflows.ConfigOpts) (*WorkflowEngine, error)

NewWorkflowEngine creates a new workflow engine using the provided configuration options

func NewWorkflowEngineWithConfig

func NewWorkflowEngineWithConfig(client *generated.Client, emitter soiree.Emitter, config *workflows.Config) (*WorkflowEngine, error)

NewWorkflowEngineWithConfig creates a new workflow engine using the provided configuration

func (*WorkflowEngine) CompleteAssignment added in v1.5.10

func (e *WorkflowEngine) CompleteAssignment(ctx context.Context, assignmentID string, status enums.WorkflowAssignmentStatus, approvalMetadata *models.WorkflowAssignmentApproval, rejectionMetadata *models.WorkflowAssignmentRejection) (err error)

CompleteAssignment marks an assignment as approved/rejected

func (*WorkflowEngine) EvaluateActionWhen added in v1.5.10

func (e *WorkflowEngine) EvaluateActionWhen(ctx context.Context, expression string, instance *generated.WorkflowInstance, obj *workflows.Object) (bool, error)

EvaluateActionWhen evaluates an action's When expression with assignment context. This is used for re-evaluating NOTIFY actions when assignment status changes.

func (*WorkflowEngine) EvaluateConditions

func (e *WorkflowEngine) EvaluateConditions(ctx context.Context, def *generated.WorkflowDefinition, obj *workflows.Object, eventType string, changedFields []string, changedEdges []string, addedIDs, removedIDs map[string][]string, proposedChanges map[string]any) (bool, error)

EvaluateConditions checks if all conditions pass for a workflow.

func (*WorkflowEngine) Execute added in v1.5.10

Execute performs the action and returns any error.

func (*WorkflowEngine) FindMatchingDefinitions added in v1.5.10

func (e *WorkflowEngine) FindMatchingDefinitions(ctx context.Context, schemaType string, eventType string, changedFields []string, changedEdges []string, addedIDs map[string][]string, removedIDs map[string][]string, proposedChanges map[string]any, obj *workflows.Object) (defs []*generated.WorkflowDefinition, err error)

FindMatchingDefinitions returns all active workflow definitions that match the criteria

func (*WorkflowEngine) ProcessAction added in v1.5.10

func (e *WorkflowEngine) ProcessAction(ctx context.Context, instance *generated.WorkflowInstance, action models.WorkflowAction) error

ProcessAction executes a workflow action

func (*WorkflowEngine) QueueIntegrationOperation added in v1.9.3

func (e *WorkflowEngine) QueueIntegrationOperation(ctx context.Context, req IntegrationQueueRequest) (IntegrationQueueResult, error)

QueueIntegrationOperation queues an integration operation for async execution

func (*WorkflowEngine) ResolveTargets added in v1.5.10

func (e *WorkflowEngine) ResolveTargets(ctx context.Context, target workflows.TargetConfig, obj *workflows.Object) ([]string, error)

ResolveTargets converts a TargetConfig into concrete user IDs.

func (*WorkflowEngine) SetIntegrationDeps added in v1.9.3

func (e *WorkflowEngine) SetIntegrationDeps(deps IntegrationDeps) error

SetIntegrationDeps attaches integration dependencies and registers listeners when possible

func (*WorkflowEngine) TriggerExistingInstance added in v1.5.10

func (e *WorkflowEngine) TriggerExistingInstance(ctx context.Context, instance *generated.WorkflowInstance, def *generated.WorkflowDefinition, obj *workflows.Object, input TriggerInput) (err error)

TriggerExistingInstance resumes a pre-created workflow instance and emits a trigger event

func (*WorkflowEngine) TriggerWorkflow added in v1.5.10

func (e *WorkflowEngine) TriggerWorkflow(ctx context.Context, def *generated.WorkflowDefinition, obj *workflows.Object, input TriggerInput) (instance *generated.WorkflowInstance, err error)

TriggerWorkflow starts a new workflow instance

type WorkflowListeners added in v1.5.10

type WorkflowListeners struct {
	// contains filtered or unexported fields
}

WorkflowListeners contains all workflow event listeners

func NewWorkflowListeners added in v1.5.10

func NewWorkflowListeners(client *generated.Client, engine *WorkflowEngine, emitter soiree.Emitter) *WorkflowListeners

NewWorkflowListeners creates workflow event listeners.

func (*WorkflowListeners) HandleActionCompleted added in v1.5.10

func (l *WorkflowListeners) HandleActionCompleted(ctx *soiree.EventContext, payload soiree.WorkflowActionCompletedPayload) (err error)

HandleActionCompleted determines next steps after action completion.

func (*WorkflowListeners) HandleActionStarted added in v1.5.10

func (l *WorkflowListeners) HandleActionStarted(ctx *soiree.EventContext, payload soiree.WorkflowActionStartedPayload) (err error)

HandleActionStarted executes a workflow action.

func (*WorkflowListeners) HandleAssignmentCompleted added in v1.5.10

func (l *WorkflowListeners) HandleAssignmentCompleted(ctx *soiree.EventContext, payload soiree.WorkflowAssignmentCompletedPayload) (err error)

HandleAssignmentCompleted handles approval decisions and continues/cancels workflows.

func (*WorkflowListeners) HandleAssignmentCreated added in v1.5.10

func (l *WorkflowListeners) HandleAssignmentCreated(ctx *soiree.EventContext, payload soiree.WorkflowAssignmentCreatedPayload) (err error)

HandleAssignmentCreated records audit events for assignment creation.

func (*WorkflowListeners) HandleInstanceCompleted added in v1.5.10

func (l *WorkflowListeners) HandleInstanceCompleted(ctx *soiree.EventContext, payload soiree.WorkflowInstanceCompletedPayload) (err error)

HandleInstanceCompleted marks a workflow instance as completed or failed.

func (*WorkflowListeners) HandleWorkflowAssignmentMutation added in v1.5.10

func (l *WorkflowListeners) HandleWorkflowAssignmentMutation(ctx *soiree.EventContext, payload *events.MutationPayload) error

HandleWorkflowAssignmentMutation reacts to assignment status changes and emits completion events.

func (*WorkflowListeners) HandleWorkflowMutation added in v1.5.10

func (l *WorkflowListeners) HandleWorkflowMutation(ctx *soiree.EventContext, payload *events.MutationPayload) error

HandleWorkflowMutation triggers matching workflows for workflow-eligible mutations.

func (*WorkflowListeners) HandleWorkflowTriggered added in v1.5.10

func (l *WorkflowListeners) HandleWorkflowTriggered(ctx *soiree.EventContext, payload soiree.WorkflowTriggeredPayload) (err error)

HandleWorkflowTriggered processes a newly triggered workflow instance. For approval actions with When conditions, all matching actions are started concurrently. This enables workflows where multiple fields change in a single mutation to trigger their respective approval requirements simultaneously.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL