Documentation
¶
Index ¶
- Constants
- Variables
- func CancelFutureSteps(ctx workflow.Context, flw *app.Workflow, idx int, reason string) error
- func CheckStepCancellation(ctx workflow.Context, stepID string) error
- func DispatchGroupSignal(ctx workflow.Context, cfg StepConfig, group *app.WorkflowStepGroup, ...) (string, error)
- func DispatchGroupSignalByIdx(ctx workflow.Context, cfg StepConfig, groupIdx int, flw *app.Workflow, ...) (string, error)
- func DispatchStepSignal(ctx workflow.Context, cfg StepConfig, step *app.WorkflowStep, ...) error
- func ExecuteStepsViaChildWorkflow(ctx workflow.Context, cfg StepConfig, flw *app.Workflow, startFromIdx int) error
- func GenerateSteps(ctx workflow.Context, cfg StepConfig, flw *app.Workflow, ...) (*app.Workflow, error)
- func HandleCancellation(ctx workflow.Context, stepErr error, stepID string, idx int, flw *app.Workflow) error
- func IsCancellationErr(ctx workflow.Context, err error) bool
- func StepHumanDescription(err error) string
- type ApprovalPauseErr
- type ContinueAsNewErr
- type FlowStoppedErr
- type RerunInput
- type RerunOperation
- type StepConfig
- type WorkflowConductor
- type WorkflowStepGenerator
Constants ¶
const ( DirectiveContinue = "continue" DirectiveStop = "stop" DirectiveAwaitApproval = "await-approval" DirectiveSkipGroup = "skip-group" DirectiveRetry = "retry" DirectiveRetryGroup = "retry-group" )
Step directives tell the parent conductor how to proceed after a step completes.
const ( // DenyViolationsKey is used in metadata to store deny-level violations DenyViolationsKey = "deny_violations" // WarnViolationsKey is used in metadata to store warning-level violations WarnViolationsKey = "warn_violations" // PassedPolicyIDsKey is used in metadata to store IDs of policies that passed evaluation PassedPolicyIDsKey = "passed_policy_ids" )
Policy evaluation metadata keys
const DirectiveKey = "directive"
DirectiveKey is the metadata key used to communicate step execution directives from the step workflow back to the parent conductor.
Variables ¶
var ErrNotApproved error = fmt.Errorf("not approved")
var FlowCancellationErr = fmt.Errorf("workflow cancelled")
Functions ¶
func CancelFutureSteps ¶ added in v0.19.894
CancelFutureSteps marks all steps after idx as not-attempted.
func CheckStepCancellation ¶ added in v0.19.894
CheckStepCancellation marks a step as cancelled if the workflow context is cancelled.
func DispatchGroupSignal ¶ added in v0.19.894
func DispatchGroupSignal(ctx workflow.Context, cfg StepConfig, group *app.WorkflowStepGroup, flw *app.Workflow) (string, error)
DispatchGroupSignal enqueues an execute-workflow-step-group signal to the step queue and awaits its completion via the group-finished update handler. Returns the group's directive directly so the caller doesn't need to re-fetch from DB.
func DispatchGroupSignalByIdx ¶ added in v0.19.894
func DispatchGroupSignalByIdx(ctx workflow.Context, cfg StepConfig, groupIdx int, flw *app.Workflow, parallel bool) (string, error)
DispatchGroupSignalByIdx is a backward-compatible helper that dispatches a group signal using a GroupIdx and parallel flag directly, without a WorkflowStepGroup object.
func DispatchStepSignal ¶ added in v0.19.894
func DispatchStepSignal(ctx workflow.Context, cfg StepConfig, step *app.WorkflowStep, flw *app.Workflow) error
DispatchStepSignal enqueues the execute-workflow-step signal to the step queue. The signal runs the full step lifecycle in its own handler workflow.
func ExecuteStepsViaChildWorkflow ¶ added in v0.19.894
func ExecuteStepsViaChildWorkflow(ctx workflow.Context, cfg StepConfig, flw *app.Workflow, startFromIdx int) error
ExecuteStepsViaChildWorkflow runs the step iteration loop where each step is dispatched as an execute-workflow-step signal. This is the StepChildWorkflow mode used by the execute-flow signal.
func GenerateSteps ¶ added in v0.19.894
func GenerateSteps(ctx workflow.Context, cfg StepConfig, flw *app.Workflow, generators map[app.WorkflowType]WorkflowStepGenerator) (*app.Workflow, error)
GenerateSteps generates workflow steps and persists them.
If the workflow has a GenerateStepsSignal, steps are generated via the signal path (see step_generate_signal.go). Otherwise falls back to the generators map for backward compatibility with conductor callers that haven't migrated yet.
func HandleCancellation ¶ added in v0.19.894
func HandleCancellation(ctx workflow.Context, stepErr error, stepID string, idx int, flw *app.Workflow) error
HandleCancellation marks the current step and all future steps as cancelled.
func IsCancellationErr ¶ added in v0.19.894
IsCancellationErr returns true if the error indicates workflow cancellation.
func StepHumanDescription ¶ added in v0.19.894
StepHumanDescription returns a user-facing error message for a failed step. For non-retryable application errors (validation failures, user errors), it extracts the root cause so the user sees actionable details. For transient/internal errors, it returns a generic message to avoid leaking internals.
Types ¶
type ApprovalPauseErr ¶ added in v0.19.894
type ApprovalPauseErr struct {
StepID string
}
ApprovalPauseErr indicates that execution stopped because a step is awaiting approval.
func NewApprovalPauseErr ¶ added in v0.19.894
func NewApprovalPauseErr(stepID string) *ApprovalPauseErr
func (*ApprovalPauseErr) Error ¶ added in v0.19.894
func (e *ApprovalPauseErr) Error() string
type ContinueAsNewErr ¶
type ContinueAsNewErr struct {
// StartFromStepIdx is the index to resume from. In the legacy conductor path
// this is a step index; in the execute-flow signal path this is a group position.
StartFromStepIdx int
}
func NewContinueAsNewErr ¶
func NewContinueAsNewErr(startsFromStepIdx int) *ContinueAsNewErr
func (*ContinueAsNewErr) Error ¶
func (e *ContinueAsNewErr) Error() string
type FlowStoppedErr ¶ added in v0.19.894
type FlowStoppedErr struct {
StepID string
Reason string // "denied", "skipped-dependents"
RetriesExhausted bool
}
FlowStoppedErr is returned when a workflow is stopped due to a denial or skip-dependents response. Unlike ErrNotApproved, this carries context about why the flow stopped and signals to the execute-flow signal that it should not enter the retry-wait loop.
func NewFlowStoppedErr ¶ added in v0.19.894
func NewFlowStoppedErr(stepID, reason string) *FlowStoppedErr
func (*FlowStoppedErr) Error ¶ added in v0.19.894
func (e *FlowStoppedErr) Error() string
type RerunInput ¶
type RerunInput struct {
FlowID string `json:"flow_id" validate:"required"`
StepID string `json:"step_id" validate:"required"`
Operation RerunOperation `json:"operation" validate:"required"`
StalePlan bool `json:"stale_plan"`
RePlanStepID string `json:"replan_step_id"`
ContinueFromIdx int `json:"continue_from_idx"`
// AdditionalSkipStepIDs are extra steps to mark as skipped alongside the primary StepID.
// Used when skipping a plan step to also skip the corresponding apply step.
AdditionalSkipStepIDs []string `json:"additional_skip_step_ids,omitempty"`
}
type RerunOperation ¶
type RerunOperation string
const ( RerunOperationSkipStep RerunOperation = "skip-step" RerunOperationRetryStep RerunOperation = "retry-step" )
type StepConfig ¶ added in v0.19.894
type StepConfig struct {
GroupQueueName string
QueueName string
TargetQueueName string
OwnerID string
OwnerType string
}
StepConfig holds the queue/owner configuration needed to dispatch and manage workflow steps. Used by both the execute-flow signal (directly) and the WorkflowConductor (for backward compatibility).
type WorkflowConductor ¶
type WorkflowConductor[DomainSignal eventloop.Signal] struct { Cfg *internal.Config MW tmetrics.Writer V *validator.Validate EVClient teventloop.Client Generators map[app.WorkflowType]WorkflowStepGenerator // ExecFnLegacy is called to actually execute the signal handler for a step. // // TODO(sdboyer) THIS IS A TEMPORARY HACK. Dispatching should be done within // the conductor itself. However, we absolutely can't do it until we allow // certain concurrent behaviors in event loops, as it would deadlock when we // signal the same event loop that's running this workflow. It'll also be a // bit of awkward coupling to do it without totally predictable event loop // workflow IDs, but that's not a hard blocker. ExecFnLegacy func(workflow.Context, eventloop.EventLoopRequest, DomainSignal, app.WorkflowStep) error // ExecFn is called to execute a queue-signal-based step. Unlike ExecFnLegacy, it does not // require a generic DomainSignal or an EventLoopRequest — it operates directly on the // QueueSignal stored on the workflow step. ExecFn func(workflow.Context, *signaldb.SignalData, app.WorkflowStep) error // StepChildWorkflow controls whether QueueSignal-based steps are executed via the // execute-workflow-step signal. When true, each step is dispatched through its own // signal execution. Only applies to steps where QueueSignal != nil. StepChildWorkflow bool // StepQueueName is the queue where the execute-workflow-step signal itself runs // (e.g. "install-workflow-steps"). When StepChildWorkflow is true, each step's // full lifecycle is dispatched to this queue. StepQueueName string // StepTargetQueueName is the queue where the inner signal (the actual step signal) // gets enqueued for execution (e.g. "install-signals"). StepTargetQueueName string StepOwnerID string StepOwnerType string }
func (*WorkflowConductor[SignalType]) Handle ¶
func (c *WorkflowConductor[SignalType]) Handle(ctx workflow.Context, req eventloop.EventLoopRequest, flowId string, startFromStepIdx int) error
func (*WorkflowConductor[SignalType]) Rerun ¶
func (c *WorkflowConductor[SignalType]) Rerun(ctx workflow.Context, req eventloop.EventLoopRequest, inp RerunInput) error
Rerun is a workflow that reruns a flow from a specific step. It marks the existing step as discarded and creates a new step with the same parameters. It then executes the flow steps from the newly created step.