README
¶
TinySystems Module SDK
A Kubebuilder-based Kubernetes operator SDK for building flow-based workflow engines. This SDK provides the complete infrastructure for developing modular operators that can be composed into visual workflows.
Overview
TinySystems Module SDK enables developers to create module operators (like common-module, http-module, grpc-module) that bring specific functionality into a Kubernetes-native flow engine. Each module provides reusable components that can be connected through a port-based architecture to create complex workflows.
Key Features
- Port-Based Component Architecture: Visual programming model with input/output ports
- JSON Schema-Driven Configuration: Automatic schema generation with UI hints
- Message Routing & Retry Logic: Intelligent routing with exponential backoff
- Multi-Module Communication: gRPC-based inter-module communication
- Expression-Based Data Transformation: JSONPath expressions for flexible data mapping
- Kubernetes-Native: Everything is a CRD with standard controller patterns
- OpenTelemetry Integration: Built-in observability with tracing and metrics
- CLI Tools: Complete tooling for running and building modules
Table of Contents
- Overview
- Architecture
- Getting Started
- Helm Charts
- Development
- Module Development
- Contributing
- License
Architecture
Core Concepts
The SDK is built around several key abstractions:
Custom Resource Definitions (CRDs)
-
TinyNode: Core execution unit representing a component instance in a flow
- Contains module name, version, component type, port configurations, and edges
- Status includes module/component info, port schemas, and error state
-
TinyModule: Registry of available modules for service discovery
- References container image
- Status contains module address (gRPC), version, and list of components
-
TinyFlow: Logical grouping of nodes representing a workflow
- Uses labels to associate nodes together
-
TinyProject: Top-level organizational unit grouping multiple flows
-
TinySignal: External trigger mechanism for node execution
- Specifies target node, port, and data payload
- Works like webhooks or manual triggers
-
TinyTracker: Execution monitoring for detailed tracing
-
TinyWidgetPage: Custom UI dashboard pages for visualization
Component System
Components are the building blocks of workflows. Each component:
- Implements the
Componentinterface (module/component.go) - Defines ports for input/output with positions (Top/Right/Bottom/Left)
- Handles messages through the
Handle()method - Provides JSON Schema configuration for each port
Message Flow
External Trigger (TinySignal)
→ TinySignal Controller
→ Scheduler.Handle()
→ Runner.MsgHandler()
→ Component.Handle()
→ output() callback
→ Next node via edges
Port System
Ports enable component communication:
- Source Ports: Output ports that send data
- Target Ports: Input ports that receive data
- System Ports:
_reconcile: Triggers node reconciliation_client: Receives Kubernetes client for resource operations_settings: Configuration port_control: Dashboard control port
Each port can have:
- Configuration: JSON schema defining expected input structure
- Response Configuration: Schema for output data structure
Edge Configuration
Edges connect ports between nodes and support:
- JSONPath expressions for data transformation
- Runtime evaluation via the expression evaluator
- Type-safe mapping through shared schema definitions
SDK Components
The SDK provides several packages for module developers:
/module/ - Core Interfaces
Component: Main interface for component implementationPort: Port definitions and configurationHandler: Callback function type for message routing
/pkg/resource/ - Resource Manager
Unified Kubernetes client providing:
- Node/Module/Flow/Project CRUD operations
- Signal creation for triggering nodes
- Ingress/Service management for exposing ports
- Helm release management
/pkg/schema/ - JSON Schema Generator
- Automatic schema generation from Go structs
- Custom struct tags for UI hints:
configurable,shared,propertyOrder,tab,align - Definition sharing and override mechanism
/pkg/evaluator/ - Expression Evaluator
- JSONPath evaluation for data transformation
- Edge configuration with expressions
/pkg/metrics/ - Observability
- OpenTelemetry integration with spans
- Metrics (gauges, counters)
- Tracker system for flow execution monitoring
/internal/scheduler/ - Message Routing
- Manages component instances
- Routes messages between nodes
- Handles cross-module communication via gRPC
/internal/client/ - Client Pool
- Manages gRPC connections to other modules
- Connection pooling and lifecycle management
/cli/ - Command-Line Tools
run: Complete operator runtimebuild: Module building and publishing
Communication Patterns
Same-Module Communication
When nodes belong to the same module, messages are routed directly through the scheduler for optimal performance.
Cross-Module Communication
When nodes belong to different modules:
- Scheduler identifies the target module via TinyModule CRD
- Client pool establishes/reuses gRPC connection
- Message is sent to target module's gRPC server
- Target module's scheduler routes to the appropriate component
Retry Mechanism
- Transient Errors: Exponential backoff (1s → 30s max)
- Permanent Errors: Marked via
PermanentErrorwrapper to stop retries - Context cancellation stops all retries
Design Patterns
- Eventual Consistency with Reconciliation: Periodic reconciliation (every 5 minutes) plus signal-based immediate updates
- Leader Election: Leader handles status updates and signal processing; non-leaders execute node logic
- Schema-Driven Configuration: Go structs automatically generate JSON schemas for UI integration
- Expression-Based Transformation: JSONPath expressions enable flexible data mapping without code changes
- Definition Sharing: Components mark fields as
shared:trueorconfigurable:truefor cross-node type safety
Project Structure
.
├── api/v1alpha1/ # CRD definitions (TinyNode, TinyModule, TinyFlow, etc.)
├── module/ # Core SDK interfaces for component developers
│ ├── component.go # Component interface
│ ├── node.go # Port definitions
│ └── handler.go # Handler function type
├── pkg/ # Reusable SDK packages
│ ├── resource/ # Kubernetes resource manager
│ ├── schema/ # JSON schema generator
│ ├── evaluator/ # JSONPath expression evaluator
│ ├── errors/ # Error handling utilities
│ └── metrics/ # OpenTelemetry integration
├── internal/ # Internal operator implementation
│ ├── controller/ # Kubernetes controllers
│ ├── scheduler/ # Message routing and execution
│ ├── server/ # gRPC server
│ └── client/ # gRPC client pool
├── cli/ # Command-line tools (run, build)
├── registry/ # Component registration system
├── config/ # Kubernetes manifests and CRD definitions
│ ├── crd/ # CRD YAML files
│ ├── samples/ # Example resources
│ └── rbac/ # RBAC configurations
└── charts/ # Helm charts for deployment
Getting Started
You’ll need a Kubernetes cluster to run against. You can use KIND to get a local cluster for testing, or run against a remote cluster.
Note: Your controller will automatically use the current context in your kubeconfig file (i.e. whatever cluster kubectl cluster-info shows).
Helm charts
helm repo add tinysystems https://tiny-systems.github.io/module/
helm repo update # if you already added repo before
helm install my-corp-data-processing-tools --set controllerManager.manager.image.repository=registry.mycorp/tools/data-processing tinysystems/tinysystems-operator
Running on the cluster
- Install Instances of Custom Resources:
kubectl apply -f config/samples/
- Build and push your image to the location specified by
IMG:
make docker-build docker-push IMG=<some-registry>/operator:tag
- Deploy the controller to the cluster with the image specified by
IMG:
make deploy IMG=<some-registry>/operator:tag
Undeploy controller
UnDeploy the controller from the cluster:
make undeploy
Contributing
// TODO(user): Add detailed information on how you would like others to contribute to this project
How it works
This project aims to follow the Kubernetes Operator pattern.
It uses Controllers, which provide a reconcile function responsible for synchronizing resources until the desired state is reached on the cluster.
Test It Out
- Install the CRDs into the cluster:
make install
- Run your controller (this will run in the foreground, so switch to a new terminal if you want to leave it running):
make run
NOTE: You can also run this in one step by running: make install run
Modifying the API definitions
If you are editing the API definitions, generate the manifests such as CRs or CRDs using:
make manifests
Create new api
kubebuilder create api --group operator --version v1alpha1 --kind TinySignal
NOTE: Run make --help for more information on all potential make targets
More information can be found via the Kubebuilder Documentation
Module Development
This SDK provides everything you need to build custom module operators. Here's a complete guide to developing your own module.
Quick Start
- Create a New Go Project
mkdir my-module
cd my-module
go mod init github.com/myorg/my-module
- Add SDK Dependency
go get github.com/tiny-systems/module
- Implement a Component
Create components/hello.go:
package components
import (
"context"
"github.com/tiny-systems/module/module"
)
type Hello struct{}
// Configuration for the input port
type HelloInput struct {
Name string `json:"name" configurable:"true"`
}
// Configuration for the output
type HelloOutput struct {
Greeting string `json:"greeting" shared:"true"`
}
func (h *Hello) Instance() module.Component {
return &Hello{}
}
func (h *Hello) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: "hello",
Description: "Greets a person by name",
Info: "Simple greeting component example",
Tags: []string{"example", "greeting"},
}
}
func (h *Hello) Ports() []module.Port {
return []module.Port{
{
Name: "input",
Label: "Input",
Source: false, // This is an input port
Position: module.Left,
Configuration: &HelloInput{},
},
{
Name: "output",
Label: "Output",
Source: true, // This is an output port
Position: module.Right,
Configuration: &HelloOutput{},
},
{
Name: "error",
Label: "Error",
Source: true,
Position: module.Bottom,
},
}
}
func (h *Hello) Handle(ctx context.Context, output module.Handler, port string, message any) any {
if port == "input" {
// Parse input configuration
input := message.(*HelloInput)
// Create greeting
greeting := "Hello, " + input.Name + "!"
// Send to output port
output(ctx, "output", &HelloOutput{
Greeting: greeting,
})
}
return nil
}
- Register Component and Create Main
Create main.go:
package main
import (
"github.com/myorg/my-module/components"
"github.com/tiny-systems/module/cli"
"github.com/tiny-systems/module/registry"
)
func main() {
// Register all components
registry.Register(&components.Hello{})
// Run the operator
cli.Run()
}
- Run Your Module
# Build
go build -o my-module
# Run locally (connects to your current kubectl context)
./my-module run --name=my-module --version=1.0.0 --namespace=tinysystems
- Deploy to Kubernetes
# Build and push Docker image
docker build -t myregistry/my-module:1.0.0 .
docker push myregistry/my-module:1.0.0
# Install using Helm
helm repo add tinysystems https://tiny-systems.github.io/module/
helm install my-module \
--set controllerManager.manager.image.repository=myregistry/my-module \
--set controllerManager.manager.image.tag=1.0.0 \
tinysystems/tinysystems-operator
Component Interface Deep Dive
func (c *MyComponent) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: "my-component", // Unique identifier
Description: "Does something", // Short description
Info: "Detailed info...", // Long description
Tags: []string{"tag1"}, // Searchable tags
}
}
Ports define how components connect to each other:
func (c *MyComponent) Ports() []module.Port {
return []module.Port{
{
Name: "input", // Unique port name
Label: "Input Data", // Display label
Source: false, // Input port
Position: module.Left, // Visual position
Configuration: &InputConfig{}, // Expected data structure
},
{
Name: "output",
Label: "Output Data",
Source: true, // Output port
Position: module.Right,
Configuration: &OutputConfig{}, // Output data structure
},
}
}
Port Positions: module.Top, module.Right, module.Bottom, module.Left
The Handle method is called when a message arrives on a port:
func (c *MyComponent) Handle(
ctx context.Context,
output module.Handler,
port string,
message any,
) any {
switch port {
case "input":
// Type assert the message
input := message.(*InputConfig)
// Do work
result := processData(input)
// Send to output port
output(ctx, "output", &OutputConfig{
Result: result,
})
case "_reconcile":
// Handle reconciliation (called periodically)
// Use this for cleanup, state sync, etc.
}
return nil
}
Key Points:
ctx: Context with tracing span and cancellationoutput: Callback function to send data to other portsport: Name of the port that received the messagemessage: The actual data (type assert to your config struct)- Return value is currently unused
Configuration Schemas
The SDK automatically generates JSON Schemas from your Go structs. Use struct tags to control the UI:
type Config struct {
// Basic field
Name string `json:"name"`
// Configurable in UI (can reference other node outputs)
UserID string `json:"userId" configurable:"true"`
// Shared definition (other nodes can reference this)
Result string `json:"result" shared:"true"`
// Control UI layout
APIKey string `json:"apiKey" propertyOrder:"1" tab:"auth"`
// Nested object
Settings struct {
Timeout int `json:"timeout" configurable:"true"`
} `json:"settings"`
// Array
Items []string `json:"items" configurable:"true"`
}
Struct Tags:
configurable:"true": Field can accept values from other nodes via expressionsshared:"true": Field definition is available to other nodes for type-safe mappingpropertyOrder:"N": Controls field order in UItab:"name": Groups field under a tab in UIalign:"horizontal": Layout hint for UI
System Ports
Special ports available to all components:
_reconcile PortCalled periodically (every 5 minutes) and on node changes:
case "_reconcile":
// Clean up resources
// Sync state
// Check for drift
_client PortProvides Kubernetes client for resource operations:
case "_client":
client := message.(resource.Manager)
// Create a signal
client.CreateSignal(ctx, resource.CreateSignalRequest{
Node: "target-node",
Port: "input",
Data: map[string]any{"key": "value"},
})
// Get node information
node, err := client.GetNode(ctx, "node-name")
_settings PortReceives initial configuration (no "from" connection required):
case "_settings":
settings := message.(*MyConfig)
// Store settings for later use
Error Handling
Return regular errors for automatic retry with exponential backoff:
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
data, err := fetchFromAPI()
if err != nil {
// Will retry automatically
return err
}
// ...
}
Use PermanentError to stop retries:
import "github.com/tiny-systems/module/pkg/errors"
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
if !isValid(msg) {
// Won't retry - send to error port instead
output(ctx, "error", errors.PermanentError{
Err: fmt.Errorf("invalid input"),
})
return nil
}
// ...
}
Using the Resource Manager
Access Kubernetes resources from your component:
import "github.com/tiny-systems/module/pkg/resource"
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
if port == "_client" {
c.client = msg.(resource.Manager)
return nil
}
if port == "create-flow" {
// Create a new flow
flow, err := c.client.CreateFlow(ctx, resource.CreateFlowRequest{
Name: "dynamic-flow",
Project: "my-project",
})
// Create nodes in the flow
node, err := c.client.CreateNode(ctx, resource.CreateNodeRequest{
Name: "node-1",
Flow: flow.Name,
Module: "http-module",
Component: "request",
Settings: map[string]any{
"url": "https://api.example.com",
},
})
// Trigger the node
c.client.CreateSignal(ctx, resource.CreateSignalRequest{
Node: node.Name,
Port: "trigger",
Data: map[string]any{},
})
}
return nil
}
Observability
OpenTelemetry is built-in. The context includes a span:
import "go.opentelemetry.io/otel"
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
// Get tracer
tracer := otel.Tracer("my-module")
// Create child span
ctx, span := tracer.Start(ctx, "processing")
defer span.End()
// Add attributes
span.SetAttributes(
attribute.String("input.size", "large"),
)
// Do work...
result := doWork(ctx)
output(ctx, "output", result)
return nil
}
Testing Components
package components_test
import (
"context"
"testing"
"github.com/myorg/my-module/components"
)
func TestHello(t *testing.T) {
component := &components.Hello{}
var outputData *components.HelloOutput
outputHandler := func(ctx context.Context, port string, data any) any {
if port == "output" {
outputData = data.(*components.HelloOutput)
}
return nil
}
// Send message to input port
component.Handle(context.Background(), outputHandler, "input", &components.HelloInput{
Name: "World",
})
// Verify output
if outputData.Greeting != "Hello, World!" {
t.Errorf("Expected 'Hello, World!', got '%s'", outputData.Greeting)
}
}
Best Practices
- Keep Components Focused: Each component should do one thing well
- Use System Ports: Implement
_reconcilefor cleanup and state sync - Handle Context Cancellation: Respect
ctx.Done()for graceful shutdown - Leverage Schemas: Use struct tags to create great UI experiences
- Share Definitions: Mark output fields as
shared:truefor type-safe flows - Use Permanent Errors: Don't retry validation errors or user mistakes
- Add Observability: Create spans for long operations
- Document with Tags: Use meaningful tags in
GetInfo()for discoverability
Example Modules
Check out these example modules for reference:
- common-module: Basic utilities (delay, switch, merge)
- http-module: HTTP client/server components
- grpc-module: gRPC service components
CLI Reference
The SDK includes a CLI for running and building modules:
# Run module locally
./my-module run --name=my-module --version=1.0.0 --namespace=tinysystems
# Build (if custom build logic is needed)
./my-module build
# Get help
./my-module --help
License
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Directories
¶
| Path | Synopsis |
|---|---|
|
api
|
|
|
v1alpha1
Package v1alpha1 contains API Schema definitions for the operator v1alpha1 API group +kubebuilder:object:generate=true +groupName=operator.tinysystems.io
|
Package v1alpha1 contains API Schema definitions for the operator v1alpha1 API group +kubebuilder:object:generate=true +groupName=operator.tinysystems.io |
|
internal
|
|
|
pkg
|
|
|
tools
|
|