flowlocalrunner

package
v0.0.0-...-6ce7508 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildPredecessorMap

func BuildPredecessorMap(edgesMap mflow.EdgesMap) map[idwrap.IDWrap][]idwrap.IDWrap

BuildPredecessorMap forwards to runner.BuildPredecessorMap. Kept for backward compatibility with node packages (nfor, nforeach, nwsconnection, nai).

func MaxParallelism

func MaxParallelism() int

func RunNodeASync

func RunNodeASync(ctx context.Context, startNodeID idwrap.IDWrap, req *node.FlowNodeRequest,
	statusLogFunc node.LogPushFunc, predecessorMap map[idwrap.IDWrap][]idwrap.IDWrap,
) error

RunNodeASync retains the legacy behaviour for packages that directly invoke the runner with timeouts.

func RunNodeSync

func RunNodeSync(ctx context.Context, startNodeID idwrap.IDWrap, req *node.FlowNodeRequest,
	statusLogFunc node.LogPushFunc, predecessorMap map[idwrap.IDWrap][]idwrap.IDWrap,
) error

RunNodeSync retains the legacy behaviour for packages that directly invoke the runner.

func SetGoroutineCountForTesting

func SetGoroutineCountForTesting(n int) func()

SetGoroutineCountForTesting overrides the goroutine count for testing. Returns a cleanup function that restores the original value.

Types

type ExecutionMode

type ExecutionMode int

ExecutionMode controls how FlowLocalRunner schedules nodes.

const (
	ExecutionModeAuto ExecutionMode = iota
	ExecutionModeSingle
	ExecutionModeMulti
)

type ExecutionOutcome

type ExecutionOutcome struct {
	Result        node.FlowNodeResult
	TrackedInput  map[string]any
	TrackedOutput map[string]any
}

ExecutionOutcome is the raw result from executing a single node, including optional tracked variable data.

type FlowLocalRunner

type FlowLocalRunner struct {
	ID          idwrap.IDWrap
	FlowID      idwrap.IDWrap
	FlowNodeMap map[idwrap.IDWrap]node.FlowNode
	Timeout     time.Duration
	// contains filtered or unexported fields
}

func CreateFlowRunner

func CreateFlowRunner(id, flowID idwrap.IDWrap, startNodeIDs []idwrap.IDWrap, flowNodeMap map[idwrap.IDWrap]node.FlowNode, edgesMap mflow.EdgesMap, timeout time.Duration, logger *slog.Logger) *FlowLocalRunner

func (*FlowLocalRunner) Run

func (r *FlowLocalRunner) Run(ctx context.Context, flowNodeStatusChan chan runner.FlowNodeStatus, flowStatusChan chan runner.FlowStatus, baseVars map[string]any) error

func (*FlowLocalRunner) RunWithEvents

func (r *FlowLocalRunner) RunWithEvents(ctx context.Context, channels runner.FlowEventChannels, baseVars map[string]any) error

func (*FlowLocalRunner) SelectedMode

func (r *FlowLocalRunner) SelectedMode() ExecutionMode

SelectedMode reports the effective mode used during the last Run invocation.

func (*FlowLocalRunner) SetDataTrackingEnabled

func (r *FlowLocalRunner) SetDataTrackingEnabled(enabled bool)

SetDataTrackingEnabled toggles variable tracking during execution.

func (*FlowLocalRunner) SetExecutionMode

func (r *FlowLocalRunner) SetExecutionMode(mode ExecutionMode)

SetExecutionMode overrides the default Auto mode for the next run.

type LocalExecutor

type LocalExecutor struct {
	// contains filtered or unexported fields
}

LocalExecutor runs nodes in the current process with optional variable tracking. It owns the tracker pool, replacing the previous global variable.

For a remote runner, a RemoteExecutor would serialize the request and dispatch to a worker instead of calling RunSync directly.

func NewLocalExecutor

func NewLocalExecutor(trackData bool) *LocalExecutor

NewLocalExecutor creates an executor with the given data tracking setting.

func (*LocalExecutor) Execute

Execute runs a node with optional variable tracking, returning the result and any tracked input/output data.

type RunConfig

type RunConfig struct {
	Timeout        time.Duration
	TrackData      bool
	MaxConcurrency int
	Emitter        *runner.StatusEmitter
	StatusLogFunc  node.LogPushFunc
	PredecessorMap map[idwrap.IDWrap][]idwrap.IDWrap
}

RunConfig bundles the parameters that both strategies need, reducing the parameter count of runNodes and the strategy functions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL