module

module
v0.1.150 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2025 License: MIT

README

TinySystems Module SDK

A Kubebuilder-based Kubernetes operator SDK for building flow-based workflow engines. This SDK provides the complete infrastructure for developing modular operators that can be composed into visual workflows.

Overview

TinySystems Module SDK enables developers to create module operators (like common-module, http-module, grpc-module) that bring specific functionality into a Kubernetes-native flow engine. Each module provides reusable components that can be connected through a port-based architecture to create complex workflows.

Key Features
  • Port-Based Component Architecture: Visual programming model with input/output ports
  • JSON Schema-Driven Configuration: Automatic schema generation with UI hints
  • Message Routing & Retry Logic: Intelligent routing with exponential backoff
  • Multi-Module Communication: gRPC-based inter-module communication
  • Expression-Based Data Transformation: JSONPath expressions for flexible data mapping
  • Kubernetes-Native: Everything is a CRD with standard controller patterns
  • OpenTelemetry Integration: Built-in observability with tracing and metrics
  • CLI Tools: Complete tooling for running and building modules

Table of Contents

Architecture

Core Concepts

The SDK is built around several key abstractions:

Custom Resource Definitions (CRDs)
  • TinyNode: Core execution unit representing a component instance in a flow

    • Contains module name, version, component type, port configurations, and edges
    • Status includes module/component info, port schemas, and error state
  • TinyModule: Registry of available modules for service discovery

    • References container image
    • Status contains module address (gRPC), version, and list of components
  • TinyFlow: Logical grouping of nodes representing a workflow

    • Uses labels to associate nodes together
  • TinyProject: Top-level organizational unit grouping multiple flows

  • TinySignal: External trigger mechanism for node execution

    • Specifies target node, port, and data payload
    • Works like webhooks or manual triggers
  • TinyTracker: Execution monitoring for detailed tracing

  • TinyWidgetPage: Custom UI dashboard pages for visualization

Component System

Components are the building blocks of workflows. Each component:

  • Implements the Component interface (module/component.go)
  • Defines ports for input/output with positions (Top/Right/Bottom/Left)
  • Handles messages through the Handle() method
  • Provides JSON Schema configuration for each port
Message Flow
External Trigger (TinySignal)
  → TinySignal Controller
    → Scheduler.Handle()
      → Runner.MsgHandler()
        → Component.Handle()
          → output() callback
            → Next node via edges
Port System

Ports enable component communication:

  • Source Ports: Output ports that send data
  • Target Ports: Input ports that receive data
  • System Ports:
    • _reconcile: Triggers node reconciliation
    • _client: Receives Kubernetes client for resource operations
    • _settings: Configuration port
    • _control: Dashboard control port

Each port can have:

  • Configuration: JSON schema defining expected input structure
  • Response Configuration: Schema for output data structure
Edge Configuration

Edges connect ports between nodes and support:

  • JSONPath expressions for data transformation
  • Runtime evaluation via the expression evaluator
  • Type-safe mapping through shared schema definitions
SDK Components

The SDK provides several packages for module developers:

/module/ - Core Interfaces
  • Component: Main interface for component implementation
  • Port: Port definitions and configuration
  • Handler: Callback function type for message routing
/pkg/resource/ - Resource Manager

Unified Kubernetes client providing:

  • Node/Module/Flow/Project CRUD operations
  • Signal creation for triggering nodes
  • Ingress/Service management for exposing ports
  • Helm release management
/pkg/schema/ - JSON Schema Generator
  • Automatic schema generation from Go structs
  • Custom struct tags for UI hints: configurable, shared, propertyOrder, tab, align
  • Definition sharing and override mechanism
/pkg/evaluator/ - Expression Evaluator
  • JSONPath evaluation for data transformation
  • Edge configuration with expressions
/pkg/metrics/ - Observability
  • OpenTelemetry integration with spans
  • Metrics (gauges, counters)
  • Tracker system for flow execution monitoring
/internal/scheduler/ - Message Routing
  • Manages component instances
  • Routes messages between nodes
  • Handles cross-module communication via gRPC
/internal/client/ - Client Pool
  • Manages gRPC connections to other modules
  • Connection pooling and lifecycle management
/cli/ - Command-Line Tools
  • run: Complete operator runtime
  • build: Module building and publishing
Communication Patterns
Same-Module Communication

When nodes belong to the same module, messages are routed directly through the scheduler for optimal performance.

Cross-Module Communication

When nodes belong to different modules:

  1. Scheduler identifies the target module via TinyModule CRD
  2. Client pool establishes/reuses gRPC connection
  3. Message is sent to target module's gRPC server
  4. Target module's scheduler routes to the appropriate component
Retry Mechanism
  • Transient Errors: Exponential backoff (1s → 30s max)
  • Permanent Errors: Marked via PermanentError wrapper to stop retries
  • Context cancellation stops all retries
Design Patterns
  1. Eventual Consistency with Reconciliation: Periodic reconciliation (every 5 minutes) plus signal-based immediate updates
  2. Leader Election: Leader handles status updates and signal processing; non-leaders execute node logic
  3. Schema-Driven Configuration: Go structs automatically generate JSON schemas for UI integration
  4. Expression-Based Transformation: JSONPath expressions enable flexible data mapping without code changes
  5. Definition Sharing: Components mark fields as shared:true or configurable:true for cross-node type safety
Project Structure
.
├── api/v1alpha1/          # CRD definitions (TinyNode, TinyModule, TinyFlow, etc.)
├── module/                # Core SDK interfaces for component developers
│   ├── component.go       # Component interface
│   ├── node.go           # Port definitions
│   └── handler.go        # Handler function type
├── pkg/                   # Reusable SDK packages
│   ├── resource/         # Kubernetes resource manager
│   ├── schema/           # JSON schema generator
│   ├── evaluator/        # JSONPath expression evaluator
│   ├── errors/           # Error handling utilities
│   └── metrics/          # OpenTelemetry integration
├── internal/              # Internal operator implementation
│   ├── controller/       # Kubernetes controllers
│   ├── scheduler/        # Message routing and execution
│   ├── server/           # gRPC server
│   └── client/           # gRPC client pool
├── cli/                   # Command-line tools (run, build)
├── registry/              # Component registration system
├── config/                # Kubernetes manifests and CRD definitions
│   ├── crd/              # CRD YAML files
│   ├── samples/          # Example resources
│   └── rbac/             # RBAC configurations
└── charts/                # Helm charts for deployment

Getting Started

You’ll need a Kubernetes cluster to run against. You can use KIND to get a local cluster for testing, or run against a remote cluster. Note: Your controller will automatically use the current context in your kubeconfig file (i.e. whatever cluster kubectl cluster-info shows).

Helm charts

helm repo add tinysystems https://tiny-systems.github.io/module/
helm repo update # if you already added repo before
helm install my-corp-data-processing-tools --set controllerManager.manager.image.repository=registry.mycorp/tools/data-processing  tinysystems/tinysystems-operator
Running on the cluster
  1. Install Instances of Custom Resources:
kubectl apply -f config/samples/
  1. Build and push your image to the location specified by IMG:
make docker-build docker-push IMG=<some-registry>/operator:tag
  1. Deploy the controller to the cluster with the image specified by IMG:
make deploy IMG=<some-registry>/operator:tag
Undeploy controller

UnDeploy the controller from the cluster:

make undeploy

Contributing

// TODO(user): Add detailed information on how you would like others to contribute to this project

How it works

This project aims to follow the Kubernetes Operator pattern.

It uses Controllers, which provide a reconcile function responsible for synchronizing resources until the desired state is reached on the cluster.

Test It Out
  1. Install the CRDs into the cluster:
make install
  1. Run your controller (this will run in the foreground, so switch to a new terminal if you want to leave it running):
make run

NOTE: You can also run this in one step by running: make install run

Modifying the API definitions

If you are editing the API definitions, generate the manifests such as CRs or CRDs using:

make manifests
Create new api
kubebuilder create api --group operator --version v1alpha1 --kind TinySignal

NOTE: Run make --help for more information on all potential make targets

More information can be found via the Kubebuilder Documentation

Module Development

This SDK provides everything you need to build custom module operators. Here's a complete guide to developing your own module.

Quick Start
  1. Create a New Go Project
mkdir my-module
cd my-module
go mod init github.com/myorg/my-module
  1. Add SDK Dependency
go get github.com/tiny-systems/module
  1. Implement a Component

Create components/hello.go:

package components

import (
    "context"
    "github.com/tiny-systems/module/module"
)

type Hello struct{}

// Configuration for the input port
type HelloInput struct {
    Name string `json:"name" configurable:"true"`
}

// Configuration for the output
type HelloOutput struct {
    Greeting string `json:"greeting" shared:"true"`
}

func (h *Hello) Instance() module.Component {
    return &Hello{}
}

func (h *Hello) GetInfo() module.ComponentInfo {
    return module.ComponentInfo{
        Name:        "hello",
        Description: "Greets a person by name",
        Info:        "Simple greeting component example",
        Tags:        []string{"example", "greeting"},
    }
}

func (h *Hello) Ports() []module.Port {
    return []module.Port{
        {
            Name:          "input",
            Label:         "Input",
            Source:        false, // This is an input port
            Position:      module.Left,
            Configuration: &HelloInput{},
        },
        {
            Name:     "output",
            Label:    "Output",
            Source:   true, // This is an output port
            Position: module.Right,
            Configuration: &HelloOutput{},
        },
        {
            Name:     "error",
            Label:    "Error",
            Source:   true,
            Position: module.Bottom,
        },
    }
}

func (h *Hello) Handle(ctx context.Context, output module.Handler, port string, message any) any {
    if port == "input" {
        // Parse input configuration
        input := message.(*HelloInput)

        // Create greeting
        greeting := "Hello, " + input.Name + "!"

        // Send to output port
        output(ctx, "output", &HelloOutput{
            Greeting: greeting,
        })
    }
    return nil
}
  1. Register Component and Create Main

Create main.go:

package main

import (
    "github.com/myorg/my-module/components"
    "github.com/tiny-systems/module/cli"
    "github.com/tiny-systems/module/registry"
)

func main() {
    // Register all components
    registry.Register(&components.Hello{})

    // Run the operator
    cli.Run()
}
  1. Run Your Module
# Build
go build -o my-module

# Run locally (connects to your current kubectl context)
./my-module run --name=my-module --version=1.0.0 --namespace=tinysystems
  1. Deploy to Kubernetes
# Build and push Docker image
docker build -t myregistry/my-module:1.0.0 .
docker push myregistry/my-module:1.0.0

# Install using Helm
helm repo add tinysystems https://tiny-systems.github.io/module/
helm install my-module \
  --set controllerManager.manager.image.repository=myregistry/my-module \
  --set controllerManager.manager.image.tag=1.0.0 \
  tinysystems/tinysystems-operator
Component Interface Deep Dive
GetInfo() - Component Metadata
func (c *MyComponent) GetInfo() module.ComponentInfo {
    return module.ComponentInfo{
        Name:        "my-component",      // Unique identifier
        Description: "Does something",     // Short description
        Info:        "Detailed info...",   // Long description
        Tags:        []string{"tag1"},     // Searchable tags
    }
}
Ports() - Define Component Ports

Ports define how components connect to each other:

func (c *MyComponent) Ports() []module.Port {
    return []module.Port{
        {
            Name:          "input",           // Unique port name
            Label:         "Input Data",      // Display label
            Source:        false,             // Input port
            Position:      module.Left,       // Visual position
            Configuration: &InputConfig{},    // Expected data structure
        },
        {
            Name:              "output",
            Label:             "Output Data",
            Source:            true,          // Output port
            Position:          module.Right,
            Configuration:     &OutputConfig{}, // Output data structure
        },
    }
}

Port Positions: module.Top, module.Right, module.Bottom, module.Left

Handle() - Process Messages

The Handle method is called when a message arrives on a port:

func (c *MyComponent) Handle(
    ctx context.Context,
    output module.Handler,
    port string,
    message any,
) any {
    switch port {
    case "input":
        // Type assert the message
        input := message.(*InputConfig)

        // Do work
        result := processData(input)

        // Send to output port
        output(ctx, "output", &OutputConfig{
            Result: result,
        })

    case "_reconcile":
        // Handle reconciliation (called periodically)
        // Use this for cleanup, state sync, etc.
    }

    return nil
}

Key Points:

  • ctx: Context with tracing span and cancellation
  • output: Callback function to send data to other ports
  • port: Name of the port that received the message
  • message: The actual data (type assert to your config struct)
  • Return value is currently unused
Configuration Schemas

The SDK automatically generates JSON Schemas from your Go structs. Use struct tags to control the UI:

type Config struct {
    // Basic field
    Name string `json:"name"`

    // Configurable in UI (can reference other node outputs)
    UserID string `json:"userId" configurable:"true"`

    // Shared definition (other nodes can reference this)
    Result string `json:"result" shared:"true"`

    // Control UI layout
    APIKey string `json:"apiKey" propertyOrder:"1" tab:"auth"`

    // Nested object
    Settings struct {
        Timeout int `json:"timeout" configurable:"true"`
    } `json:"settings"`

    // Array
    Items []string `json:"items" configurable:"true"`
}

Struct Tags:

  • configurable:"true": Field can accept values from other nodes via expressions
  • shared:"true": Field definition is available to other nodes for type-safe mapping
  • propertyOrder:"N": Controls field order in UI
  • tab:"name": Groups field under a tab in UI
  • align:"horizontal": Layout hint for UI
System Ports

Special ports available to all components:

_reconcile Port

Called periodically (every 5 minutes) and on node changes:

case "_reconcile":
    // Clean up resources
    // Sync state
    // Check for drift
_client Port

Provides Kubernetes client for resource operations:

case "_client":
    client := message.(resource.Manager)

    // Create a signal
    client.CreateSignal(ctx, resource.CreateSignalRequest{
        Node: "target-node",
        Port: "input",
        Data: map[string]any{"key": "value"},
    })
    
    // Get node information
    node, err := client.GetNode(ctx, "node-name")
_settings Port

Receives initial configuration (no "from" connection required):

case "_settings":
    settings := message.(*MyConfig)
    // Store settings for later use
Error Handling
Transient Errors

Return regular errors for automatic retry with exponential backoff:

func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    data, err := fetchFromAPI()
    if err != nil {
        // Will retry automatically
        return err
    }
    // ...
}
Permanent Errors

Use PermanentError to stop retries:

import "github.com/tiny-systems/module/pkg/errors"

func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    if !isValid(msg) {
        // Won't retry - send to error port instead
        output(ctx, "error", errors.PermanentError{
            Err: fmt.Errorf("invalid input"),
        })
        return nil
    }
    // ...
}
Using the Resource Manager

Access Kubernetes resources from your component:

import "github.com/tiny-systems/module/pkg/resource"

func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    if port == "_client" {
        c.client = msg.(resource.Manager)
        return nil
    }

    if port == "create-flow" {
        // Create a new flow
        flow, err := c.client.CreateFlow(ctx, resource.CreateFlowRequest{
            Name:    "dynamic-flow",
            Project: "my-project",
        })

        // Create nodes in the flow
        node, err := c.client.CreateNode(ctx, resource.CreateNodeRequest{
            Name:      "node-1",
            Flow:      flow.Name,
            Module:    "http-module",
            Component: "request",
            Settings: map[string]any{
                "url": "https://api.example.com",
            },
        })

        // Trigger the node
        c.client.CreateSignal(ctx, resource.CreateSignalRequest{
            Node: node.Name,
            Port: "trigger",
            Data: map[string]any{},
        })
    }

    return nil
}
Observability

OpenTelemetry is built-in. The context includes a span:

import "go.opentelemetry.io/otel"

func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
    // Get tracer
    tracer := otel.Tracer("my-module")

    // Create child span
    ctx, span := tracer.Start(ctx, "processing")
    defer span.End()

    // Add attributes
    span.SetAttributes(
        attribute.String("input.size", "large"),
    )

    // Do work...
    result := doWork(ctx)

    output(ctx, "output", result)
    return nil
}
Testing Components
package components_test

import (
    "context"
    "testing"
    "github.com/myorg/my-module/components"
)

func TestHello(t *testing.T) {
    component := &components.Hello{}

    var outputData *components.HelloOutput
    outputHandler := func(ctx context.Context, port string, data any) any {
        if port == "output" {
            outputData = data.(*components.HelloOutput)
        }
        return nil
    }

    // Send message to input port
    component.Handle(context.Background(), outputHandler, "input", &components.HelloInput{
        Name: "World",
    })

    // Verify output
    if outputData.Greeting != "Hello, World!" {
        t.Errorf("Expected 'Hello, World!', got '%s'", outputData.Greeting)
    }
}
Best Practices
  1. Keep Components Focused: Each component should do one thing well
  2. Use System Ports: Implement _reconcile for cleanup and state sync
  3. Handle Context Cancellation: Respect ctx.Done() for graceful shutdown
  4. Leverage Schemas: Use struct tags to create great UI experiences
  5. Share Definitions: Mark output fields as shared:true for type-safe flows
  6. Use Permanent Errors: Don't retry validation errors or user mistakes
  7. Add Observability: Create spans for long operations
  8. Document with Tags: Use meaningful tags in GetInfo() for discoverability
Example Modules

Check out these example modules for reference:

  • common-module: Basic utilities (delay, switch, merge)
  • http-module: HTTP client/server components
  • grpc-module: gRPC service components
CLI Reference

The SDK includes a CLI for running and building modules:

# Run module locally
./my-module run --name=my-module --version=1.0.0 --namespace=tinysystems

# Build (if custom build logic is needed)
./my-module build

# Get help
./my-module --help

License

Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Directories

Path Synopsis
api
v1alpha1
Package v1alpha1 contains API Schema definitions for the operator v1alpha1 API group +kubebuilder:object:generate=true +groupName=operator.tinysystems.io
Package v1alpha1 contains API Schema definitions for the operator v1alpha1 API group +kubebuilder:object:generate=true +groupName=operator.tinysystems.io
internal
pkg
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL