engine

package
v0.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Overview

Package engine is the dwarf workflow-orchestration engine.

The engine executes workflow graphs against a SQL database, scheduling and dispatching one task at a time per step, persisting state between steps, and driving fan-out/fan-in, retries, sleeps, subgraphs, and interrupts. It owns no transport of its own; a host wires it to the outside world through injected dependency interfaces and calls its operations.

Lifecycle

Build an engine with NewEngine and the Set* methods, register a Host (see SetHost), then Startup (opens the database, runs migrations, starts workers). Shutdown drains the workers and closes the database. In tests, RunInTest replaces Startup/Shutdown with per-test SQLite databases and t.Cleanup.

eng := engine.NewEngine()
eng.SetDSN("postgres://user:pass@host:5432/dwarf")
eng.SetHost(host)
if err := eng.Startup(ctx); err != nil { ... }
defer eng.Shutdown(ctx)

Each Set* method returns an error. The live ones (SetNumShards, SetMaxOpenConns, SetTimeBudget, SetDefaultPriority) take effect immediately on a running engine; the construction-time-only ones (SetDSN, SetWorkers, SetHost, SetLogger, SetMeterProvider, SetTracerProvider) return an error if called after Startup.

Host

The engine reaches the outside world through a single injected Host interface (see SetHost):

  • LoadGraph fetches a workflow graph by name (called at Create; the graph JSON is then frozen on the flow), and on subgraph spawn.
  • ExecuteTask executes one task, given the Flow carrier with its state pre-populated.
  • FlowStopped is fired when a flow stops, for flows created with FlowOptions.NotifyOnStop; the flow's baggage is on the ctx so the host resolves delivery itself. A host with no notification need does nothing.
  • SignalPeers ships a cross-replica coordination signal (op + opaque payload bytes) to the other replicas, which hand it back via Engine.DeliverSignal; a single-replica host does nothing.

The flow's opaque baggage (host identity/tenant/context, set in workflow.FlowOptions) rides on the dispatch context of every LoadGraph and ExecuteTask call; read it with workflow.BaggageFrom(ctx).

Operations

Create or CreateTask makes a flow; Start runs it; Await blocks until it stops; Run is Create+Start+Await in one call. Snapshot/History/Step/List inspect; Resume/ResumeBreak continue a paused flow; Cancel/Restart/RestartFrom/Continue manage lifecycle; Delete/Purge retain. See the repository's docs/ directory for guides.

Example

Wire an engine to a host, then create, start, and await a flow.

package main

import (
	"context"
	"fmt"

	"github.com/microbus-io/dwarf/engine"
	"github.com/microbus-io/dwarf/workflow"
)

// exampleHost implements engine.Host. A real host loads graphs from a registry/file/database/RPC and
// dispatches tasks over a local call, RPC, or message bus; here an in-memory registry and a local
// function stand in. LoadGraph and ExecuteTask are required; the remaining Host methods (flow-stop
// notification and the cross-replica signals) are left as no-ops.
type exampleHost struct {
	graphs map[string]*workflow.Graph
}

func (h exampleHost) LoadGraph(ctx context.Context, name string) (*workflow.Graph, error) {
	return h.graphs[name], nil
}
func (h exampleHost) ExecuteTask(ctx context.Context, taskName string, f *workflow.Flow) error {
	f.SetString("greeting", "hello "+f.GetString("name"))
	return nil
}
func (exampleHost) FlowStopped(context.Context, string, *workflow.FlowOutcome) {}
func (exampleHost) SignalPeers(context.Context, string, []byte)                {}

func main() {
	ctx := context.Background()

	graphs := map[string]*workflow.Graph{}
	g := workflow.NewGraph("Greet")
	g.SetEndpoint("Hello", "Hello")
	g.AddTransition("Hello", workflow.END)
	graphs["greet"] = g

	eng := engine.NewEngine()
	eng.SetDSN("postgres://user:pass@localhost:5432/dwarf")
	eng.SetHost(exampleHost{graphs: graphs})

	if err := eng.Startup(ctx); err != nil {
		panic(err)
	}
	defer eng.Shutdown(ctx)

	// Run is Create + Start + Await in one call.
	_, out, err := eng.Run(ctx, "greet", map[string]any{"name": "ada"}, nil)
	if err != nil {
		panic(err)
	}
	fmt.Println(out.State["greeting"])
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the standalone workflow orchestration engine.

func NewEngine

func NewEngine() *Engine

NewEngine creates a new workflow engine.

func (*Engine) Await

func (e *Engine) Await(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)

Await blocks until a flow stops.

func (*Engine) BreakBefore

func (e *Engine) BreakBefore(ctx context.Context, flowKey string, taskName string, enabled bool) error

BreakBefore sets or clears a breakpoint before a named task.

func (*Engine) Cancel

func (e *Engine) Cancel(ctx context.Context, flowKey string, reason string) error

Cancel aborts a flow.

func (*Engine) Continue

func (e *Engine) Continue(ctx context.Context, threadKey string, additionalState any, opts *workflow.FlowOptions) (string, error)

Continue creates a new flow from the latest completed flow in a thread.

func (*Engine) Create

func (e *Engine) Create(ctx context.Context, workflowURL string, initialState any, opts *workflow.FlowOptions) (flowKey string, err error)

Create creates a new flow for a workflow without starting it. opts carries the flow's scheduling (priority/fairness/start-at) and its opaque host Baggage; nil opts uses defaults.

Example

Create separates flow creation from starting it, and accepts FlowOptions for scheduling and the opaque host baggage carried with the flow.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/microbus-io/dwarf/engine"
	"github.com/microbus-io/dwarf/workflow"
)

func main() {
	var eng *engine.Engine // obtained from NewEngine().…Startup(ctx)
	ctx := context.Background()

	flowKey, err := eng.Create(ctx, "greet", map[string]any{"name": "ada"},
		&workflow.FlowOptions{
			Priority:    10,                             // lower runs first
			FairnessKey: "tenant-42",                    // fair scheduling bucket
			StartAt:     time.Now().Add(1 * time.Hour),  // delayed start
			Baggage:     map[string]any{"actor": "ada"}, // read with workflow.BaggageFrom(ctx)
		})
	if err != nil {
		panic(err)
	}

	// The flow sits in "created" until Start; Await blocks until it stops.
	if err := eng.Start(ctx, flowKey); err != nil {
		panic(err)
	}
	out, _ := eng.Await(ctx, flowKey)
	fmt.Println(out.Status)
}

func (*Engine) CreateTask

func (e *Engine) CreateTask(ctx context.Context, name, taskURL string, initialState any, opts *workflow.FlowOptions) (flowKey string, err error)

CreateTask creates a flow for a single task without starting it. name is the node's display name (shown in diagrams/history; required, non-empty), placed before taskURL to match NewGraph(name)/graph.SetEndpoint(name, url)/flow.Subtask(name, url). taskURL is the task's dispatch URL. opts carries scheduling and the opaque host Baggage (see Create); nil opts uses defaults.

func (*Engine) Delete

func (e *Engine) Delete(ctx context.Context, flowKey string) error

Delete removes a flow and its steps.

func (*Engine) DeliverSignal added in v0.4.0

func (e *Engine) DeliverSignal(ctx context.Context, op string, payload []byte) error

DeliverSignal processes an inbound peer signal. The host calls it with the op routing key and the payload bytes it received from a peer (the JSON encoding of what the engine handed that peer's SignalPeers). It delegates by op to the matching internal handler. op and payload are opaque to the host; only the engine interprets them.

func (*Engine) Fingerprint

func (e *Engine) Fingerprint(ctx context.Context, flowKey string) (fingerprint string, status string, err error)

Fingerprint returns a fingerprint and status for change detection.

func (*Engine) History

func (e *Engine) History(ctx context.Context, flowKey string) ([]workflow.FlowStep, error)

History returns the step-by-step execution history of a flow.

func (*Engine) HistoryMermaid

func (e *Engine) HistoryMermaid(ctx context.Context, flowKey string, w io.StringWriter) error

HistoryMermaid writes the execution DAG of a flow as a Mermaid diagram.

func (*Engine) List

func (e *Engine) List(ctx context.Context, query workflow.Query) ([]workflow.FlowSummary, string, error)

List queries flows by status, workflow name, or thread key.

func (*Engine) Purge

func (e *Engine) Purge(ctx context.Context, query workflow.Query) (int, error)

Purge deletes flows matching a query.

func (*Engine) Restart

func (e *Engine) Restart(ctx context.Context, flowKey string, stateOverrides any) error

Restart re-executes a flow from the beginning.

func (*Engine) RestartFrom

func (e *Engine) RestartFrom(ctx context.Context, stepKey string, stateOverrides any) error

RestartFrom re-executes a flow from a specific step.

func (*Engine) Resume

func (e *Engine) Resume(ctx context.Context, flowKey string, resumeData any) error

Resume continues a flow paused by flow.Interrupt.

func (*Engine) ResumeBreak

func (e *Engine) ResumeBreak(ctx context.Context, flowKey string, stateOverrides any) error

ResumeBreak continues a flow paused at a BreakBefore breakpoint.

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, workflowURL string, initialState any, opts *workflow.FlowOptions) (flowKey string, outcome *workflow.FlowOutcome, err error)

Run creates, starts, and awaits a flow in one call, returning the new flow's key alongside its outcome (the key is the flow's identity, not part of the outcome). opts carries scheduling and the opaque host Baggage; nil opts uses defaults. On error, flowKey is "" and outcome is nil.

func (*Engine) RunInTest

func (e *Engine) RunInTest(t *testing.T)

RunInTest initializes the engine for testing with per-test SQLite databases and registers cleanup via t.Cleanup.

func (*Engine) SetDSN added in v0.4.0

func (e *Engine) SetDSN(dsn string) error

SetDSN sets the SQL data source name. Use "%d" for sharded DSNs; an empty DSN in test mode uses SQLite in-memory. Construction-time only - the shards are opened against the DSN at Startup, so changing it on a running engine (which would require reopening live connections) is rejected.

func (*Engine) SetDebugLogger added in v0.4.0

func (e *Engine) SetDebugLogger() error

SetDebugLogger is a convenience that wires a human-readable text logger to stderr at debug level - shorthand for SetLogger(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))). It is meant for development and test runs where you want to see the engine's (and its sequel DB layer's) internal logging without standing up an OTEL pipeline. Output goes to stderr, not stdout, so it never mixes with a program's data stream - the standard convention for diagnostic logs. Because it routes through SetLogger, it counts as an explicitly-set logger, so it also reaches sequel via the engine's existing SetLogger wiring (sequel's migration logs appear too). Construction-time only.

func (*Engine) SetDefaultPriority added in v0.4.0

func (e *Engine) SetDefaultPriority(p int) error

SetDefaultPriority sets the default priority for new flows. Live: read fresh on each Create, so it takes effect on a running engine immediately.

func (*Engine) SetHost added in v0.4.0

func (e *Engine) SetHost(h Host) error

SetHost registers the host the engine reaches the outside world through: it loads graphs, executes tasks, and (optionally) receives flow-stop notifications and carries cross-replica coordination signals. A host must implement LoadGraph and ExecuteTask; the remaining Host methods may be no-ops. Construction-time only.

func (*Engine) SetLogger added in v0.4.0

func (e *Engine) SetLogger(l *slog.Logger) error

SetLogger sets the structured logger. The engine logs through the *Context variants (DebugContext/InfoContext/WarnContext/ErrorContext) so a handler that reads the context - e.g. the otelslog bridge - can correlate each record with the active step span. A host routes logs to OTEL by passing a logger whose handler bridges there. Defaults to a discard logger: until a logger is injected the engine (and its sequel DB layer) stay silent rather than writing to the application-owned slog.Default(). A nil logger resets to that silent default. Construction-time only - the engine wires the logger into the worker hot path and the shard DBs at Startup, and reads it lock-free thereafter.

func (*Engine) SetMaxOpenConns added in v0.4.0

func (e *Engine) SetMaxOpenConns(n int) error

SetMaxOpenConns sets the maximum number of open SQL connections per shard and re-applies it to every live shard pool, so it takes effect on a running engine immediately. (sequel's pool setters are hot/atomic.) Before Startup it records the value, applied as each shard opens.

func (*Engine) SetMeterProvider added in v0.4.0

func (e *Engine) SetMeterProvider(mp metric.MeterProvider) error

SetMeterProvider sets the OpenTelemetry MeterProvider the engine builds its dwarf_* instruments from. Defaults to the global otel.GetMeterProvider() (the no-op provider unless the host configures the OTEL SDK). The engine creates instruments under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host service's identity. Construction-time only - the engine resolves the meter once at Startup.

func (*Engine) SetNumShards added in v0.4.0

func (e *Engine) SetNumShards(num int) error

SetNumShards sets the number of database shards and, on a running engine, brings any added shards online (open + migrate) in one call, returning any error from opening/migrating them. New flows spread onto the added shards immediately; existing flows stay on their original shard.

The count may only grow at runtime: a value at or below the current live shard count records the new target but removes nothing (old shards drain naturally; an actual reduction takes effect only on a restart, where Startup opens just numShards shards). Concurrency-safe - the open+migrate work is serialized internally and runs off the hot path. Before Startup it simply records the target (no shards are open yet). It takes no ctx because the underlying open+migrate path is not ctx-cancellable.

func (*Engine) SetTimeBudget added in v0.4.0

func (e *Engine) SetTimeBudget(d time.Duration) error

SetTimeBudget sets the maximum duration for a single task execution. Live: read fresh on each dispatch, so it takes effect on a running engine immediately.

func (*Engine) SetTracerProvider added in v0.4.0

func (e *Engine) SetTracerProvider(tp trace.TracerProvider) error

SetTracerProvider sets the OpenTelemetry TracerProvider the engine builds its spans from. Defaults to the global otel.GetTracerProvider() (the no-op provider unless the host configures the OTEL SDK). The engine mints the root "workflow" span at Create (persisted to the dwarf-owned trace_parent column) and a per-step span in processStep, parented to the reconstructed root and placed on the TaskExecutor's context so the task's downstream spans nest under it. The host injects only the provider - no span code, no trace_parent handling. Spans are created under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host's identity. Construction-time only - the engine resolves the tracer once at Startup.

func (*Engine) SetWorkers added in v0.4.0

func (e *Engine) SetWorkers(n int) error

SetWorkers sets the number of worker goroutines. Construction-time only: the pool is spawned at Startup at this count, and live resizing (spawning/retiring workers and resizing the candidate cache) is not supported, so a call on a running engine is rejected.

func (*Engine) ShardInfo

func (e *Engine) ShardInfo(ctx context.Context) ([]ShardSummary, error)

ShardInfo returns health and size summaries for all shards.

func (*Engine) Shutdown

func (e *Engine) Shutdown(ctx context.Context) error

Shutdown stops all worker goroutines and closes database connections.

func (*Engine) Snapshot

func (e *Engine) Snapshot(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)

Snapshot returns the current state and status of a flow.

func (*Engine) Start

func (e *Engine) Start(ctx context.Context, flowKey string) error

Start transitions a created flow to running. Whether the flow notifies the host on stop is set at Create via FlowOptions.NotifyOnStop.

func (*Engine) Startup

func (e *Engine) Startup(ctx context.Context) error

Startup initializes the engine: opens database connections, runs migrations, and starts worker goroutines.

func (*Engine) StartupInTest added in v0.4.0

func (e *Engine) StartupInTest(ctx context.Context, testID string) error

StartupInTest initializes the engine against isolated, throwaway test databases - one per shard, keyed by testID - instead of opening the configured DSN. It is for a host that is itself running under test (so it has no *testing.T to hand RunInTest) but must still convey "use a disposable, isolated database" down to sequel. The testID must be stable for the test run and shared by every replica that should see the same database (e.g. a per-test isolation key shared by all replicas in one test app); distinct test runs pass distinct ids for isolation. The engine never learns the host's notion of "test mode" - it only receives this concrete instruction. Unlike RunInTest there is no cleanup hook; the host drives teardown by calling Shutdown (e.g. from its own shutdown lifecycle).

func (*Engine) Step

func (e *Engine) Step(ctx context.Context, stepKey string) (*workflow.FlowStep, error)

Step returns details of a single step.

type Host added in v0.4.0

type Host interface {
	// LoadGraph fetches a workflow graph definition by its URL (the addressable resolve key passed to
	// Create). The flow's opaque baggage rides on ctx; read it with workflow.BaggageFrom(ctx) if loading
	// is identity-dependent (authz, per-actor graphs).
	LoadGraph(ctx context.Context, workflowURL string) (*workflow.Graph, error)

	// ExecuteTask executes a single task within a workflow. taskURL is the task's dispatch URL (the real
	// downstream address), not the graph node name. The flow carrier has its state pre-populated; the
	// executor should call the task and let it write changes to the flow. The flow's opaque baggage rides
	// on ctx - read it with workflow.BaggageFrom(ctx) (e.g. to mint a token).
	ExecuteTask(ctx context.Context, taskURL string, flow *workflow.Flow) error

	// FlowStopped is fired when a flow stops (completed, failed, cancelled, interrupted), but only for a
	// flow created with FlowOptions.NotifyOnStop=true. flowKey identifies the stopped flow (it is not part
	// of the outcome). The flow's opaque baggage rides on ctx (read it with workflow.BaggageFrom(ctx)); the
	// host decides where/how to deliver the notification from it - the engine traffics in no delivery
	// address. Optional: a host with no notification need does nothing.
	FlowStopped(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome)

	// SignalPeers delivers a cross-replica coordination signal to the other replicas. op is an opaque
	// routing key (usable as a topic); payload is opaque bytes the engine already serialized. The host
	// ships (op, payload) to peers and on the receiving side calls Engine.DeliverSignal(ctx, op,
	// payload). See the cross-replica signal contract above. A single-replica host does nothing here.
	SignalPeers(ctx context.Context, op string, payload []byte)
}

Host is the contract between the dwarf engine and the surrounding host application. The engine owns no transport of its own; it reaches workflow graphs and tasks, reports flow stops, and carries cross-replica coordination signals exclusively through the host. Register it once via Engine.SetHost.

A host MUST implement LoadGraph and ExecuteTask. The remaining two methods are optional: an implementation may do nothing in them when it has no flow-stop notification need (FlowStopped) or runs single-replica with no cross-replica coordination (SignalPeers).

Cross-replica signal contract (SignalPeers): the engine funnels all of its coordination signals (work doorbells, flow-stop wakes) through this one method. op is a routing key the host may use as a topic/subject; payload is opaque bytes the engine already serialized. The host delivers (op, payload) to OTHER replicas, EXCLUDING the calling replica, and on the receiving side hands them back via Engine.DeliverSignal(ctx, op, payload) - it is a pure pipe that never inspects either. The engine always applies a signal's effect locally before calling SignalPeers, so an implementation that echoes the signal back to the sender would cause it to be processed twice on the originating replica (a doubled enqueue or status-change wake); if the transport delivers published messages to the publisher, the implementation must filter out self-delivery. Because the host never branches on op or inspects payload, adding a new engine signal kind requires no host change.

type ShardSummary

type ShardSummary struct {
	Shard     int    `json:"shard,omitzero"`
	Error     string `json:"error,omitzero"`
	LatencyMs int    `json:"latencyMs,omitzero"`
	Steps     int    `json:"steps,omitzero"`
	Flows     int    `json:"flows,omitzero"`
}

ShardSummary is the health/size summary of a single database shard.

type TaskHandler

type TaskHandler func(ctx context.Context, flow *workflow.Flow) error

TaskHandler is the signature for a test task handler. Read the flow's baggage, if any, with workflow.BaggageFrom(ctx).

type TestProxy

type TestProxy struct {
	// contains filtered or unexported fields
}

TestProxy routes graph fetches and task dispatches to registered handlers. It implements the Host interface for use with Engine.SetHost / Engine.RunInTest: LoadGraph and ExecuteTask dispatch to the registered handlers, FlowStopped invokes an optional callback set via OnFlowStopped, and SignalPeers relays to the peer engines registered with AddPeer (none by default, i.e. single-replica). For a multi-replica test, give each replica its own proxy and AddPeer the other engines.

func NewTestProxy

func NewTestProxy() *TestProxy

NewTestProxy creates a new test proxy with empty handler registries.

func (*TestProxy) AddPeer added in v0.4.0

func (p *TestProxy) AddPeer(peer *Engine)

AddPeer registers a peer engine that SignalPeers relays to, standing in for the bus in a single-process multi-replica test. Add every OTHER replica's engine (not this proxy's own), so the engine's self-exclusion contract holds. Call before Startup/RunInTest.

func (*TestProxy) ExecuteTask

func (p *TestProxy) ExecuteTask(ctx context.Context, taskURL string, flow *workflow.Flow) error

ExecuteTask implements Host.

func (*TestProxy) FlowStopped added in v0.4.0

func (p *TestProxy) FlowStopped(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome)

FlowStopped implements Host; it invokes the callback set via OnFlowStopped, if any.

func (*TestProxy) HandleGraph

func (p *TestProxy) HandleGraph(name string, graph *workflow.Graph)

HandleGraph registers a workflow graph under the given name. The name should match the workflow URL passed to Engine.Create or Engine.Run.

func (*TestProxy) HandleTask

func (p *TestProxy) HandleTask(name string, handler TaskHandler)

HandleTask registers a task handler under the given name. The name should match the task URL registered via graph.SetEndpoint.

func (*TestProxy) LoadGraph

func (p *TestProxy) LoadGraph(ctx context.Context, workflowURL string) (*workflow.Graph, error)

LoadGraph implements Host.

func (*TestProxy) OnFlowStopped added in v0.4.0

func (p *TestProxy) OnFlowStopped(cb func(ctx context.Context, flowKey string, outcome *workflow.FlowOutcome))

OnFlowStopped registers a callback invoked by FlowStopped (only for flows created with FlowOptions.NotifyOnStop). Nil (the default) makes FlowStopped a no-op. The flow's baggage is on the ctx.

func (*TestProxy) SignalPeers added in v0.4.0

func (p *TestProxy) SignalPeers(ctx context.Context, op string, payload []byte)

SignalPeers implements Host; it relays the signal to every peer engine registered with AddPeer. With no peers (the default) it is a single-replica no-op. The relay is async to mirror bus semantics and avoid synchronous reentrancy into a peer mid-processStep.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL