Documentation
¶
Overview ¶
Package engine is the dwarf workflow-orchestration engine.
The engine executes workflow graphs against a SQL database, scheduling and dispatching one task at a time per step, persisting state between steps, and driving fan-out/fan-in, retries, sleeps, subgraphs, and interrupts. It owns no transport of its own; a host wires it to the outside world through injected dependency interfaces and calls its operations.
Lifecycle ¶
Build an engine with NewEngine and the Set* methods, register a Host (see SetHost), then Startup (opens the database, runs migrations, starts workers). Shutdown drains the workers and closes the database. In tests, RunInTest replaces Startup/Shutdown with per-test SQLite databases and t.Cleanup.
eng := engine.NewEngine()
eng.SetDSN("postgres://user:pass@host:5432/dwarf")
eng.SetHost(host)
if err := eng.Startup(ctx); err != nil { ... }
defer eng.Shutdown(ctx)
Each Set* method returns an error. The live ones (SetNumShards, SetMaxOpenConns, SetTimeBudget, SetDefaultPriority) take effect immediately on a running engine; the construction-time-only ones (SetDSN, SetWorkers, SetHost, SetLogger, SetMeterProvider, SetTracerProvider) return an error if called after Startup.
Host ¶
The engine reaches the outside world through a single injected Host interface (see SetHost):
- LoadGraph fetches a workflow graph by name (called at Create; the graph JSON is then frozen on the flow), and on subgraph spawn.
- ExecuteTask executes one task, given the Flow carrier with its state pre-populated.
- FlowStopped is fired when a flow stops, for flows created with FlowOptions.NotifyOnStop; the flow's baggage is on the ctx so the host resolves delivery itself. A host with no notification need does nothing.
- SignalPeers ships a cross-replica coordination signal (op + opaque payload bytes) to the other replicas, which hand it back via Engine.DeliverSignal; a single-replica host does nothing.
The flow's opaque baggage (host identity/tenant/context, set in workflow.FlowOptions) rides on the dispatch context of every LoadGraph and ExecuteTask call; read it with workflow.BaggageFrom(ctx).
Operations ¶
Create or CreateTask makes a flow; Start runs it; Await blocks until it stops; Run is Create+Start+Await in one call. Snapshot/History/Step/List inspect; Resume/ResumeBreak continue a paused flow; Cancel/Restart/RestartFrom/Continue manage lifecycle; Delete/Purge retain. See the repository's docs/ directory for guides.
Example ¶
Wire an engine to a host, then create, start, and await a flow.
package main
import (
"context"
"fmt"
"github.com/microbus-io/dwarf/engine"
"github.com/microbus-io/dwarf/workflow"
)
// exampleHost implements engine.Host. A real host loads graphs from a registry/file/database/RPC and
// dispatches tasks over a local call, RPC, or message bus; here an in-memory registry and a local
// function stand in. LoadGraph and ExecuteTask are required; the remaining Host methods (flow-stop
// notification and the cross-replica signals) are left as no-ops.
type exampleHost struct {
graphs map[string]*workflow.Graph
}
func (h exampleHost) LoadGraph(ctx context.Context, name string) (*workflow.Graph, error) {
return h.graphs[name], nil
}
func (h exampleHost) ExecuteTask(ctx context.Context, taskName string, f *workflow.Flow) error {
f.SetString("greeting", "hello "+f.GetString("name"))
return nil
}
func (exampleHost) FlowStopped(context.Context, string, *workflow.FlowOutcome) {}
func (exampleHost) SignalPeers(context.Context, string, []byte) {}
func main() {
ctx := context.Background()
graphs := map[string]*workflow.Graph{}
g := workflow.NewGraph("Greet")
g.SetEndpoint("Hello", "Hello")
g.AddTransition("Hello", workflow.END)
graphs["greet"] = g
eng := engine.NewEngine()
eng.SetDSN("postgres://user:pass@localhost:5432/dwarf")
eng.SetHost(exampleHost{graphs: graphs})
if err := eng.Startup(ctx); err != nil {
panic(err)
}
defer eng.Shutdown(ctx)
// Run is Create + Start + Await in one call.
_, out, err := eng.Run(ctx, "greet", map[string]any{"name": "ada"}, nil)
if err != nil {
panic(err)
}
fmt.Println(out.State["greeting"])
}
Output:
Index ¶
- type Engine
- func (e *Engine) Await(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)
- func (e *Engine) BreakBefore(ctx context.Context, flowKey string, taskName string, enabled bool) error
- func (e *Engine) Cancel(ctx context.Context, flowKey string, reason string) error
- func (e *Engine) Continue(ctx context.Context, threadKey string, additionalState any, ...) (string, error)
- func (e *Engine) Create(ctx context.Context, workflowURL string, initialState any, ...) (flowKey string, err error)
- func (e *Engine) CreateTask(ctx context.Context, name, taskURL string, initialState any, ...) (flowKey string, err error)
- func (e *Engine) Delete(ctx context.Context, flowKey string) error
- func (e *Engine) DeliverSignal(ctx context.Context, op string, payload []byte) error
- func (e *Engine) Fingerprint(ctx context.Context, flowKey string) (fingerprint string, status string, err error)
- func (e *Engine) History(ctx context.Context, flowKey string) ([]workflow.FlowStep, error)
- func (e *Engine) HistoryMermaid(ctx context.Context, flowKey string, w io.StringWriter) error
- func (e *Engine) List(ctx context.Context, query workflow.Query) ([]workflow.FlowSummary, string, error)
- func (e *Engine) Purge(ctx context.Context, query workflow.Query) (int, error)
- func (e *Engine) Restart(ctx context.Context, flowKey string, stateOverrides any) error
- func (e *Engine) RestartFrom(ctx context.Context, stepKey string, stateOverrides any) error
- func (e *Engine) Resume(ctx context.Context, flowKey string, resumeData any) error
- func (e *Engine) ResumeBreak(ctx context.Context, flowKey string, stateOverrides any) error
- func (e *Engine) Run(ctx context.Context, workflowURL string, initialState any, ...) (flowKey string, outcome *workflow.FlowOutcome, err error)
- func (e *Engine) RunInTest(t *testing.T)
- func (e *Engine) SetDSN(dsn string) error
- func (e *Engine) SetDebugLogger() error
- func (e *Engine) SetDefaultPriority(p int) error
- func (e *Engine) SetHost(h Host) error
- func (e *Engine) SetLogger(l *slog.Logger) error
- func (e *Engine) SetMaxOpenConns(n int) error
- func (e *Engine) SetMeterProvider(mp metric.MeterProvider) error
- func (e *Engine) SetNumShards(num int) error
- func (e *Engine) SetTimeBudget(d time.Duration) error
- func (e *Engine) SetTracerProvider(tp trace.TracerProvider) error
- func (e *Engine) SetWorkers(n int) error
- func (e *Engine) ShardInfo(ctx context.Context) ([]ShardSummary, error)
- func (e *Engine) Shutdown(ctx context.Context) error
- func (e *Engine) Snapshot(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)
- func (e *Engine) Start(ctx context.Context, flowKey string) error
- func (e *Engine) Startup(ctx context.Context) error
- func (e *Engine) StartupInTest(ctx context.Context, testID string) error
- func (e *Engine) Step(ctx context.Context, stepKey string) (*workflow.FlowStep, error)
- type Host
- type ShardSummary
- type TaskHandler
- type TestProxy
- func (p *TestProxy) AddPeer(peer *Engine)
- func (p *TestProxy) ExecuteTask(ctx context.Context, taskURL string, flow *workflow.Flow) error
- func (p *TestProxy) FlowStopped(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome)
- func (p *TestProxy) HandleGraph(name string, graph *workflow.Graph)
- func (p *TestProxy) HandleTask(name string, handler TaskHandler)
- func (p *TestProxy) LoadGraph(ctx context.Context, workflowURL string) (*workflow.Graph, error)
- func (p *TestProxy) OnFlowStopped(cb func(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome))
- func (p *TestProxy) SignalPeers(ctx context.Context, op string, payload []byte)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the standalone workflow orchestration engine.
func (*Engine) BreakBefore ¶
func (e *Engine) BreakBefore(ctx context.Context, flowKey string, taskName string, enabled bool) error
BreakBefore sets or clears a breakpoint before a named task.
func (*Engine) Continue ¶
func (e *Engine) Continue(ctx context.Context, threadKey string, additionalState any, opts *workflow.FlowOptions) (string, error)
Continue creates a new flow from the latest completed flow in a thread.
func (*Engine) Create ¶
func (e *Engine) Create(ctx context.Context, workflowURL string, initialState any, opts *workflow.FlowOptions) (flowKey string, err error)
Create creates a new flow for a workflow without starting it. opts carries the flow's scheduling (priority/fairness/start-at) and its opaque host Baggage; nil opts uses defaults.
Example ¶
Create separates flow creation from starting it, and accepts FlowOptions for scheduling and the opaque host baggage carried with the flow.
package main
import (
"context"
"fmt"
"time"
"github.com/microbus-io/dwarf/engine"
"github.com/microbus-io/dwarf/workflow"
)
func main() {
var eng *engine.Engine // obtained from NewEngine().…Startup(ctx)
ctx := context.Background()
flowKey, err := eng.Create(ctx, "greet", map[string]any{"name": "ada"},
&workflow.FlowOptions{
Priority: 10, // lower runs first
FairnessKey: "tenant-42", // fair scheduling bucket
StartAt: time.Now().Add(1 * time.Hour), // delayed start
Baggage: map[string]any{"actor": "ada"}, // read with workflow.BaggageFrom(ctx)
})
if err != nil {
panic(err)
}
// The flow sits in "created" until Start; Await blocks until it stops.
if err := eng.Start(ctx, flowKey); err != nil {
panic(err)
}
out, _ := eng.Await(ctx, flowKey)
fmt.Println(out.Status)
}
Output:
func (*Engine) CreateTask ¶
func (e *Engine) CreateTask(ctx context.Context, name, taskURL string, initialState any, opts *workflow.FlowOptions) (flowKey string, err error)
CreateTask creates a flow for a single task without starting it. name is the node's display name (shown in diagrams/history; required, non-empty), placed before taskURL to match NewGraph(name)/graph.SetEndpoint(name, url)/flow.Subtask(name, url). taskURL is the task's dispatch URL. opts carries scheduling and the opaque host Baggage (see Create); nil opts uses defaults.
func (*Engine) DeliverSignal ¶ added in v0.4.0
DeliverSignal processes an inbound peer signal. The host calls it with the op routing key and the payload bytes it received from a peer (the JSON encoding of what the engine handed that peer's SignalPeers). It delegates by op to the matching internal handler. op and payload are opaque to the host; only the engine interprets them.
func (*Engine) Fingerprint ¶
func (e *Engine) Fingerprint(ctx context.Context, flowKey string) (fingerprint string, status string, err error)
Fingerprint returns a fingerprint and status for change detection.
func (*Engine) HistoryMermaid ¶
HistoryMermaid writes the execution DAG of a flow as a Mermaid diagram.
func (*Engine) List ¶
func (e *Engine) List(ctx context.Context, query workflow.Query) ([]workflow.FlowSummary, string, error)
List queries flows by status, workflow name, or thread key.
func (*Engine) RestartFrom ¶
RestartFrom re-executes a flow from a specific step.
func (*Engine) ResumeBreak ¶
ResumeBreak continues a flow paused at a BreakBefore breakpoint.
func (*Engine) Run ¶
func (e *Engine) Run(ctx context.Context, workflowURL string, initialState any, opts *workflow.FlowOptions) (flowKey string, outcome *workflow.FlowOutcome, err error)
Run creates, starts, and awaits a flow in one call, returning the new flow's key alongside its outcome (the key is the flow's identity, not part of the outcome). opts carries scheduling and the opaque host Baggage; nil opts uses defaults. On error, flowKey is "" and outcome is nil.
func (*Engine) RunInTest ¶
RunInTest initializes the engine for testing with per-test SQLite databases and registers cleanup via t.Cleanup.
func (*Engine) SetDSN ¶ added in v0.4.0
SetDSN sets the SQL data source name. Use "%d" for sharded DSNs; an empty DSN in test mode uses SQLite in-memory. Construction-time only - the shards are opened against the DSN at Startup, so changing it on a running engine (which would require reopening live connections) is rejected.
func (*Engine) SetDebugLogger ¶ added in v0.4.0
SetDebugLogger is a convenience that wires a human-readable text logger to stderr at debug level - shorthand for SetLogger(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))). It is meant for development and test runs where you want to see the engine's (and its sequel DB layer's) internal logging without standing up an OTEL pipeline. Output goes to stderr, not stdout, so it never mixes with a program's data stream - the standard convention for diagnostic logs. Because it routes through SetLogger, it counts as an explicitly-set logger, so it also reaches sequel via the engine's existing SetLogger wiring (sequel's migration logs appear too). Construction-time only.
func (*Engine) SetDefaultPriority ¶ added in v0.4.0
SetDefaultPriority sets the default priority for new flows. Live: read fresh on each Create, so it takes effect on a running engine immediately.
func (*Engine) SetHost ¶ added in v0.4.0
SetHost registers the host the engine reaches the outside world through: it loads graphs, executes tasks, and (optionally) receives flow-stop notifications and carries cross-replica coordination signals. A host must implement LoadGraph and ExecuteTask; the remaining Host methods may be no-ops. Construction-time only.
func (*Engine) SetLogger ¶ added in v0.4.0
SetLogger sets the structured logger. The engine logs through the *Context variants (DebugContext/InfoContext/WarnContext/ErrorContext) so a handler that reads the context - e.g. the otelslog bridge - can correlate each record with the active step span. A host routes logs to OTEL by passing a logger whose handler bridges there. Defaults to a discard logger: until a logger is injected the engine (and its sequel DB layer) stay silent rather than writing to the application-owned slog.Default(). A nil logger resets to that silent default. Construction-time only - the engine wires the logger into the worker hot path and the shard DBs at Startup, and reads it lock-free thereafter.
func (*Engine) SetMaxOpenConns ¶ added in v0.4.0
SetMaxOpenConns sets the maximum number of open SQL connections per shard and re-applies it to every live shard pool, so it takes effect on a running engine immediately. (sequel's pool setters are hot/atomic.) Before Startup it records the value, applied as each shard opens.
func (*Engine) SetMeterProvider ¶ added in v0.4.0
func (e *Engine) SetMeterProvider(mp metric.MeterProvider) error
SetMeterProvider sets the OpenTelemetry MeterProvider the engine builds its dwarf_* instruments from. Defaults to the global otel.GetMeterProvider() (the no-op provider unless the host configures the OTEL SDK). The engine creates instruments under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host service's identity. Construction-time only - the engine resolves the meter once at Startup.
func (*Engine) SetNumShards ¶ added in v0.4.0
SetNumShards sets the number of database shards and, on a running engine, brings any added shards online (open + migrate) in one call, returning any error from opening/migrating them. New flows spread onto the added shards immediately; existing flows stay on their original shard.
The count may only grow at runtime: a value at or below the current live shard count records the new target but removes nothing (old shards drain naturally; an actual reduction takes effect only on a restart, where Startup opens just numShards shards). Concurrency-safe - the open+migrate work is serialized internally and runs off the hot path. Before Startup it simply records the target (no shards are open yet). It takes no ctx because the underlying open+migrate path is not ctx-cancellable.
func (*Engine) SetTimeBudget ¶ added in v0.4.0
SetTimeBudget sets the maximum duration for a single task execution. Live: read fresh on each dispatch, so it takes effect on a running engine immediately.
func (*Engine) SetTracerProvider ¶ added in v0.4.0
func (e *Engine) SetTracerProvider(tp trace.TracerProvider) error
SetTracerProvider sets the OpenTelemetry TracerProvider the engine builds its spans from. Defaults to the global otel.GetTracerProvider() (the no-op provider unless the host configures the OTEL SDK). The engine mints the root "workflow" span at Create (persisted to the dwarf-owned trace_parent column) and a per-step span in processStep, parented to the reconstructed root and placed on the TaskExecutor's context so the task's downstream spans nest under it. The host injects only the provider - no span code, no trace_parent handling. Spans are created under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host's identity. Construction-time only - the engine resolves the tracer once at Startup.
func (*Engine) SetWorkers ¶ added in v0.4.0
SetWorkers sets the number of worker goroutines. Construction-time only: the pool is spawned at Startup at this count, and live resizing (spawning/retiring workers and resizing the candidate cache) is not supported, so a call on a running engine is rejected.
func (*Engine) ShardInfo ¶
func (e *Engine) ShardInfo(ctx context.Context) ([]ShardSummary, error)
ShardInfo returns health and size summaries for all shards.
func (*Engine) Start ¶
Start transitions a created flow to running. Whether the flow notifies the host on stop is set at Create via FlowOptions.NotifyOnStop.
func (*Engine) Startup ¶
Startup initializes the engine: opens database connections, runs migrations, and starts worker goroutines.
func (*Engine) StartupInTest ¶ added in v0.4.0
StartupInTest initializes the engine against isolated, throwaway test databases - one per shard, keyed by testID - instead of opening the configured DSN. It is for a host that is itself running under test (so it has no *testing.T to hand RunInTest) but must still convey "use a disposable, isolated database" down to sequel. The testID must be stable for the test run and shared by every replica that should see the same database (e.g. a per-test isolation key shared by all replicas in one test app); distinct test runs pass distinct ids for isolation. The engine never learns the host's notion of "test mode" - it only receives this concrete instruction. Unlike RunInTest there is no cleanup hook; the host drives teardown by calling Shutdown (e.g. from its own shutdown lifecycle).
type Host ¶ added in v0.4.0
type Host interface {
// LoadGraph fetches a workflow graph definition by its URL (the addressable resolve key passed to
// Create). The flow's opaque baggage rides on ctx; read it with workflow.BaggageFrom(ctx) if loading
// is identity-dependent (authz, per-actor graphs).
LoadGraph(ctx context.Context, workflowURL string) (*workflow.Graph, error)
// ExecuteTask executes a single task within a workflow. taskURL is the task's dispatch URL (the real
// downstream address), not the graph node name. The flow carrier has its state pre-populated; the
// executor should call the task and let it write changes to the flow. The flow's opaque baggage rides
// on ctx - read it with workflow.BaggageFrom(ctx) (e.g. to mint a token).
ExecuteTask(ctx context.Context, taskURL string, flow *workflow.Flow) error
// FlowStopped is fired when a flow stops (completed, failed, cancelled, interrupted), but only for a
// flow created with FlowOptions.NotifyOnStop=true. flowKey identifies the stopped flow (it is not part
// of the outcome). The flow's opaque baggage rides on ctx (read it with workflow.BaggageFrom(ctx)); the
// host decides where/how to deliver the notification from it - the engine traffics in no delivery
// address. Optional: a host with no notification need does nothing.
FlowStopped(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome)
// SignalPeers delivers a cross-replica coordination signal to the other replicas. op is an opaque
// routing key (usable as a topic); payload is opaque bytes the engine already serialized. The host
// ships (op, payload) to peers and on the receiving side calls Engine.DeliverSignal(ctx, op,
// payload). See the cross-replica signal contract above. A single-replica host does nothing here.
SignalPeers(ctx context.Context, op string, payload []byte)
}
Host is the contract between the dwarf engine and the surrounding host application. The engine owns no transport of its own; it reaches workflow graphs and tasks, reports flow stops, and carries cross-replica coordination signals exclusively through the host. Register it once via Engine.SetHost.
A host MUST implement LoadGraph and ExecuteTask. The remaining two methods are optional: an implementation may do nothing in them when it has no flow-stop notification need (FlowStopped) or runs single-replica with no cross-replica coordination (SignalPeers).
Cross-replica signal contract (SignalPeers): the engine funnels all of its coordination signals (work doorbells, flow-stop wakes) through this one method. op is a routing key the host may use as a topic/subject; payload is opaque bytes the engine already serialized. The host delivers (op, payload) to OTHER replicas, EXCLUDING the calling replica, and on the receiving side hands them back via Engine.DeliverSignal(ctx, op, payload) - it is a pure pipe that never inspects either. The engine always applies a signal's effect locally before calling SignalPeers, so an implementation that echoes the signal back to the sender would cause it to be processed twice on the originating replica (a doubled enqueue or status-change wake); if the transport delivers published messages to the publisher, the implementation must filter out self-delivery. Because the host never branches on op or inspects payload, adding a new engine signal kind requires no host change.
type ShardSummary ¶
type ShardSummary struct {
Shard int `json:"shard,omitzero"`
Error string `json:"error,omitzero"`
LatencyMs int `json:"latencyMs,omitzero"`
Steps int `json:"steps,omitzero"`
Flows int `json:"flows,omitzero"`
}
ShardSummary is the health/size summary of a single database shard.
type TaskHandler ¶
TaskHandler is the signature for a test task handler. Read the flow's baggage, if any, with workflow.BaggageFrom(ctx).
type TestProxy ¶
type TestProxy struct {
// contains filtered or unexported fields
}
TestProxy routes graph fetches and task dispatches to registered handlers. It implements the Host interface for use with Engine.SetHost / Engine.RunInTest: LoadGraph and ExecuteTask dispatch to the registered handlers, FlowStopped invokes an optional callback set via OnFlowStopped, and SignalPeers relays to the peer engines registered with AddPeer (none by default, i.e. single-replica). For a multi-replica test, give each replica its own proxy and AddPeer the other engines.
func NewTestProxy ¶
func NewTestProxy() *TestProxy
NewTestProxy creates a new test proxy with empty handler registries.
func (*TestProxy) AddPeer ¶ added in v0.4.0
AddPeer registers a peer engine that SignalPeers relays to, standing in for the bus in a single-process multi-replica test. Add every OTHER replica's engine (not this proxy's own), so the engine's self-exclusion contract holds. Call before Startup/RunInTest.
func (*TestProxy) ExecuteTask ¶
ExecuteTask implements Host.
func (*TestProxy) FlowStopped ¶ added in v0.4.0
FlowStopped implements Host; it invokes the callback set via OnFlowStopped, if any.
func (*TestProxy) HandleGraph ¶
HandleGraph registers a workflow graph under the given name. The name should match the workflow URL passed to Engine.Create or Engine.Run.
func (*TestProxy) HandleTask ¶
func (p *TestProxy) HandleTask(name string, handler TaskHandler)
HandleTask registers a task handler under the given name. The name should match the task URL registered via graph.SetEndpoint.
func (*TestProxy) OnFlowStopped ¶ added in v0.4.0
func (p *TestProxy) OnFlowStopped(cb func(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome))
OnFlowStopped registers a callback invoked by FlowStopped (only for flows created with FlowOptions.NotifyOnStop). Nil (the default) makes FlowStopped a no-op. The flow's baggage is on the ctx.
func (*TestProxy) SignalPeers ¶ added in v0.4.0
SignalPeers implements Host; it relays the signal to every peer engine registered with AddPeer. With no peers (the default) it is a single-replica no-op. The relay is async to mirror bus semantics and avoid synchronous reentrancy into a peer mid-processStep.