README
¶
TinySystems Module SDK
A Kubebuilder-based Kubernetes operator SDK for building flow-based workflow engines. This SDK provides the complete infrastructure for developing modular operators that can be composed into visual workflows.
Overview
TinySystems Module SDK enables developers to create module operators (like common-module, http-module, grpc-module) that bring specific functionality into a Kubernetes-native flow engine. Each module provides reusable components that can be connected through a port-based architecture to create complex workflows.
Key Features
- Port-Based Component Architecture: Visual programming model with input/output ports
- JSON Schema-Driven Configuration: Automatic schema generation with UI hints
- Message Routing & Retry Logic: Intelligent routing with exponential backoff
- Multi-Module Communication: gRPC-based inter-module communication
- Expression-Based Data Transformation: Mustache-style
{{expression}}syntax with JSONPath for flexible data mapping - Kubernetes-Native: Everything is a CRD with standard controller patterns
- OpenTelemetry Integration: Built-in observability with tracing and metrics
- CLI Tools: Complete tooling for running and building modules
Table of Contents
- Overview
- Architecture
- Getting Started
- Helm Charts
- Development
- Module Development
- Contributing
- License
Architecture
Core Concepts
The SDK is built around several key abstractions:
Custom Resource Definitions (CRDs)
-
TinyNode: Core execution unit representing a component instance in a flow
- Contains module name, version, component type, port configurations, and edges
- Status includes module/component info, port schemas, and error state
-
TinyModule: Registry of available modules for service discovery
- References container image
- Status contains module address (gRPC), version, and list of components
-
TinyFlow: Logical grouping of nodes representing a workflow
- Uses labels to associate nodes together
-
TinyProject: Top-level organizational unit grouping multiple flows
-
TinySignal: External trigger mechanism for node execution
- Specifies target node, port, and data payload
- Works like webhooks or manual triggers
-
TinyTracker: Execution monitoring for detailed tracing
-
TinyWidgetPage: Custom UI dashboard pages for visualization
Component System
Components are the building blocks of workflows. Each component:
- Implements the
Componentinterface (module/component.go) - Defines ports for input/output with positions (Top/Right/Bottom/Left)
- Handles messages through the
Handle()method - Provides JSON Schema configuration for each port
Message Flow
External Trigger (TinySignal)
→ TinySignal Controller
→ Scheduler.Handle()
→ Runner.MsgHandler()
→ Component.Handle()
→ output() callback
→ Next node via edges
Port System
Ports enable component communication:
- Source Ports: Output ports that send data
- Target Ports: Input ports that receive data
- System Ports:
_reconcile: Triggers node reconciliation_client: Receives Kubernetes client for resource operations_settings: Configuration port_control: Dashboard control port
Each port can have:
- Configuration: JSON schema defining expected input structure
- Response Configuration: Schema for output data structure
Edge Configuration
Edges connect ports between nodes and support data transformation using mustache-style expressions:
Expression Syntax: Use {{expression}} to evaluate JSONPath expressions against incoming data.
{
"body": "{{$.request.body}}",
"statusCode": 200,
"greeting": "Hello {{$.user.name}}!",
"isAdmin": "{{$.user.role == 'admin'}}"
}
Expression Types:
- Pure expression (
"{{$.field}}") - Returns the actual type (string, number, boolean, object) - Literal value (
"hello",200,true) - Static values passed through as-is - String interpolation (
"Hello {{$.name}}!") - Embeds expression results in strings - JSONPath with operators (
"{{$.count + 1}}","{{$.method == 'GET'}}") - Supports arithmetic and comparison
Key Features:
- Type preservation:
"{{$.count}}"returns a number, not a string - Graceful error handling: If source data is unavailable, expressions return
nil - Full JSONPath support via ajson library
SDK Components
The SDK provides several packages for module developers:
/module/ - Core Interfaces
Component: Main interface for component implementationPort: Port definitions and configurationHandler: Callback function type for message routing
/pkg/resource/ - Resource Manager
Unified Kubernetes client providing:
- Node/Module/Flow/Project CRUD operations
- Signal creation for triggering nodes
- Ingress/Service management for exposing ports
- Helm release management
/pkg/schema/ - JSON Schema Generator
- Automatic schema generation from Go structs
- Custom struct tags for UI hints:
configurable,shared,propertyOrder,tab,align - Definition sharing and override mechanism
/pkg/evaluator/ - Expression Evaluator
- Mustache-style
{{expression}}syntax processing - JSONPath evaluation via ajson library
- Type-preserving evaluation (numbers stay numbers, booleans stay booleans)
- Graceful error handling when source data is unavailable
/pkg/metrics/ - Observability
- OpenTelemetry integration with spans
- Metrics (gauges, counters)
- Tracker system for flow execution monitoring
/internal/scheduler/ - Message Routing
- Manages component instances
- Routes messages between nodes
- Handles cross-module communication via gRPC
/internal/client/ - Client Pool
- Manages gRPC connections to other modules
- Connection pooling and lifecycle management
/cli/ - Command-Line Tools
run: Complete operator runtimebuild: Module building and publishing
Communication Patterns
Same-Module Communication
When nodes belong to the same module, messages are routed directly through the scheduler for optimal performance.
Cross-Module Communication
When nodes belong to different modules:
- Scheduler identifies the target module via TinyModule CRD
- Client pool establishes/reuses gRPC connection
- Message is sent to target module's gRPC server
- Target module's scheduler routes to the appropriate component
Retry Mechanism
- Transient Errors: Exponential backoff (1s → 30s max)
- Permanent Errors: Marked via
PermanentErrorwrapper to stop retries - Context cancellation stops all retries
Design Patterns
- Eventual Consistency with Reconciliation: Periodic reconciliation (every 5 minutes) plus signal-based immediate updates
- Leader Election with Metadata Sync: Leader handles metadata updates for control operations; all pods execute node logic and sync state from TinyNode metadata
- Schema-Driven Configuration: Go structs automatically generate JSON schemas for UI integration
- Expression-Based Transformation: Mustache-style
{{expression}}syntax with JSONPath enables flexible data mapping without code changes - Definition Sharing: Components mark fields as
shared:trueorconfigurable:truefor cross-node type safety
Scalability
The SDK implements a metadata-synchronization pattern that enables horizontal scaling of module operators while maintaining consistency across replicas.
Core Scalability Mechanism
Leader Election (for TinySignal processing):
- Uses Kubernetes native leader election via
k8s.io/client-go/tools/leaderelectionwith Lease resources - Configuration: 15s lease duration, 10s renew deadline, 2s retry period
- Each pod identifies itself using the
HOSTNAMEenvironment variable - Leadership state tracked via
isLeaderandleadershipKnownatomic booleans - Purpose: TinySignal broadcasts reconcile events to ALL pods; only leader processes them to avoid N-fold multiplication
Metadata as Source of Truth:
- TinyNode
Status.Metadataacts as distributed state store - Any pod can update metadata (not just leader)
- All pods sync state from metadata via
ReconcilePort - Non-leaders requeue reconciliation every 5 seconds for faster state sync
Two Message Delivery Patterns:
- TinySignal → Reconcile: Broadcast to all pods, only leader processes
- gRPC (inter-component): Round-robin to any pod, receiving pod processes and updates CR
Signal Component Scalability
The Signal component (common-module) demonstrates leader filtering for TinySignal-triggered control operations.
Problem Solved: TinySignal broadcasts to all pods; control commands (Send/Reset) should execute once, not N times.
Why Leader Filtering Works Here:
- Signal's control port accepts configurable input data
- Platform can create TinySignals targeting control ports with input types
- TinySignal → reconcile → all pods receive → only leader processes
Implementation:
// Only leader processes TinySignal-triggered control commands
if !utils.IsLeader(ctx) {
return nil
}
Metadata Keys:
signal-running: Boolean string indicating if signal is active
Flow:
- TinySignal created → reconcile broadcast to all Signal pods
- Only leader pod processes the control command
- Leader updates
Status.Metadata["signal-running"] = "true"via ReconcilePort handler - All pods receive ReconcilePort event with updated TinyNode
- All pods sync their local
isRunningstate from metadata - UI shows consistent state across all replicas
State Synchronization:
case v1alpha1.ReconcilePort:
if node, ok := msg.(v1alpha1.TinyNode); ok {
t.isRunning = node.Status.Metadata["signal-running"] == "true"
}
Benefits:
- Single execution of TinySignal-triggered commands (no N-fold multiplication)
- Consistent UI state across all pods
- Automatic state recovery on pod restart
- Context data preserved in component state
HTTP Server Component Scalability
The HTTP Server component (http-module) demonstrates gRPC round-robin with metadata sync.
Key Difference from Signal: HTTP Server's control port has no input type (buttons only), so platform cannot create TinySignals for it. Start requests arrive via gRPC from upstream components (e.g., Signal → gRPC → HTTP Server).
Problem Solved: Multiple replicas need to serve HTTP traffic while coordinating startup/shutdown when any pod can receive the start request.
Message Flow:
TinySignal → Signal (leader only) → gRPC → HTTP Server (any pod, round-robin)
Implementation:
First Pod to Receive Start (becomes "initiator"):
- Receives start request via gRPC (round-robin, could be any pod)
- Starts server and updates metadata with actual listening port
- Serializes full configuration to metadata for other pods
Other Pods (followers):
- See metadata update via ReconcilePort
- Auto-discover port from metadata
- Restore configuration from serialized metadata
- Start their own server instance
Metadata Keys:
http-server-port: The actual listening port (0 = stopped, >0 = running)http-server-config: Serialized startup configuration (JSON)
Flow:
┌─────────────────────────────────────────────────────┐
│ TinyNode Status.Metadata │
│ - http-server-port: [actual-port or 0] │
│ - http-server-config: {serialized config} │
└─────────────────────────────────────────────────────┘
▲ ▲
updates first syncs & follows
│ │
┌────┴─────────────┐ ┌──────────┴─────────────┐
│ INITIATOR POD │ │ FOLLOWER POD(s) │
│ (receives gRPC) │ │ (sync from metadata) │
│ │ │ │
│ - Starts server │ │ - Auto-discovers port │
│ - Updates meta │ │ - Restores config │
│ - Sets config │ │ - Starts own server │
└──────────────────┘ └────────────────────────┘
Stop Behavior:
- Pod that initiated (listenPort was 0 at start) updates metadata to port=0 on stop
- Follower pods (listenPort > 0) only stop locally, don't modify metadata
- Restart flag prevents metadata corruption during restart sequences
Benefits:
- Zero-downtime scaling (pods auto-discover and start from metadata)
- Configuration consistency (initiator's config distributed via metadata)
- Clean shutdown coordination (metadata ensures pod agreement)
- Automatic recovery (pods auto-restart from ReconcilePort)
Implementing Scalable Components
Choose the right pattern based on how your component receives messages:
Pattern 1: TinySignal-triggered control ports (like Signal)
Use when: Control port accepts configurable input that users may trigger via TinySignal.
// Filter TinySignal broadcasts - only leader processes
if !utils.IsLeader(ctx) {
return nil
}
// Process command and update metadata
handler(ctx, v1alpha1.ReconcilePort, v1alpha1.MetadataConfigPatch{
Metadata: map[string]string{"my-state": "value"},
})
Pattern 2: gRPC-triggered ports (like HTTP Server)
Use when: Port receives messages via gRPC from other components (round-robin delivery).
// Any pod can receive - process and update metadata
// Other pods will sync via ReconcilePort
handler(ctx, v1alpha1.ReconcilePort, v1alpha1.MetadataConfigPatch{
Metadata: map[string]string{"my-state": "value"},
})
Common: Sync state from ReconcilePort (all patterns)
case v1alpha1.ReconcilePort:
if node, ok := msg.(v1alpha1.TinyNode); ok {
c.myState = node.Status.Metadata["my-state"]
// Start/configure based on metadata if needed
}
Common: Use metadata for consistent status display
// Use metadata as source of truth for UI, not local state
isRunning := node.Status.Metadata["running"] == "true"
Project Structure
.
├── api/v1alpha1/ # CRD definitions (TinyNode, TinyModule, TinyFlow, etc.)
├── module/ # Core SDK interfaces for component developers
│ ├── component.go # Component interface
│ ├── node.go # Port definitions
│ └── handler.go # Handler function type
├── pkg/ # Reusable SDK packages
│ ├── resource/ # Kubernetes resource manager
│ ├── schema/ # JSON schema generator
│ ├── evaluator/ # JSONPath expression evaluator
│ ├── errors/ # Error handling utilities
│ └── metrics/ # OpenTelemetry integration
├── internal/ # Internal operator implementation
│ ├── controller/ # Kubernetes controllers
│ ├── scheduler/ # Message routing and execution
│ ├── server/ # gRPC server
│ └── client/ # gRPC client pool
├── cli/ # Command-line tools (run, build)
├── registry/ # Component registration system
├── config/ # Kubernetes manifests and CRD definitions
│ ├── crd/ # CRD YAML files
│ ├── samples/ # Example resources
│ └── rbac/ # RBAC configurations
└── charts/ # Helm charts for deployment
Getting Started
You’ll need a Kubernetes cluster to run against. You can use KIND to get a local cluster for testing, or run against a remote cluster.
Note: Your controller will automatically use the current context in your kubeconfig file (i.e. whatever cluster kubectl cluster-info shows).
Helm charts
helm repo add tinysystems https://tiny-systems.github.io/module/
helm repo update # if you already added repo before
helm install my-corp-data-processing-tools --set controllerManager.manager.image.repository=registry.mycorp/tools/data-processing tinysystems/tinysystems-operator
Running on the cluster
- Install Instances of Custom Resources:
kubectl apply -f config/samples/
- Build and push your image to the location specified by
IMG:
make docker-build docker-push IMG=<some-registry>/operator:tag
- Deploy the controller to the cluster with the image specified by
IMG:
make deploy IMG=<some-registry>/operator:tag
Undeploy controller
UnDeploy the controller from the cluster:
make undeploy
Contributing
// TODO(user): Add detailed information on how you would like others to contribute to this project
How it works
This project aims to follow the Kubernetes Operator pattern.
It uses Controllers, which provide a reconcile function responsible for synchronizing resources until the desired state is reached on the cluster.
Test It Out
- Install the CRDs into the cluster:
make install
- Run your controller (this will run in the foreground, so switch to a new terminal if you want to leave it running):
make run
NOTE: You can also run this in one step by running: make install run
Modifying the API definitions
If you are editing the API definitions, generate the manifests such as CRs or CRDs using:
make manifests
Create new api
kubebuilder create api --group operator --version v1alpha1 --kind TinySignal
NOTE: Run make --help for more information on all potential make targets
More information can be found via the Kubebuilder Documentation
Module Development
This SDK provides everything you need to build custom module operators. Here's a complete guide to developing your own module.
Quick Start
- Create a New Go Project
mkdir my-module
cd my-module
go mod init github.com/myorg/my-module
- Add SDK Dependency
go get github.com/tiny-systems/module
- Implement a Component
Create components/hello.go:
package components
import (
"context"
"github.com/tiny-systems/module/module"
)
type Hello struct{}
// Configuration for the input port
type HelloInput struct {
Name string `json:"name" configurable:"true"`
}
// Configuration for the output
type HelloOutput struct {
Greeting string `json:"greeting" shared:"true"`
}
func (h *Hello) Instance() module.Component {
return &Hello{}
}
func (h *Hello) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: "hello",
Description: "Greets a person by name",
Info: "Simple greeting component example",
Tags: []string{"example", "greeting"},
}
}
func (h *Hello) Ports() []module.Port {
return []module.Port{
{
Name: "input",
Label: "Input",
Source: false, // This is an input port
Position: module.Left,
Configuration: &HelloInput{},
},
{
Name: "output",
Label: "Output",
Source: true, // This is an output port
Position: module.Right,
Configuration: &HelloOutput{},
},
{
Name: "error",
Label: "Error",
Source: true,
Position: module.Bottom,
},
}
}
func (h *Hello) Handle(ctx context.Context, output module.Handler, port string, message any) any {
if port == "input" {
// Parse input configuration
input := message.(*HelloInput)
// Create greeting
greeting := "Hello, " + input.Name + "!"
// Send to output port
output(ctx, "output", &HelloOutput{
Greeting: greeting,
})
}
return nil
}
- Register Component and Create Main
Create main.go:
package main
import (
"github.com/myorg/my-module/components"
"github.com/tiny-systems/module/cli"
"github.com/tiny-systems/module/registry"
)
func main() {
// Register all components
registry.Register(&components.Hello{})
// Run the operator
cli.Run()
}
- Run Your Module
# Build
go build -o my-module
# Run locally (connects to your current kubectl context)
./my-module run --name=my-module --version=1.0.0 --namespace=tinysystems
- Deploy to Kubernetes
# Build and push Docker image
docker build -t myregistry/my-module:1.0.0 .
docker push myregistry/my-module:1.0.0
# Install using Helm
helm repo add tinysystems https://tiny-systems.github.io/module/
helm install my-module \
--set controllerManager.manager.image.repository=myregistry/my-module \
--set controllerManager.manager.image.tag=1.0.0 \
tinysystems/tinysystems-operator
Component Interface Deep Dive
func (c *MyComponent) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: "my-component", // Unique identifier
Description: "Does something", // Short description
Info: "Detailed info...", // Long description
Tags: []string{"tag1"}, // Searchable tags
}
}
Ports define how components connect to each other:
func (c *MyComponent) Ports() []module.Port {
return []module.Port{
{
Name: "input", // Unique port name
Label: "Input Data", // Display label
Source: false, // Input port
Position: module.Left, // Visual position
Configuration: &InputConfig{}, // Expected data structure
},
{
Name: "output",
Label: "Output Data",
Source: true, // Output port
Position: module.Right,
Configuration: &OutputConfig{}, // Output data structure
},
}
}
Port Positions: module.Top, module.Right, module.Bottom, module.Left
The Handle method is called when a message arrives on a port:
func (c *MyComponent) Handle(
ctx context.Context,
output module.Handler,
port string,
message any,
) any {
switch port {
case "input":
// Type assert the message
input := message.(*InputConfig)
// Do work
result := processData(input)
// Send to output port
output(ctx, "output", &OutputConfig{
Result: result,
})
case "_reconcile":
// Handle reconciliation (called periodically)
// Use this for cleanup, state sync, etc.
}
return nil
}
Key Points:
ctx: Context with tracing span and cancellationoutput: Callback function to send data to other portsport: Name of the port that received the messagemessage: The actual data (type assert to your config struct)- Return value is currently unused
Configuration Schemas
The SDK automatically generates JSON Schemas from your Go structs. Use struct tags to control the UI:
type Config struct {
// Basic field
Name string `json:"name"`
// Configurable in UI (can reference other node outputs)
UserID string `json:"userId" configurable:"true"`
// Shared definition (other nodes can reference this)
Result string `json:"result" shared:"true"`
// Control UI layout
APIKey string `json:"apiKey" propertyOrder:"1" tab:"auth"`
// Nested object
Settings struct {
Timeout int `json:"timeout" configurable:"true"`
} `json:"settings"`
// Array
Items []string `json:"items" configurable:"true"`
}
Struct Tags:
configurable:"true": Field can accept values from other nodes via{{expression}}syntaxshared:"true": Field definition is available to other nodes for type-safe mappingpropertyOrder:"N": Controls field order in UItab:"name": Groups field under a tab in UIalign:"horizontal": Layout hint for UI
System Ports
Special ports available to all components:
_reconcile PortCalled periodically (every 5 minutes) and on node changes:
case "_reconcile":
// Clean up resources
// Sync state
// Check for drift
_client PortProvides Kubernetes client for resource operations:
case "_client":
client := message.(resource.Manager)
// Create a signal
client.CreateSignal(ctx, resource.CreateSignalRequest{
Node: "target-node",
Port: "input",
Data: map[string]any{"key": "value"},
})
// Get node information
node, err := client.GetNode(ctx, "node-name")
_settings PortReceives initial configuration (no "from" connection required):
case "_settings":
settings := message.(*MyConfig)
// Store settings for later use
Error Handling
Return regular errors for automatic retry with exponential backoff:
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
data, err := fetchFromAPI()
if err != nil {
// Will retry automatically
return err
}
// ...
}
Use PermanentError to stop retries:
import "github.com/tiny-systems/module/pkg/errors"
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
if !isValid(msg) {
// Won't retry - send to error port instead
output(ctx, "error", errors.PermanentError{
Err: fmt.Errorf("invalid input"),
})
return nil
}
// ...
}
Using the Resource Manager
Access Kubernetes resources from your component:
import "github.com/tiny-systems/module/pkg/resource"
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
if port == "_client" {
c.client = msg.(resource.Manager)
return nil
}
if port == "create-flow" {
// Create a new flow
flow, err := c.client.CreateFlow(ctx, resource.CreateFlowRequest{
Name: "dynamic-flow",
Project: "my-project",
})
// Create nodes in the flow
node, err := c.client.CreateNode(ctx, resource.CreateNodeRequest{
Name: "node-1",
Flow: flow.Name,
Module: "http-module",
Component: "request",
Settings: map[string]any{
"url": "https://api.example.com",
},
})
// Trigger the node
c.client.CreateSignal(ctx, resource.CreateSignalRequest{
Node: node.Name,
Port: "trigger",
Data: map[string]any{},
})
}
return nil
}
Observability
OpenTelemetry is built-in. The context includes a span:
import "go.opentelemetry.io/otel"
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
// Get tracer
tracer := otel.Tracer("my-module")
// Create child span
ctx, span := tracer.Start(ctx, "processing")
defer span.End()
// Add attributes
span.SetAttributes(
attribute.String("input.size", "large"),
)
// Do work...
result := doWork(ctx)
output(ctx, "output", result)
return nil
}
Testing Components
package components_test
import (
"context"
"testing"
"github.com/myorg/my-module/components"
)
func TestHello(t *testing.T) {
component := &components.Hello{}
var outputData *components.HelloOutput
outputHandler := func(ctx context.Context, port string, data any) any {
if port == "output" {
outputData = data.(*components.HelloOutput)
}
return nil
}
// Send message to input port
component.Handle(context.Background(), outputHandler, "input", &components.HelloInput{
Name: "World",
})
// Verify output
if outputData.Greeting != "Hello, World!" {
t.Errorf("Expected 'Hello, World!', got '%s'", outputData.Greeting)
}
}
Best Practices
- Keep Components Focused: Each component should do one thing well
- Use System Ports: Implement
_reconcilefor cleanup and state sync - Handle Context Cancellation: Respect
ctx.Done()for graceful shutdown - Leverage Schemas: Use struct tags to create great UI experiences
- Share Definitions: Mark output fields as
shared:truefor type-safe flows - Use Permanent Errors: Don't retry validation errors or user mistakes
- Add Observability: Create spans for long operations
- Document with Tags: Use meaningful tags in
GetInfo()for discoverability
Example Modules
Check out these example modules for reference:
- common-module: Basic utilities (delay, switch, merge)
- http-module: HTTP client/server components
- grpc-module: gRPC service components
CLI Reference
The SDK includes a CLI for running and building modules:
# Run module locally
./my-module run --name=my-module --version=1.0.0 --namespace=tinysystems
# Build (if custom build logic is needed)
./my-module build
# Get help
./my-module --help
License
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Directories
¶
| Path | Synopsis |
|---|---|
|
api
|
|
|
v1alpha1
Package v1alpha1 contains API Schema definitions for the operator v1alpha1 API group +kubebuilder:object:generate=true +groupName=operator.tinysystems.io
|
Package v1alpha1 contains API Schema definitions for the operator v1alpha1 API group +kubebuilder:object:generate=true +groupName=operator.tinysystems.io |
|
internal
|
|
|
pkg
|
|
|
tools
|
|