Documentation
¶
Index ¶
- Variables
- func BuildPredecessorMap(edges mflow.EdgesMap) map[idwrap.IDWrap][]idwrap.IDWrap
- func FlowStatusString(f FlowStatus) string
- func FlowStatusStringWithIcons(f FlowStatus) string
- func IsCancellationError(err error) bool
- func IsFlowStatusDone(f FlowStatus) bool
- func NewChannelEmitFunc(channels FlowEventChannels) func(FlowNodeStatus)
- type ConvergenceTracker
- type FlowEventChannels
- type FlowGraph
- type FlowNodeEvent
- type FlowNodeEventTarget
- type FlowNodeLogPayload
- type FlowNodeStatus
- type FlowRunner
- type FlowStatus
- type IterationContext
- type IterationLabel
- type NodeExecution
- type StatusEmitter
- func (se *StatusEmitter) CancelAllRunning(cancelErr error)
- func (se *StatusEmitter) Deregister(executionID idwrap.IDWrap)
- func (se *StatusEmitter) Emit(status FlowNodeStatus)
- func (se *StatusEmitter) EmitRunning(exec NodeExecution)
- func (se *StatusEmitter) EmitTerminal(executionID idwrap.IDWrap, status FlowNodeStatus, skip bool)
Constants ¶
This section is empty.
Variables ¶
var ( ErrFlowRunnerNotImplemented = errors.New("flowrunner not implemented") ErrNodeNotFound = errors.New("next node not found") )
var ErrFlowCanceledByThrow = errors.New("flow canceled by throw")
ErrFlowCanceledByThrow marks an intentional cancellation triggered by a node (e.g., via a user throw). When a loop node propagates this error, the runner should mark the loop as CANCELED, not FAILURE.
Functions ¶
func BuildPredecessorMap ¶
BuildPredecessorMap computes which nodes precede each node in the graph.
func FlowStatusString ¶
func FlowStatusString(f FlowStatus) string
func FlowStatusStringWithIcons ¶
func FlowStatusStringWithIcons(f FlowStatus) string
func IsCancellationError ¶
IsCancellationError returns true if the error represents a cancellation (explicit throw or context cancellation).
func IsFlowStatusDone ¶
func IsFlowStatusDone(f FlowStatus) bool
func NewChannelEmitFunc ¶
func NewChannelEmitFunc(channels FlowEventChannels) func(FlowNodeStatus)
NewChannelEmitFunc creates an emit function that routes status events to FlowEventChannels. RUNNING events go to NodeStates only; terminal events go to both NodeStates and NodeLogs. Safe to call after channels are closed (recovers from send-on-closed-channel).
Types ¶
type ConvergenceTracker ¶
type ConvergenceTracker struct {
// contains filtered or unexported fields
}
ConvergenceTracker tracks how many predecessors have completed for convergence (join) nodes. It is the mutable counterpart to FlowGraph's immutable ConvergeCounts.
func NewConvergenceTrackerFromPending ¶
func NewConvergenceTrackerFromPending(pending map[idwrap.IDWrap]uint32) *ConvergenceTracker
NewConvergenceTrackerFromPending creates a tracker from a pre-built pending map (e.g. from FlowNodeRequest.PendingAtmoicMap). It copies the map to avoid aliasing the original.
func (*ConvergenceTracker) Arrive ¶
func (ct *ConvergenceTracker) Arrive(nodeID idwrap.IDWrap) bool
Arrive records that one predecessor of nodeID has completed. Returns true when all predecessors have arrived (the node is ready).
func (*ConvergenceTracker) Clone ¶
func (ct *ConvergenceTracker) Clone() *ConvergenceTracker
Clone creates an independent copy for loop iteration isolation.
type FlowEventChannels ¶
type FlowEventChannels struct {
NodeStates chan FlowNodeStatus
NodeLogs chan FlowNodeLogPayload
FlowStatus chan FlowStatus
}
func LegacyFlowEventChannels ¶
func LegacyFlowEventChannels(nodeStates chan FlowNodeStatus, flowStatus chan FlowStatus) FlowEventChannels
func (FlowEventChannels) HasLogChannel ¶
func (c FlowEventChannels) HasLogChannel() bool
type FlowGraph ¶
type FlowGraph struct {
Edges mflow.EdgesMap
StartNodeIDs []idwrap.IDWrap
Predecessors map[idwrap.IDWrap][]idwrap.IDWrap
ConvergeCounts map[idwrap.IDWrap]uint32
}
FlowGraph is an immutable representation of a flow's DAG topology. Constructed once before execution begins, then shared across all components. It holds no node implementations (no node.FlowNode) to avoid import cycles.
func NewFlowGraph ¶
NewFlowGraph constructs an immutable graph from edges and start nodes. It precomputes predecessor maps and convergence counts.
func NewFlowGraphFromPredecessors ¶
func NewFlowGraphFromPredecessors( edges mflow.EdgesMap, startNodeID idwrap.IDWrap, predecessors map[idwrap.IDWrap][]idwrap.IDWrap, ) *FlowGraph
NewFlowGraphFromPredecessors constructs a FlowGraph when the predecessor map is already computed (e.g., inside RunNodeSync where loop nodes pre-build it).
func (*FlowGraph) NewConvergenceTracker ¶
func (g *FlowGraph) NewConvergenceTracker() *ConvergenceTracker
NewConvergenceTracker creates a fresh mutable tracker for one execution.
type FlowNodeEvent ¶
type FlowNodeEvent struct {
Status FlowNodeStatus
Targets FlowNodeEventTarget
LogPayload *FlowNodeLogPayload
}
func (FlowNodeEvent) ShouldSend ¶
func (e FlowNodeEvent) ShouldSend(target FlowNodeEventTarget) bool
type FlowNodeEventTarget ¶
type FlowNodeEventTarget uint8
const ( FlowNodeEventTargetState FlowNodeEventTarget = 1 << iota FlowNodeEventTargetLog )
type FlowNodeLogPayload ¶
type FlowNodeStatus ¶
type FlowNodeStatus struct {
ExecutionID idwrap.IDWrap
NodeID idwrap.IDWrap
Name string
State mflow.NodeState
OutputData any
InputData any // Data that was read by this node during execution
RunDuration time.Duration
Error error
IterationContext *IterationContext `json:"iteration_context,omitempty"`
IterationEvent bool `json:"iteration_event,omitempty"`
IterationIndex int `json:"iteration_index,omitempty"`
LoopNodeID idwrap.IDWrap `json:"loop_node_id,omitempty"`
AuxiliaryID *idwrap.IDWrap
}
type FlowRunner ¶
type FlowStatus ¶
type FlowStatus int8
const ( FlowStatusStarting FlowStatus = iota FlowStatusRunning FlowStatusSuccess FlowStatusFailed FlowStatusTimeout )
type IterationContext ¶
type IterationContext struct {
IterationPath []int `json:"iteration_path"` // [1, 2, 3] for nested loops
ExecutionIndex int `json:"execution_index"` // Current execution within current loop
ParentNodes []idwrap.IDWrap `json:"parent_nodes,omitempty"` // Parent loop node IDs for hierarchical naming
Labels []IterationLabel `json:"labels,omitempty"`
}
type IterationLabel ¶
type IterationLabel struct {
NodeID idwrap.IDWrap `json:"node_id"`
Name string `json:"name"`
Iteration int `json:"iteration"`
}
IterationLabel captures a single segment of a loop execution chain.
type NodeExecution ¶
type NodeExecution struct {
ExecutionID idwrap.IDWrap
NodeID idwrap.IDWrap
Name string
StartTime time.Time
IterCtx *IterationContext
}
NodeExecution identifies a single node execution for status tracking.
type StatusEmitter ¶
type StatusEmitter struct {
// contains filtered or unexported fields
}
StatusEmitter tracks running nodes and emits status events. It delegates the actual event delivery to an emit function, making it usable with both channel-based delivery (RunWithEvents) and callback-based delivery (RunNodeSync from loop nodes).
For a remote runner, the same StatusEmitter works — only the emitFn changes.
func NewStatusEmitter ¶
func NewStatusEmitter(emitFn func(FlowNodeStatus)) *StatusEmitter
NewStatusEmitter creates an emitter that delegates to the given function.
func (*StatusEmitter) CancelAllRunning ¶
func (se *StatusEmitter) CancelAllRunning(cancelErr error)
CancelAllRunning emits CANCELED for every node currently tracked as RUNNING. Called during context cancellation cleanup.
func (*StatusEmitter) Deregister ¶
func (se *StatusEmitter) Deregister(executionID idwrap.IDWrap)
Deregister removes a node from running tracking without emitting a status. Used when the caller handles status emission itself (e.g., with custom state logic for errors, timeouts, or loop coordinators).
func (*StatusEmitter) Emit ¶
func (se *StatusEmitter) Emit(status FlowNodeStatus)
Emit sends a status event. This is used as the LogPushFunc callback for nodes.
func (*StatusEmitter) EmitRunning ¶
func (se *StatusEmitter) EmitRunning(exec NodeExecution)
EmitRunning atomically registers a node as running and emits RUNNING status. This eliminates the race where cancellation could miss a node between status emission and registration.
func (*StatusEmitter) EmitTerminal ¶
func (se *StatusEmitter) EmitTerminal(executionID idwrap.IDWrap, status FlowNodeStatus, skip bool)
EmitTerminal deregisters the node and emits a pre-built terminal status. The wasRunning guard prevents double emission: if CancelAllRunning already processed this node (cleared from running map), emission is skipped.