flow

package
v0.19.855 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: AGPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DenyViolationsKey is used in metadata to store deny-level violations
	DenyViolationsKey = "deny_violations"
	// WarnViolationsKey is used in metadata to store warning-level violations
	WarnViolationsKey = "warn_violations"
)

Policy violation metadata keys

Variables

View Source
var ErrNotApproved error = fmt.Errorf("not approved")
View Source
var FlowCancellationErr = fmt.Errorf("workflow cancelled")

Functions

This section is empty.

Types

type ContinueAsNewErr

type ContinueAsNewErr struct {
	StartFromStepIdx int
}

func NewContinueAsNewErr

func NewContinueAsNewErr(startsFromStepIdx int) *ContinueAsNewErr

func (*ContinueAsNewErr) Error

func (e *ContinueAsNewErr) Error() string

type ExecuteStepRequest added in v0.19.850

type ExecuteStepRequest struct {
	StepID      string               `json:"step_id"`
	QueueSignal *signaldb.SignalData `json:"queue_signal"`
	Step        app.WorkflowStep     `json:"step"`
}

ExecuteStepRequest is the input for the ExecuteStep child workflow.

type ExecuteStepWorkflows added in v0.19.850

type ExecuteStepWorkflows struct {
	ExecFn func(workflow.Context, *signaldb.SignalData, app.WorkflowStep) error
}

ExecuteStepWorkflows provides a registrable workflow struct for the ExecuteStep child workflow. Callers must set ExecFn before registering the workflow on the worker.

func (*ExecuteStepWorkflows) ExecuteStep added in v0.19.850

type RerunInput

type RerunInput struct {
	FlowID          string         `json:"flow_id" validate:"required"`
	StepID          string         `json:"step_id" validate:"required"`
	Operation       RerunOperation `json:"operation" validate:"required"`
	StalePlan       bool           `json:"stale_plan"`
	RePlanStepID    string         `json:"replan_step_id"`
	ContinueFromIdx int            `json:"continue_from_idx"`
}

type RerunOperation

type RerunOperation string
const (
	RerunOperationSkipStep  RerunOperation = "skip-step"
	RerunOperationRetryStep RerunOperation = "retry-step"
)

type WorkflowConductor

type WorkflowConductor[DomainSignal eventloop.Signal] struct {
	Cfg        *internal.Config
	MW         tmetrics.Writer
	V          *validator.Validate
	EVClient   teventloop.Client
	Generators map[app.WorkflowType]WorkflowStepGenerator

	// ExecFnLegacy is called to actually execute the signal handler for a step.
	//
	// TODO(sdboyer) THIS IS A TEMPORARY HACK. Dispatching should be done within
	// the conductor itself.  However, we absolutely can't do it until we allow
	// certain concurrent behaviors in event loops, as it would deadlock when we
	// signal the same event loop that's running this workflow. It'll also be a
	// bit of awkward coupling to do it without totally predictable event loop
	// workflow IDs, but that's not a hard blocker.
	ExecFnLegacy func(workflow.Context, eventloop.EventLoopRequest, DomainSignal, app.WorkflowStep) error

	// ExecFn is called to execute a queue-signal-based step. Unlike ExecFnLegacy, it does not
	// require a generic DomainSignal or an EventLoopRequest — it operates directly on the
	// QueueSignal stored on the workflow step.
	ExecFn func(workflow.Context, *signaldb.SignalData, app.WorkflowStep) error

	// StepChildWorkflow controls whether QueueSignal-based steps are executed as child workflows.
	// When true, each step is run in its own child workflow (ExecuteStep). Only applies to
	// steps where QueueSignal != nil.
	StepChildWorkflow bool
}

func (*WorkflowConductor[SignalType]) Handle

func (c *WorkflowConductor[SignalType]) Handle(ctx workflow.Context, req eventloop.EventLoopRequest, flowId string, startFromStepIdx int) error

func (*WorkflowConductor[SignalType]) Rerun

func (c *WorkflowConductor[SignalType]) Rerun(ctx workflow.Context, req eventloop.EventLoopRequest, inp RerunInput) error

Rerun is a workflow that reruns a flow from a specific step. It marks the existing step as discarded and creates a new step with the same parameters. It then executes the flow steps from the newly created step.

type WorkflowStepGenerator

type WorkflowStepGenerator func(ctx workflow.Context, uf *app.Workflow) ([]*app.WorkflowStep, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL