Documentation
¶
Overview ¶
Package engine is the workflow engine for orchestrating workflow execution
Index ¶
- Variables
- type AssignmentStatusCounts
- type CELEvaluator
- type ProposalManager
- func (m *ProposalManager) Apply(scope *observability.Scope, proposalID string, obj *workflows.Object) error
- func (m *ProposalManager) ComputeHash(ctx context.Context, instance *generated.WorkflowInstance, ...) (string, error)
- func (m *ProposalManager) Create(ctx context.Context, tx *generated.Tx, objRef *generated.WorkflowObjectRef, ...) (*generated.WorkflowProposal, error)
- func (m *ProposalManager) LoadForObject(ctx context.Context, obj *workflows.Object) (*generated.WorkflowProposal, error)
- type TriggerInput
- type WorkflowEngine
- func (e *WorkflowEngine) CompleteAssignment(ctx context.Context, assignmentID string, ...) (err error)
- func (e *WorkflowEngine) EvaluateActionWhen(ctx context.Context, expression string, instance *generated.WorkflowInstance, ...) (bool, error)
- func (e *WorkflowEngine) EvaluateConditions(ctx context.Context, def *generated.WorkflowDefinition, obj *workflows.Object, ...) (bool, error)
- func (e *WorkflowEngine) Execute(ctx context.Context, action models.WorkflowAction, ...) error
- func (e *WorkflowEngine) FindMatchingDefinitions(ctx context.Context, schemaType string, eventType string, ...) (defs []*generated.WorkflowDefinition, err error)
- func (e *WorkflowEngine) ProcessAction(ctx context.Context, instance *generated.WorkflowInstance, ...) error
- func (e *WorkflowEngine) ResolveTargets(ctx context.Context, target workflows.TargetConfig, obj *workflows.Object) ([]string, error)
- func (e *WorkflowEngine) TriggerExistingInstance(ctx context.Context, instance *generated.WorkflowInstance, ...) (err error)
- func (e *WorkflowEngine) TriggerWorkflow(ctx context.Context, def *generated.WorkflowDefinition, obj *workflows.Object, ...) (instance *generated.WorkflowInstance, err error)
- type WorkflowListeners
- func (l *WorkflowListeners) HandleActionCompleted(ctx *soiree.EventContext, payload soiree.WorkflowActionCompletedPayload) (err error)
- func (l *WorkflowListeners) HandleActionStarted(ctx *soiree.EventContext, payload soiree.WorkflowActionStartedPayload) (err error)
- func (l *WorkflowListeners) HandleAssignmentCompleted(ctx *soiree.EventContext, payload soiree.WorkflowAssignmentCompletedPayload) (err error)
- func (l *WorkflowListeners) HandleAssignmentCreated(ctx *soiree.EventContext, payload soiree.WorkflowAssignmentCreatedPayload) (err error)
- func (l *WorkflowListeners) HandleInstanceCompleted(ctx *soiree.EventContext, payload soiree.WorkflowInstanceCompletedPayload) (err error)
- func (l *WorkflowListeners) HandleWorkflowAssignmentMutation(ctx *soiree.EventContext, payload *events.MutationPayload) error
- func (l *WorkflowListeners) HandleWorkflowMutation(ctx *soiree.EventContext, payload *events.MutationPayload) error
- func (l *WorkflowListeners) HandleWorkflowTriggered(ctx *soiree.EventContext, payload soiree.WorkflowTriggeredPayload) (err error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrWorkflowNotFound is returned when a workflow definition cannot be found ErrWorkflowNotFound = errors.New("workflow definition not found") // ErrInstanceNotFound is returned when a workflow instance cannot be found ErrInstanceNotFound = errors.New("workflow instance not found") // ErrAssignmentNotFound is returned when a workflow assignment cannot be found ErrAssignmentNotFound = errors.New("workflow assignment not found") // ErrInvalidState is returned when a workflow instance is in an invalid state for the operation ErrInvalidState = errors.New("invalid workflow state") // ErrConditionFailed is returned when a CEL condition evaluation fails ErrConditionFailed = errors.New("condition evaluation failed") // ErrNoMatchingDefinitions is returned when no workflow definitions match the trigger criteria ErrNoMatchingDefinitions = errors.New("no matching workflow definitions") // ErrTargetResolutionFailed is returned when dynamic target resolution fails ErrTargetResolutionFailed = errors.New("target resolution failed") // ErrActionExecutionFailed is returned when an action execution fails ErrActionExecutionFailed = errors.New("action execution failed") // ErrInvalidTargetType is returned when an unknown target type is encountered ErrInvalidTargetType = errors.New("invalid target type") // ErrInvalidActionType is returned when an unknown action type is encountered ErrInvalidActionType = errors.New("invalid action type") // ErrMissingRequiredField is returned when a required field is missing ErrMissingRequiredField = errors.New("missing required field") // ErrInvalidObjectType is returned when an unknown object type is encountered ErrInvalidObjectType = errors.New("invalid object type") // ErrObjectRefMissingID is returned when a workflow object ref has no object ID set ErrObjectRefMissingID = errors.New("workflow object ref has no object ID set") // ErrFieldNotWorkflowEligible is returned when a field cannot be updated by workflow ErrFieldNotWorkflowEligible = errors.New("field is not eligible for workflow modification") // ErrCELTypeMismatch is returned when a CEL expression returns a non-boolean type ErrCELTypeMismatch = errors.New("CEL expression must return boolean") // ErrCELValueExtraction is returned when extracting a boolean value from CEL fails ErrCELValueExtraction = errors.New("failed to extract boolean value from CEL result") // ErrEvaluationTimeout is returned when CEL evaluation exceeds the timeout ErrEvaluationTimeout = errors.New("CEL evaluation timeout") // ErrCELPanic is returned when CEL evaluation panics ErrCELPanic = errors.New("CEL evaluation panic") // ErrCELNilOutput is returned when CEL evaluation returns nil output ErrCELNilOutput = errors.New("CEL evaluation returned nil output") // ErrWebhookFailed is returned when a webhook request fails ErrWebhookFailed = errors.New("webhook request failed") // ErrIntegrationFailed is returned when an integration operation fails ErrIntegrationFailed = errors.New("integration operation failed") // ErrExecutorNotAvailable is returned when the executor is not available ErrExecutorNotAvailable = errors.New("executor is nil") // ErrIntegrationManagerNotAvailable is returned when the integration manager is not available ErrIntegrationManagerNotAvailable = errors.New("integration operations manager not available") // ErrUnsupportedTimeFormat is returned when a time format is not supported ErrUnsupportedTimeFormat = errors.New("unsupported time format") // ErrObjectNil is returned when the workflow object is nil ErrObjectNil = errors.New("object is nil") // ErrProposalChangesModified is returned when proposal changes are modified after approval ErrProposalChangesModified = errors.New("proposal changes modified after approval") // ErrUnmarshalParams is returned when action params cannot be unmarshaled ErrUnmarshalParams = errors.New("failed to unmarshal action params") // ErrMarshalPayload is returned when a payload cannot be marshaled ErrMarshalPayload = errors.New("failed to marshal payload") // ErrIntegrationProviderRequired is returned when integration action is missing provider ErrIntegrationProviderRequired = errors.New("integration action requires provider") // ErrIntegrationOwnerRequired is returned when integration action is missing owner ErrIntegrationOwnerRequired = errors.New("integration action requires instance owner_id") // ErrAssignmentCreationFailed is returned when workflow assignment creation fails ErrAssignmentCreationFailed = errors.New("failed to create workflow assignment") // ErrNotificationCreationFailed is returned when notification creation fails ErrNotificationCreationFailed = errors.New("failed to create notification") // ErrWebhookURLRequired is returned when webhook action is missing URL ErrWebhookURLRequired = errors.New("webhook action requires url") // ErrAssignmentUpdateFailed is returned when assignment update fails ErrAssignmentUpdateFailed = errors.New("failed to update workflow assignment") // ErrActionIndexOutOfBounds is returned when an action index is outside the workflow definition range ErrActionIndexOutOfBounds = errors.New("workflow action index out of bounds") // ErrAssignmentActionNotFound is returned when a workflow assignment cannot be mapped to an action ErrAssignmentActionNotFound = errors.New("workflow assignment action not found") // ErrNilClient is returned when a nil database client is provided ErrNilClient = errors.New("ent client is required to initialize workflow engine") // ErrCELProgramCreationFailed is returned when CEL program creation fails ErrCELProgramCreationFailed = errors.New("failed to create CEL program") // ErrCELCompilationFailed is returned when CEL fails to compile ErrCELCompilationFailed = errors.New("failed to compile CEL") // ErrFailedToComputeProposalHash is returned when a proposal hash cannot be computed ErrFailedToComputeProposalHash = errors.New("failed to compute proposal hash") // ErrFailedToCreateProposal is returned when a workflow proposal cannot be created ErrFailedToCreateProposal = errors.New("failed to create workflow proposal inside of proposal manager") // ErrFailedToQueryObjectRefs is returned when workflow object refs cannot be queried ErrFailedToQueryObjectRefs = errors.New("failed to query object refs") // ErrFailedToQueryProposals is returned when workflow proposals cannot be queried ErrFailedToQueryProposals = errors.New("failed to query proposals") // ErrFailedToLoadProposal is returned when a workflow proposal cannot be loaded ErrFailedToLoadProposal = errors.New("failed to load proposal") // ErrFailedToApplyFieldUpdates is returned when proposal field updates cannot be applied ErrFailedToApplyFieldUpdates = errors.New("failed to apply field updates") // ErrFailedToCreateAssignmentTarget is returned when an assignment target cannot be created ErrFailedToCreateAssignmentTarget = errors.New("failed to create assignment target") // ErrFailedToEnrichWebhookPayload is returned when webhook payload enrichment fails ErrFailedToEnrichWebhookPayload = errors.New("failed to enrich webhook payload") // ErrFailedToQueryDefinitions is returned when workflow definitions cannot be queried ErrFailedToQueryDefinitions = errors.New("failed to query workflow definitions") // ErrFailedToResolveTarget is returned when a target cannot be resolved ErrFailedToResolveTarget = errors.New("failed to resolve target") // ErrFailedToResolveNotificationTarget is returned when a notification target cannot be resolved ErrFailedToResolveNotificationTarget = errors.New("failed to resolve notification target") // ErrApprovalNoTargets indicates an approval action resolved no targets and should be skipped ErrApprovalNoTargets = errors.New("approval action has no resolved targets") // ErrMissingObjectRef is returned when a workflow object ref is nil ErrMissingObjectRef = errors.New("workflow object ref is required") )
Functions ¶
This section is empty.
Types ¶
type AssignmentStatusCounts ¶ added in v1.5.10
type AssignmentStatusCounts struct {
// Approved is the count of approved assignments
Approved int
// Pending is the count of pending assignments
Pending int
// Rejected is the count of rejected assignments
Rejected int
// RejectedRequired indicates if any required assignment was rejected
RejectedRequired bool
}
AssignmentStatusCounts holds counts of assignment statuses for quorum evaluation
func CountAssignmentStatus ¶ added in v1.5.10
func CountAssignmentStatus(assignments []*generated.WorkflowAssignment) AssignmentStatusCounts
CountAssignmentStatus counts the status of assignments for quorum evaluation
type CELEvaluator ¶ added in v1.5.10
type CELEvaluator struct {
// contains filtered or unexported fields
}
CELEvaluator handles CEL expression compilation and evaluation with caching
func NewCELEvaluator ¶ added in v1.5.10
func NewCELEvaluator(env *cel.Env, config *workflows.Config) *CELEvaluator
NewCELEvaluator creates a new CEL evaluator with the provided environment and configuration
func (*CELEvaluator) Evaluate ¶ added in v1.5.10
func (e *CELEvaluator) Evaluate(ctx context.Context, expression string, vars map[string]any) (bool, error)
Evaluate evaluates a CEL expression with the given variables, using caching and timeout
func (*CELEvaluator) Validate ¶ added in v1.5.10
func (e *CELEvaluator) Validate(expression string) error
Validate validates that a CEL expression compiles successfully
type ProposalManager ¶ added in v1.5.10
type ProposalManager struct {
// contains filtered or unexported fields
}
ProposalManager handles WorkflowProposal operations
func NewProposalManager ¶ added in v1.5.10
func NewProposalManager(client *generated.Client) *ProposalManager
NewProposalManager creates a new proposal manager
func (*ProposalManager) Apply ¶ added in v1.5.10
func (m *ProposalManager) Apply(scope *observability.Scope, proposalID string, obj *workflows.Object) error
Apply applies the approved changes from a WorkflowProposal to the target object
func (*ProposalManager) ComputeHash ¶ added in v1.5.10
func (m *ProposalManager) ComputeHash(ctx context.Context, instance *generated.WorkflowInstance, obj *workflows.Object, domainKey string) (string, error)
ComputeHash computes a hash of the proposed changes for the given object/domain. It prefers the proposal attached to the instance when available.
func (*ProposalManager) Create ¶ added in v1.5.10
func (m *ProposalManager) Create(ctx context.Context, tx *generated.Tx, objRef *generated.WorkflowObjectRef, domain *workflows.DomainChanges) (*generated.WorkflowProposal, error)
Create creates a WorkflowProposal for the approval domain within a transaction
func (*ProposalManager) LoadForObject ¶ added in v1.5.10
func (m *ProposalManager) LoadForObject(ctx context.Context, obj *workflows.Object) (*generated.WorkflowProposal, error)
LoadForObject loads a WorkflowProposal for the given object using the ObjectFromRef registry
type TriggerInput ¶ added in v1.5.10
type TriggerInput struct {
// EventType is the trigger event name
EventType string
// ChangedFields lists updated fields on the target object
ChangedFields []string
// ChangedEdges lists updated edges on the target object
ChangedEdges []string
// AddedIDs captures added edge IDs keyed by edge name
AddedIDs map[string][]string
// RemovedIDs captures removed edge IDs keyed by edge name
RemovedIDs map[string][]string
// ProposedChanges contains proposed field updates for approval workflows
ProposedChanges map[string]any
}
TriggerInput captures the trigger metadata passed to workflow execution
type WorkflowEngine ¶
type WorkflowEngine struct {
// contains filtered or unexported fields
}
WorkflowEngine orchestrates workflow execution via event emission
func NewWorkflowEngine ¶
func NewWorkflowEngine(client *generated.Client, emitter soiree.Emitter, opts ...workflows.ConfigOpts) (*WorkflowEngine, error)
NewWorkflowEngine creates a new workflow engine using the provided configuration options
func NewWorkflowEngineWithConfig ¶
func NewWorkflowEngineWithConfig(client *generated.Client, emitter soiree.Emitter, config *workflows.Config) (*WorkflowEngine, error)
NewWorkflowEngineWithConfig creates a new workflow engine using the provided configuration
func (*WorkflowEngine) CompleteAssignment ¶ added in v1.5.10
func (e *WorkflowEngine) CompleteAssignment(ctx context.Context, assignmentID string, status enums.WorkflowAssignmentStatus, approvalMetadata *models.WorkflowAssignmentApproval, rejectionMetadata *models.WorkflowAssignmentRejection) (err error)
CompleteAssignment marks an assignment as approved/rejected
func (*WorkflowEngine) EvaluateActionWhen ¶ added in v1.5.10
func (e *WorkflowEngine) EvaluateActionWhen(ctx context.Context, expression string, instance *generated.WorkflowInstance, obj *workflows.Object) (bool, error)
EvaluateActionWhen evaluates an action's When expression with assignment context. This is used for re-evaluating NOTIFY actions when assignment status changes.
func (*WorkflowEngine) EvaluateConditions ¶
func (e *WorkflowEngine) EvaluateConditions(ctx context.Context, def *generated.WorkflowDefinition, obj *workflows.Object, eventType string, changedFields []string, changedEdges []string, addedIDs, removedIDs map[string][]string, proposedChanges map[string]any) (bool, error)
EvaluateConditions checks if all conditions pass for a workflow.
func (*WorkflowEngine) Execute ¶ added in v1.5.10
func (e *WorkflowEngine) Execute(ctx context.Context, action models.WorkflowAction, instance *generated.WorkflowInstance, obj *wfworkflows.Object) error
Execute performs the action and returns any error.
func (*WorkflowEngine) FindMatchingDefinitions ¶ added in v1.5.10
func (e *WorkflowEngine) FindMatchingDefinitions(ctx context.Context, schemaType string, eventType string, changedFields []string, changedEdges []string, addedIDs map[string][]string, removedIDs map[string][]string, proposedChanges map[string]any, obj *workflows.Object) (defs []*generated.WorkflowDefinition, err error)
FindMatchingDefinitions returns all active workflow definitions that match the criteria
func (*WorkflowEngine) ProcessAction ¶ added in v1.5.10
func (e *WorkflowEngine) ProcessAction(ctx context.Context, instance *generated.WorkflowInstance, action models.WorkflowAction) error
ProcessAction executes a workflow action
func (*WorkflowEngine) ResolveTargets ¶ added in v1.5.10
func (e *WorkflowEngine) ResolveTargets(ctx context.Context, target workflows.TargetConfig, obj *workflows.Object) ([]string, error)
ResolveTargets converts a TargetConfig into concrete user IDs.
func (*WorkflowEngine) TriggerExistingInstance ¶ added in v1.5.10
func (e *WorkflowEngine) TriggerExistingInstance(ctx context.Context, instance *generated.WorkflowInstance, def *generated.WorkflowDefinition, obj *workflows.Object, input TriggerInput) (err error)
TriggerExistingInstance resumes a pre-created workflow instance and emits a trigger event
func (*WorkflowEngine) TriggerWorkflow ¶ added in v1.5.10
func (e *WorkflowEngine) TriggerWorkflow(ctx context.Context, def *generated.WorkflowDefinition, obj *workflows.Object, input TriggerInput) (instance *generated.WorkflowInstance, err error)
TriggerWorkflow starts a new workflow instance
type WorkflowListeners ¶ added in v1.5.10
type WorkflowListeners struct {
// contains filtered or unexported fields
}
WorkflowListeners contains all workflow event listeners
func NewWorkflowListeners ¶ added in v1.5.10
func NewWorkflowListeners(client *generated.Client, engine *WorkflowEngine, emitter soiree.Emitter) *WorkflowListeners
NewWorkflowListeners creates workflow event listeners.
func (*WorkflowListeners) HandleActionCompleted ¶ added in v1.5.10
func (l *WorkflowListeners) HandleActionCompleted(ctx *soiree.EventContext, payload soiree.WorkflowActionCompletedPayload) (err error)
HandleActionCompleted determines next steps after action completion.
func (*WorkflowListeners) HandleActionStarted ¶ added in v1.5.10
func (l *WorkflowListeners) HandleActionStarted(ctx *soiree.EventContext, payload soiree.WorkflowActionStartedPayload) (err error)
HandleActionStarted executes a workflow action.
func (*WorkflowListeners) HandleAssignmentCompleted ¶ added in v1.5.10
func (l *WorkflowListeners) HandleAssignmentCompleted(ctx *soiree.EventContext, payload soiree.WorkflowAssignmentCompletedPayload) (err error)
HandleAssignmentCompleted handles approval decisions and continues/cancels workflows.
func (*WorkflowListeners) HandleAssignmentCreated ¶ added in v1.5.10
func (l *WorkflowListeners) HandleAssignmentCreated(ctx *soiree.EventContext, payload soiree.WorkflowAssignmentCreatedPayload) (err error)
HandleAssignmentCreated records audit events for assignment creation.
func (*WorkflowListeners) HandleInstanceCompleted ¶ added in v1.5.10
func (l *WorkflowListeners) HandleInstanceCompleted(ctx *soiree.EventContext, payload soiree.WorkflowInstanceCompletedPayload) (err error)
HandleInstanceCompleted marks a workflow instance as completed or failed.
func (*WorkflowListeners) HandleWorkflowAssignmentMutation ¶ added in v1.5.10
func (l *WorkflowListeners) HandleWorkflowAssignmentMutation(ctx *soiree.EventContext, payload *events.MutationPayload) error
HandleWorkflowAssignmentMutation reacts to assignment status changes and emits completion events.
func (*WorkflowListeners) HandleWorkflowMutation ¶ added in v1.5.10
func (l *WorkflowListeners) HandleWorkflowMutation(ctx *soiree.EventContext, payload *events.MutationPayload) error
HandleWorkflowMutation triggers matching workflows for workflow-eligible mutations.
func (*WorkflowListeners) HandleWorkflowTriggered ¶ added in v1.5.10
func (l *WorkflowListeners) HandleWorkflowTriggered(ctx *soiree.EventContext, payload soiree.WorkflowTriggeredPayload) (err error)
HandleWorkflowTriggered processes a newly triggered workflow instance. For approval actions with When conditions, all matching actions are started concurrently. This enables workflows where multiple fields change in a single mutation to trigger their respective approval requirements simultaneously.