flowstore

package
v1.0.0-alpha.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: MIT Imports: 10 Imported by: 0

README

FlowStore - Visual Flow Persistence

Persistent storage for visual flow definitions in the SemStreams framework.

Overview

FlowStore manages the design-time representation of flows (canvas layouts, node positions, connections) separately from runtime component configurations. Flows are stored in NATS KV with optimistic concurrency control.

Installation

import "github.com/c360/semstreams/flowstore"

Quick Start

import "github.com/c360/semstreams/flowstore"

// Create store
store, err := flowstore.NewStore(natsClient)

// Create flow
flow := &flowstore.Flow{
    ID:   "my-flow",
    Name: "My First Flow",
    RuntimeState: flowstore.StateNotDeployed,
    Nodes: []flowstore.FlowNode{
        {
            ID:       "node-1",
            Type:     "udp",
            Name:     "UDP Input",
            Position: flowstore.Position{X: 100, Y: 100},
            Config:   map[string]any{"port": 5000},
        },
    },
}

err = store.Create(ctx, flow)
// flow.Version is now 1

// Update with optimistic concurrency
flow.Name = "Updated Name"
err = store.Update(ctx, flow)
// flow.Version is now 2

Features

  • CRUD Operations: Create, Read, Update, Delete flows
  • Optimistic Concurrency: Version-based conflict detection
  • Validation: Structural validation of nodes and connections
  • Integration: Works with FlowEngine for deployment

Architecture

Design-time (flowstore):

  • User creates/edits flows in UI
  • Canvas layout stored as Flow entities in semstreams_flows KV bucket

Runtime (config package):

  • FlowEngine translates Flow → ComponentConfigs
  • Deployed configs stored in semstreams_config KV bucket

API Reference

Core Types
type Flow struct {
    ID           string          // Unique flow identifier
    Name         string          // Human-readable name
    Description  string          // Optional description
    Version      uint64          // Optimistic concurrency version
    RuntimeState RuntimeState    // Current deployment state
    Nodes        []FlowNode      // Visual nodes on canvas
    Connections  []FlowConnection // Edges between nodes
    CreatedAt    time.Time       // Creation timestamp
    UpdatedAt    time.Time       // Last update timestamp
}

type FlowNode struct {
    ID       string                 // Unique node ID within flow
    Type     string                 // Component type (udp, http, etc)
    Name     string                 // Display name
    Position Position               // Canvas coordinates
    Config   map[string]any // Component configuration
}

type FlowConnection struct {
    ID     string // Unique connection ID
    Source string // Source node ID
    Target string // Target node ID
}
Store Operations
// Create a new flow (version starts at 1)
func (s *Store) Create(ctx context.Context, flow *Flow) error

// Get flow by ID
func (s *Store) Get(ctx context.Context, id string) (*Flow, error)

// Update existing flow (optimistic concurrency via version)
func (s *Store) Update(ctx context.Context, flow *Flow) error

// Delete flow
func (s *Store) Delete(ctx context.Context, id string) error

// List all flows
func (s *Store) List(ctx context.Context) ([]*Flow, error)

Optimistic Concurrency

FlowStore uses version-based optimistic concurrency control:

// User A retrieves flow
flowA, _ := store.Get(ctx, "flow-1")  // Version: 5

// User B retrieves same flow
flowB, _ := store.Get(ctx, "flow-1")  // Version: 5

// User A updates successfully
flowA.Name = "Updated by A"
store.Update(ctx, flowA)  // Version becomes 6

// User B's update fails with version conflict
flowB.Name = "Updated by B"
err := store.Update(ctx, flowB)  // Error: version mismatch
// err contains: "expected 6, got 5"

Validation

Flows are validated before Create/Update:

flow := &flowstore.Flow{
    ID:   "",  // ❌ Empty ID
    Name: "My Flow",
}

err := store.Create(ctx, flow)
// Error: "flow ID cannot be empty"

Validation Rules:

  • Flow ID and Name cannot be empty
  • Node IDs must be unique within flow
  • Node Type and ID cannot be empty
  • Connection Source and Target must reference existing nodes
  • No circular dependencies (future enhancement)

Testing

# Unit tests
go test ./flowstore -v -short

# Integration tests (requires Docker)
go test ./flowstore -v

# Coverage
go test ./flowstore -coverprofile=cover.out
go tool cover -html=cover.out

Current coverage: 81.6%

Error Handling

FlowStore uses SemStreams's error classification system:

err := store.Update(ctx, flow)
if err != nil {
    if errors.IsInvalid(err) {
        // Validation error or version conflict - fix input
        log.Printf("Invalid update: %v", err)
    } else if errors.IsTransient(err) {
        // Network/NATS error - retry with backoff
        log.Printf("Transient error: %v", err)
    }
}

Error Classes:

  • Invalid: Empty IDs, version conflicts, validation failures
  • Transient: NATS connectivity issues, context timeouts
  • Fatal: JSON marshaling errors (should never occur)

Integration with FlowEngine

import (
    "github.com/c360/semstreams/flowstore"
    "github.com/c360/semstreams/engine"
)

// Store design-time flow
flow := &flowstore.Flow{...}
flowStore.Create(ctx, flow)

// Deploy to runtime
engine := engine.NewEngine(flowStore, configMgr, compRegistry, natsClient)
err := engine.DeployFlow(ctx, flow.ID)
// FlowEngine translates flowstore.Flow → config.ComponentConfig

Example: Complete Workflow

package main

import (
    "context"
    "log"

    "github.com/c360/semstreams/flowstore"
    "github.com/c360/semstreams/natsclient"
)

func main() {
    ctx := context.Background()

    // Setup
    natsClient, _ := natsclient.New("nats://localhost:4222")
    store, _ := flowstore.NewStore(natsClient)

    // Create flow with UDP input and HTTP output
    flow := &flowstore.Flow{
        ID:   "sensor-flow",
        Name: "Sensor Data Pipeline",
        RuntimeState: flowstore.StateNotDeployed,
        Nodes: []flowstore.FlowNode{
            {
                ID:       "udp-input",
                Type:     "udp",
                Name:     "Sensor Input",
                Position: flowstore.Position{X: 100, Y: 100},
                Config:   map[string]any{"port": 5000},
            },
            {
                ID:       "http-output",
                Type:     "httppost",
                Name:     "API Output",
                Position: flowstore.Position{X: 400, Y: 100},
                Config:   map[string]any{"url": "https://api.example.com/data"},
            },
        },
        Connections: []flowstore.FlowConnection{
            {ID: "conn-1", Source: "udp-input", Target: "http-output"},
        },
    }

    // Persist flow
    if err := store.Create(ctx, flow); err != nil {
        log.Fatalf("Failed to create flow: %v", err)
    }
    log.Printf("Created flow %s with version %d", flow.ID, flow.Version)

    // Update flow
    flow.Name = "Updated Sensor Pipeline"
    if err := store.Update(ctx, flow); err != nil {
        log.Fatalf("Failed to update flow: %v", err)
    }
    log.Printf("Updated flow to version %d", flow.Version)

    // List all flows
    flows, _ := store.List(ctx)
    log.Printf("Total flows: %d", len(flows))
}

Documentation

License

Copyright 2025 C360

Documentation

Overview

Package flowstore provides flow persistence and management.

Package flowstore provides persistence for visual flow definitions.

Overview

The flowstore package manages the storage and retrieval of Flow entities, which represent visual flow configurations created by users in the flow builder UI. Flows contain canvas layout information (node positions, connections) and metadata, but do not contain runtime component instances.

Architecture

Flow entities are stored in NATS KV bucket "semstreams_flows" with optimistic concurrency control via version numbers. This is separate from the "semstreams_config" bucket used for runtime component configurations.

Design-time (flowstore):

  • User creates/edits flows in UI
  • Canvas layout and connections stored as Flow entities
  • Metadata: name, description, runtime state

Runtime (config package):

  • FlowEngine translates Flow → ComponentConfigs
  • ComponentConfigs stored in semstreams_config KV
  • Manager watches and triggers ComponentManager

Key Concepts

Flow Entity:

  • ID: Unique flow identifier
  • Nodes: Visual components on canvas (with positions)
  • Connections: Edges between node ports
  • RuntimeState: not_deployed, deployed_stopped, running, error
  • Version: Optimistic concurrency control

Node vs Component:

  • Node.Type: Factory name (e.g., "udp", "graph-processor")
  • Node.Name: Instance name (e.g., "udp-input-1")
  • Node.Config: Component-specific configuration

Validation

Flow.Validate() checks:

  • Required fields (ID, Name, RuntimeState)
  • Valid RuntimeState values
  • Node completeness (ID, Type, Name)
  • No duplicate node IDs
  • Connection validity (IDs, ports, node references)

All validation errors use errs.WrapInvalid for consistent error handling.

Optimistic Concurrency

The Store uses version-based conflict detection:

  • Create: Sets version to 1
  • Update: Checks current version matches, increments on success
  • Conflict: Returns errs.WrapInvalid with "conflict" message

Example workflow:

flow, _ := store.Get(ctx, "my-flow")
// flow.Version = 5

// Another user updates
// Version is now 6 in KV

flow.Name = "Updated Name"
err := store.Update(ctx, flow) // FAILS - version 5 != 6
// Error contains "conflict"

Integration with FlowEngine

The flowstore package is used by flowengine for deployment:

  1. FlowEngine.Deploy(flowID) retrieves Flow from flowstore
  2. Validates flow structure
  3. Translates to ComponentConfigs (using component registry)
  4. Writes to semstreams_config KV
  5. Updates Flow.RuntimeState to deployed_stopped

Testing

Integration tests use testcontainers with real NATS:

  • TestCreateAndGet: Basic CRUD
  • TestOptimisticConcurrency: Version conflicts
  • TestComplexFlow: Nodes and connections

All tests follow Constitutional Principle II (Real Dependencies).

Error Classification

Following pkg/errors patterns:

  • WrapInvalid: Bad input, validation failures, version conflicts
  • WrapTransient: NATS KV errors, network issues
  • WrapFatal: Marshaling errors, nil flow pointer

Example Usage

// Create store
store, err := flowstore.NewStore(natsClient)

// Create flow
flow := &flowstore.Flow{
	ID:   "my-flow",
	Name: "My First Flow",
	RuntimeState: flowstore.StateNotDeployed,
	Nodes: []flowstore.FlowNode{
		{
			ID:       "node-1",
			Type:     "udp",
			Name:     "udp-input-1",
			Position: flowstore.Position{X: 100, Y: 100},
			Config:   map[string]any{"port": 5000},
		},
	},
	Connections: []flowstore.FlowConnection{},
}

err = store.Create(ctx, flow)
// flow.Version now = 1, timestamps set

// Update flow
flow.Name = "Updated Name"
err = store.Update(ctx, flow)
// flow.Version now = 2

Package Structure

flowstore/
├── doc.go                       # This file
├── flow.go                      # Flow entity and validation
├── flow_test.go                 # Unit tests for validation
├── store.go                     # KV-based Store implementation
└── store_integration_test.go   # Integration tests with real NATS

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Flow

type Flow struct {
	// Identity
	ID          string `json:"id"`
	Name        string `json:"name"`
	Description string `json:"description,omitempty"`

	// Version for optimistic concurrency control
	Version int64 `json:"version"`

	// Canvas layout
	Nodes       []FlowNode       `json:"nodes"`
	Connections []FlowConnection `json:"connections"`

	// Runtime state
	RuntimeState RuntimeState `json:"runtime_state"`
	DeployedAt   *time.Time   `json:"deployed_at,omitempty"`
	StartedAt    *time.Time   `json:"started_at,omitempty"`
	StoppedAt    *time.Time   `json:"stopped_at,omitempty"`

	// Audit
	CreatedAt    time.Time `json:"created_at"`
	UpdatedAt    time.Time `json:"updated_at"`
	CreatedBy    string    `json:"created_by,omitempty"`
	LastModified time.Time `json:"last_modified"`
}

Flow represents a visual flow definition with metadata and canvas layout

func FromComponentConfigs

func FromComponentConfigs(name string, configs map[string]types.ComponentConfig) (*Flow, error)

FromComponentConfigs creates a Flow from component configurations. This bridges static config files to the FlowStore, making headless configs visible in the UI.

The conversion:

  • Each ComponentConfig becomes a FlowNode
  • Node.ID = config key (e.g., "udp-input")
  • Node.Component = cfg.Name (factory name, e.g., "udp")
  • Node.Type = cfg.Type (category, e.g., "input", "processor")
  • Node.Config = component config as map[string]any
  • Positions are auto-calculated using grid layout

Connections are left empty as they require runtime component instances to derive from port subject matching. Users can connect nodes in the UI.

func FromComponentConfigsWithConnections

func FromComponentConfigsWithConnections(
	name string,
	configs map[string]types.ComponentConfig,
	connections []FlowConnection,
) (*Flow, error)

FromComponentConfigsWithConnections creates a Flow with connection inference. This variant accepts pre-computed connections from FlowGraph analysis. Use this when you have access to instantiated components for port matching.

func (*Flow) Validate

func (f *Flow) Validate() error

Validate checks if the flow is valid for deployment

type FlowConnection

type FlowConnection struct {
	ID           string `json:"id"`
	SourceNodeID string `json:"source_node_id"`
	SourcePort   string `json:"source_port"`
	TargetNodeID string `json:"target_node_id"`
	TargetPort   string `json:"target_port"`
}

FlowConnection represents a connection between two component ports

type FlowNode

type FlowNode struct {
	ID        string              `json:"id"`        // Unique instance ID
	Component string              `json:"component"` // Component factory name (e.g., "udp", "graph-processor")
	Type      types.ComponentType `json:"type"`      // Component category (input/processor/output/storage/gateway)
	Name      string              `json:"name"`      // Instance name
	Position  Position            `json:"position"`  // Canvas coordinates
	Config    map[string]any      `json:"config"`    // Component configuration
}

FlowNode represents a component instance on the canvas

type Position

type Position struct {
	X float64 `json:"x"`
	Y float64 `json:"y"`
}

Position represents canvas coordinates for a node

type RuntimeState

type RuntimeState string

RuntimeState represents the deployment and execution state of a flow

const (
	StateNotDeployed     RuntimeState = "not_deployed"
	StateDeployedStopped RuntimeState = "deployed_stopped"
	StateRunning         RuntimeState = "running"
	StateError           RuntimeState = "error"
)

RuntimeState constants define the lifecycle states of a flow:

  • StateNotDeployed: Flow exists but has never been deployed
  • StateDeployedStopped: Flow deployed to config but not running
  • StateRunning: Flow is actively processing messages
  • StateError: Flow encountered an error during deployment/execution

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store provides persistence for Flow entities using NATS KV

func NewStore

func NewStore(natsClient *natsclient.Client) (*Store, error)

NewStore creates a new flow store

func (*Store) Create

func (s *Store) Create(ctx context.Context, flow *Flow) error

Create creates a new flow

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete removes a flow by ID

func (*Store) Get

func (s *Store) Get(ctx context.Context, id string) (*Flow, error)

Get retrieves a flow by ID

func (*Store) List

func (s *Store) List(ctx context.Context) ([]*Flow, error)

List retrieves all flows

func (*Store) Update

func (s *Store) Update(ctx context.Context, flow *Flow) error

Update updates an existing flow with optimistic concurrency control

func (*Store) Watch

func (s *Store) Watch(ctx context.Context, pattern string) (jetstream.KeyWatcher, error)

Watch watches for changes to flows matching the pattern. Pattern supports wildcards: "*" matches any single token, ">" matches remaining tokens. Returns a KeyWatcher that emits updates on its Updates() channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL