Documentation
¶
Overview ¶
Package engine is the dwarf workflow-orchestration engine.
The engine executes workflow graphs against a SQL database, scheduling and dispatching one task at a time per step, persisting state between steps, and driving fan-out/fan-in, retries, sleeps, subgraphs, and interrupts. It owns no transport of its own; a host wires it to the outside world through injected dependency interfaces and calls its operations.
Lifecycle ¶
Build an engine with NewEngine and the Set* methods, register a Host (see SetHost), then Startup (opens the database, runs migrations, starts workers). Shutdown drains the workers and closes the database. In tests, RunInTest replaces Startup/Shutdown with per-test SQLite databases and t.Cleanup.
eng := engine.NewEngine()
eng.SetDSN("postgres://user:pass@host:5432/dwarf")
eng.SetHost(host)
err := eng.Startup(ctx)
if err != nil { ... }
defer eng.Shutdown(ctx)
Each Set* method returns an error. The live ones (SetNumShards, SetMaxOpenConns, SetWorkersPerConn, SetTimeBudget, SetDefaultPriority) take effect immediately on a running engine; the construction-time-only ones (SetDSN, SetWorkers, SetHost, SetLogger, SetMeterProvider, SetTracerProvider) return an error if called after Startup.
Host ¶
The engine reaches the outside world through a single injected Host interface (see SetHost):
- LoadGraph fetches a workflow graph by name (called at Create; the graph JSON is then frozen on the flow), and on subgraph spawn.
- ExecuteTask executes one task, given the Flow carrier with its state pre-populated.
- FlowStopped is fired when a flow stops, for flows created with FlowOptions.NotifyOnStop; the flow's baggage is on the ctx so the host resolves delivery itself. A host with no notification need does nothing.
- SignalPeers ships a cross-replica coordination signal (op + opaque payload bytes) to the other replicas, which hand it back via Engine.DeliverSignal; a single-replica host does nothing.
The flow's opaque baggage (host identity/tenant/context, set in workflow.FlowOptions) rides on the dispatch context of every LoadGraph and ExecuteTask call; read it with workflow.BaggageFrom(ctx).
Operations ¶
Create makes a flow and runs it; Await blocks until it stops; Run is Create+Await in one call. Snapshot/History/Step/List inspect; Resume continues a paused flow; Cancel/Continue manage lifecycle; Fork clones a terminal flow from a chosen step into a new flow for non-destructive recovery; Delete/Purge retain. See the repository's docs/ directory for guides.
Example ¶
Wire an engine to a host, then create, start, and await a flow.
package main
import (
"context"
"fmt"
"github.com/microbus-io/dwarf/engine"
"github.com/microbus-io/dwarf/workflow"
)
// exampleHost implements engine.Host. A real host loads graphs from a registry/file/database/RPC and
// dispatches tasks over a local call, RPC, or message bus; here an in-memory registry and a local
// function stand in. LoadGraph and ExecuteTask are required; the remaining Host methods (flow-stop
// notification and the cross-replica signals) are left as no-ops.
type exampleHost struct {
graphs map[string]*workflow.Graph
}
func (h exampleHost) LoadGraph(ctx context.Context, name string) (*workflow.Graph, error) {
return h.graphs[name], nil
}
func (h exampleHost) ExecuteTask(ctx context.Context, taskName string, f *workflow.Flow) error {
f.SetString("greeting", "hello "+f.GetString("name"))
return nil
}
func (exampleHost) FlowStopped(context.Context, string, *workflow.FlowOutcome) {}
func (exampleHost) SignalPeers(context.Context, string, []byte) {}
func main() {
ctx := context.Background()
graphs := map[string]*workflow.Graph{}
g := workflow.NewGraph("Greet")
g.SetEndpoint("Hello", "Hello")
g.AddTransition("Hello", workflow.END)
graphs["greet"] = g
eng := engine.NewEngine()
eng.SetDSN("postgres://user:pass@localhost:5432/dwarf")
eng.SetHost(exampleHost{graphs: graphs})
err := eng.Startup(ctx)
if err != nil {
panic(err)
}
defer eng.Shutdown(ctx)
// Run is Create + Await in one call.
_, out, err := eng.Run(ctx, "greet", map[string]any{"name": "ada"}, nil)
if err != nil {
panic(err)
}
fmt.Println(out.State["greeting"])
}
Output:
Index ¶
- type Engine
- func (e *Engine) Await(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)
- func (e *Engine) Cancel(ctx context.Context, flowKey string, reason string) error
- func (e *Engine) Continue(ctx context.Context, threadKey string, additionalState any) (string, error)
- func (e *Engine) Create(ctx context.Context, workflowURL string, initialState any, ...) (flowKey string, err error)
- func (e *Engine) Delete(ctx context.Context, flowKey string) error
- func (e *Engine) DeliverSignal(ctx context.Context, op string, payload []byte) error
- func (e *Engine) Fingerprint(ctx context.Context, flowKey string) (fingerprint string, status string, err error)
- func (e *Engine) Fork(ctx context.Context, stepKey string, stateOverrides any) (string, error)
- func (e *Engine) History(ctx context.Context, flowKey string) ([]workflow.FlowStep, error)
- func (e *Engine) HistoryMermaid(ctx context.Context, flowKey string, w io.StringWriter) error
- func (e *Engine) List(ctx context.Context, query workflow.Query) ([]workflow.FlowSummary, string, error)
- func (e *Engine) Purge(ctx context.Context, query workflow.Query) (int, error)
- func (e *Engine) Resume(ctx context.Context, flowKey string, resumeData any) error
- func (e *Engine) Run(ctx context.Context, workflowURL string, initialState any, ...) (flowKey string, outcome *workflow.FlowOutcome, err error)
- func (e *Engine) RunInTest(t *testing.T)
- func (e *Engine) SetDSN(dsn string) error
- func (e *Engine) SetDebugLogger() error
- func (e *Engine) SetDefaultPriority(p int) error
- func (e *Engine) SetHost(h Host) error
- func (e *Engine) SetInTest(name string) error
- func (e *Engine) SetLogger(l *slog.Logger) error
- func (e *Engine) SetMaxOpenConns(n int) error
- func (e *Engine) SetMeterProvider(mp metric.MeterProvider) error
- func (e *Engine) SetNumShards(num int) error
- func (e *Engine) SetTimeBudget(d time.Duration) error
- func (e *Engine) SetTracerProvider(tp trace.TracerProvider) error
- func (e *Engine) SetWorkers(n int) error
- func (e *Engine) SetWorkersPerConn(n int) error
- func (e *Engine) ShardInfo(ctx context.Context) ([]ShardSummary, error)
- func (e *Engine) Shutdown(ctx context.Context) error
- func (e *Engine) Snapshot(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)
- func (e *Engine) Startup(ctx context.Context) error
- func (e *Engine) Step(ctx context.Context, stepKey string) (*workflow.FlowStep, error)
- type Host
- type ShardSummary
- type TaskHandler
- type TestProxy
- func (p *TestProxy) AddPeer(peer *Engine)
- func (p *TestProxy) ExecuteTask(ctx context.Context, taskURL string, flow *workflow.Flow) error
- func (p *TestProxy) FlowStopped(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome)
- func (p *TestProxy) HandleGraph(name string, graph *workflow.Graph)
- func (p *TestProxy) HandleTask(name string, handler TaskHandler)
- func (p *TestProxy) LoadGraph(ctx context.Context, workflowURL string) (*workflow.Graph, error)
- func (p *TestProxy) OnFlowStopped(cb func(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome))
- func (p *TestProxy) SignalPeers(ctx context.Context, op string, payload []byte)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the standalone workflow orchestration engine.
func (*Engine) Continue ¶
func (e *Engine) Continue(ctx context.Context, threadKey string, additionalState any) (string, error)
Continue creates a new flow from the latest completed flow in a thread, inheriting that flow's policy (scheduling, baggage, notify-on-stop) - it does not take FlowOptions. For a turn with different policy, use Create with FlowOptions.ThreadKey.
func (*Engine) Create ¶
func (e *Engine) Create(ctx context.Context, workflowURL string, initialState any, opts *workflow.FlowOptions) (flowKey string, err error)
Create creates a new flow for a workflow and starts it, returning the running flow's key. opts carries the flow's policy (scheduling, NotifyOnStop, DeleteOnCompletion, Baggage, ThreadKey); nil uses defaults. For a flow that must wait for an external trigger, have the entry task call flow.Interrupt and resume it with Resume (which, unlike a separate start, also delivers a payload).
Example ¶
Create makes and runs a flow, and accepts FlowOptions for scheduling, notifications, thread membership, and the opaque host baggage carried with the flow.
package main
import (
"context"
"fmt"
"github.com/microbus-io/dwarf/engine"
"github.com/microbus-io/dwarf/workflow"
)
func main() {
var eng *engine.Engine // obtained from NewEngine().…Startup(ctx)
ctx := context.Background()
flowKey, err := eng.Create(ctx, "greet", map[string]any{"name": "ada"},
&workflow.FlowOptions{
Priority: 10, // lower runs first
FairnessKey: "tenant-42", // fair scheduling bucket
Baggage: map[string]any{"actor": "ada"}, // read with workflow.BaggageFrom(ctx)
})
if err != nil {
panic(err)
}
// Create runs the flow immediately; Await blocks until it stops.
out, _ := eng.Await(ctx, flowKey)
fmt.Println(out.Status)
}
Output:
func (*Engine) DeliverSignal ¶ added in v0.4.0
DeliverSignal processes an inbound peer signal. The host calls it with the op routing key and the payload bytes it received from a peer (the JSON encoding of what the engine handed that peer's SignalPeers). It delegates by op to the matching internal handler. op and payload are opaque to the host; only the engine interprets them.
func (*Engine) Fingerprint ¶
func (e *Engine) Fingerprint(ctx context.Context, flowKey string) (fingerprint string, status string, err error)
Fingerprint returns a fingerprint and status for change detection.
func (*Engine) Fork ¶ added in v0.8.0
Fork clones a terminal flow's prefix up to the given step into a new, self-contained running flow and re-executes from that step with optional stateOverrides applied to it. The original flow is never modified. The fork inherits the original's scheduling and baggage (it does not take FlowOptions). Returns the new flow's key.
func (*Engine) HistoryMermaid ¶
HistoryMermaid writes the execution DAG of a flow as a Mermaid diagram.
func (*Engine) List ¶
func (e *Engine) List(ctx context.Context, query workflow.Query) ([]workflow.FlowSummary, string, error)
List queries flows by status, workflow name, or thread key.
func (*Engine) Purge ¶
Purge deletes flows matching a query, their subflows, and their step history. No more than 1000 flows are deleted at a time. Iterate to delete more.
func (*Engine) Run ¶
func (e *Engine) Run(ctx context.Context, workflowURL string, initialState any, opts *workflow.FlowOptions) (flowKey string, outcome *workflow.FlowOutcome, err error)
Run creates, starts, and awaits a flow in one call, returning the new flow's key alongside its outcome (the key is the flow's identity, not part of the outcome). opts carries scheduling and the opaque host Baggage; nil opts uses defaults. On error, flowKey is "" and outcome is nil.
func (*Engine) RunInTest ¶
RunInTest initializes the engine for testing with per-test isolated databases (keyed by t.Name()) and registers cleanup via t.Cleanup.
Unless the caller wired its own logger, the engine logs to stderr at Info by default (rather than the production discard default) so a CI failure has engine-level clues - flow-status transitions show where a flow got stuck, and the Error logs surface wedge sweeps / poll / refill faults. stderr, not t.Log, because a `go test` timeout panic drops the failing test's buffered t.Log output but not stderr. Override the level with DWARF_TEST_LOG_LEVEL (e.g. "error" to quiet local runs, "debug" for the full play-by-play), or call SetLogger/SetDebugLogger before RunInTest to take over entirely.
func (*Engine) SetDSN ¶ added in v0.4.0
SetDSN sets the SQL data source name. Use "%d" for sharded DSNs; an empty DSN in test mode uses SQLite in-memory. Construction-time only - the shards are opened against the DSN at Startup, so changing it on a running engine (which would require reopening live connections) is rejected.
func (*Engine) SetDebugLogger ¶ added in v0.4.0
SetDebugLogger is a convenience that wires a human-readable text logger to stderr at debug level - shorthand for SetLogger(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))). It is meant for development and test runs where you want to see the engine's (and its sequel DB layer's) internal logging without standing up an OTEL pipeline. Output goes to stderr, not stdout, so it never mixes with a program's data stream - the standard convention for diagnostic logs. Because it routes through SetLogger, it counts as an explicitly-set logger, so it also reaches sequel via the engine's existing SetLogger wiring (sequel's migration logs appear too). Construction-time only.
func (*Engine) SetDefaultPriority ¶ added in v0.4.0
SetDefaultPriority sets the default priority for new flows. Live: read fresh on each Create, so it takes effect on a running engine immediately.
func (*Engine) SetHost ¶ added in v0.4.0
SetHost registers the host the engine reaches the outside world through: it loads graphs, executes tasks, and (optionally) receives flow-stop notifications and carries cross-replica coordination signals. A host must implement LoadGraph and ExecuteTask; the remaining Host methods may be no-ops. Construction-time only.
func (*Engine) SetInTest ¶ added in v0.8.1
SetInTest puts the engine into test mode keyed by name, so a subsequent Startup opens per-name isolated, auto-dropped databases (via sequel.CreateTestingDatabase) instead of the configured ones. Construction- time only. It is the *testing.T-free counterpart to RunInTest, for a host running under an external test harness that has no *testing.T but a stable isolation key such as its plane: every engine sharing the same name resolves to the same isolated databases, so the replicas of a multi-replica test app converge on one set. RunInTest(t) is SetInTest(t.Name()) plus Startup and a t.Cleanup shutdown.
func (*Engine) SetLogger ¶ added in v0.4.0
SetLogger sets the structured logger. The engine logs through the *Context variants (DebugContext/InfoContext/WarnContext/ErrorContext) so a handler that reads the context - e.g. the otelslog bridge - can correlate each record with the active step span. A host routes logs to OTEL by passing a logger whose handler bridges there. Defaults to a discard logger: until a logger is injected the engine (and its sequel DB layer) stay silent rather than writing to the application-owned slog.Default(). A nil logger resets to that silent default. Construction-time only - the engine wires the logger into the worker hot path and the shard DBs at Startup, and reads it lock-free thereafter.
func (*Engine) SetMaxOpenConns ¶ added in v0.4.0
SetMaxOpenConns sets the per-shard hard ceiling on open SQL connections.
func (*Engine) SetMeterProvider ¶ added in v0.4.0
func (e *Engine) SetMeterProvider(mp metric.MeterProvider) error
SetMeterProvider sets the OpenTelemetry MeterProvider the engine builds its dwarf_* instruments from. Defaults to the global otel.GetMeterProvider() (the no-op provider unless the host configures the OTEL SDK). The engine creates instruments under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host service's identity. Construction-time only - the engine resolves the meter once at Startup.
func (*Engine) SetNumShards ¶ added in v0.4.0
SetNumShards sets the number of database shards and, on a running engine, brings any added shards online (open + migrate) in one call, returning any error from opening/migrating them. New flows spread onto the added shards immediately; existing flows stay on their original shard.
The count may only grow at runtime: a value at or below the current live shard count records the new target but removes nothing (old shards drain naturally; an actual reduction takes effect only on a restart, where Startup opens just numShards shards). Concurrency-safe - the open+migrate work is serialized internally and runs off the hot path. Before Startup it simply records the target (no shards are open yet). It takes no ctx because the underlying open+migrate path is not ctx-cancellable.
func (*Engine) SetTimeBudget ¶ added in v0.4.0
SetTimeBudget sets the default duration for a single task execution, used by any flow that does not override it via FlowOptions.TimeBudget. Live: read fresh on each Create (existing flows keep the budget frozen at their own Create).
func (*Engine) SetTracerProvider ¶ added in v0.4.0
func (e *Engine) SetTracerProvider(tp trace.TracerProvider) error
SetTracerProvider sets the OpenTelemetry TracerProvider the engine builds its spans from. Defaults to the global otel.GetTracerProvider() (the no-op provider unless the host configures the OTEL SDK). The engine mints the root "workflow" span at Create (persisted to the dwarf-owned trace_parent column) and a per-step span in processStep, parented to the reconstructed root and placed on the TaskExecutor's context so the task's downstream spans nest under it. The host injects only the provider - no span code, no trace_parent handling. Spans are created under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host's identity. Construction-time only - the engine resolves the tracer once at Startup.
func (*Engine) SetWorkers ¶ added in v0.4.0
SetWorkers sets the number of worker goroutines. Construction-time only: the pool is spawned at Startup at this count, and live resizing (spawning/retiring workers and resizing the candidate cache) is not supported, so a call on a running engine is rejected.
func (*Engine) SetWorkersPerConn ¶ added in v0.8.0
SetWorkersPerConn sets the assumed number of worker goroutines that share one database connection. The pool size is derived FROM the number of workers, not the other way around.
func (*Engine) ShardInfo ¶
func (e *Engine) ShardInfo(ctx context.Context) ([]ShardSummary, error)
ShardInfo returns health and size summaries for all shards.
type Host ¶ added in v0.4.0
type Host interface {
// LoadGraph fetches a workflow graph definition by its URL (the addressable resolve key passed to
// Create). The flow's opaque baggage rides on ctx; read it with workflow.BaggageFrom(ctx) if loading
// is identity-dependent (authz, per-actor graphs).
LoadGraph(ctx context.Context, workflowURL string) (*workflow.Graph, error)
// ExecuteTask executes a single task within a workflow. taskURL is the task's dispatch URL (the real
// downstream address), not the graph node name. The flow carrier has its state pre-populated; the
// executor should call the task and let it write changes to the flow. The flow's opaque baggage rides
// on ctx - read it with workflow.BaggageFrom(ctx) (e.g. to mint a token).
ExecuteTask(ctx context.Context, taskURL string, flow *workflow.Flow) error
// FlowStopped is fired when a flow stops (completed, failed, cancelled, interrupted), but only for a
// flow created with FlowOptions.NotifyOnStop=true. flowKey identifies the stopped flow (it is not part
// of the outcome). The flow's opaque baggage rides on ctx (read it with workflow.BaggageFrom(ctx)); the
// host decides where/how to deliver the notification from it - the engine traffics in no delivery
// address. Optional: a host with no notification need does nothing.
FlowStopped(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome)
// SignalPeers delivers a cross-replica coordination signal to the other replicas. op is an opaque
// routing key (usable as a topic); payload is opaque bytes the engine already serialized. The host
// ships (op, payload) to peers and on the receiving side calls Engine.DeliverSignal(ctx, op,
// payload). See the cross-replica signal contract above. A single-replica host does nothing here.
SignalPeers(ctx context.Context, op string, payload []byte)
}
Host is the contract between the dwarf engine and the surrounding host application. The engine owns no transport of its own; it reaches workflow graphs and tasks, reports flow stops, and carries cross-replica coordination signals exclusively through the host. Register it once via Engine.SetHost.
A host MUST implement LoadGraph and ExecuteTask. The remaining two methods are optional: an implementation may do nothing in them when it has no flow-stop notification need (FlowStopped) or runs single-replica with no cross-replica coordination (SignalPeers).
Cross-replica signal contract (SignalPeers): the engine funnels all of its coordination signals (work doorbells, flow-stop wakes) through this one method. op is a routing key the host may use as a topic/subject; payload is opaque bytes the engine already serialized. The host delivers (op, payload) to OTHER replicas, EXCLUDING the calling replica, and on the receiving side hands them back via Engine.DeliverSignal(ctx, op, payload) - it is a pure pipe that never inspects either. The engine always applies a signal's effect locally before calling SignalPeers, so an implementation that echoes the signal back to the sender would cause it to be processed twice on the originating replica (a doubled enqueue or status-change wake); if the transport delivers published messages to the publisher, the implementation must filter out self-delivery. Because the host never branches on op or inspects payload, adding a new engine signal kind requires no host change.
type ShardSummary ¶
type ShardSummary struct {
Shard int `json:"shard,omitzero"`
Error string `json:"error,omitzero"`
LatencyMs int `json:"latencyMs,omitzero"`
Steps int `json:"steps,omitzero"`
Flows int `json:"flows,omitzero"`
}
ShardSummary is the health/size summary of a single database shard.
type TaskHandler ¶
TaskHandler is the signature for a test task handler. Read the flow's baggage, if any, with workflow.BaggageFrom(ctx).
type TestProxy ¶
type TestProxy struct {
// contains filtered or unexported fields
}
TestProxy routes graph fetches and task dispatches to registered handlers. It implements the Host interface for use with Engine.SetHost / Engine.RunInTest: LoadGraph and ExecuteTask dispatch to the registered handlers, FlowStopped invokes an optional callback set via OnFlowStopped, and SignalPeers relays to the peer engines registered with AddPeer (none by default, i.e. single-replica). For a multi-replica test, give each replica its own proxy and AddPeer the other engines.
func NewTestProxy ¶
func NewTestProxy() *TestProxy
NewTestProxy creates a new test proxy with empty handler registries.
func (*TestProxy) AddPeer ¶ added in v0.4.0
AddPeer registers a peer engine that SignalPeers relays to, standing in for the bus in a single-process multi-replica test. Add every OTHER replica's engine (not this proxy's own), so the engine's self-exclusion contract holds. Call before Startup/RunInTest.
func (*TestProxy) ExecuteTask ¶
ExecuteTask implements Host.
func (*TestProxy) FlowStopped ¶ added in v0.4.0
FlowStopped implements Host; it invokes the callback set via OnFlowStopped, if any.
func (*TestProxy) HandleGraph ¶
HandleGraph registers a workflow graph under the given name. The name should match the workflow URL passed to Engine.Create or Engine.Run.
func (*TestProxy) HandleTask ¶
func (p *TestProxy) HandleTask(name string, handler TaskHandler)
HandleTask registers a task handler under the given name. The name should match the task URL registered via graph.SetEndpoint.
func (*TestProxy) OnFlowStopped ¶ added in v0.4.0
func (p *TestProxy) OnFlowStopped(cb func(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome))
OnFlowStopped registers a callback invoked by FlowStopped (only for flows created with FlowOptions.NotifyOnStop). Nil (the default) makes FlowStopped a no-op. The flow's baggage is on the ctx.
func (*TestProxy) SignalPeers ¶ added in v0.4.0
SignalPeers implements Host; it relays the signal to every peer engine registered with AddPeer. With no peers (the default) it is a single-replica no-op. The relay is async to mirror bus semantics and avoid synchronous reentrancy into a peer mid-processStep.