dwarf

package module
v0.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: Apache-2.0 Imports: 1 Imported by: 0

README

License Apache 2 Go Reference Test Go Report Card Discord

Dwarf is a standalone, embeddable workflow-orchestration engine for Go.

You describe a workflow as a graph of tasks; dwarf runs it — dispatching one task at a time per step, persisting state between steps in a SQL database, and handling the hard parts of durable orchestration: parallel fan-out/fan-in, conditional routing, retries with backoff, timed sleeps, subgraphs, and human-in-the-loop pauses.

Dwarf has no built-in transport. It doesn't know how your tasks are reached (a local function call, an RPC, a message bus) or where your graphs live. You wire it to your world through a few small dependency interfaces, and it handles scheduling, state, durability, and recovery.

g := workflow.NewGraph("Greet")
g.SetEndpoint("Hello", "http://example/hello") // node "Hello" dispatches to this endpoint URL
g.AddTransition("Hello", workflow.END)

proxy := engine.NewTestProxy()
proxy.HandleGraph("http://example/greet", g)
proxy.HandleTask("http://example/hello", func(ctx context.Context, f *workflow.Flow) error {
    f.SetString("greeting", "hello "+f.GetString("name"))
    return nil
})

eng := dwarf.NewEngine()
eng.SetHost(proxy) // TestProxy implements the Host interface
eng.RunInTest(t)   // SQLite in-memory, auto-cleanup

_, out, _ := eng.Run(ctx, "http://example/greet", map[string]any{"name": "ada"}, nil) // Run returns (flowKey, outcome, err)
fmt.Println(out.State["greeting"]) // hello ada

Why dwarf

  • Durable by construction. Every step is checkpointed to SQL. A crashed worker's in-flight step is recovered by lease expiry; a flow can be inspected, resumed, restarted, or continued days later.
  • Parallelism that merges cleanly. Static and dynamic (forEach) fan-out run branches concurrently; fan-in merges their state with per-field reducers (append, add, union, merge, …).
  • Human-in-the-loop. A task can Interrupt to park the flow for external input and Resume later — approvals, manual review, async callbacks.
  • Fair and prioritized. Two-level scheduling: strict priority bands across the cluster, weighted fairness within a band so one tenant can't starve another.
  • Scales horizontally. Run many replicas against sharded databases; replicas coordinate through fire-and-forget peer signals you publish however you like.
  • OTEL-native observability. Structured logs (slog), 10 dwarf_* metrics, and distributed tracing, all through standard providers you inject.
  • Four SQL dialects. PostgreSQL, MySQL/MariaDB, SQL Server, and SQLite (testing / single-instance).

Dwarf depends only on sequel (SQL), plus the OpenTelemetry API.

Install

go get github.com/microbus-io/dwarf

Requires Go 1.26+.

Packages

Import Role Who imports it
github.com/microbus-io/dwarf Thin convenience: NewEngine() The host process
github.com/microbus-io/dwarf/engine The engine: lifecycle, operations, config, the Host interface The host process only
github.com/microbus-io/dwarf/workflow Pure types: Graph, Flow, FlowOptions, reducers, error helpers Any code that defines tasks or graphs

The split matters: dwarf/workflow is a lightweight type package. Code that defines tasks and graphs imports only dwarf/workflow, never the engine, so the engine's heavy dependencies (SQL drivers, the scheduler) stay out of those builds.

The host model

The engine reaches the outside world through a single Host interface, registered once with SetHost. Only the first two methods are required; an implementation does nothing in the rest when it has no stop-notification need or runs single-replica.

type Host interface {
    // Required. Fetch a workflow graph by name (called at Create; the graph is then frozen on the flow,
    // and on subgraph spawn).
    LoadGraph(ctx context.Context, workflowURL string) (*workflow.Graph, error)

    // Required. Execute one task. The Flow carrier arrives with its input state populated; write outputs.
    ExecuteTask(ctx context.Context, taskName string, flow *workflow.Flow) error

    // Optional. Fired when a flow stops, for flows created with FlowOptions.NotifyOnStop.
    // The flow's baggage is on ctx; resolve where to deliver from it (the engine carries no address).
    FlowStopped(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome)

    // Optional. Ship one cross-replica coordination signal to the other replicas (no-op for
    // single-replica). op is a routing key; payload is opaque bytes. Peers hand it back via
    // eng.DeliverSignal(ctx, op, payload).
    SignalPeers(ctx context.Context, op string, payload []byte)
}

A standalone host backs LoadGraph with an in-memory registry / file / database, and ExecuteTask with a local function table or an RPC client. A bus-based host (for example a microservice mesh) bridges them to its transport. The engine never learns how tasks are reached.

Production wiring

Each Set* returns an error (there is no fluent With* builder — dropping the chained return is what lets every setter surface its error, so misconfiguration fails loudly at wiring time):

eng := dwarf.NewEngine()
check := func(err error) {
    if err != nil {
        log.Fatal(err)
    }
}
check(eng.SetDSN("postgres://user:pass@db:5432/dwarf"))
check(eng.SetNumShards(2))
check(eng.SetWorkers(64))
check(eng.SetHost(host))
check(eng.SetLogger(slog.Default()))
check(eng.SetMeterProvider(otel.GetMeterProvider()))
check(eng.SetTracerProvider(otel.GetTracerProvider()))

if err := eng.Startup(ctx); err != nil {
    log.Fatal(err)
}
defer eng.Shutdown(ctx)

flowKey, err := eng.Create(ctx, "checkout", initialState, &workflow.FlowOptions{
    Priority:    10,
    FairnessKey: tenantID,
    Baggage:     actorClaims,
})
eng.Start(ctx, flowKey)
outcome, err := eng.Await(ctx, flowKey)

The With* methods are atomic and may be called after Startup for hot reconfiguration.

Database support

Engine Use Notes
PostgreSQL 13+ Recommended for production MVCC, no gap locks; fan-out runs deadlock-free at any concurrency
SQL Server Production Enable READ_COMMITTED_SNAPSHOT for non-blocking reads
MySQL / MariaDB Production, expect tuning Prefer READ-COMMITTED isolation to drop gap locks
SQLite Testing & single-instance dev only Used automatically by RunInTest; do not run in production

See docs/deployment.md for tuning, sharding, and connection-pool guidance.

Documentation

Full guides live in docs/:

API reference: pkg.go.dev/github.com/microbus-io/dwarf.

License

Apache License 2.0. See LICENSE.

Documentation

Overview

Package dwarf is a standalone, embeddable workflow-orchestration engine.

Dwarf executes workflow graphs: it dispatches tasks, manages state between steps, and handles fan-out/fan-in, retries, sleeps, conditional routing, subgraphs, and human-in-the-loop interrupts. It is library code with no built-in transport: a host application wires it to its own task execution, graph storage, and observability through a small set of injected dependency interfaces (see the engine package). It depends only on a SQL database (via sequel).

This root package is a thin convenience: NewEngine returns an *engine.Engine. The real API lives in two sub-packages:

  • github.com/microbus-io/dwarf/engine - the engine: Startup/Shutdown, the Create/Run/Await operations, configuration, and the dependency interfaces. Import this only in the process that hosts the engine.
  • github.com/microbus-io/dwarf/workflow - the pure types: Graph, Flow, FlowOptions, FlowOutcome, and reducers. Import this in code that defines tasks and graphs; it has no heavy dependencies.

A 30-second taste, using the in-process test harness:

proxy := engine.NewTestProxy()
g := workflow.NewGraph("Greet")
g.SetEndpoint("Hello", "http://example/hello") // node "Hello" dispatches to this endpoint URL
g.AddTransition("Hello", workflow.END)
proxy.HandleGraph("http://example/greet", g)
proxy.HandleTask("http://example/hello", func(ctx context.Context, f *workflow.Flow) error {
	f.SetString("greeting", "hello "+f.GetString("name"))
	return nil
})

eng := dwarf.NewEngine()
eng.SetHost(proxy)
eng.RunInTest(t) // SQLite in-memory, auto cleanup

_, out, _ := eng.Run(ctx, "http://example/greet", map[string]any{"name": "ada"}, nil)
fmt.Println(out.State["greeting"]) // hello ada

See the docs/ directory in the repository for guides on graphs, tasks, scheduling, observability, and deployment.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewEngine

func NewEngine() *engine.Engine

NewEngine creates a new workflow engine with default settings.

Types

This section is empty.

Directories

Path Synopsis
Package engine is the dwarf workflow-orchestration engine.
Package engine is the dwarf workflow-orchestration engine.
Package workflow holds the pure data types of the dwarf workflow engine: the building blocks a host uses to define workflows and the carriers it reads and writes when running tasks.
Package workflow holds the pure data types of the dwarf workflow engine: the building blocks a host uses to define workflows and the carriers it reads and writes when running tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL