Documentation
¶
Overview ¶
Package flowstore provides flow persistence and management.
Package flowstore provides persistence for visual flow definitions.
Overview ¶
The flowstore package manages the storage and retrieval of Flow entities, which represent visual flow configurations created by users in the flow builder UI. Flows contain canvas layout information (node positions, connections) and metadata, but do not contain runtime component instances.
Architecture ¶
Flow entities are stored in NATS KV bucket "semstreams_flows" with optimistic concurrency control via version numbers. This is separate from the "semstreams_config" bucket used for runtime component configurations.
Design-time (flowstore):
- User creates/edits flows in UI
- Canvas layout and connections stored as Flow entities
- Metadata: name, description, runtime state
Runtime (config package):
- FlowEngine translates Flow → ComponentConfigs
- ComponentConfigs stored in semstreams_config KV
- Manager watches and triggers ComponentManager
Key Concepts ¶
Flow Entity:
- ID: Unique flow identifier
- Nodes: Visual components on canvas (with positions)
- Connections: Edges between node ports
- RuntimeState: not_deployed, deployed_stopped, running, error
- Version: Optimistic concurrency control
Node vs Component:
- Node.Type: Factory name (e.g., "udp", "graph-processor")
- Node.Name: Instance name (e.g., "udp-input-1")
- Node.Config: Component-specific configuration
Validation ¶
Flow.Validate() checks:
- Required fields (ID, Name, RuntimeState)
- Valid RuntimeState values
- Node completeness (ID, Type, Name)
- No duplicate node IDs
- Connection validity (IDs, ports, node references)
All validation errors use errs.WrapInvalid for consistent error handling.
Optimistic Concurrency ¶
The Store uses version-based conflict detection:
- Create: Sets version to 1
- Update: Checks current version matches, increments on success
- Conflict: Returns errs.WrapInvalid with "conflict" message
Example workflow:
flow, _ := store.Get(ctx, "my-flow") // flow.Version = 5 // Another user updates // Version is now 6 in KV flow.Name = "Updated Name" err := store.Update(ctx, flow) // FAILS - version 5 != 6 // Error contains "conflict"
Integration with FlowEngine ¶
The flowstore package is used by flowengine for deployment:
- FlowEngine.Deploy(flowID) retrieves Flow from flowstore
- Validates flow structure
- Translates to ComponentConfigs (using component registry)
- Writes to semstreams_config KV
- Updates Flow.RuntimeState to deployed_stopped
Testing ¶
Integration tests use testcontainers with real NATS:
- TestCreateAndGet: Basic CRUD
- TestOptimisticConcurrency: Version conflicts
- TestComplexFlow: Nodes and connections
All tests follow Constitutional Principle II (Real Dependencies).
Error Classification ¶
Following pkg/errors patterns:
- WrapInvalid: Bad input, validation failures, version conflicts
- WrapTransient: NATS KV errors, network issues
- WrapFatal: Marshaling errors, nil flow pointer
Example Usage ¶
// Create store
store, err := flowstore.NewStore(natsClient)
// Create flow
flow := &flowstore.Flow{
ID: "my-flow",
Name: "My First Flow",
RuntimeState: flowstore.StateNotDeployed,
Nodes: []flowstore.FlowNode{
{
ID: "node-1",
Type: "udp",
Name: "udp-input-1",
Position: flowstore.Position{X: 100, Y: 100},
Config: map[string]any{"port": 5000},
},
},
Connections: []flowstore.FlowConnection{},
}
err = store.Create(ctx, flow)
// flow.Version now = 1, timestamps set
// Update flow
flow.Name = "Updated Name"
err = store.Update(ctx, flow)
// flow.Version now = 2
Package Structure ¶
flowstore/ ├── doc.go # This file ├── flow.go # Flow entity and validation ├── flow_test.go # Unit tests for validation ├── store.go # KV-based Store implementation └── store_integration_test.go # Integration tests with real NATS
Index ¶
- type Flow
- type FlowConnection
- type FlowNode
- type Position
- type RuntimeState
- type Store
- func (s *Store) Create(ctx context.Context, flow *Flow) error
- func (s *Store) Delete(ctx context.Context, id string) error
- func (s *Store) Get(ctx context.Context, id string) (*Flow, error)
- func (s *Store) List(ctx context.Context) ([]*Flow, error)
- func (s *Store) Update(ctx context.Context, flow *Flow) error
- func (s *Store) Watch(ctx context.Context, pattern string) (jetstream.KeyWatcher, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Flow ¶
type Flow struct {
// Identity
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
// Version for optimistic concurrency control
Version int64 `json:"version"`
// Canvas layout
Nodes []FlowNode `json:"nodes"`
Connections []FlowConnection `json:"connections"`
// Runtime state
RuntimeState RuntimeState `json:"runtime_state"`
DeployedAt *time.Time `json:"deployed_at,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
StoppedAt *time.Time `json:"stopped_at,omitempty"`
// Audit
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CreatedBy string `json:"created_by,omitempty"`
LastModified time.Time `json:"last_modified"`
}
Flow represents a visual flow definition with metadata and canvas layout
func FromComponentConfigs ¶
FromComponentConfigs creates a Flow from component configurations. This bridges static config files to the FlowStore, making headless configs visible in the UI.
The conversion:
- Each ComponentConfig becomes a FlowNode
- Node.ID = config key (e.g., "udp-input")
- Node.Component = cfg.Name (factory name, e.g., "udp")
- Node.Type = cfg.Type (category, e.g., "input", "processor")
- Node.Config = component config as map[string]any
- Positions are auto-calculated using grid layout
Connections are left empty as they require runtime component instances to derive from port subject matching. Users can connect nodes in the UI.
func FromComponentConfigsWithConnections ¶
func FromComponentConfigsWithConnections( name string, configs map[string]types.ComponentConfig, connections []FlowConnection, ) (*Flow, error)
FromComponentConfigsWithConnections creates a Flow with connection inference. This variant accepts pre-computed connections from FlowGraph analysis. Use this when you have access to instantiated components for port matching.
type FlowConnection ¶
type FlowConnection struct {
ID string `json:"id"`
SourceNodeID string `json:"source_node_id"`
SourcePort string `json:"source_port"`
TargetNodeID string `json:"target_node_id"`
TargetPort string `json:"target_port"`
}
FlowConnection represents a connection between two component ports
type FlowNode ¶
type FlowNode struct {
ID string `json:"id"` // Unique instance ID
Component string `json:"component"` // Component factory name (e.g., "udp", "graph-processor")
Type types.ComponentType `json:"type"` // Component category (input/processor/output/storage/gateway)
Name string `json:"name"` // Instance name
Position Position `json:"position"` // Canvas coordinates
Config map[string]any `json:"config"` // Component configuration
}
FlowNode represents a component instance on the canvas
type RuntimeState ¶
type RuntimeState string
RuntimeState represents the deployment and execution state of a flow
const ( StateNotDeployed RuntimeState = "not_deployed" StateDeployedStopped RuntimeState = "deployed_stopped" StateRunning RuntimeState = "running" StateError RuntimeState = "error" )
RuntimeState constants define the lifecycle states of a flow:
- StateNotDeployed: Flow exists but has never been deployed
- StateDeployedStopped: Flow deployed to config but not running
- StateRunning: Flow is actively processing messages
- StateError: Flow encountered an error during deployment/execution
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store provides persistence for Flow entities using NATS KV
func NewStore ¶
func NewStore(natsClient *natsclient.Client) (*Store, error)
NewStore creates a new flow store