engine

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2026 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Overview

Package engine is the dwarf workflow-orchestration engine.

The engine executes workflow graphs against a SQL database, scheduling and dispatching one task at a time per step, persisting state between steps, and driving fan-out/fan-in, retries, sleeps, subgraphs, interrupts, backpressure, and circuit breakers. It owns no transport of its own; a host wires it to the outside world through injected dependency interfaces and calls its operations.

Lifecycle

Build an engine with NewEngine and the With* builder methods, inject at least a GraphLoader and a TaskExecutor, then Startup (opens the database, runs migrations, starts workers). Shutdown drains the workers and closes the database. In tests, RunInTest replaces Startup/Shutdown with per-test SQLite databases and t.Cleanup.

eng := engine.NewEngine().
	WithDSN("postgres://user:pass@host:5432/dwarf").
	WithGraphLoader(loadGraph).
	WithTaskExecutor(runTask)
if err := eng.Startup(ctx); err != nil { ... }
defer eng.Shutdown(ctx)

The With* configuration methods are atomic and may be called after Startup for hot reconfiguration.

Dependency interfaces

The engine reaches the outside world through four injection points:

  • GraphLoader fetches a workflow graph by name (called once at Create; the graph JSON is then frozen on the flow).
  • TaskExecutor executes one task, given the Flow carrier with its state pre-populated.
  • FlowStoppedCallback (optional) is fired when a flow stops, for flows started via StartNotify.
  • PeerNotifier (optional) carries cross-replica coordination signals; nil in single-replica mode.

The flow's opaque baggage (host identity/tenant/context, set in workflow.FlowOptions) rides on the dispatch context of every GraphLoader and TaskExecutor call; read it with workflow.BaggageFrom(ctx).

Operations

Create or CreateTask makes a flow; Start (or StartNotify) runs it; Await blocks until it stops; Run is Create+Start+Await in one call. Snapshot/History/Step/List inspect; Resume/ResumeBreak continue a paused flow; Cancel/Restart/RestartFrom/Continue manage lifecycle; Delete/Purge retain. See the repository's docs/ directory for guides.

Example

Wire an engine to a graph source and a task runner, then create, start, and await a flow. A real host injects a GraphLoader that fetches graphs (from a registry, file, database, or RPC) and a TaskExecutor that dispatches the named task (local call, RPC, message bus); here a small in-memory registry and a local function stand in for both.

package main

import (
	"context"
	"fmt"

	"github.com/microbus-io/dwarf/engine"
	"github.com/microbus-io/dwarf/workflow"
)

func main() {
	ctx := context.Background()

	graphs := map[string]*workflow.Graph{}
	g := workflow.NewGraph("greet")
	g.AddTask("hello", "hello")
	g.AddTransition("hello", workflow.END)
	graphs["greet"] = g

	eng := engine.NewEngine().
		WithDSN("postgres://user:pass@localhost:5432/dwarf").
		WithGraphLoader(func(ctx context.Context, name string) (*workflow.Graph, error) {
			return graphs[name], nil
		}).
		WithTaskExecutor(func(ctx context.Context, taskName string, f *workflow.Flow) error {
			f.SetString("greeting", "hello "+f.GetString("name"))
			return nil
		})

	if err := eng.Startup(ctx); err != nil {
		panic(err)
	}
	defer eng.Shutdown(ctx)

	// Run is Create + Start + Await in one call.
	out, err := eng.Run(ctx, "greet", map[string]any{"name": "ada"}, nil)
	if err != nil {
		panic(err)
	}
	fmt.Println(out.State["greeting"])
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the standalone workflow orchestration engine.

func NewEngine

func NewEngine() *Engine

NewEngine creates a new workflow engine.

func (*Engine) Await

func (e *Engine) Await(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)

Await blocks until a flow stops.

func (*Engine) BreakBefore

func (e *Engine) BreakBefore(ctx context.Context, flowKey string, taskName string, enabled bool) error

BreakBefore sets or clears a breakpoint before a named task.

func (*Engine) BreakerTripped

func (e *Engine) BreakerTripped(taskName string) bool

BreakerTripped reports whether the breaker for taskName is currently tripped.

func (*Engine) Cancel

func (e *Engine) Cancel(ctx context.Context, flowKey string, reason string) error

Cancel aborts a flow.

func (*Engine) Continue

func (e *Engine) Continue(ctx context.Context, threadKey string, additionalState any, opts *workflow.FlowOptions) (string, error)

Continue creates a new flow from the latest completed flow in a thread.

func (*Engine) Create

func (e *Engine) Create(ctx context.Context, workflowName string, initialState any, opts *workflow.FlowOptions) (flowKey string, err error)

Create creates a new flow for a workflow without starting it. opts carries the flow's scheduling (priority/fairness/start-at) and its opaque host Baggage; nil opts uses defaults.

Example

Create separates flow creation from starting it, and accepts FlowOptions for scheduling and the opaque host baggage carried with the flow.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/microbus-io/dwarf/engine"
	"github.com/microbus-io/dwarf/workflow"
)

func main() {
	var eng *engine.Engine // obtained from NewEngine().…Startup(ctx)
	ctx := context.Background()

	flowKey, err := eng.Create(ctx, "greet", map[string]any{"name": "ada"},
		&workflow.FlowOptions{
			Priority:    10,                             // lower runs first
			FairnessKey: "tenant-42",                    // fair scheduling bucket
			StartAt:     time.Now().Add(1 * time.Hour),  // delayed start
			Baggage:     map[string]any{"actor": "ada"}, // read with workflow.BaggageFrom(ctx)
		})
	if err != nil {
		panic(err)
	}

	// The flow sits in "created" until Start; Await blocks until it stops.
	if err := eng.Start(ctx, flowKey); err != nil {
		panic(err)
	}
	out, _ := eng.Await(ctx, flowKey)
	fmt.Println(out.Status)
}

func (*Engine) CreateTask

func (e *Engine) CreateTask(ctx context.Context, taskName string, initialState any, opts *workflow.FlowOptions) (flowKey string, err error)

CreateTask creates a flow for a single task without starting it. opts carries scheduling and the opaque host Baggage (see Create); nil opts uses defaults.

func (*Engine) Delete

func (e *Engine) Delete(ctx context.Context, flowKey string) error

Delete removes a flow and its steps.

func (*Engine) Fingerprint

func (e *Engine) Fingerprint(ctx context.Context, flowKey string) (fingerprint string, status string, err error)

Fingerprint returns a fingerprint and status for change detection.

func (*Engine) HandleEnqueue

func (e *Engine) HandleEnqueue(ctx context.Context, shard, stepID int) error

HandleEnqueue processes an inbound doorbell signal from another replica.

func (*Engine) HandleNotifyStatusChange

func (e *Engine) HandleNotifyStatusChange(ctx context.Context, flowKey string, status string) error

HandleNotifyStatusChange processes an inbound status change notification.

func (*Engine) HandleSyncValve

func (e *Engine) HandleSyncValve(ctx context.Context, taskName string, wCong int, tCong time.Time) error

HandleSyncValve processes an inbound valve gossip signal from another replica.

func (*Engine) HandleTripBreaker

func (e *Engine) HandleTripBreaker(ctx context.Context, taskName string) error

HandleTripBreaker processes an inbound breaker trip signal from another replica.

func (*Engine) History

func (e *Engine) History(ctx context.Context, flowKey string) ([]workflow.FlowStep, error)

History returns the step-by-step execution history of a flow.

func (*Engine) HistoryMermaid

func (e *Engine) HistoryMermaid(ctx context.Context, flowKey string, w io.StringWriter) error

HistoryMermaid writes the execution DAG of a flow as a Mermaid diagram.

func (*Engine) List

func (e *Engine) List(ctx context.Context, query workflow.Query) ([]workflow.FlowSummary, string, error)

List queries flows by status, workflow name, or thread key.

func (*Engine) Purge

func (e *Engine) Purge(ctx context.Context, query workflow.Query) (int, error)

Purge deletes flows matching a query.

func (*Engine) Restart

func (e *Engine) Restart(ctx context.Context, flowKey string, stateOverrides any) error

Restart re-executes a flow from the beginning.

func (*Engine) RestartFrom

func (e *Engine) RestartFrom(ctx context.Context, stepKey string, stateOverrides any) error

RestartFrom re-executes a flow from a specific step.

func (*Engine) Resume

func (e *Engine) Resume(ctx context.Context, flowKey string, resumeData any) error

Resume continues a flow paused by flow.Interrupt.

func (*Engine) ResumeBreak

func (e *Engine) ResumeBreak(ctx context.Context, flowKey string, stateOverrides any) error

ResumeBreak continues a flow paused at a BreakBefore breakpoint.

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, workflowName string, initialState any, opts *workflow.FlowOptions) (*workflow.FlowOutcome, error)

Run creates, starts, and awaits a flow in one call. opts carries scheduling and the opaque host Baggage; nil opts uses defaults.

func (*Engine) RunInTest

func (e *Engine) RunInTest(t *testing.T)

RunInTest initializes the engine for testing with per-test SQLite databases and registers cleanup via t.Cleanup.

func (*Engine) ShardInfo

func (e *Engine) ShardInfo(ctx context.Context) ([]ShardSummary, error)

ShardInfo returns health and size summaries for all shards.

func (*Engine) Shutdown

func (e *Engine) Shutdown(ctx context.Context) error

Shutdown stops all worker goroutines and closes database connections.

func (*Engine) Snapshot

func (e *Engine) Snapshot(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)

Snapshot returns the current state and status of a flow.

func (*Engine) Start

func (e *Engine) Start(ctx context.Context, flowKey string) error

Start transitions a created flow to running.

func (*Engine) StartNotify

func (e *Engine) StartNotify(ctx context.Context, flowKey string, notifyHostname string) error

StartNotify starts a flow and registers a hostname for stop notifications.

func (*Engine) Startup

func (e *Engine) Startup(ctx context.Context) error

Startup initializes the engine: opens database connections, runs migrations, and starts worker goroutines.

func (*Engine) Step

func (e *Engine) Step(ctx context.Context, stepKey string) (*workflow.FlowStep, error)

Step returns details of a single step.

func (*Engine) WithDSN

func (e *Engine) WithDSN(dsn string) *Engine

WithDSN sets the SQL data source name. Use "%d" for sharded DSNs. An empty DSN in test mode uses SQLite in-memory.

func (*Engine) WithDefaultPriority

func (e *Engine) WithDefaultPriority(p int) *Engine

WithDefaultPriority sets the default priority for new flows.

func (*Engine) WithFlowStoppedCallback

func (e *Engine) WithFlowStoppedCallback(cb FlowStoppedCallback) *Engine

WithFlowStoppedCallback sets the callback fired when a flow stops.

func (*Engine) WithGraphLoader

func (e *Engine) WithGraphLoader(gl GraphLoader) *Engine

WithGraphLoader sets the function that fetches workflow graph definitions.

func (*Engine) WithLogger

func (e *Engine) WithLogger(l *slog.Logger) *Engine

WithLogger sets the structured logger. The engine logs through the *Context variants (DebugContext/InfoContext/WarnContext/ErrorContext) so a handler that reads the context - e.g. the otelslog bridge - can correlate each record with the active step span. A host routes logs to OTEL by passing a logger whose handler bridges there. Defaults to a discard logger: until a logger is injected the engine (and its sequel DB layer) stay silent rather than writing to the application-owned slog.Default(). A nil logger resets to that silent default. Must be set before Startup; the engine wires the logger into the worker hot path and the shard DBs at startup, so a call after Startup is a no-op (keeping the hot-path read of the logger lock-free).

func (*Engine) WithMaxOpenConns

func (e *Engine) WithMaxOpenConns(n int) *Engine

WithMaxOpenConns sets the maximum number of open SQL connections per shard.

func (*Engine) WithMeterProvider

func (e *Engine) WithMeterProvider(mp metric.MeterProvider) *Engine

WithMeterProvider sets the OpenTelemetry MeterProvider the engine builds its dwarf_* instruments from. Defaults to the global otel.GetMeterProvider() (the no-op provider unless the host configures the OTEL SDK). The engine creates instruments under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host service's identity. Must be set before Startup; the engine resolves the meter once at startup, so a call after Startup is a no-op.

func (*Engine) WithNumShards

func (e *Engine) WithNumShards(n int) *Engine

WithNumShards sets the number of database shards.

func (*Engine) WithPeerNotifier

func (e *Engine) WithPeerNotifier(pn PeerNotifier) *Engine

WithPeerNotifier sets the cross-replica coordination interface.

func (*Engine) WithTaskExecutor

func (e *Engine) WithTaskExecutor(te TaskExecutor) *Engine

WithTaskExecutor sets the function that executes workflow tasks.

func (*Engine) WithTimeBudget

func (e *Engine) WithTimeBudget(d time.Duration) *Engine

WithTimeBudget sets the maximum duration for a single task execution.

func (*Engine) WithTracerProvider

func (e *Engine) WithTracerProvider(tp trace.TracerProvider) *Engine

WithTracerProvider sets the OpenTelemetry TracerProvider the engine builds its spans from. Defaults to the global otel.GetTracerProvider() (the no-op provider unless the host configures the OTEL SDK). The engine mints the root "workflow" span at Create (persisted to the dwarf-owned trace_parent column) and a per-step span in processStep, parented to the reconstructed root and placed on the TaskExecutor's context so the task's downstream spans nest under it. The host injects only the provider - no span code, no trace_parent handling. Spans are created under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host's identity. Must be set before Startup; the engine resolves the tracer once at startup, so a call after Startup is a no-op.

func (*Engine) WithWorkers

func (e *Engine) WithWorkers(n int) *Engine

WithWorkers sets the number of worker goroutines.

type FlowStoppedCallback

type FlowStoppedCallback func(ctx context.Context, hostname string, outcome *workflow.FlowOutcome)

FlowStoppedCallback is fired when a flow stops (completed, failed, cancelled, interrupted). The hostname is the notify_hostname stored on the flow via StartNotify.

type GraphLoader

type GraphLoader func(ctx context.Context, workflowName string) (*workflow.Graph, error)

GraphLoader fetches a workflow graph definition by name. The flow's opaque baggage rides on ctx; read it with workflow.BaggageFrom(ctx) if loading is identity-dependent (authz, per-actor graphs).

type PeerNotifier

type PeerNotifier interface {
	Enqueue(ctx context.Context, shard, stepID int)
	SyncValve(ctx context.Context, taskName string, wCong int, tCong time.Time)
	TripBreaker(ctx context.Context, taskName string)
	// NotifyStatusChange tells peer replicas a flow reached a stopped status (completed, failed,
	// cancelled, interrupted) so their Await callers wake and re-check. Without it, an Await on the
	// replica that did not run the flow's final step blocks until its context deadline. The receiving
	// replica routes the signal to HandleNotifyStatusChange.
	NotifyStatusChange(ctx context.Context, flowKey string, status string)
}

PeerNotifier sends cross-replica coordination signals. All methods are fire-and-forget.

Every signal must be delivered to OTHER replicas only, EXCLUDING the calling replica. The engine always applies a signal's effect locally before invoking the corresponding PeerNotifier method (e.g. startNotify rings the local doorbell via handleEnqueue and then calls Enqueue; valveRegulate mutates the local valve and then calls SyncValve). An implementation that echoes the signal back to the sender would cause it to be processed twice on the originating replica — a doubled enqueue, valve cut, breaker trip, or status-change wake. If the underlying transport delivers published messages to the publisher, the implementation is responsible for filtering out self-delivery.

type ShardSummary

type ShardSummary struct {
	Shard     int    `json:"shard,omitzero"`
	Error     string `json:"error,omitzero"`
	LatencyMs int    `json:"latencyMs,omitzero"`
	Steps     int    `json:"steps,omitzero"`
	Flows     int    `json:"flows,omitzero"`
}

ShardSummary is the health/size summary of a single database shard.

type TaskExecutor

type TaskExecutor func(ctx context.Context, taskName string, flow *workflow.Flow) error

TaskExecutor executes a single task within a workflow. The flow carrier has its state pre-populated; the executor should call the task and let it write changes to the flow. The flow's opaque baggage rides on ctx - read it with workflow.BaggageFrom(ctx) (e.g. to mint a token).

type TaskHandler

type TaskHandler func(ctx context.Context, flow *workflow.Flow) error

TaskHandler is the signature for a test task handler. Read the flow's baggage, if any, with workflow.BaggageFrom(ctx).

type TestProxy

type TestProxy struct {
	// contains filtered or unexported fields
}

TestProxy routes graph fetches and task dispatches to registered handlers. It implements both GraphLoader and TaskExecutor for use with Engine.RunInTest.

func NewTestProxy

func NewTestProxy() *TestProxy

NewTestProxy creates a new test proxy with empty handler registries.

func (*TestProxy) ExecuteTask

func (p *TestProxy) ExecuteTask(ctx context.Context, taskName string, flow *workflow.Flow) error

ExecuteTask implements the TaskExecutor signature.

func (*TestProxy) HandleGraph

func (p *TestProxy) HandleGraph(name string, graph *workflow.Graph)

HandleGraph registers a workflow graph under the given name. The name should match the workflow URL passed to Engine.Create or Engine.Run.

func (*TestProxy) HandleTask

func (p *TestProxy) HandleTask(name string, handler TaskHandler)

HandleTask registers a task handler under the given name. The name should match the task URL registered via graph.AddTask.

func (*TestProxy) LoadGraph

func (p *TestProxy) LoadGraph(ctx context.Context, workflowName string) (*workflow.Graph, error)

LoadGraph implements the GraphLoader signature.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL