runner

package
v0.0.0-...-6ce7508 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrFlowRunnerNotImplemented = errors.New("flowrunner not implemented")
	ErrNodeNotFound             = errors.New("next node not found")
)
View Source
var ErrFlowCanceledByThrow = errors.New("flow canceled by throw")

ErrFlowCanceledByThrow marks an intentional cancellation triggered by a node (e.g., via a user throw). When a loop node propagates this error, the runner should mark the loop as CANCELED, not FAILURE.

Functions

func BuildPredecessorMap

func BuildPredecessorMap(edges mflow.EdgesMap) map[idwrap.IDWrap][]idwrap.IDWrap

BuildPredecessorMap computes which nodes precede each node in the graph.

func FlowStatusString

func FlowStatusString(f FlowStatus) string

func FlowStatusStringWithIcons

func FlowStatusStringWithIcons(f FlowStatus) string

func IsCancellationError

func IsCancellationError(err error) bool

IsCancellationError returns true if the error represents a cancellation (explicit throw or context cancellation).

func IsFlowStatusDone

func IsFlowStatusDone(f FlowStatus) bool

func NewChannelEmitFunc

func NewChannelEmitFunc(channels FlowEventChannels) func(FlowNodeStatus)

NewChannelEmitFunc creates an emit function that routes status events to FlowEventChannels. RUNNING events go to NodeStates only; terminal events go to both NodeStates and NodeLogs. Safe to call after channels are closed (recovers from send-on-closed-channel).

Types

type ConvergenceTracker

type ConvergenceTracker struct {
	// contains filtered or unexported fields
}

ConvergenceTracker tracks how many predecessors have completed for convergence (join) nodes. It is the mutable counterpart to FlowGraph's immutable ConvergeCounts.

func NewConvergenceTrackerFromPending

func NewConvergenceTrackerFromPending(pending map[idwrap.IDWrap]uint32) *ConvergenceTracker

NewConvergenceTrackerFromPending creates a tracker from a pre-built pending map (e.g. from FlowNodeRequest.PendingAtmoicMap). It copies the map to avoid aliasing the original.

func (*ConvergenceTracker) Arrive

func (ct *ConvergenceTracker) Arrive(nodeID idwrap.IDWrap) bool

Arrive records that one predecessor of nodeID has completed. Returns true when all predecessors have arrived (the node is ready).

func (*ConvergenceTracker) Clone

Clone creates an independent copy for loop iteration isolation.

type FlowEventChannels

type FlowEventChannels struct {
	NodeStates chan FlowNodeStatus
	NodeLogs   chan FlowNodeLogPayload
	FlowStatus chan FlowStatus
}

func LegacyFlowEventChannels

func LegacyFlowEventChannels(nodeStates chan FlowNodeStatus, flowStatus chan FlowStatus) FlowEventChannels

func (FlowEventChannels) HasLogChannel

func (c FlowEventChannels) HasLogChannel() bool

type FlowGraph

type FlowGraph struct {
	Edges          mflow.EdgesMap
	StartNodeIDs   []idwrap.IDWrap
	Predecessors   map[idwrap.IDWrap][]idwrap.IDWrap
	ConvergeCounts map[idwrap.IDWrap]uint32
}

FlowGraph is an immutable representation of a flow's DAG topology. Constructed once before execution begins, then shared across all components. It holds no node implementations (no node.FlowNode) to avoid import cycles.

func NewFlowGraph

func NewFlowGraph(edges mflow.EdgesMap, startNodeIDs []idwrap.IDWrap) *FlowGraph

NewFlowGraph constructs an immutable graph from edges and start nodes. It precomputes predecessor maps and convergence counts.

func NewFlowGraphFromPredecessors

func NewFlowGraphFromPredecessors(
	edges mflow.EdgesMap,
	startNodeID idwrap.IDWrap,
	predecessors map[idwrap.IDWrap][]idwrap.IDWrap,
) *FlowGraph

NewFlowGraphFromPredecessors constructs a FlowGraph when the predecessor map is already computed (e.g., inside RunNodeSync where loop nodes pre-build it).

func (*FlowGraph) NewConvergenceTracker

func (g *FlowGraph) NewConvergenceTracker() *ConvergenceTracker

NewConvergenceTracker creates a fresh mutable tracker for one execution.

type FlowNodeEvent

type FlowNodeEvent struct {
	Status     FlowNodeStatus
	Targets    FlowNodeEventTarget
	LogPayload *FlowNodeLogPayload
}

func (FlowNodeEvent) ShouldSend

func (e FlowNodeEvent) ShouldSend(target FlowNodeEventTarget) bool

type FlowNodeEventTarget

type FlowNodeEventTarget uint8
const (
	FlowNodeEventTargetState FlowNodeEventTarget = 1 << iota
	FlowNodeEventTargetLog
)

type FlowNodeLogPayload

type FlowNodeLogPayload struct {
	ExecutionID      idwrap.IDWrap
	NodeID           idwrap.IDWrap
	Name             string
	State            mflow.NodeState
	Error            error
	OutputData       any
	RunDuration      time.Duration
	IterationContext *IterationContext
	IterationEvent   bool
	IterationIndex   int
	LoopNodeID       idwrap.IDWrap
}

type FlowNodeStatus

type FlowNodeStatus struct {
	ExecutionID      idwrap.IDWrap
	NodeID           idwrap.IDWrap
	Name             string
	State            mflow.NodeState
	OutputData       any
	InputData        any // Data that was read by this node during execution
	RunDuration      time.Duration
	Error            error
	IterationContext *IterationContext `json:"iteration_context,omitempty"`
	IterationEvent   bool              `json:"iteration_event,omitempty"`
	IterationIndex   int               `json:"iteration_index,omitempty"`
	LoopNodeID       idwrap.IDWrap     `json:"loop_node_id,omitempty"`
	AuxiliaryID      *idwrap.IDWrap
}

type FlowRunner

type FlowRunner interface {
	RunWithEvents(context.Context, FlowEventChannels, map[string]any) error
}

type FlowStatus

type FlowStatus int8
const (
	FlowStatusStarting FlowStatus = iota
	FlowStatusRunning
	FlowStatusSuccess
	FlowStatusFailed
	FlowStatusTimeout
)

type IterationContext

type IterationContext struct {
	IterationPath  []int            `json:"iteration_path"`         // [1, 2, 3] for nested loops
	ExecutionIndex int              `json:"execution_index"`        // Current execution within current loop
	ParentNodes    []idwrap.IDWrap  `json:"parent_nodes,omitempty"` // Parent loop node IDs for hierarchical naming
	Labels         []IterationLabel `json:"labels,omitempty"`
}

type IterationLabel

type IterationLabel struct {
	NodeID    idwrap.IDWrap `json:"node_id"`
	Name      string        `json:"name"`
	Iteration int           `json:"iteration"`
}

IterationLabel captures a single segment of a loop execution chain.

type NodeExecution

type NodeExecution struct {
	ExecutionID idwrap.IDWrap
	NodeID      idwrap.IDWrap
	Name        string
	StartTime   time.Time
	IterCtx     *IterationContext
}

NodeExecution identifies a single node execution for status tracking.

type StatusEmitter

type StatusEmitter struct {
	// contains filtered or unexported fields
}

StatusEmitter tracks running nodes and emits status events. It delegates the actual event delivery to an emit function, making it usable with both channel-based delivery (RunWithEvents) and callback-based delivery (RunNodeSync from loop nodes).

For a remote runner, the same StatusEmitter works — only the emitFn changes.

func NewStatusEmitter

func NewStatusEmitter(emitFn func(FlowNodeStatus)) *StatusEmitter

NewStatusEmitter creates an emitter that delegates to the given function.

func (*StatusEmitter) CancelAllRunning

func (se *StatusEmitter) CancelAllRunning(cancelErr error)

CancelAllRunning emits CANCELED for every node currently tracked as RUNNING. Called during context cancellation cleanup.

func (*StatusEmitter) Deregister

func (se *StatusEmitter) Deregister(executionID idwrap.IDWrap)

Deregister removes a node from running tracking without emitting a status. Used when the caller handles status emission itself (e.g., with custom state logic for errors, timeouts, or loop coordinators).

func (*StatusEmitter) Emit

func (se *StatusEmitter) Emit(status FlowNodeStatus)

Emit sends a status event. This is used as the LogPushFunc callback for nodes.

func (*StatusEmitter) EmitRunning

func (se *StatusEmitter) EmitRunning(exec NodeExecution)

EmitRunning atomically registers a node as running and emits RUNNING status. This eliminates the race where cancellation could miss a node between status emission and registration.

func (*StatusEmitter) EmitTerminal

func (se *StatusEmitter) EmitTerminal(executionID idwrap.IDWrap, status FlowNodeStatus, skip bool)

EmitTerminal deregisters the node and emits a pre-built terminal status. The wasRunning guard prevents double emission: if CancelAllRunning already processed this node (cleared from running map), emission is skipped.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL