workflow

package module
v0.0.0-...-dac86b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: MIT Imports: 26 Imported by: 0

README

Workflow Engine

Go License Built on Modular

A production-grade, configuration-driven workflow orchestration engine built on CrisisTextLine/modular v1.11.11. Define entire applications in YAML -- from API servers to multi-service chat platforms -- with 65+ module types, dynamic hot-reload, AI-powered generation, and a visual builder UI.

What It Does

The workflow engine turns YAML configuration files into running applications. No code changes needed. The same codebase can operate as:

  • A RESTful API server with JWT authentication and middleware chains
  • An event-driven pipeline with Kafka messaging and state machines
  • A multi-service platform with Docker Compose, reverse proxies, and observability
  • An AI-assisted workflow builder with drag-and-drop visual editing
modules:
  - name: http-server
    type: http.server
    config:
      address: ":8080"
  - name: router
    type: chimux.router
  - name: auth
    type: auth.jwt
    config:
      secret: "${JWT_SECRET}"
  - name: orders-api
    type: api.handler
    config:
      resourceName: orders
      operations: [list, get, create, update]

workflows:
  http:
    routes:
      - method: GET
        path: /api/orders
        handler: orders-api
        middleware: [auth]

Features

65+ Module Types Across 10 Categories
Category Count Types
HTTP 10 http.server, http.router, http.handler, http.middleware.{auth, cors, logging, ratelimit, requestid, securityheaders}, http.proxy, http.simple_proxy
Messaging 6 messaging.broker, messaging.broker.eventbus, messaging.handler, messaging.nats, messaging.kafka, notification.slack
State Machine 4 statemachine.engine, state.tracker, state.connector, processing.step
Pipeline Steps 12 step.validate, step.transform, step.conditional, step.set, step.log, step.publish, step.http_call, step.delegate, step.request_parse, step.db_query, step.db_exec, step.json_response
API & CQRS 3 api.handler, api.command, api.query
Modular Framework 10 httpserver, httpclient, chimux, scheduler, auth, eventbus, cache, database, eventlogger, jsonschema
Storage/Persistence 8 database.workflow, persistence.store, storage.s3, storage.gcs, storage.local, storage.sqlite, static.fileserver, database.modular
Observability 5 metrics.collector, health.checker, observability.otel, log.collector, eventlogger.modular
Auth 3 auth.jwt, auth.modular, auth.user-store
Other 6 data.transformer, webhook.sender, dynamic.component, secrets.vault, secrets.aws, workflow.registry
Triggers 5 http, schedule, event, eventbus, mock
Security
  • JWT Authentication with user registration, login, token generation/validation, role-based claims, and bcrypt password hashing
  • PII Encryption at Rest using AES-256-GCM with SHA-256 key derivation -- configurable field-level encryption integrated with PersistenceStore and Kafka payloads
  • Middleware Chain: CORS, rate limiting, request ID propagation, auth enforcement
Dynamic Component Hot-Reload

Load Go components at runtime without restarting the server. The Yaegi interpreter provides:

  • Sandboxed execution with stdlib-only import validation
  • File watcher for automatic hot-reload on save
  • Component registry with full lifecycle management (init, start, stop)
  • HTTP API: POST/GET/DELETE /api/dynamic/components
AI-Powered Workflow Generation

Hybrid AI integration with two providers:

  • Anthropic Claude -- direct API with tool use for component generation and validation
  • GitHub Copilot SDK -- session-based integration for development workflows
  • Automatic validation loop with compile-test-retry cycle
  • Natural language to YAML workflow generation
EventBus Integration
  • Native EventBus bridge adapting MessageBroker to EventBus
  • Workflow lifecycle events: workflow.started, workflow.completed, workflow.failed, step.started, step.completed, step.failed
  • EventBus trigger for native subscription-based workflow activation
  • Topic filtering and async mode support
Visual Workflow Builder (ReactFlow UI)
  • Drag-and-drop node palette with all 65+ module types across categorized sections
  • Property panel for node configuration with type-specific fields
  • YAML import/export with round-trip fidelity
  • Undo/redo, validation (local + server), Zustand state management
Observability
  • Prometheus metrics collection with 6 pre-registered metric vectors
  • Grafana dashboards for platform monitoring
  • OpenTelemetry tracing via OTLP/HTTP export
  • Health check endpoints: /health, /ready, /live
  • Request ID propagation via X-Request-ID header
Dynamic Field Mapping

Schema-agnostic field resolution for REST API handlers:

  • FieldMapping type with fallback chains and primary/resolve/set operations
  • Configurable field aliases in YAML (fieldMapping, transitionMap, summaryFields)
  • Runtime field resolution from workflow context

Quick Start

Requirements
  • Go 1.26+
  • Node.js 18+ (for UI development)
Run the Server
# Clone the repository
git clone https://github.com/GoCodeAlone/workflow.git
cd workflow

# Build and run with an example config
go build -o server ./cmd/server
./server -config example/order-processing-pipeline.yaml

# Or run directly
go run ./cmd/server -config example/order-processing-pipeline.yaml

The server starts on :8080 by default. Override with -addr :9090.

Run the Visual Builder
cd ui
npm install
npm run dev

Opens at http://localhost:5173 with hot module replacement.

Run with Docker
# Chat platform (multi-service with Kafka, Prometheus, Grafana)
cd example/chat-platform
docker compose up

# E-commerce app
cd example/ecommerce-app
docker compose up

Example Applications

Chat Platform -- Production-Grade Mental Health Support

A 73-file, multi-service platform demonstrating the full capabilities of the engine. Located in example/chat-platform/.

Architecture:

Browser -> [gateway:8080] -> reverse proxy -> [api:8081]          (auth, CRUD, admin)
                                           -> [conversation:8082] (chat, state machine)

[conversation] <-> [Kafka] <-> [conversation]  (event-driven messaging)
[prometheus] -> [grafana]                       (observability)

Highlights:

  • 6 Docker Compose services: gateway, API, conversation, Kafka, Prometheus, Grafana
  • 18 dynamic components: AI summarizer, PII encryptor, risk tagger, conversation router, survey engine, escalation handler, keyword matcher, and more
  • Full SPA with role-based views: admin, responder, and supervisor dashboards
  • Conversation state machine with 13 states and 18 transitions (queued -> assigned -> active -> wrap_up -> closed)
  • Real-time risk assessment with keyword pattern matching across 5 categories (self-harm, suicidal ideation, crisis, substance abuse, domestic violence)
  • PII masking in UI, field-level encryption at rest
  • Webchat widget, SMS providers (Twilio, AWS, partner webhooks)
  • Seed data system for users, affiliates, programs, keywords, and surveys
Order Processing Pipeline

A 10+ module workflow demonstrating module composition with HTTP servers, routers, handlers, data transformers, state machines, message brokers, and observability. See example/order-processing-pipeline.yaml.

100+ Example Configurations

The example/ directory contains configurations covering:

  • API gateways and reverse proxies
  • Event-driven and scheduled workflows
  • State machine lifecycle management
  • Data transformation and webhook delivery
  • Multi-workflow composition
  • Real-time messaging
  • Dependency injection patterns
  • Multi-tenant scenarios

Each example includes a companion .md file documenting its architecture and usage.

Architecture

cmd/server/          Server binary, HTTP mux, graceful shutdown
  main.go            Entry point with CLI flags and AI provider init

config/              YAML config structs (WorkflowConfig, ModuleConfig)
module/              65+ built-in module implementations
handlers/            5 workflow handler types:
                       HTTP, Messaging, StateMachine, Scheduler, Integration
dynamic/             Yaegi-based hot-reload system
ai/                  AI integration layer
  llm/                 Anthropic Claude direct API with tool use
  copilot/             GitHub Copilot SDK with session management
  service.go           Provider selection and orchestration
  deploy.go            Validation loop and deployment to dynamic components
ui/                  React + ReactFlow + Zustand visual builder (Vite, TypeScript)
example/             100+ YAML configs and 2 full application examples
mock/                Test helpers and mock implementations

Core flow:

  1. StdEngine loads YAML config via BuildFromConfig()
  2. Each module definition is matched to a factory (65+ built-in types) and instantiated
  3. Modules register with the modular Application (dependency injection, service registry)
  4. Workflow handlers (HTTP, Messaging, StateMachine, Scheduler, Integration) configure workflows
  5. Triggers (HTTP endpoints, EventBus subscriptions, cron schedules) start the system
  6. TriggerWorkflow() dispatches incoming events to the correct handler, emitting lifecycle events

Key interfaces:

  • modular.Module -- all components implement Name(), Dependencies(), Configure()
  • WorkflowHandler -- CanHandle(), ConfigureWorkflow(), ExecuteWorkflow()
  • Trigger -- Name(), Start(ctx), Stop(ctx)
Adding a New Module Type
  1. Implement the module in module/
  2. Register it in engine.go's BuildFromConfig switch statement
  3. Add an example YAML config in example/
Adding a New Workflow Handler
  1. Implement the WorkflowHandler interface in handlers/
  2. Register with engine.RegisterWorkflowHandler() in cmd/server/main.go

Testing

# All Go tests
go test ./...

# With race detection
go test -race ./...

# With coverage
go test -cover ./...

# Single test
go test -v -run TestName .

# UI component tests (Vitest)
cd ui && npm test

# UI E2E tests (Playwright)
cd ui && npx playwright test

# Lint
go fmt ./...
golangci-lint run
cd ui && npm run lint

Test coverage targets: root package 80%+, module 80%+, dynamic 80%+, AI packages 85%+.

Technology

Component Technology
Language Go 1.26
Framework CrisisTextLine/modular v1.11.11
UI React, ReactFlow, Zustand, Vite, TypeScript
Hot-Reload Yaegi Go interpreter
Messaging Apache Kafka (Sarama), NATS, EventBus
Database SQLite (modernc), PostgreSQL (pgx)
Storage AWS S3
Auth JWT (golang-jwt), OAuth2
Encryption AES-256-GCM, bcrypt
Metrics Prometheus, Grafana
Tracing OpenTelemetry (OTLP/HTTP)
AI Anthropic Claude API, GitHub Copilot SDK
Containers Docker multi-stage builds, Docker Compose
Testing Go testing, Vitest, Playwright

Roadmap

See ROADMAP.md for the full development history (Phases 1-6 complete) and planned work including JSON Schema config validation, performance benchmarks, Helm charts, and security hardening.

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Engine

type Engine interface {
	RegisterWorkflowHandler(handler WorkflowHandler)
	RegisterTrigger(trigger module.Trigger)
	AddModuleType(moduleType string, factory ModuleFactory)
	BuildFromConfig(cfg *config.WorkflowConfig) error
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	TriggerWorkflow(ctx context.Context, workflowType string, action string, data map[string]any) error
}

type EngineBuilderFunc

type EngineBuilderFunc func(cfg *config.WorkflowConfig, logger *slog.Logger) (*StdEngine, modular.Application, error)

EngineBuilderFunc is called by the manager to create and configure an engine from a parsed workflow config. The caller is responsible for registering workflow handlers, dynamic components, and other setup. The function must call BuildFromConfig on the engine before returning.

type ManagedEngine

type ManagedEngine struct {
	WorkflowID uuid.UUID
	Engine     *StdEngine
	App        modular.Application
	Status     string // "running", "stopped", "error"
	StartedAt  time.Time
	Error      error
	// contains filtered or unexported fields
}

ManagedEngine holds a running workflow engine along with its metadata.

func (*ManagedEngine) GetEngine

func (me *ManagedEngine) GetEngine() module.TriggerWorkflower

GetEngine returns the underlying engine, satisfying the module.triggerableEngine interface so the CrossWorkflowRouter can trigger workflows via duck-typing.

type ModuleFactory

type ModuleFactory func(name string, config map[string]any) modular.Module

ModuleFactory is a function that creates a module from a name and configuration

type PipelineAdder

type PipelineAdder interface {
	AddPipeline(name string, p *module.Pipeline)
}

PipelineAdder is implemented by workflow handlers that can receive named pipelines. This allows the engine to add pipelines without importing the handlers package.

type RoutePipelineSetter

type RoutePipelineSetter interface {
	SetRoutePipeline(routePath string, pipeline *module.Pipeline)
}

buildPipelineSteps creates PipelineStep instances from step configurations. RoutePipelineSetter is implemented by handlers (QueryHandler, CommandHandler) that support per-route pipelines.

type StartStopModule

type StartStopModule interface {
	modular.Module
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

StartStopModule extends the basic Module interface with lifecycle methods

type StdEngine

type StdEngine struct {
	// contains filtered or unexported fields
}

StdEngine represents the workflow execution engine

func NewStdEngine

func NewStdEngine(app modular.Application, logger modular.Logger) *StdEngine

NewStdEngine creates a new workflow engine

func (*StdEngine) AddModuleType

func (e *StdEngine) AddModuleType(moduleType string, factory ModuleFactory)

AddModuleType registers a factory function for a module type

func (*StdEngine) AddStepType

func (e *StdEngine) AddStepType(stepType string, factory module.StepFactory)

AddStepType registers a pipeline step factory for the given step type.

func (*StdEngine) App

func (e *StdEngine) App() modular.Application

App returns the underlying modular.Application.

func (*StdEngine) BuildFromConfig

func (e *StdEngine) BuildFromConfig(cfg *config.WorkflowConfig) error

BuildFromConfig builds a workflow from configuration

func (*StdEngine) GetApp

func (e *StdEngine) GetApp() modular.Application

GetApp returns the underlying modular Application.

func (*StdEngine) GetStepRegistry

func (e *StdEngine) GetStepRegistry() *module.StepRegistry

GetStepRegistry returns the engine's pipeline step registry.

func (*StdEngine) RegisterTrigger

func (e *StdEngine) RegisterTrigger(trigger module.Trigger)

RegisterTrigger registers a trigger with the engine

func (*StdEngine) RegisterWorkflowHandler

func (e *StdEngine) RegisterWorkflowHandler(handler WorkflowHandler)

RegisterWorkflowHandler adds a workflow handler to the engine

func (*StdEngine) SecretsResolver

func (e *StdEngine) SecretsResolver() *secrets.MultiResolver

SecretsResolver returns the engine's multi-provider secrets resolver.

func (*StdEngine) SetDynamicLoader

func (e *StdEngine) SetDynamicLoader(loader *dynamic.Loader)

SetDynamicLoader sets the dynamic component loader on the engine. When set, dynamic.component modules can load from source files via the "source" config key.

func (*StdEngine) SetDynamicRegistry

func (e *StdEngine) SetDynamicRegistry(registry *dynamic.ComponentRegistry)

SetDynamicRegistry sets the dynamic component registry on the engine.

func (*StdEngine) Start

func (e *StdEngine) Start(ctx context.Context) error

Start starts all modules and triggers

func (*StdEngine) Stop

func (e *StdEngine) Stop(ctx context.Context) error

Stop stops all modules and triggers

func (*StdEngine) TriggerWorkflow

func (e *StdEngine) TriggerWorkflow(ctx context.Context, workflowType string, action string, data map[string]any) error

TriggerWorkflow starts a workflow based on a trigger

type WorkflowEngineManager

type WorkflowEngineManager struct {
	// contains filtered or unexported fields
}

WorkflowEngineManager manages multiple concurrent workflow engine instances.

func NewWorkflowEngineManager

func NewWorkflowEngineManager(wfStore store.WorkflowStore, linkStore store.CrossWorkflowLinkStore, logger *slog.Logger, engineBuilder EngineBuilderFunc) *WorkflowEngineManager

NewWorkflowEngineManager creates a new manager for workflow engine instances. The engineBuilder function is called to create each new engine instance, allowing the caller to register handlers and configure the dynamic system.

func (*WorkflowEngineManager) DeployWorkflow

func (m *WorkflowEngineManager) DeployWorkflow(ctx context.Context, workflowID uuid.UUID) error

DeployWorkflow loads config from the store, creates an isolated engine, and starts it.

func (*WorkflowEngineManager) GetStatus

func (m *WorkflowEngineManager) GetStatus(workflowID uuid.UUID) (*WorkflowStatus, error)

GetStatus returns the runtime status of a workflow.

func (*WorkflowEngineManager) ListActive

func (m *WorkflowEngineManager) ListActive() []WorkflowStatus

ListActive returns the status of all running workflows.

func (*WorkflowEngineManager) ReloadWorkflow

func (m *WorkflowEngineManager) ReloadWorkflow(ctx context.Context, workflowID uuid.UUID) error

ReloadWorkflow stops and redeploys a workflow.

func (*WorkflowEngineManager) Router

Router returns the cross-workflow event router.

func (*WorkflowEngineManager) StopAll

func (m *WorkflowEngineManager) StopAll(ctx context.Context) error

StopAll gracefully stops all running engines.

func (*WorkflowEngineManager) StopWorkflow

func (m *WorkflowEngineManager) StopWorkflow(ctx context.Context, workflowID uuid.UUID) error

StopWorkflow gracefully stops a running engine.

type WorkflowHandler

type WorkflowHandler interface {
	// CanHandle returns true if this handler can process the given workflow type
	CanHandle(workflowType string) bool

	// ConfigureWorkflow sets up the workflow from configuration
	ConfigureWorkflow(app modular.Application, workflowConfig any) error

	// ExecuteWorkflow executes a workflow with the given action and input data
	ExecuteWorkflow(ctx context.Context, workflowType string, action string, data map[string]any) (map[string]any, error)
}

WorkflowHandler interface for handling different workflow types

type WorkflowStatus

type WorkflowStatus struct {
	WorkflowID  uuid.UUID     `json:"workflow_id"`
	Status      string        `json:"status"`
	StartedAt   time.Time     `json:"started_at"`
	Uptime      time.Duration `json:"uptime"`
	Error       string        `json:"error,omitempty"`
	ModuleCount int           `json:"module_count"`
}

WorkflowStatus describes the current runtime state of a managed workflow.

Directories

Path Synopsis
Package admin provides the built-in workflow admin UI configuration.
Package admin provides the built-in workflow admin UI configuration.
ai
examples
Package examples contains complete examples of AI-generated workflows.
Package examples contains complete examples of AI-generated workflows.
llm
auth
cmd
server command
wfctl command
Package debug provides interactive workflow debugging with breakpoint support, step-through execution, and state inspection.
Package debug provides interactive workflow debugging with breakpoint support, step-through execution, and state inspection.
Package handlers provides workflow handling capabilities
Package handlers provides workflow handling capabilities
Package mock provides common mock implementations for testing
Package mock provides common mock implementations for testing
Package module defines core interfaces for the workflow engine
Package module defines core interfaces for the workflow engine
sla
sdk
aws
gcp
Package sandbox provides Docker-based sandboxed execution for CI/CD pipeline steps.
Package sandbox provides Docker-based sandboxed execution for CI/CD pipeline steps.
Package schema provides JSON Schema generation and validation for workflow configuration files.
Package schema provides JSON Schema generation and validation for workflow configuration files.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL