Documentation
¶
Index ¶
- func BuildPredecessorMap(edgesMap mflow.EdgesMap) map[idwrap.IDWrap][]idwrap.IDWrap
- func MaxParallelism() int
- func RunNodeASync(ctx context.Context, startNodeID idwrap.IDWrap, req *node.FlowNodeRequest, ...) error
- func RunNodeSync(ctx context.Context, startNodeID idwrap.IDWrap, req *node.FlowNodeRequest, ...) error
- func SetGoroutineCountForTesting(n int) func()
- type ExecutionMode
- type ExecutionOutcome
- type FlowLocalRunner
- func (r *FlowLocalRunner) Run(ctx context.Context, flowNodeStatusChan chan runner.FlowNodeStatus, ...) error
- func (r *FlowLocalRunner) RunWithEvents(ctx context.Context, channels runner.FlowEventChannels, ...) error
- func (r *FlowLocalRunner) SelectedMode() ExecutionMode
- func (r *FlowLocalRunner) SetDataTrackingEnabled(enabled bool)
- func (r *FlowLocalRunner) SetExecutionMode(mode ExecutionMode)
- type LocalExecutor
- type RunConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildPredecessorMap ¶
BuildPredecessorMap forwards to runner.BuildPredecessorMap. Kept for backward compatibility with node packages (nfor, nforeach, nwsconnection, nai).
func MaxParallelism ¶
func MaxParallelism() int
func RunNodeASync ¶
func RunNodeASync(ctx context.Context, startNodeID idwrap.IDWrap, req *node.FlowNodeRequest, statusLogFunc node.LogPushFunc, predecessorMap map[idwrap.IDWrap][]idwrap.IDWrap, ) error
RunNodeASync retains the legacy behaviour for packages that directly invoke the runner with timeouts.
func RunNodeSync ¶
func RunNodeSync(ctx context.Context, startNodeID idwrap.IDWrap, req *node.FlowNodeRequest, statusLogFunc node.LogPushFunc, predecessorMap map[idwrap.IDWrap][]idwrap.IDWrap, ) error
RunNodeSync retains the legacy behaviour for packages that directly invoke the runner.
func SetGoroutineCountForTesting ¶
func SetGoroutineCountForTesting(n int) func()
SetGoroutineCountForTesting overrides the goroutine count for testing. Returns a cleanup function that restores the original value.
Types ¶
type ExecutionMode ¶
type ExecutionMode int
ExecutionMode controls how FlowLocalRunner schedules nodes.
const ( ExecutionModeAuto ExecutionMode = iota ExecutionModeSingle ExecutionModeMulti )
type ExecutionOutcome ¶
type ExecutionOutcome struct {
Result node.FlowNodeResult
TrackedInput map[string]any
TrackedOutput map[string]any
}
ExecutionOutcome is the raw result from executing a single node, including optional tracked variable data.
type FlowLocalRunner ¶
type FlowLocalRunner struct {
ID idwrap.IDWrap
FlowID idwrap.IDWrap
FlowNodeMap map[idwrap.IDWrap]node.FlowNode
Timeout time.Duration
// contains filtered or unexported fields
}
func CreateFlowRunner ¶
func (*FlowLocalRunner) Run ¶
func (r *FlowLocalRunner) Run(ctx context.Context, flowNodeStatusChan chan runner.FlowNodeStatus, flowStatusChan chan runner.FlowStatus, baseVars map[string]any) error
func (*FlowLocalRunner) RunWithEvents ¶
func (r *FlowLocalRunner) RunWithEvents(ctx context.Context, channels runner.FlowEventChannels, baseVars map[string]any) error
func (*FlowLocalRunner) SelectedMode ¶
func (r *FlowLocalRunner) SelectedMode() ExecutionMode
SelectedMode reports the effective mode used during the last Run invocation.
func (*FlowLocalRunner) SetDataTrackingEnabled ¶
func (r *FlowLocalRunner) SetDataTrackingEnabled(enabled bool)
SetDataTrackingEnabled toggles variable tracking during execution.
func (*FlowLocalRunner) SetExecutionMode ¶
func (r *FlowLocalRunner) SetExecutionMode(mode ExecutionMode)
SetExecutionMode overrides the default Auto mode for the next run.
type LocalExecutor ¶
type LocalExecutor struct {
// contains filtered or unexported fields
}
LocalExecutor runs nodes in the current process with optional variable tracking. It owns the tracker pool, replacing the previous global variable.
For a remote runner, a RemoteExecutor would serialize the request and dispatch to a worker instead of calling RunSync directly.
func NewLocalExecutor ¶
func NewLocalExecutor(trackData bool) *LocalExecutor
NewLocalExecutor creates an executor with the given data tracking setting.
func (*LocalExecutor) Execute ¶
func (e *LocalExecutor) Execute(ctx context.Context, n node.FlowNode, req *node.FlowNodeRequest) ExecutionOutcome
Execute runs a node with optional variable tracking, returning the result and any tracked input/output data.
type RunConfig ¶
type RunConfig struct {
Timeout time.Duration
TrackData bool
MaxConcurrency int
Emitter *runner.StatusEmitter
StatusLogFunc node.LogPushFunc
PredecessorMap map[idwrap.IDWrap][]idwrap.IDWrap
}
RunConfig bundles the parameters that both strategies need, reducing the parameter count of runNodes and the strategy functions.