Documentation
¶
Overview ¶
Package engine is the dwarf workflow-orchestration engine.
The engine executes workflow graphs against a SQL database, scheduling and dispatching one task at a time per step, persisting state between steps, and driving fan-out/fan-in, retries, sleeps, subgraphs, interrupts, backpressure, and circuit breakers. It owns no transport of its own; a host wires it to the outside world through injected dependency interfaces and calls its operations.
Lifecycle ¶
Build an engine with NewEngine and the With* builder methods, inject at least a GraphLoader and a TaskExecutor, then Startup (opens the database, runs migrations, starts workers). Shutdown drains the workers and closes the database. In tests, RunInTest replaces Startup/Shutdown with per-test SQLite databases and t.Cleanup.
eng := engine.NewEngine().
WithDSN("postgres://user:pass@host:5432/dwarf").
WithGraphLoader(loadGraph).
WithTaskExecutor(runTask)
if err := eng.Startup(ctx); err != nil { ... }
defer eng.Shutdown(ctx)
The With* configuration methods are atomic and may be called after Startup for hot reconfiguration.
Dependency interfaces ¶
The engine reaches the outside world through four injection points:
- GraphLoader fetches a workflow graph by name (called once at Create; the graph JSON is then frozen on the flow).
- TaskExecutor executes one task, given the Flow carrier with its state pre-populated.
- FlowStoppedCallback (optional) is fired when a flow stops, for flows started via StartNotify.
- PeerNotifier (optional) carries cross-replica coordination signals; nil in single-replica mode.
The flow's opaque baggage (host identity/tenant/context, set in workflow.FlowOptions) rides on the dispatch context of every GraphLoader and TaskExecutor call; read it with workflow.BaggageFrom(ctx).
Operations ¶
Create or CreateTask makes a flow; Start (or StartNotify) runs it; Await blocks until it stops; Run is Create+Start+Await in one call. Snapshot/History/Step/List inspect; Resume/ResumeBreak continue a paused flow; Cancel/Restart/RestartFrom/Continue manage lifecycle; Delete/Purge retain. See the repository's docs/ directory for guides.
Example ¶
Wire an engine to a graph source and a task runner, then create, start, and await a flow. A real host injects a GraphLoader that fetches graphs (from a registry, file, database, or RPC) and a TaskExecutor that dispatches the named task (local call, RPC, message bus); here a small in-memory registry and a local function stand in for both.
package main
import (
"context"
"fmt"
"github.com/microbus-io/dwarf/engine"
"github.com/microbus-io/dwarf/workflow"
)
func main() {
ctx := context.Background()
graphs := map[string]*workflow.Graph{}
g := workflow.NewGraph("greet")
g.AddTask("hello", "hello")
g.AddTransition("hello", workflow.END)
graphs["greet"] = g
eng := engine.NewEngine().
WithDSN("postgres://user:pass@localhost:5432/dwarf").
WithGraphLoader(func(ctx context.Context, name string) (*workflow.Graph, error) {
return graphs[name], nil
}).
WithTaskExecutor(func(ctx context.Context, taskName string, f *workflow.Flow) error {
f.SetString("greeting", "hello "+f.GetString("name"))
return nil
})
if err := eng.Startup(ctx); err != nil {
panic(err)
}
defer eng.Shutdown(ctx)
// Run is Create + Start + Await in one call.
out, err := eng.Run(ctx, "greet", map[string]any{"name": "ada"}, nil)
if err != nil {
panic(err)
}
fmt.Println(out.State["greeting"])
}
Output:
Index ¶
- type Engine
- func (e *Engine) Await(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)
- func (e *Engine) BreakBefore(ctx context.Context, flowKey string, taskName string, enabled bool) error
- func (e *Engine) BreakerTripped(taskName string) bool
- func (e *Engine) Cancel(ctx context.Context, flowKey string, reason string) error
- func (e *Engine) Continue(ctx context.Context, threadKey string, additionalState any, ...) (string, error)
- func (e *Engine) Create(ctx context.Context, workflowName string, initialState any, ...) (flowKey string, err error)
- func (e *Engine) CreateTask(ctx context.Context, taskName string, initialState any, ...) (flowKey string, err error)
- func (e *Engine) Delete(ctx context.Context, flowKey string) error
- func (e *Engine) Fingerprint(ctx context.Context, flowKey string) (fingerprint string, status string, err error)
- func (e *Engine) HandleEnqueue(ctx context.Context, shard, stepID int) error
- func (e *Engine) HandleNotifyStatusChange(ctx context.Context, flowKey string, status string) error
- func (e *Engine) HandleSyncValve(ctx context.Context, taskName string, wCong int, tCong time.Time) error
- func (e *Engine) HandleTripBreaker(ctx context.Context, taskName string) error
- func (e *Engine) History(ctx context.Context, flowKey string) ([]workflow.FlowStep, error)
- func (e *Engine) HistoryMermaid(ctx context.Context, flowKey string, w io.StringWriter) error
- func (e *Engine) List(ctx context.Context, query workflow.Query) ([]workflow.FlowSummary, string, error)
- func (e *Engine) Purge(ctx context.Context, query workflow.Query) (int, error)
- func (e *Engine) Restart(ctx context.Context, flowKey string, stateOverrides any) error
- func (e *Engine) RestartFrom(ctx context.Context, stepKey string, stateOverrides any) error
- func (e *Engine) Resume(ctx context.Context, flowKey string, resumeData any) error
- func (e *Engine) ResumeBreak(ctx context.Context, flowKey string, stateOverrides any) error
- func (e *Engine) Run(ctx context.Context, workflowName string, initialState any, ...) (*workflow.FlowOutcome, error)
- func (e *Engine) RunInTest(t *testing.T)
- func (e *Engine) ShardInfo(ctx context.Context) ([]ShardSummary, error)
- func (e *Engine) Shutdown(ctx context.Context) error
- func (e *Engine) Snapshot(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)
- func (e *Engine) Start(ctx context.Context, flowKey string) error
- func (e *Engine) StartNotify(ctx context.Context, flowKey string, notifyHostname string) error
- func (e *Engine) Startup(ctx context.Context) error
- func (e *Engine) Step(ctx context.Context, stepKey string) (*workflow.FlowStep, error)
- func (e *Engine) WithDSN(dsn string) *Engine
- func (e *Engine) WithDefaultPriority(p int) *Engine
- func (e *Engine) WithFlowStoppedCallback(cb FlowStoppedCallback) *Engine
- func (e *Engine) WithGraphLoader(gl GraphLoader) *Engine
- func (e *Engine) WithLogger(l *slog.Logger) *Engine
- func (e *Engine) WithMaxOpenConns(n int) *Engine
- func (e *Engine) WithMeterProvider(mp metric.MeterProvider) *Engine
- func (e *Engine) WithNumShards(n int) *Engine
- func (e *Engine) WithPeerNotifier(pn PeerNotifier) *Engine
- func (e *Engine) WithTaskExecutor(te TaskExecutor) *Engine
- func (e *Engine) WithTimeBudget(d time.Duration) *Engine
- func (e *Engine) WithTracerProvider(tp trace.TracerProvider) *Engine
- func (e *Engine) WithWorkers(n int) *Engine
- type FlowStoppedCallback
- type GraphLoader
- type PeerNotifier
- type ShardSummary
- type TaskExecutor
- type TaskHandler
- type TestProxy
- func (p *TestProxy) ExecuteTask(ctx context.Context, taskName string, flow *workflow.Flow) error
- func (p *TestProxy) HandleGraph(name string, graph *workflow.Graph)
- func (p *TestProxy) HandleTask(name string, handler TaskHandler)
- func (p *TestProxy) LoadGraph(ctx context.Context, workflowName string) (*workflow.Graph, error)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the standalone workflow orchestration engine.
func (*Engine) BreakBefore ¶
func (e *Engine) BreakBefore(ctx context.Context, flowKey string, taskName string, enabled bool) error
BreakBefore sets or clears a breakpoint before a named task.
func (*Engine) BreakerTripped ¶
BreakerTripped reports whether the breaker for taskName is currently tripped.
func (*Engine) Continue ¶
func (e *Engine) Continue(ctx context.Context, threadKey string, additionalState any, opts *workflow.FlowOptions) (string, error)
Continue creates a new flow from the latest completed flow in a thread.
func (*Engine) Create ¶
func (e *Engine) Create(ctx context.Context, workflowName string, initialState any, opts *workflow.FlowOptions) (flowKey string, err error)
Create creates a new flow for a workflow without starting it. opts carries the flow's scheduling (priority/fairness/start-at) and its opaque host Baggage; nil opts uses defaults.
Example ¶
Create separates flow creation from starting it, and accepts FlowOptions for scheduling and the opaque host baggage carried with the flow.
package main
import (
"context"
"fmt"
"time"
"github.com/microbus-io/dwarf/engine"
"github.com/microbus-io/dwarf/workflow"
)
func main() {
var eng *engine.Engine // obtained from NewEngine().…Startup(ctx)
ctx := context.Background()
flowKey, err := eng.Create(ctx, "greet", map[string]any{"name": "ada"},
&workflow.FlowOptions{
Priority: 10, // lower runs first
FairnessKey: "tenant-42", // fair scheduling bucket
StartAt: time.Now().Add(1 * time.Hour), // delayed start
Baggage: map[string]any{"actor": "ada"}, // read with workflow.BaggageFrom(ctx)
})
if err != nil {
panic(err)
}
// The flow sits in "created" until Start; Await blocks until it stops.
if err := eng.Start(ctx, flowKey); err != nil {
panic(err)
}
out, _ := eng.Await(ctx, flowKey)
fmt.Println(out.Status)
}
Output:
func (*Engine) CreateTask ¶
func (e *Engine) CreateTask(ctx context.Context, taskName string, initialState any, opts *workflow.FlowOptions) (flowKey string, err error)
CreateTask creates a flow for a single task without starting it. opts carries scheduling and the opaque host Baggage (see Create); nil opts uses defaults.
func (*Engine) Fingerprint ¶
func (e *Engine) Fingerprint(ctx context.Context, flowKey string) (fingerprint string, status string, err error)
Fingerprint returns a fingerprint and status for change detection.
func (*Engine) HandleEnqueue ¶
HandleEnqueue processes an inbound doorbell signal from another replica.
func (*Engine) HandleNotifyStatusChange ¶
HandleNotifyStatusChange processes an inbound status change notification.
func (*Engine) HandleSyncValve ¶
func (e *Engine) HandleSyncValve(ctx context.Context, taskName string, wCong int, tCong time.Time) error
HandleSyncValve processes an inbound valve gossip signal from another replica.
func (*Engine) HandleTripBreaker ¶
HandleTripBreaker processes an inbound breaker trip signal from another replica.
func (*Engine) HistoryMermaid ¶
HistoryMermaid writes the execution DAG of a flow as a Mermaid diagram.
func (*Engine) List ¶
func (e *Engine) List(ctx context.Context, query workflow.Query) ([]workflow.FlowSummary, string, error)
List queries flows by status, workflow name, or thread key.
func (*Engine) RestartFrom ¶
RestartFrom re-executes a flow from a specific step.
func (*Engine) ResumeBreak ¶
ResumeBreak continues a flow paused at a BreakBefore breakpoint.
func (*Engine) Run ¶
func (e *Engine) Run(ctx context.Context, workflowName string, initialState any, opts *workflow.FlowOptions) (*workflow.FlowOutcome, error)
Run creates, starts, and awaits a flow in one call. opts carries scheduling and the opaque host Baggage; nil opts uses defaults.
func (*Engine) RunInTest ¶
RunInTest initializes the engine for testing with per-test SQLite databases and registers cleanup via t.Cleanup.
func (*Engine) ShardInfo ¶
func (e *Engine) ShardInfo(ctx context.Context) ([]ShardSummary, error)
ShardInfo returns health and size summaries for all shards.
func (*Engine) StartNotify ¶
StartNotify starts a flow and registers a hostname for stop notifications.
func (*Engine) Startup ¶
Startup initializes the engine: opens database connections, runs migrations, and starts worker goroutines.
func (*Engine) WithDSN ¶
WithDSN sets the SQL data source name. Use "%d" for sharded DSNs. An empty DSN in test mode uses SQLite in-memory.
func (*Engine) WithDefaultPriority ¶
WithDefaultPriority sets the default priority for new flows.
func (*Engine) WithFlowStoppedCallback ¶
func (e *Engine) WithFlowStoppedCallback(cb FlowStoppedCallback) *Engine
WithFlowStoppedCallback sets the callback fired when a flow stops.
func (*Engine) WithGraphLoader ¶
func (e *Engine) WithGraphLoader(gl GraphLoader) *Engine
WithGraphLoader sets the function that fetches workflow graph definitions.
func (*Engine) WithLogger ¶
WithLogger sets the structured logger. The engine logs through the *Context variants (DebugContext/InfoContext/WarnContext/ErrorContext) so a handler that reads the context - e.g. the otelslog bridge - can correlate each record with the active step span. Defaults to slog.Default(); a host routes logs to OTEL by passing a logger whose handler bridges there. A nil logger is treated as slog.Default().
func (*Engine) WithMaxOpenConns ¶
WithMaxOpenConns sets the maximum number of open SQL connections per shard.
func (*Engine) WithMeterProvider ¶
func (e *Engine) WithMeterProvider(mp metric.MeterProvider) *Engine
WithMeterProvider sets the OpenTelemetry MeterProvider the engine builds its dwarf_* instruments from. Defaults to the global otel.GetMeterProvider() (the no-op provider unless the host configures the OTEL SDK). The engine creates instruments under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host service's identity. Must be set before Startup.
func (*Engine) WithNumShards ¶
WithNumShards sets the number of database shards.
func (*Engine) WithPeerNotifier ¶
func (e *Engine) WithPeerNotifier(pn PeerNotifier) *Engine
WithPeerNotifier sets the cross-replica coordination interface.
func (*Engine) WithTaskExecutor ¶
func (e *Engine) WithTaskExecutor(te TaskExecutor) *Engine
WithTaskExecutor sets the function that executes workflow tasks.
func (*Engine) WithTimeBudget ¶
WithTimeBudget sets the maximum duration for a single task execution.
func (*Engine) WithTracerProvider ¶
func (e *Engine) WithTracerProvider(tp trace.TracerProvider) *Engine
WithTracerProvider sets the OpenTelemetry TracerProvider the engine builds its spans from. Defaults to the global otel.GetTracerProvider() (the no-op provider unless the host configures the OTEL SDK). The engine mints the root "workflow" span at Create (persisted to the dwarf-owned trace_parent column) and a per-step span in processStep, parented to the reconstructed root and placed on the TaskExecutor's context so the task's downstream spans nest under it. The host injects only the provider - no span code, no trace_parent handling. Spans are created under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host's identity. Must be set before Startup.
func (*Engine) WithWorkers ¶
WithWorkers sets the number of worker goroutines.
type FlowStoppedCallback ¶
type FlowStoppedCallback func(ctx context.Context, hostname string, outcome *workflow.FlowOutcome)
FlowStoppedCallback is fired when a flow stops (completed, failed, cancelled, interrupted). The hostname is the notify_hostname stored on the flow via StartNotify.
type GraphLoader ¶
GraphLoader fetches a workflow graph definition by name. The flow's opaque baggage rides on ctx; read it with workflow.BaggageFrom(ctx) if loading is identity-dependent (authz, per-actor graphs).
type PeerNotifier ¶
type PeerNotifier interface {
Enqueue(ctx context.Context, shard, stepID int)
SyncValve(ctx context.Context, taskName string, wCong int, tCong time.Time)
TripBreaker(ctx context.Context, taskName string)
// NotifyStatusChange tells peer replicas a flow reached a stopped status (completed, failed,
// cancelled, interrupted) so their Await callers wake and re-check. Without it, an Await on the
// replica that did not run the flow's final step blocks until its context deadline. The receiving
// replica routes the signal to HandleNotifyStatusChange.
NotifyStatusChange(ctx context.Context, flowKey string, status string)
}
PeerNotifier sends cross-replica coordination signals. All methods are fire-and-forget.
Every signal must be delivered to OTHER replicas only, EXCLUDING the calling replica. The engine always applies a signal's effect locally before invoking the corresponding PeerNotifier method (e.g. startNotify rings the local doorbell via handleEnqueue and then calls Enqueue; valveRegulate mutates the local valve and then calls SyncValve). An implementation that echoes the signal back to the sender would cause it to be processed twice on the originating replica — a doubled enqueue, valve cut, breaker trip, or status-change wake. If the underlying transport delivers published messages to the publisher, the implementation is responsible for filtering out self-delivery.
type ShardSummary ¶
type ShardSummary struct {
Shard int `json:"shard,omitzero"`
Error string `json:"error,omitzero"`
LatencyMs int `json:"latencyMs,omitzero"`
Steps int `json:"steps,omitzero"`
Flows int `json:"flows,omitzero"`
}
ShardSummary is the health/size summary of a single database shard.
type TaskExecutor ¶
TaskExecutor executes a single task within a workflow. The flow carrier has its state pre-populated; the executor should call the task and let it write changes to the flow. The flow's opaque baggage rides on ctx - read it with workflow.BaggageFrom(ctx) (e.g. to mint a token).
type TaskHandler ¶
TaskHandler is the signature for a test task handler. Read the flow's baggage, if any, with workflow.BaggageFrom(ctx).
type TestProxy ¶
type TestProxy struct {
// contains filtered or unexported fields
}
TestProxy routes graph fetches and task dispatches to registered handlers. It implements both GraphLoader and TaskExecutor for use with Engine.RunInTest.
func NewTestProxy ¶
func NewTestProxy() *TestProxy
NewTestProxy creates a new test proxy with empty handler registries.
func (*TestProxy) ExecuteTask ¶
ExecuteTask implements the TaskExecutor signature.
func (*TestProxy) HandleGraph ¶
HandleGraph registers a workflow graph under the given name. The name should match the workflow URL passed to Engine.Create or Engine.Run.
func (*TestProxy) HandleTask ¶
func (p *TestProxy) HandleTask(name string, handler TaskHandler)
HandleTask registers a task handler under the given name. The name should match the task URL registered via graph.AddTask.