flowengine

package
v1.0.0-alpha.39 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: MIT Imports: 15 Imported by: 0

README

FlowEngine - Flow Deployment & Lifecycle Management

The engine package provides flow deployment and lifecycle management for SemStreams applications. It orchestrates component lifecycle, validates flows, and manages the complete deploy → start → stop → undeploy workflow.

Overview

FlowEngine is the runtime execution environment for SemStreams flows. It coordinates with the component registry, configuration manager, and flow store to deploy and manage data processing pipelines.

Quick Start

package main

import (
    "context"
    "log"

    "github.com/c360/semstreams/component"
    "github.com/c360/semstreams/config"
    "github.com/c360/semstreams/engine"
    "github.com/c360/semstreams/flowstore"
    "github.com/c360/semstreams/natsclient"
)

func main() {
    ctx := context.Background()

    // Initialize dependencies
    natsClient, _ := natsclient.NewClient(ctx, cfg.NATS, logger)
    configMgr, _ := config.NewConfigManager(cfg, natsClient, logger)
    flowStore := flowstore.NewStore(natsClient, logger)
    registry := component.NewRegistry()

    // Create engine with metrics
    metricsRegistry := metric.NewMetricsRegistry()
    flowEngine := engine.NewEngine(configMgr, flowStore, registry, natsClient, logger, metricsRegistry)

    // Deploy a flow
    if err := flowEngine.Deploy(ctx, "my-flow-id"); err != nil {
        log.Fatal(err)
    }

    // Start the flow
    if err := flowEngine.Start(ctx, "my-flow-id"); err != nil {
        log.Fatal(err)
    }

    // ... flow is now running ...

    // Stop the flow
    flowEngine.Stop(ctx, "my-flow-id")

    // Undeploy (cleanup)
    flowEngine.Undeploy(ctx, "my-flow-id")
}

Lifecycle Operations

FlowEngine enforces a strict state machine with four lifecycle operations:

1. Deploy

Transition: not_deployed → deployed_stopped

Deploys a flow by:

  • Loading flow definition from flow store
  • Validating flow structure and connections
  • Translating flow nodes to component configurations
  • Adding components to configuration manager
  • Keeping components disabled (not started)
err := flowEngine.Deploy(ctx, "flow-123")
2. Start

Transition: deployed_stopped → running

Starts all components in the flow:

  • Enables components in topological order (inputs first, outputs last)
  • Components begin processing data
  • Metrics recorded for runtime monitoring
err := flowEngine.Start(ctx, "flow-123")
3. Stop

Transition: running → deployed_stopped

Stops all running components:

  • Disables components in reverse topological order (outputs first, inputs last)
  • Graceful shutdown with context timeout
  • Flow definition remains in configuration
err := flowEngine.Stop(ctx, "flow-123")
4. Undeploy

Transition: deployed_stopped → not_deployed

Removes flow from configuration:

  • Removes all component configurations
  • Cleans up resources
  • Flow must be stopped before undeploying
err := flowEngine.Undeploy(ctx, "flow-123")

State Machine

┌─────────────┐
│             │
│ not_deployed│
│             │
└──────┬──────┘
       │ Deploy
       ▼
┌─────────────────┐
│                 │
│ deployed_stopped│◄─┐
│                 │  │
└────┬────────────┘  │
     │ Start      Stop
     ▼               │
┌──────────┐         │
│          │         │
│  running │─────────┘
│          │
└──────────┘

Flow Validation

The engine includes comprehensive flow validation before deployment:

Structural Validation
  • All nodes reference registered component types
  • All connections specify valid ports
  • No dangling connections (source/target must exist)
  • No self-loops (component connected to itself)
Type Validation
  • Port types are compatible across connections
  • Source output ports match target input ports
  • Data type conversions are possible
Semantic Validation
  • Flows have at least one input component
  • Flows have at least one output component
  • No isolated components (must be connected)
  • Topological sort possible (no cycles)
Example Validation
validator := engine.NewValidator(registry, natsClient, logger)
result, err := validator.ValidateFlow(ctx, flow)
if err != nil {
    log.Fatal("Validation failed:", err)
}

if len(result.Errors) > 0 {
    for _, e := range result.Errors {
        log.Printf("ERROR: %s - %s", e.Type, e.Message)
    }
}

if len(result.Warnings) > 0 {
    for _, w := range result.Warnings {
        log.Printf("WARN: %s - %s", w.Type, w.Message)
    }
}

Architecture

The engine package consists of three main components:

Engine

Core orchestration logic for lifecycle operations. Coordinates between:

  • Manager: Runtime component configuration
  • FlowStore: Persistent flow definitions
  • ComponentRegistry: Available component types
  • NATS Client: Message bus connectivity
Validator

Flow validation logic using component/flowgraph for:

  • Graph analysis and cycle detection
  • Port compatibility checking
  • Type validation
  • Semantic correctness
Translator

Converts flow definitions to component configurations:

  • Maps flow nodes to component configs
  • Resolves port connections to NATS subjects
  • Generates unique component names
  • Preserves flow metadata

Error Handling

The engine uses SemStreams's error classification:

import "github.com/c360/semstreams/errors"

err := flowEngine.Deploy(ctx, "invalid-flow")
if err != nil {
    var validationErr *engine.ValidationError
    if errors.As(err, &validationErr) {
        // Flow structure is invalid
        log.Println("Validation errors:", validationErr.Errors)
    }

    if errors.IsInvalid(err) {
        // Invalid input (e.g., flow not found)
        log.Println("Invalid request:", err)
    }

    if errors.IsTransient(err) {
        // Temporary failure (e.g., NATS timeout)
        log.Println("Retry possible:", err)
    }
}

Configuration Integration

The engine integrates with the config package for dynamic component management:

// Component configurations written to NATS KV
// Key pattern: components.{component-name}

{
  "type": "input/udp",
  "name": "flow-123-udp-input-1",
  "enabled": true,  // Controlled by Start/Stop
  "config": {
    "bind_address": "0.0.0.0:14550",
    "output_subject": "flow-123.udp.output"
  }
}

Testing

The engine package includes comprehensive integration tests using testcontainers:

# Run unit tests
go test ./engine/...

# Run with race detection
go test -race ./engine/...

# Run integration tests (requires Docker for testcontainers)
go test -tags=integration ./engine/...

# Run integration tests with verbose output
go test -tags=integration -v ./engine/...

Performance Considerations

  • Deploy/Undeploy: O(n) where n = number of components
  • Start/Stop: O(n) with sequential component state transitions
  • Validation: O(n + e) where n = nodes, e = edges (topological sort)
  • Memory: Flow definition size + component configs cached in Manager

For large flows (100+ components):

  • Consider batching component configuration updates
  • Monitor NATS KV operation latency
  • Use context timeouts for bounded operations

Examples

See engine_integration_test.go for complete examples including:

  • Full lifecycle testing (Deploy → Start → Stop → Undeploy)
  • Error handling for invalid states
  • Validation with real NATS connections
  • Multi-component flow orchestration

Documentation

Overview

Package flowengine translates Flow entities to ComponentConfigs and manages deployment.

Overview

The flowengine package bridges the gap between design-time flows (flowstore) and runtime components (config/component packages). It translates visual flow definitions into deployable component configurations and manages the full deployment lifecycle.

Architecture

The Engine integrates with existing SemStreams infrastructure:

┌─────────────┐
│   FlowUI    │ (Future)
└──────┬──────┘
       │ HTTP POST /flows/deployment/{id}/deploy
       ▼
┌─────────────┐
│ FlowService │ (pkg/service/flow_service.go)
└──────┬──────┘
       │
       ▼
┌─────────────┐     Reads      ┌──────────────┐
│ FlowEngine  │ ──────────────> │  FlowStore   │
│             │                 │ (flowstore)  │
│ - Deploy()  │                 └──────────────┘
│ - Start()   │
│ - Stop()    │     Translates
│ - Undeploy()│ ───────────────────────┐
└──────┬──────┘                        │
       │                               ▼
       │ Writes                 Flow → ComponentConfigs
       │                        (uses ComponentRegistry)
       ▼
┌──────────────────┐
│ Manager KV │ (semstreams_config)
│ (already exists) │
└────────┬─────────┘
         │ Watches
         ▼
┌──────────────────┐
│ ComponentManager │
│   (existing)     │
└──────────────────┘

Key Innovation: Dynamic Component Registry Lookup

Unlike hardcoded factory-to-type mappings, the Engine uses the component registry to dynamically determine component types. This means:

  • Adding new components requires NO changes to flowengine
  • Component type mapping is automatic and correct
  • Follows existing component registration patterns

Implementation:

func (e *Engine) mapFactoryToComponentType(factoryName string) (types.ComponentType, error) {
	// Look up factory in registry (NOT hardcoded switch)
	factories := e.componentRegistry.ListFactories()
	registration := factories[factoryName]
	// Return type from registration metadata
	return registration.Type, nil
}

Deployment Lifecycle

The Engine manages four operations that map to flow runtime states:

1. Deploy (not_deployed → deployed_stopped):

  • Retrieve Flow from flowstore
  • Validate flow structure
  • Translate nodes to ComponentConfigs (using registry)
  • Write to semstreams_config KV
  • Update Flow.RuntimeState

2. Start (deployed_stopped → running):

  • Set all component configs Enabled = true
  • Write to semstreams_config KV
  • Update Flow.RuntimeState

3. Stop (running → deployed_stopped):

  • Set all component configs Enabled = false
  • Write to semstreams_config KV
  • Update Flow.RuntimeState

4. Undeploy (deployed_stopped → not_deployed):

  • Delete all component configs from KV
  • Update Flow.RuntimeState
  • Cannot undeploy running flows (validation error)

Translation Logic

Flow nodes are translated to ComponentConfigs:

FlowNode {                      ComponentConfig {
  ID:       "node-1"               (not used)
  Type:     "udp"          →      Type:    "input"  (via registry lookup)
  Name:     "udp-input-1"  →      Name:    "udp"    (factory name)
  Position: {X:100, Y:100}        (not used)
  Config:   {...}          →      Config:  json.RawMessage
}                           →      Enabled: true
                                 }

Key fields:

  • Node.Name becomes the config key: "components.udp-input-1"
  • Node.Type (factory name) looked up in registry for ComponentType
  • Node.Config marshaled to ComponentConfig.Config
  • ComponentConfig.Enabled set based on operation (deploy=true)

State Transitions

Valid state transitions enforced by the Engine:

not_deployed ──Deploy()──> deployed_stopped ──Start()──> running
      ▲                           │                        │
      │                           │                        │
      └──────Undeploy()───────────┘                        │
                                                            │
                       deployed_stopped <──Stop()──────────┘

Invalid transitions return errs.WrapInvalid.

Integration with Existing Systems

The Engine reuses 100% of existing deployment infrastructure:

Manager:

  • Already watches semstreams_config KV
  • Fires OnChange("components.*") when Engine writes
  • No changes needed

ComponentManager:

  • Already creates/starts/stops components
  • Already respects Enabled field
  • No changes needed

ComponentRegistry:

  • Already contains factory metadata
  • Engine queries for type information
  • No changes needed

Error Handling

Following pkg/errors patterns:

  • WrapInvalid: Flow not found, validation errors, wrong state transitions
  • WrapTransient: NATS KV errors, Manager errors
  • WrapFatal: Marshaling errors (should never happen with valid Go types)

Testing Strategy

Integration tests use real NATS and component registry:

  • TestDeployFlow: Full deploy → verify configs written
  • TestFullLifecycle: Deploy → Start → Stop → Undeploy
  • TestStartNotDeployedFlow: Invalid state transition
  • TestUndeployRunningFlow: Must stop before undeploy

All tests use testcontainers (Constitutional Principle II).

Example Usage

// In FlowService (pkg/service/flow_service.go):

engine := flowengine.NewEngine(configMgr, flowStore, componentRegistry, natsClient, logger, metricsRegistry)

// User clicks "Deploy" in UI
err := engine.Deploy(ctx, "my-flow")
// Flow state: not_deployed → deployed_stopped
// Components created but not started

// User clicks "Start" in UI
err = engine.Start(ctx, "my-flow")
// Flow state: deployed_stopped → running
// Components now processing data

// User clicks "Stop" in UI
err = engine.Stop(ctx, "my-flow")
// Flow state: running → deployed_stopped

// User clicks "Undeploy" in UI
err = engine.Undeploy(ctx, "my-flow")
// Flow state: deployed_stopped → not_deployed
// Components deleted from runtime

Future Enhancements

Validation integration:

  • Currently validateFlow() is a stub
  • Future: Integrate with existing flowgraph package
  • Check connectivity, cycles, port compatibility
  • Use component registry for port metadata

Package Structure

flowengine/
├── doc.go                       # This file
├── engine.go                    # Engine implementation
└── engine_integration_test.go  # Integration tests (9 passing)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DiscoveredConnection

type DiscoveredConnection struct {
	SourceNodeID string `json:"source_node_id"`
	SourcePort   string `json:"source_port"`
	TargetNodeID string `json:"target_node_id"`
	TargetPort   string `json:"target_port"`
	ConnectionID string `json:"connection_id"`
	Pattern      string `json:"pattern"`
}

DiscoveredConnection represents an auto-discovered connection between ports

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine translates Flow entities into ComponentConfigs and manages deployment lifecycle

func NewEngine

func NewEngine(
	configMgr *config.Manager,
	flowStore *flowstore.Store,
	componentRegistry *component.Registry,
	natsClient *natsclient.Client,
	logger *slog.Logger,
	metricsRegistry *metric.MetricsRegistry,
) *Engine

NewEngine creates a new flow engine

func (*Engine) Deploy

func (e *Engine) Deploy(ctx context.Context, flowID string) error

Deploy translates a flow to component configs and writes to semstreams_config KV The existing Manager will detect the changes and ComponentManager will create the components

func (*Engine) Start

func (e *Engine) Start(ctx context.Context, flowID string) error

Start starts all components in a deployed flow This is achieved by updating the "enabled" field in component configs

func (*Engine) Stop

func (e *Engine) Stop(ctx context.Context, flowID string) error

Stop stops all components in a running flow

func (*Engine) Undeploy

func (e *Engine) Undeploy(ctx context.Context, flowID string) error

Undeploy removes all component configs for a flow

func (*Engine) ValidateFlowDefinition

func (e *Engine) ValidateFlowDefinition(flow *flowstore.Flow) (*ValidationResult, error)

ValidateFlowDefinition validates a flow without deploying it Returns full validation results including port information and discovered connections

type ValidatedNode

type ValidatedNode struct {
	ID          string              `json:"id"`
	Component   string              `json:"component"` // Component factory name (e.g., "udp", "graph-processor")
	Type        types.ComponentType `json:"type"`      // Component category (input/processor/output/storage/gateway)
	Name        string              `json:"name"`
	InputPorts  []ValidatedPort     `json:"input_ports"`
	OutputPorts []ValidatedPort     `json:"output_ports"`
}

ValidatedNode represents a flow node with its port information

type ValidatedPort

type ValidatedPort struct {
	Name         string `json:"name"`
	Direction    string `json:"direction"`
	Type         string `json:"type"` // Interface contract type (e.g., "message.Storable")
	Required     bool   `json:"required"`
	ConnectionID string `json:"connection_id"` // NATS subject, network address, etc.
	Pattern      string `json:"pattern"`       // stream, request, watch, api
	Description  string `json:"description"`   // Port description
}

ValidatedPort represents a port with validation information

type ValidationError

type ValidationError struct {
	Result *ValidationResult
}

ValidationError wraps validation results for API responses

func (*ValidationError) Error

func (e *ValidationError) Error() string

type ValidationIssue

type ValidationIssue struct {
	Type          string   `json:"type"`     // "orphaned_port", "disconnected_node", "unknown_component", etc.
	Severity      string   `json:"severity"` // "error", "warning"
	ComponentName string   `json:"component_name"`
	PortName      string   `json:"port_name,omitempty"`
	Message       string   `json:"message"`
	Suggestions   []string `json:"suggestions,omitempty"`
}

ValidationIssue represents a single validation problem

type ValidationResult

type ValidationResult struct {
	Status                string                 `json:"validation_status"` // "valid", "warnings", "errors"
	Errors                []ValidationIssue      `json:"errors"`
	Warnings              []ValidationIssue      `json:"warnings"`
	Nodes                 []ValidatedNode        `json:"nodes"`                  // Nodes with port information
	DiscoveredConnections []DiscoveredConnection `json:"discovered_connections"` // Auto-discovered edges
}

ValidationResult contains the results of flow validation

type Validator

type Validator struct {
	// contains filtered or unexported fields
}

Validator provides flow validation using FlowGraph analysis

func NewValidator

func NewValidator(registry *component.Registry, natsClient *natsclient.Client, logger *slog.Logger) *Validator

NewValidator creates a new flow validator with component registry and NATS client. The validator performs structural, type, and semantic validation of flow definitions before deployment to ensure they can be safely executed.

func (*Validator) ValidateFlow

func (v *Validator) ValidateFlow(flow *flowstore.Flow) (*ValidationResult, error)

ValidateFlow performs comprehensive flow validation using FlowGraph

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL