flowgraph

package
v1.0.0-alpha.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 4 Imported by: 0

README

FlowGraph

Static analysis and validation for component port connections in SemStreams.

Overview

The flowgraph package builds a directed graph representation of component port connections, enabling static analysis to detect configuration issues before runtime. It validates that components are correctly connected based on their port types and interaction patterns, identifying problems such as missing publishers, dangling subscribers, or JetStream/NATS protocol mismatches.

This analysis runs during service startup to fail fast on misconfigured flows.

Key Features

Connectivity Analysis

Performs graph-based connectivity analysis to detect:

  • Orphaned Ports: Ports with no matching connections (classified by severity)
  • Disconnected Nodes: Components with zero edges (potentially misconfigured)
  • Connected Components: Clusters of interconnected components (isolated subgraphs indicate configuration issues)
NATS Pattern Matching

Validates connections using NATS wildcard semantics:

  • Exact Match: graph.ingest.data matches graph.ingest.data
  • Single Wildcard: graph.*.data matches graph.ingest.data
  • Multi Wildcard: graph.> matches graph.ingest.data
  • Bidirectional: Either side of a connection can be the pattern
JetStream Stream Validation

Detects protocol mismatches between publishers and subscribers:

  • JetStream Requirement: JetStream subscribers expect a durable stream to exist
  • Stream Creation: Streams are only created by JetStream output ports via EnsureStreams
  • Critical Warning: If a JetStream subscriber connects only to NATS publishers, no stream will be created and the subscriber will hang indefinitely

Interaction Patterns

The package supports four interaction patterns:

Pattern Description Direction Matching
stream NATS/JetStream pub-sub Unidirectional NATS wildcard
request NATS request-reply Bidirectional Exact subject
watch KV bucket observation Unidirectional Exact bucket
network External boundaries External Exact address
Stream Pattern (NATS, JetStream)

Unidirectional flow from publishers to subscribers with NATS wildcard matching. Multiple publishers and subscribers per subject are allowed.

Request Pattern (NATS Request-Reply)

Bidirectional connections where any port can initiate requests. All ports with the same subject are connected to each other for RPC-style communication.

Watch Pattern (KV Bucket)

Unidirectional flow from KV writers to watchers. Multiple writers to the same bucket generates a warning as this may indicate a design issue.

Network Pattern (External)

External boundary ports such as HTTP, UDP, or file I/O. These are not connected within the graph as they represent system boundaries. Multiple components binding to the same network address generates an error.

Usage Example

Basic Flow Analysis
import "github.com/c360studio/semstreams/component/flowgraph"

// Create a new flow graph
graph := flowgraph.NewFlowGraph()

// Add components from your flow configuration
for name, comp := range components {
    if err := graph.AddComponentNode(name, comp); err != nil {
        return fmt.Errorf("failed to add component %s: %w", name, err)
    }
}

// Build edges by matching connection patterns
if err := graph.ConnectComponentsByPatterns(); err != nil {
    return fmt.Errorf("flow graph connection failed: %w", err)
}

// Analyze connectivity
result := graph.AnalyzeConnectivity()
if result.ValidationStatus == "warnings" {
    for _, orphan := range result.OrphanedPorts {
        log.Warn("orphaned port detected",
            "component", orphan.ComponentName,
            "port", orphan.PortName,
            "issue", orphan.Issue)
    }
}
JetStream Stream Validation
// Validate JetStream requirements
warnings := graph.ValidateStreamRequirements()
for _, w := range warnings {
    if w.Severity == "critical" {
        return fmt.Errorf("JetStream mismatch: %s", w.Issue)
    }
    log.Warn("stream requirement warning",
        "subscriber", w.SubscriberComp,
        "port", w.SubscriberPort,
        "publishers", w.PublisherComps)
}

Analysis Results

The AnalyzeConnectivity() method returns a FlowAnalysisResult containing:

ConnectedComponents

Clusters of interconnected components found via depth-first search. Multiple clusters indicate isolated subgraphs in the flow, which may be intentional (separate processing pipelines) or indicate configuration errors.

// Example: [["input", "processor", "output"], ["monitor"]]
result.ConnectedComponents
DisconnectedNodes

Components with zero edges. These components have no connections to any other components and may be misconfigured or intentionally standalone (e.g., health monitors).

for _, node := range result.DisconnectedNodes {
    fmt.Printf("Component %s: %s\n", node.ComponentName, node.Issue)
}
OrphanedPorts

Ports with no matching connections, classified by issue type:

  • no_publishers: Input port has no matching output ports (critical for required stream ports)
  • no_subscribers: Output port has no matching input ports (warning for data flow)
  • optional_api_unused: Request ports are optional by design
  • optional_interface_unused: Interface-specific alternative ports (e.g., write-graphable)
  • optional_index_unwatched: KV watch ports may be intentionally unused
for _, orphan := range result.OrphanedPorts {
    fmt.Printf("Port %s.%s: %s (required: %v)\n",
        orphan.ComponentName,
        orphan.PortName,
        orphan.Issue,
        orphan.Required)
}
ValidationStatus

Overall status of the flow graph:

  • healthy: No issues detected, all required ports connected
  • warnings: Issues detected (disconnected nodes, orphaned required ports)

Thread Safety

FlowGraph is NOT safe for concurrent modification. It is designed for single-threaded construction and analysis during service startup. Once analysis is complete, the graph should be treated as read-only.

Integration with Flow Service

The flow service uses flowgraph during initialization to validate component configurations:

// In service initialization
graph := flowgraph.NewFlowGraph()

// Add all components
for name, comp := range service.components {
    graph.AddComponentNode(name, comp)
}

// Validate connections
graph.ConnectComponentsByPatterns()
result := graph.AnalyzeConnectivity()

// Validate JetStream requirements
jsWarnings := graph.ValidateStreamRequirements()

// Fail startup if critical issues detected
if result.ValidationStatus == "warnings" || len(jsWarnings) > 0 {
    return errors.New("flow validation failed")
}

Orphaned Port Classification

The package uses heuristics to classify orphaned ports by severity:

Critical Issues
  • Required stream input ports with no publishers
  • Required stream output ports with no subscribers
Warnings
  • Optional ports with no connections
  • Interface-specific alternative ports (detected by naming patterns like write-graphable)
  • Request/reply ports (optional APIs)
  • KV watch ports (may be intentionally unwatched indexes)
Interface Alternative Detection

Ports are classified as optional interface alternatives if they:

  1. Have an interface contract specified
  2. Are not marked as required
  3. Use naming patterns suggesting specialized variants (e.g., contain -)
  4. Have connection IDs containing .graphable, .typed, or .validated

See Also

Documentation

Overview

Package flowgraph provides flow graph analysis and validation for component connections.

Overview

The flowgraph package builds a directed graph representation of component port connections, enabling static analysis to detect configuration issues before runtime. It supports four interaction patterns (stream, request, watch, network) and provides:

  • Pattern-based connection matching (NATS wildcard subject matching)
  • Connected component analysis (DFS-based clustering)
  • Orphaned port detection with severity classification
  • JetStream stream requirement validation

This enables early detection of misconfigurations such as missing publishers, dangling subscribers, or JetStream/NATS protocol mismatches.

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                           FlowGraph                                     │
├─────────────────────────────────────────────────────────────────────────┤
│  ComponentNode[]          FlowEdge[]           InteractionPattern       │
│  - InputPorts             - From (portref)     - stream (NATS/JS)       │
│  - OutputPorts            - To (portref)       - request (req/reply)    │
│  - Component ref          - Pattern            - watch (KV)             │
│                           - ConnectionID       - network (external)     │
└─────────────────────────────────────────────────────────────────────────┘
                                 ↓
┌─────────────────────────────────────────────────────────────────────────┐
│                       FlowAnalysisResult                                │
│  - ConnectedComponents: clusters of connected components               │
│  - OrphanedPorts: ports with no connections                             │
│  - DisconnectedNodes: components with no edges                          │
│  - ValidationStatus: healthy/warnings                                   │
└─────────────────────────────────────────────────────────────────────────┘

Usage

Build and analyze a flow graph:

graph := flowgraph.NewFlowGraph()

// Add components
for name, comp := range components {
    if err := graph.AddComponentNode(name, comp); err != nil {
        return err
    }
}

// Build edges by matching connection patterns
if err := graph.ConnectComponentsByPatterns(); err != nil {
    return err
}

// Analyze connectivity
result := graph.AnalyzeConnectivity()
if result.ValidationStatus == "warnings" {
    for _, orphan := range result.OrphanedPorts {
        log.Warn("orphaned port", "component", orphan.ComponentName, "port", orphan.PortName)
    }
}

Validate JetStream requirements:

warnings := graph.ValidateStreamRequirements()
for _, w := range warnings {
    if w.Severity == "critical" {
        return fmt.Errorf("JetStream mismatch: %s", w.Issue)
    }
}

Interaction Patterns

PatternStream (NATS, JetStream):

  • Unidirectional: publishers → subscribers
  • Supports NATS wildcard matching (* and >)
  • Multiple publishers/subscribers per subject allowed

PatternRequest (NATS request/reply):

  • Bidirectional: any port can initiate requests
  • All ports with same subject are connected
  • Used for synchronous RPC-style communication

PatternWatch (KV bucket):

  • Unidirectional: writers → watchers
  • Multiple writers to same bucket generates warning
  • Watchers receive change notifications

PatternNetwork (external):

  • External boundary ports (HTTP, UDP, etc.)
  • Exclusive binding: multiple binds to same port is an error
  • Not connected in graph (external endpoints)

Connection Matching

NATS subject matching follows standard semantics:

  • Exact match: "graph.ingest.data" matches "graph.ingest.data"
  • Single wildcard: "graph.*.data" matches "graph.ingest.data"
  • Multi wildcard: "graph.>" matches "graph.ingest.data"
  • Bidirectional: either side can be the pattern

Analysis Results

FlowAnalysisResult contains:

ConnectedComponents: Clusters of interconnected components found via DFS. Multiple clusters indicate isolated subgraphs.

DisconnectedNodes: Components with zero edges. These may be misconfigured or intentionally standalone.

OrphanedPorts: Ports with no matching connections. Classified by issue type:

  • no_publishers: Input port has no matching output ports
  • no_subscribers: Output port has no matching input ports
  • optional_api_unused: Request ports are optional by design
  • optional_interface_unused: Interface-specific alternative ports
  • optional_index_unwatched: KV watch ports may be intentionally unused

ValidationStatus: "healthy" if no issues, "warnings" if any problems detected.

JetStream Validation

ValidateStreamRequirements detects protocol mismatches:

  • JetStream subscribers expect a durable stream to exist
  • Streams are created by JetStream output ports (via EnsureStreams)
  • If a JetStream subscriber connects only to NATS publishers, no stream is created
  • This results in the subscriber hanging indefinitely

The validation returns critical severity warnings for such mismatches.

Thread Safety

FlowGraph is NOT safe for concurrent modification. It is designed for single-threaded construction and analysis during service startup.

See Also

Related packages:

Package flowgraph provides flow graph analysis and validation for component connections.

Package flowgraph provides flow graph analysis and validation for component connections.

Index

Constants

View Source
const (
	IssueNoPublishers            = "no_publishers"
	IssueNoSubscribers           = "no_subscribers"
	IssueOptionalAPIUnused       = "optional_api_unused"
	IssueOptionalIndexUnwatched  = "optional_index_unwatched"
	IssueOptionalInterfaceUnused = "optional_interface_unused"
)

Issue type constants for orphaned port classification

Variables

This section is empty.

Functions

This section is empty.

Types

type ComponentNode

type ComponentNode struct {
	ComponentName string
	Component     component.Discoverable
	InputPorts    []PortInfo
	OutputPorts   []PortInfo
}

ComponentNode represents a component in the flow graph

type ComponentPortRef

type ComponentPortRef struct {
	ComponentName string `json:"component_name"`
	PortName      string `json:"port_name"`
}

ComponentPortRef references a specific port on a component

type DisconnectedNode

type DisconnectedNode struct {
	ComponentName string   `json:"component_name"`
	Issue         string   `json:"issue"`
	Suggestions   []string `json:"suggestions,omitempty"`
}

DisconnectedNode represents a component with no connections

type EdgeMetadata

type EdgeMetadata struct {
	InterfaceContract *component.InterfaceContract `json:"interface_contract,omitempty"`
	Timeout           string                       `json:"timeout,omitempty"` // Request pattern
	Queue             string                       `json:"queue,omitempty"`   // Stream pattern
	Keys              []string                     `json:"keys,omitempty"`    // Watch pattern
}

EdgeMetadata contains pattern-specific metadata

type FlowAnalysisResult

type FlowAnalysisResult struct {
	ConnectedComponents [][]string         `json:"connected_components"`
	ConnectedEdges      []FlowEdge         `json:"connected_edges"`
	DisconnectedNodes   []DisconnectedNode `json:"disconnected_nodes"`
	OrphanedPorts       []OrphanedPort     `json:"orphaned_ports"`
	ValidationStatus    string             `json:"validation_status"`
}

FlowAnalysisResult contains the results of connectivity analysis

type FlowEdge

type FlowEdge struct {
	From         ComponentPortRef   `json:"from"`
	To           ComponentPortRef   `json:"to"`
	Pattern      InteractionPattern `json:"pattern"`
	ConnectionID string             `json:"connection_id"` // Subject, bucket, or network addr
	Metadata     EdgeMetadata       `json:"metadata"`      // Pattern-specific validation data
}

FlowEdge represents a connection between two component ports

type FlowGraph

type FlowGraph struct {
	// contains filtered or unexported fields
}

FlowGraph represents a directed graph of component connections

func NewFlowGraph

func NewFlowGraph() *FlowGraph

NewFlowGraph creates a new empty FlowGraph

func (*FlowGraph) AddComponentNode

func (g *FlowGraph) AddComponentNode(name string, comp component.Discoverable) error

AddComponentNode adds a component as a node in the graph

func (*FlowGraph) AnalyzeConnectivity

func (g *FlowGraph) AnalyzeConnectivity() *FlowAnalysisResult

AnalyzeConnectivity performs graph connectivity analysis

func (*FlowGraph) ConnectComponentsByPatterns

func (g *FlowGraph) ConnectComponentsByPatterns() error

ConnectComponentsByPatterns builds edges by matching connection patterns

func (*FlowGraph) GetEdges

func (g *FlowGraph) GetEdges() []FlowEdge

GetEdges returns the edges in the graph

func (*FlowGraph) GetNodes

func (g *FlowGraph) GetNodes() map[string]*ComponentNode

GetNodes returns a deep copy of component nodes to prevent external modification

func (*FlowGraph) ValidateStreamRequirements

func (g *FlowGraph) ValidateStreamRequirements() []StreamRequirementWarning

ValidateStreamRequirements checks that JetStream subscribers have corresponding JetStream publishers. When a component subscribes via JetStream, it expects a durable stream to exist. Streams are only created by components that publish with JetStream output ports (via EnsureStreams). If a JetStream subscriber is connected only to NATS publishers, no stream will be created and the subscriber will hang waiting for a stream that never appears.

type InteractionPattern

type InteractionPattern string

InteractionPattern defines the type of interaction between components

const (
	// PatternStream represents NATSPort and JetStreamPort interactions
	PatternStream InteractionPattern = "stream"
	// PatternRequest represents NATSRequestPort (bidirectional) interactions
	PatternRequest InteractionPattern = "request"
	// PatternWatch represents KVWatchPort (observation) interactions
	PatternWatch InteractionPattern = "watch"
	// PatternNetwork represents NetworkPort (external) interactions
	PatternNetwork InteractionPattern = "network"
)

type OrphanedPort

type OrphanedPort struct {
	ComponentName string              `json:"component_name"`
	PortName      string              `json:"port_name"`
	Direction     component.Direction `json:"direction"`
	ConnectionID  string              `json:"connection_id"`
	Pattern       InteractionPattern  `json:"pattern"`
	Issue         string              `json:"issue"`
	Required      bool                `json:"required"`
}

OrphanedPort represents a port with no connections

type PortInfo

type PortInfo struct {
	Name         string
	Direction    component.Direction
	ConnectionID string // Subject, bucket, or network address
	Pattern      InteractionPattern
	Interface    *component.InterfaceContract
	Required     bool               // Whether this port is required for the component to function
	PortConfig   component.Portable // Original port configuration for type checking
}

PortInfo contains port metadata for graph analysis

type StreamRequirementWarning

type StreamRequirementWarning struct {
	Severity       string   `json:"severity"`
	SubscriberComp string   `json:"subscriber_component"`
	SubscriberPort string   `json:"subscriber_port"`
	Subjects       []string `json:"subjects"`
	PublisherComps []string `json:"publisher_components"`
	Issue          string   `json:"issue"`
}

StreamRequirementWarning represents a mismatch between JetStream subscriber and NATS publisher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL