dwarf

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2026 License: Apache-2.0 Imports: 1 Imported by: 0

README

License Apache 2 Go Reference Test Go Report Card Discord

Dwarf is a standalone, embeddable workflow-orchestration engine for Go.

You describe a workflow as a graph of tasks; dwarf runs it — dispatching one task at a time per step, persisting state between steps in a SQL database, and handling the hard parts of durable orchestration: parallel fan-out/fan-in, conditional routing, retries with backoff, timed sleeps, subgraphs, and human-in-the-loop pauses.

Dwarf has no built-in transport. It doesn't know how your tasks are reached (a local function call, an RPC, a message bus) or where your graphs live. You wire it to your world through a few small dependency interfaces, and it handles scheduling, state, durability, and recovery.

g := workflow.NewGraph("Greet")
g.SetEndpoint("Hello", "http://example/hello") // node "Hello" dispatches to this endpoint URL
g.AddTransition("Hello", workflow.END)

proxy := engine.NewTestProxy()
proxy.HandleGraph("http://example/greet", g)
proxy.HandleTask("http://example/hello", func(ctx context.Context, f *workflow.Flow) error {
    f.SetString("greeting", "hello "+f.GetString("name"))
    return nil
})

eng := dwarf.NewEngine()
eng.SetHost(proxy) // TestProxy implements the Host interface
eng.RunInTest(t)   // SQLite in-memory, auto-cleanup

_, out, _ := eng.Run(ctx, "http://example/greet", map[string]any{"name": "ada"}, nil) // Run returns (flowKey, outcome, err)
fmt.Println(out.State["greeting"]) // hello ada

Why dwarf

  • Durable by construction. Every step is checkpointed to SQL. A crashed worker's in-flight step is recovered by lease expiry; a flow can be inspected, resumed, forked, or continued days later.
  • Parallelism that merges cleanly. Static and dynamic (forEach) fan-out run branches concurrently; fan-in merges their state with per-field reducers (append, add, union, merge, …).
  • Human-in-the-loop. A task can Interrupt to park the flow for external input and Resume later — approvals, manual review, async callbacks.
  • Fair and prioritized. Two-level scheduling: strict priority bands across the cluster, weighted fairness within a band so one tenant can't starve another.
  • Scales horizontally. Run many replicas against sharded databases; replicas coordinate through fire-and-forget peer signals you publish however you like.
  • OTEL-native observability. Structured logs (slog), 10 dwarf_* metrics, and distributed tracing, all through standard providers you inject.
  • Four SQL dialects. PostgreSQL, MySQL/MariaDB, SQL Server, and SQLite (testing / single-instance).

Dwarf depends only on sequel (SQL), plus the OpenTelemetry API.

Install

go get github.com/microbus-io/dwarf

Requires Go 1.26+.

Packages

Import Role Who imports it
github.com/microbus-io/dwarf Thin convenience: NewEngine() The host process
github.com/microbus-io/dwarf/engine The engine: lifecycle, operations, config, the Host interface The host process only
github.com/microbus-io/dwarf/workflow Pure types: Graph, Flow, FlowOptions, reducers, error helpers Any code that defines tasks or graphs

The split matters: dwarf/workflow is a lightweight type package. Code that defines tasks and graphs imports only dwarf/workflow, never the engine, so the engine's heavy dependencies (SQL drivers, the scheduler) stay out of those builds.

The host model

The engine reaches the outside world through a single Host interface, registered once with SetHost. The first two methods are required; SignalPeers does nothing for a single-replica host.

type Host interface {
    // Required. Fetch a workflow graph by name (called at Create; the graph is then frozen on the flow,
    // and on subgraph spawn).
    LoadGraph(ctx context.Context, workflowURL string) (*workflow.Graph, error)

    // Required. Execute one task. The Flow carrier arrives with its input state populated; write outputs.
    ExecuteTask(ctx context.Context, taskName string, flow *workflow.Flow) error

    // Optional. Ship one cross-replica coordination signal to the other replicas (no-op for
    // single-replica). op is a routing key; payload is opaque bytes. Peers hand it back via
    // eng.DeliverSignal(ctx, op, payload).
    SignalPeers(ctx context.Context, op string, payload []byte)
}

A standalone host backs LoadGraph with an in-memory registry / file / database, and ExecuteTask with a local function table or an RPC client. A bus-based host (for example a microservice mesh) bridges them to its transport. The engine never learns how tasks are reached.

Production wiring

Each Set* returns an error, so misconfiguration fails loudly at wiring time:

eng := dwarf.NewEngine()
eng.SetDSN("postgres://user:pass@db:5432/dwarf")
eng.SetNumShards(2)
eng.SetWorkers(64)
eng.SetHost(host)
eng.SetLogger(slog.Default())
eng.SetMeterProvider(otel.GetMeterProvider())
eng.SetTracerProvider(otel.GetTracerProvider())

err := eng.Startup(ctx)
if err != nil {
    log.Fatal(err)
}
defer eng.Shutdown(ctx)

flowKey, err := eng.Create(ctx, "checkout", initialState, &workflow.FlowOptions{
    Priority:    10,
    FairnessKey: tenantID,
    Baggage:     actorClaims,
})
outcome, err := eng.Await(ctx, flowKey) // Create returns a running flow; there is no separate start call

The live Set* methods (SetMaxOpenConns, SetWorkersPerConn, SetTimeBudget, SetDefaultPriority) may be called after Startup for hot reconfiguration; the rest are construction-time only.

Database support

Engine Use Notes
PostgreSQL 13+ Recommended for production MVCC, no gap locks; fan-out runs deadlock-free at any concurrency
SQL Server Production Enable READ_COMMITTED_SNAPSHOT for non-blocking reads
MySQL / MariaDB Production, expect tuning Prefer READ-COMMITTED isolation to drop gap locks
SQLite Testing & single-instance dev only Used automatically by RunInTest; do not run in production

See docs/deployment.md for tuning, sharding, and connection-pool guidance.

Documentation

Full guides live in docs/:

API reference: pkg.go.dev/github.com/microbus-io/dwarf.

License

Apache License 2.0. See LICENSE.

Documentation

Overview

Package dwarf is a standalone, embeddable workflow-orchestration engine.

Dwarf executes workflow graphs: it dispatches tasks, manages state between steps, and handles fan-out/fan-in, retries, sleeps, conditional routing, subgraphs, and human-in-the-loop interrupts. It is library code with no built-in transport: a host application wires it to its own task execution, graph storage, and observability through a small set of injected dependency interfaces (see the engine package). It depends only on a SQL database (via sequel).

This root package is a thin convenience: NewEngine returns an *engine.Engine. The real API lives in two sub-packages:

  • github.com/microbus-io/dwarf/engine - the engine: Startup/Shutdown, the Create/Run/Await operations, configuration, and the dependency interfaces. Import this only in the process that hosts the engine.
  • github.com/microbus-io/dwarf/workflow - the pure types: Graph, Flow, FlowOptions, FlowOutcome, and reducers. Import this in code that defines tasks and graphs; it has no heavy dependencies.

A 30-second taste, using the in-process test harness:

proxy := engine.NewTestProxy()
g := workflow.NewGraph("Greet")
g.SetEndpoint("Hello", "http://example/hello") // node "Hello" dispatches to this endpoint URL
g.AddTransition("Hello", workflow.END)
proxy.HandleGraph("http://example/greet", g)
proxy.HandleTask("http://example/hello", func(ctx context.Context, f *workflow.Flow) error {
	f.SetString("greeting", "hello "+f.GetString("name"))
	return nil
})

eng := dwarf.NewEngine()
eng.SetHost(proxy)
eng.RunInTest(t) // SQLite in-memory, auto cleanup

_, out, _ := eng.Run(ctx, "http://example/greet", map[string]any{"name": "ada"}, nil)
fmt.Println(out.State["greeting"]) // hello ada

See the docs/ directory in the repository for guides on graphs, tasks, scheduling, observability, and deployment.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewEngine

func NewEngine() *engine.Engine

NewEngine creates a new workflow engine with default settings.

Types

This section is empty.

Directories

Path Synopsis
Package engine is the dwarf workflow-orchestration engine.
Package engine is the dwarf workflow-orchestration engine.
internal
candidatecache
Package candidatecache holds the per-replica bounded set of step candidates produced by the engine's refiller.
Package candidatecache holds the per-replica bounded set of step candidates produced by the engine's refiller.
database
Package database owns the engine's sharded SQL connections: it opens and migrates every shard, routes by 1-based shard index, fans an operation out over all shards, and closes them.
Package database owns the engine's sharded SQL connections: it opens and migrates every shard, routes by 1-based shard index, fans an operation out over all shards, and closes them.
keys
Package keys encodes and decodes the engine's composite flow and step keys.
Package keys encodes and decodes the engine's composite flow and step keys.
lru
Package lru provides a small thread-safe LRU cache with a per-entry TTL.
Package lru provides a small thread-safe LRU cache with a per-entry TTL.
Package workflow holds the pure data types of the dwarf workflow engine: the building blocks a host uses to define workflows and the carriers it reads and writes when running tasks.
Package workflow holds the pure data types of the dwarf workflow engine: the building blocks a host uses to define workflows and the carriers it reads and writes when running tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL