Documentation
¶
Overview ¶
Package flowgraph provides flow graph analysis and validation for component connections.
Overview ¶
The flowgraph package builds a directed graph representation of component port connections, enabling static analysis to detect configuration issues before runtime. It supports four interaction patterns (stream, request, watch, network) and provides:
- Pattern-based connection matching (NATS wildcard subject matching)
- Connected component analysis (DFS-based clustering)
- Orphaned port detection with severity classification
- JetStream stream requirement validation
This enables early detection of misconfigurations such as missing publishers, dangling subscribers, or JetStream/NATS protocol mismatches.
Architecture ¶
┌─────────────────────────────────────────────────────────────────────────┐
│ FlowGraph │
├─────────────────────────────────────────────────────────────────────────┤
│ ComponentNode[] FlowEdge[] InteractionPattern │
│ - InputPorts - From (portref) - stream (NATS/JS) │
│ - OutputPorts - To (portref) - request (req/reply) │
│ - Component ref - Pattern - watch (KV) │
│ - ConnectionID - network (external) │
└─────────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────────┐
│ FlowAnalysisResult │
│ - ConnectedComponents: clusters of connected components │
│ - OrphanedPorts: ports with no connections │
│ - DisconnectedNodes: components with no edges │
│ - ValidationStatus: healthy/warnings │
└─────────────────────────────────────────────────────────────────────────┘
Usage ¶
Build and analyze a flow graph:
graph := flowgraph.NewFlowGraph()
// Add components
for name, comp := range components {
if err := graph.AddComponentNode(name, comp); err != nil {
return err
}
}
// Build edges by matching connection patterns
if err := graph.ConnectComponentsByPatterns(); err != nil {
return err
}
// Analyze connectivity
result := graph.AnalyzeConnectivity()
if result.ValidationStatus == "warnings" {
for _, orphan := range result.OrphanedPorts {
log.Warn("orphaned port", "component", orphan.ComponentName, "port", orphan.PortName)
}
}
Validate JetStream requirements:
warnings := graph.ValidateStreamRequirements()
for _, w := range warnings {
if w.Severity == "critical" {
return fmt.Errorf("JetStream mismatch: %s", w.Issue)
}
}
Interaction Patterns ¶
PatternStream (NATS, JetStream):
- Unidirectional: publishers → subscribers
- Supports NATS wildcard matching (* and >)
- Multiple publishers/subscribers per subject allowed
PatternRequest (NATS request/reply):
- Bidirectional: any port can initiate requests
- All ports with same subject are connected
- Used for synchronous RPC-style communication
PatternWatch (KV bucket):
- Unidirectional: writers → watchers
- Multiple writers to same bucket generates warning
- Watchers receive change notifications
PatternNetwork (external):
- External boundary ports (HTTP, UDP, etc.)
- Exclusive binding: multiple binds to same port is an error
- Not connected in graph (external endpoints)
Connection Matching ¶
NATS subject matching follows standard semantics:
- Exact match: "graph.ingest.data" matches "graph.ingest.data"
- Single wildcard: "graph.*.data" matches "graph.ingest.data"
- Multi wildcard: "graph.>" matches "graph.ingest.data"
- Bidirectional: either side can be the pattern
Analysis Results ¶
FlowAnalysisResult contains:
ConnectedComponents: Clusters of interconnected components found via DFS. Multiple clusters indicate isolated subgraphs.
DisconnectedNodes: Components with zero edges. These may be misconfigured or intentionally standalone.
OrphanedPorts: Ports with no matching connections. Classified by issue type:
- no_publishers: Input port has no matching output ports
- no_subscribers: Output port has no matching input ports
- optional_api_unused: Request ports are optional by design
- optional_interface_unused: Interface-specific alternative ports
- optional_index_unwatched: KV watch ports may be intentionally unused
ValidationStatus: "healthy" if no issues, "warnings" if any problems detected.
JetStream Validation ¶
ValidateStreamRequirements detects protocol mismatches:
- JetStream subscribers expect a durable stream to exist
- Streams are created by JetStream output ports (via EnsureStreams)
- If a JetStream subscriber connects only to NATS publishers, no stream is created
- This results in the subscriber hanging indefinitely
The validation returns critical severity warnings for such mismatches.
Thread Safety ¶
FlowGraph is NOT safe for concurrent modification. It is designed for single-threaded construction and analysis during service startup.
See Also ¶
Related packages:
- github.com/c360studio/semstreams/component: Port types and component interfaces
- github.com/c360studio/semstreams/service: Uses flowgraph for flow validation
Package flowgraph provides flow graph analysis and validation for component connections.
Package flowgraph provides flow graph analysis and validation for component connections.
Index ¶
- Constants
- type ComponentNode
- type ComponentPortRef
- type DisconnectedNode
- type EdgeMetadata
- type FlowAnalysisResult
- type FlowEdge
- type FlowGraph
- func (g *FlowGraph) AddComponentNode(name string, comp component.Discoverable) error
- func (g *FlowGraph) AnalyzeConnectivity() *FlowAnalysisResult
- func (g *FlowGraph) ConnectComponentsByPatterns() error
- func (g *FlowGraph) GetEdges() []FlowEdge
- func (g *FlowGraph) GetNodes() map[string]*ComponentNode
- func (g *FlowGraph) ValidateStreamRequirements() []StreamRequirementWarning
- type InteractionPattern
- type OrphanedPort
- type PortInfo
- type StreamRequirementWarning
Constants ¶
const ( IssueNoPublishers = "no_publishers" IssueNoSubscribers = "no_subscribers" IssueOptionalAPIUnused = "optional_api_unused" IssueOptionalIndexUnwatched = "optional_index_unwatched" IssueOptionalInterfaceUnused = "optional_interface_unused" )
Issue type constants for orphaned port classification
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ComponentNode ¶
type ComponentNode struct {
ComponentName string
Component component.Discoverable
InputPorts []PortInfo
OutputPorts []PortInfo
}
ComponentNode represents a component in the flow graph
type ComponentPortRef ¶
type ComponentPortRef struct {
ComponentName string `json:"component_name"`
PortName string `json:"port_name"`
}
ComponentPortRef references a specific port on a component
type DisconnectedNode ¶
type DisconnectedNode struct {
ComponentName string `json:"component_name"`
Issue string `json:"issue"`
Suggestions []string `json:"suggestions,omitempty"`
}
DisconnectedNode represents a component with no connections
type EdgeMetadata ¶
type EdgeMetadata struct {
InterfaceContract *component.InterfaceContract `json:"interface_contract,omitempty"`
Timeout string `json:"timeout,omitempty"` // Request pattern
Queue string `json:"queue,omitempty"` // Stream pattern
Keys []string `json:"keys,omitempty"` // Watch pattern
}
EdgeMetadata contains pattern-specific metadata
type FlowAnalysisResult ¶
type FlowAnalysisResult struct {
ConnectedComponents [][]string `json:"connected_components"`
ConnectedEdges []FlowEdge `json:"connected_edges"`
DisconnectedNodes []DisconnectedNode `json:"disconnected_nodes"`
OrphanedPorts []OrphanedPort `json:"orphaned_ports"`
ValidationStatus string `json:"validation_status"`
}
FlowAnalysisResult contains the results of connectivity analysis
type FlowEdge ¶
type FlowEdge struct {
From ComponentPortRef `json:"from"`
To ComponentPortRef `json:"to"`
Pattern InteractionPattern `json:"pattern"`
ConnectionID string `json:"connection_id"` // Subject, bucket, or network addr
Metadata EdgeMetadata `json:"metadata"` // Pattern-specific validation data
}
FlowEdge represents a connection between two component ports
type FlowGraph ¶
type FlowGraph struct {
// contains filtered or unexported fields
}
FlowGraph represents a directed graph of component connections
func (*FlowGraph) AddComponentNode ¶
func (g *FlowGraph) AddComponentNode(name string, comp component.Discoverable) error
AddComponentNode adds a component as a node in the graph
func (*FlowGraph) AnalyzeConnectivity ¶
func (g *FlowGraph) AnalyzeConnectivity() *FlowAnalysisResult
AnalyzeConnectivity performs graph connectivity analysis
func (*FlowGraph) ConnectComponentsByPatterns ¶
ConnectComponentsByPatterns builds edges by matching connection patterns
func (*FlowGraph) GetNodes ¶
func (g *FlowGraph) GetNodes() map[string]*ComponentNode
GetNodes returns a deep copy of component nodes to prevent external modification
func (*FlowGraph) ValidateStreamRequirements ¶
func (g *FlowGraph) ValidateStreamRequirements() []StreamRequirementWarning
ValidateStreamRequirements checks that JetStream subscribers have corresponding JetStream publishers. When a component subscribes via JetStream, it expects a durable stream to exist. Streams are only created by components that publish with JetStream output ports (via EnsureStreams). If a JetStream subscriber is connected only to NATS publishers, no stream will be created and the subscriber will hang waiting for a stream that never appears.
type InteractionPattern ¶
type InteractionPattern string
InteractionPattern defines the type of interaction between components
const ( // PatternStream represents NATSPort and JetStreamPort interactions PatternStream InteractionPattern = "stream" // PatternRequest represents NATSRequestPort (bidirectional) interactions PatternRequest InteractionPattern = "request" // PatternWatch represents KVWatchPort (observation) interactions PatternWatch InteractionPattern = "watch" // PatternNetwork represents NetworkPort (external) interactions PatternNetwork InteractionPattern = "network" )
type OrphanedPort ¶
type OrphanedPort struct {
ComponentName string `json:"component_name"`
PortName string `json:"port_name"`
Direction component.Direction `json:"direction"`
ConnectionID string `json:"connection_id"`
Pattern InteractionPattern `json:"pattern"`
Issue string `json:"issue"`
Required bool `json:"required"`
}
OrphanedPort represents a port with no connections
type PortInfo ¶
type PortInfo struct {
Name string
Direction component.Direction
ConnectionID string // Subject, bucket, or network address
Pattern InteractionPattern
Interface *component.InterfaceContract
Required bool // Whether this port is required for the component to function
PortConfig component.Portable // Original port configuration for type checking
}
PortInfo contains port metadata for graph analysis
type StreamRequirementWarning ¶
type StreamRequirementWarning struct {
Severity string `json:"severity"`
SubscriberComp string `json:"subscriber_component"`
SubscriberPort string `json:"subscriber_port"`
Subjects []string `json:"subjects"`
PublisherComps []string `json:"publisher_components"`
Issue string `json:"issue"`
}
StreamRequirementWarning represents a mismatch between JetStream subscriber and NATS publisher