flow

package
v0.19.904 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: AGPL-3.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DirectiveContinue = "continue"
	DirectiveStop     = "stop"

	DirectiveAwaitApproval = "await-approval"

	DirectiveSkipGroup  = "skip-group"
	DirectiveRetry      = "retry"
	DirectiveRetryGroup = "retry-group"
)

Step directives tell the parent conductor how to proceed after a step completes.

View Source
const (
	// DenyViolationsKey is used in metadata to store deny-level violations
	DenyViolationsKey = "deny_violations"
	// WarnViolationsKey is used in metadata to store warning-level violations
	WarnViolationsKey = "warn_violations"
	// PassedPolicyIDsKey is used in metadata to store IDs of policies that passed evaluation
	PassedPolicyIDsKey = "passed_policy_ids"
)

Policy evaluation metadata keys

View Source
const DirectiveKey = "directive"

DirectiveKey is the metadata key used to communicate step execution directives from the step workflow back to the parent conductor.

Variables

View Source
var ErrNotApproved error = fmt.Errorf("not approved")
View Source
var FlowCancellationErr = fmt.Errorf("workflow cancelled")

Functions

func CancelFutureSteps added in v0.19.894

func CancelFutureSteps(ctx workflow.Context, flw *app.Workflow, idx int, reason string) error

CancelFutureSteps marks all steps after idx as not-attempted.

func CheckStepCancellation added in v0.19.894

func CheckStepCancellation(ctx workflow.Context, stepID string) error

CheckStepCancellation marks a step as cancelled if the workflow context is cancelled.

func CompleteStepGeneration added in v0.19.903

func CompleteStepGeneration(ctx workflow.Context, cfg StepConfig, flw *app.Workflow, queueSignalID string) (*app.Workflow, error)

CompleteStepGeneration fetches and persists all remaining step groups after eager generation. Safe to call even if eager generation was not used (no-op).

func DispatchGroupSignal added in v0.19.894

func DispatchGroupSignal(ctx workflow.Context, cfg StepConfig, group *app.WorkflowStepGroup, flw *app.Workflow) (string, error)

DispatchGroupSignal enqueues an execute-workflow-step-group signal to the step queue and awaits its completion via the group-finished update handler. Returns the group's directive directly so the caller doesn't need to re-fetch from DB.

func DispatchGroupSignalByIdx added in v0.19.894

func DispatchGroupSignalByIdx(ctx workflow.Context, cfg StepConfig, groupIdx int, flw *app.Workflow, parallel bool) (string, error)

DispatchGroupSignalByIdx is a backward-compatible helper that dispatches a group signal using a GroupIdx and parallel flag directly, without a WorkflowStepGroup object.

func DispatchStepSignal added in v0.19.894

func DispatchStepSignal(ctx workflow.Context, cfg StepConfig, step *app.WorkflowStep, flw *app.Workflow) error

DispatchStepSignal enqueues the execute-workflow-step signal to the step queue. The signal runs the full step lifecycle in its own handler workflow.

func ExecuteStepsViaChildWorkflow added in v0.19.894

func ExecuteStepsViaChildWorkflow(ctx workflow.Context, cfg StepConfig, flw *app.Workflow, startFromIdx int) error

ExecuteStepsViaChildWorkflow runs the step iteration loop where each step is dispatched as an execute-workflow-step signal. This is the StepChildWorkflow mode used by the execute-flow signal.

func GenerateSteps added in v0.19.894

func GenerateSteps(ctx workflow.Context, cfg StepConfig, flw *app.Workflow, generators map[app.WorkflowType]WorkflowStepGenerator) (*app.Workflow, error)

GenerateSteps generates workflow steps and persists them.

If the workflow has a GenerateStepsSignal, steps are generated via the signal path (see step_generate_signal.go). Otherwise falls back to the generators map for backward compatibility with conductor callers that haven't migrated yet.

func HandleCancellation added in v0.19.894

func HandleCancellation(ctx workflow.Context, stepErr error, stepID string, idx int, flw *app.Workflow) error

HandleCancellation marks the current step and all future steps as cancelled.

func IsCancellationErr added in v0.19.894

func IsCancellationErr(ctx workflow.Context, err error) bool

IsCancellationErr returns true if the error indicates workflow cancellation.

func StepHumanDescription added in v0.19.894

func StepHumanDescription(err error) string

StepHumanDescription returns a user-facing error message for a failed step. For non-retryable application errors (validation failures, user errors), it extracts the root cause so the user sees actionable details. For transient/internal errors, it returns a generic message to avoid leaking internals.

Types

type ApprovalPauseErr added in v0.19.894

type ApprovalPauseErr struct {
	StepID string
}

ApprovalPauseErr indicates that execution stopped because a step is awaiting approval.

func NewApprovalPauseErr added in v0.19.894

func NewApprovalPauseErr(stepID string) *ApprovalPauseErr

func (*ApprovalPauseErr) Error added in v0.19.894

func (e *ApprovalPauseErr) Error() string

type ContinueAsNewErr

type ContinueAsNewErr struct {
	// StartFromStepIdx is the index to resume from. In the legacy conductor path
	// this is a step index; in the execute-flow signal path this is a group position.
	StartFromStepIdx int
}

func NewContinueAsNewErr

func NewContinueAsNewErr(startsFromStepIdx int) *ContinueAsNewErr

func (*ContinueAsNewErr) Error

func (e *ContinueAsNewErr) Error() string

type EagerStepGroupsResult added in v0.19.903

type EagerStepGroupsResult struct {
	Workflow      *app.Workflow
	QueueSignalID string
}

EagerStepGroupsResult holds the result of an eager step generation — the eager groups are persisted and ready for execution, while remaining groups can be fetched later via CompleteStepGeneration.

func GenerateEagerStepGroups added in v0.19.903

func GenerateEagerStepGroups(ctx workflow.Context, cfg StepConfig, flw *app.Workflow) (*EagerStepGroupsResult, error)

GenerateEagerStepGroups generates and persists the eager step groups, allowing execution to begin on them while remaining groups are still generating. Only works for the signal-based path.

type FlowStoppedErr added in v0.19.894

type FlowStoppedErr struct {
	StepID           string
	Reason           string // "denied", "skipped-dependents"
	RetriesExhausted bool
}

FlowStoppedErr is returned when a workflow is stopped due to a denial or skip-dependents response. Unlike ErrNotApproved, this carries context about why the flow stopped and signals to the execute-flow signal that it should not enter the retry-wait loop.

func NewFlowStoppedErr added in v0.19.894

func NewFlowStoppedErr(stepID, reason string) *FlowStoppedErr

func (*FlowStoppedErr) Error added in v0.19.894

func (e *FlowStoppedErr) Error() string

type RerunInput

type RerunInput struct {
	FlowID          string         `json:"flow_id" validate:"required"`
	StepID          string         `json:"step_id" validate:"required"`
	Operation       RerunOperation `json:"operation" validate:"required"`
	StalePlan       bool           `json:"stale_plan"`
	RePlanStepID    string         `json:"replan_step_id"`
	ContinueFromIdx int            `json:"continue_from_idx"`

	// AdditionalSkipStepIDs are extra steps to mark as skipped alongside the primary StepID.
	// Used when skipping a plan step to also skip the corresponding apply step.
	AdditionalSkipStepIDs []string `json:"additional_skip_step_ids,omitempty"`
}

type RerunOperation

type RerunOperation string
const (
	RerunOperationSkipStep  RerunOperation = "skip-step"
	RerunOperationRetryStep RerunOperation = "retry-step"
)

type StepConfig added in v0.19.894

type StepConfig struct {
	GroupQueueName  string
	QueueName       string
	TargetQueueName string
	OwnerID         string
	OwnerType       string
}

StepConfig holds the queue/owner configuration needed to dispatch and manage workflow steps. Used by both the execute-flow signal (directly) and the WorkflowConductor (for backward compatibility).

type WorkflowConductor

type WorkflowConductor[DomainSignal eventloop.Signal] struct {
	Cfg        *internal.Config
	MW         tmetrics.Writer
	V          *validator.Validate
	EVClient   teventloop.Client
	Generators map[app.WorkflowType]WorkflowStepGenerator

	// ExecFnLegacy is called to actually execute the signal handler for a step.
	//
	// TODO(sdboyer) THIS IS A TEMPORARY HACK. Dispatching should be done within
	// the conductor itself.  However, we absolutely can't do it until we allow
	// certain concurrent behaviors in event loops, as it would deadlock when we
	// signal the same event loop that's running this workflow. It'll also be a
	// bit of awkward coupling to do it without totally predictable event loop
	// workflow IDs, but that's not a hard blocker.
	ExecFnLegacy func(workflow.Context, eventloop.EventLoopRequest, DomainSignal, app.WorkflowStep) error

	// ExecFn is called to execute a queue-signal-based step. Unlike ExecFnLegacy, it does not
	// require a generic DomainSignal or an EventLoopRequest — it operates directly on the
	// QueueSignal stored on the workflow step.
	ExecFn func(workflow.Context, *signaldb.SignalData, app.WorkflowStep) error

	// StepChildWorkflow controls whether QueueSignal-based steps are executed via the
	// execute-workflow-step signal. When true, each step is dispatched through its own
	// signal execution. Only applies to steps where QueueSignal != nil.
	StepChildWorkflow bool

	// StepQueueName is the queue where the execute-workflow-step signal itself runs
	// (e.g. "install-workflow-steps"). When StepChildWorkflow is true, each step's
	// full lifecycle is dispatched to this queue.
	StepQueueName string
	// StepTargetQueueName is the queue where the inner signal (the actual step signal)
	// gets enqueued for execution (e.g. "install-signals").
	StepTargetQueueName string
	StepOwnerID         string
	StepOwnerType       string
}

func (*WorkflowConductor[SignalType]) Handle

func (c *WorkflowConductor[SignalType]) Handle(ctx workflow.Context, req eventloop.EventLoopRequest, flowId string, startFromStepIdx int) error

func (*WorkflowConductor[SignalType]) Rerun

func (c *WorkflowConductor[SignalType]) Rerun(ctx workflow.Context, req eventloop.EventLoopRequest, inp RerunInput) error

Rerun is a workflow that reruns a flow from a specific step. It marks the existing step as discarded and creates a new step with the same parameters. It then executes the flow steps from the newly created step.

type WorkflowStepGenerator

type WorkflowStepGenerator func(ctx workflow.Context, uf *app.Workflow) (*app.GenerateStepsResult, error)

Directories

Path Synopsis
signals

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL