Documentation
¶
Overview ¶
Package flowengine translates Flow entities to ComponentConfigs and manages deployment.
Overview ¶
The flowengine package bridges the gap between design-time flows (flowstore) and runtime components (config/component packages). It translates visual flow definitions into deployable component configurations and manages the full deployment lifecycle.
Architecture ¶
The Engine integrates with existing SemStreams infrastructure:
┌─────────────┐
│ FlowUI │ (Future)
└──────┬──────┘
│ HTTP POST /flows/deployment/{id}/deploy
▼
┌─────────────┐
│ FlowService │ (pkg/service/flow_service.go)
└──────┬──────┘
│
▼
┌─────────────┐ Reads ┌──────────────┐
│ FlowEngine │ ──────────────> │ FlowStore │
│ │ │ (flowstore) │
│ - Deploy() │ └──────────────┘
│ - Start() │
│ - Stop() │ Translates
│ - Undeploy()│ ───────────────────────┐
└──────┬──────┘ │
│ ▼
│ Writes Flow → ComponentConfigs
│ (uses ComponentRegistry)
▼
┌──────────────────┐
│ Manager KV │ (semstreams_config)
│ (already exists) │
└────────┬─────────┘
│ Watches
▼
┌──────────────────┐
│ ComponentManager │
│ (existing) │
└──────────────────┘
Key Innovation: Dynamic Component Registry Lookup ¶
Unlike hardcoded factory-to-type mappings, the Engine uses the component registry to dynamically determine component types. This means:
- Adding new components requires NO changes to flowengine
- Component type mapping is automatic and correct
- Follows existing component registration patterns
Implementation:
func (e *Engine) mapFactoryToComponentType(factoryName string) (types.ComponentType, error) {
// Look up factory in registry (NOT hardcoded switch)
factories := e.componentRegistry.ListFactories()
registration := factories[factoryName]
// Return type from registration metadata
return registration.Type, nil
}
Deployment Lifecycle ¶
The Engine manages four operations that map to flow runtime states:
1. Deploy (not_deployed → deployed_stopped):
- Retrieve Flow from flowstore
- Validate flow structure
- Translate nodes to ComponentConfigs (using registry)
- Write to semstreams_config KV
- Update Flow.RuntimeState
2. Start (deployed_stopped → running):
- Set all component configs Enabled = true
- Write to semstreams_config KV
- Update Flow.RuntimeState
3. Stop (running → deployed_stopped):
- Set all component configs Enabled = false
- Write to semstreams_config KV
- Update Flow.RuntimeState
4. Undeploy (deployed_stopped → not_deployed):
- Delete all component configs from KV
- Update Flow.RuntimeState
- Cannot undeploy running flows (validation error)
Translation Logic ¶
Flow nodes are translated to ComponentConfigs:
FlowNode { ComponentConfig {
ID: "node-1" (not used)
Type: "udp" → Type: "input" (via registry lookup)
Name: "udp-input-1" → Name: "udp" (factory name)
Position: {X:100, Y:100} (not used)
Config: {...} → Config: json.RawMessage
} → Enabled: true
}
Key fields:
- Node.Name becomes the config key: "components.udp-input-1"
- Node.Type (factory name) looked up in registry for ComponentType
- Node.Config marshaled to ComponentConfig.Config
- ComponentConfig.Enabled set based on operation (deploy=true)
State Transitions ¶
Valid state transitions enforced by the Engine:
not_deployed ──Deploy()──> deployed_stopped ──Start()──> running
▲ │ │
│ │ │
└──────Undeploy()───────────┘ │
│
deployed_stopped <──Stop()──────────┘
Invalid transitions return errs.WrapInvalid.
Integration with Existing Systems ¶
The Engine reuses 100% of existing deployment infrastructure:
Manager:
- Already watches semstreams_config KV
- Fires OnChange("components.*") when Engine writes
- No changes needed
ComponentManager:
- Already creates/starts/stops components
- Already respects Enabled field
- No changes needed
ComponentRegistry:
- Already contains factory metadata
- Engine queries for type information
- No changes needed
Error Handling ¶
Following pkg/errors patterns:
- WrapInvalid: Flow not found, validation errors, wrong state transitions
- WrapTransient: NATS KV errors, Manager errors
- WrapFatal: Marshaling errors (should never happen with valid Go types)
Testing Strategy ¶
Integration tests use real NATS and component registry:
- TestDeployFlow: Full deploy → verify configs written
- TestFullLifecycle: Deploy → Start → Stop → Undeploy
- TestStartNotDeployedFlow: Invalid state transition
- TestUndeployRunningFlow: Must stop before undeploy
All tests use testcontainers (Constitutional Principle II).
Example Usage ¶
// In FlowService (pkg/service/flow_service.go): engine := flowengine.NewEngine(configMgr, flowStore, componentRegistry, natsClient, logger, metricsRegistry) // User clicks "Deploy" in UI err := engine.Deploy(ctx, "my-flow") // Flow state: not_deployed → deployed_stopped // Components created but not started // User clicks "Start" in UI err = engine.Start(ctx, "my-flow") // Flow state: deployed_stopped → running // Components now processing data // User clicks "Stop" in UI err = engine.Stop(ctx, "my-flow") // Flow state: running → deployed_stopped // User clicks "Undeploy" in UI err = engine.Undeploy(ctx, "my-flow") // Flow state: deployed_stopped → not_deployed // Components deleted from runtime
Future Enhancements ¶
Validation integration:
- Currently validateFlow() is a stub
- Future: Integrate with existing flowgraph package
- Check connectivity, cycles, port compatibility
- Use component registry for port metadata
Package Structure ¶
flowengine/ ├── doc.go # This file ├── engine.go # Engine implementation └── engine_integration_test.go # Integration tests (9 passing)
Index ¶
- type DiscoveredConnection
- type Engine
- func (e *Engine) Deploy(ctx context.Context, flowID string) error
- func (e *Engine) Start(ctx context.Context, flowID string) error
- func (e *Engine) Stop(ctx context.Context, flowID string) error
- func (e *Engine) Undeploy(ctx context.Context, flowID string) error
- func (e *Engine) ValidateFlowDefinition(flow *flowstore.Flow) (*ValidationResult, error)
- type ValidatedNode
- type ValidatedPort
- type ValidationError
- type ValidationIssue
- type ValidationResult
- type Validator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DiscoveredConnection ¶
type DiscoveredConnection struct {
SourceNodeID string `json:"source_node_id"`
SourcePort string `json:"source_port"`
TargetNodeID string `json:"target_node_id"`
TargetPort string `json:"target_port"`
ConnectionID string `json:"connection_id"`
Pattern string `json:"pattern"`
}
DiscoveredConnection represents an auto-discovered connection between ports
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine translates Flow entities into ComponentConfigs and manages deployment lifecycle
func NewEngine ¶
func NewEngine( configMgr *config.Manager, flowStore *flowstore.Store, componentRegistry *component.Registry, natsClient *natsclient.Client, logger *slog.Logger, metricsRegistry *metric.MetricsRegistry, ) *Engine
NewEngine creates a new flow engine
func (*Engine) Deploy ¶
Deploy translates a flow to component configs and writes to semstreams_config KV The existing Manager will detect the changes and ComponentManager will create the components
func (*Engine) Start ¶
Start starts all components in a deployed flow This is achieved by updating the "enabled" field in component configs
func (*Engine) ValidateFlowDefinition ¶
func (e *Engine) ValidateFlowDefinition(flow *flowstore.Flow) (*ValidationResult, error)
ValidateFlowDefinition validates a flow without deploying it Returns full validation results including port information and discovered connections
type ValidatedNode ¶
type ValidatedNode struct {
ID string `json:"id"`
Component string `json:"component"` // Component factory name (e.g., "udp", "graph-processor")
Type types.ComponentType `json:"type"` // Component category (input/processor/output/storage/gateway)
Name string `json:"name"`
InputPorts []ValidatedPort `json:"input_ports"`
OutputPorts []ValidatedPort `json:"output_ports"`
}
ValidatedNode represents a flow node with its port information
type ValidatedPort ¶
type ValidatedPort struct {
Name string `json:"name"`
Direction string `json:"direction"`
Type string `json:"type"` // Interface contract type (e.g., "message.Storable")
Required bool `json:"required"`
ConnectionID string `json:"connection_id"` // NATS subject, network address, etc.
Pattern string `json:"pattern"` // stream, request, watch, api
Description string `json:"description"` // Port description
}
ValidatedPort represents a port with validation information
type ValidationError ¶
type ValidationError struct {
Result *ValidationResult
}
ValidationError wraps validation results for API responses
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
type ValidationIssue ¶
type ValidationIssue struct {
Type string `json:"type"` // "orphaned_port", "disconnected_node", "unknown_component", etc.
Severity string `json:"severity"` // "error", "warning"
ComponentName string `json:"component_name"`
PortName string `json:"port_name,omitempty"`
Message string `json:"message"`
Suggestions []string `json:"suggestions,omitempty"`
}
ValidationIssue represents a single validation problem
type ValidationResult ¶
type ValidationResult struct {
Status string `json:"validation_status"` // "valid", "warnings", "errors"
Errors []ValidationIssue `json:"errors"`
Warnings []ValidationIssue `json:"warnings"`
Nodes []ValidatedNode `json:"nodes"` // Nodes with port information
DiscoveredConnections []DiscoveredConnection `json:"discovered_connections"` // Auto-discovered edges
}
ValidationResult contains the results of flow validation
type Validator ¶
type Validator struct {
// contains filtered or unexported fields
}
Validator provides flow validation using FlowGraph analysis
func NewValidator ¶
func NewValidator(registry *component.Registry, natsClient *natsclient.Client, logger *slog.Logger) *Validator
NewValidator creates a new flow validator with component registry and NATS client. The validator performs structural, type, and semantic validation of flow definitions before deployment to ensure they can be safely executed.
func (*Validator) ValidateFlow ¶
func (v *Validator) ValidateFlow(flow *flowstore.Flow) (*ValidationResult, error)
ValidateFlow performs comprehensive flow validation using FlowGraph