Documentation
¶
Overview ¶
Package engine is the dwarf workflow-orchestration engine.
The engine executes workflow graphs against a SQL database, scheduling and dispatching one task at a time per step, persisting state between steps, and driving fan-out/fan-in, retries, sleeps, subgraphs, and interrupts. It owns no transport of its own; a host wires it to the outside world through injected dependency interfaces and calls its operations.
Lifecycle ¶
Build an engine with NewEngine and the Set* methods, register a Host (see SetHost), then Startup (opens the database, runs migrations, starts workers). Shutdown drains the workers and closes the database. In tests, RunInTest replaces Startup/Shutdown with per-test SQLite databases and t.Cleanup.
eng := engine.NewEngine()
eng.SetDSN("postgres://user:pass@host:5432/dwarf")
eng.SetHost(host)
err := eng.Startup(ctx)
if err != nil { ... }
defer eng.Shutdown(ctx)
Each Set* method returns an error. The live ones (SetMaxOpenConns, SetWorkersPerConn, SetTimeBudget, SetDefaultPriority) take effect immediately on a running engine; the construction-time-only ones (SetDSN, SetWorkers, SetNumShards, SetHost, SetLogger, SetMeterProvider, SetTracerProvider) return an error if called after Startup.
Host ¶
The engine reaches the outside world through a single injected Host interface (see SetHost):
- LoadGraph fetches a workflow graph by name (called at Create; the graph JSON is then frozen on the flow), and on subgraph spawn.
- ExecuteTask executes one task, given the Flow carrier with its state pre-populated.
- SignalPeers ships a cross-replica coordination signal (op + opaque payload bytes) to the other replicas, which hand it back via Engine.DeliverSignal; a single-replica host does nothing.
The flow's opaque baggage (host identity/tenant/context, set in workflow.FlowOptions) rides on the dispatch context of every LoadGraph and ExecuteTask call; read it with workflow.BaggageFrom(ctx).
Operations ¶
Create makes a flow and runs it; Await blocks until it stops; Run is Create+Await in one call. Snapshot/History/Step/List inspect; Resume continues a paused flow; Cancel/Continue manage lifecycle; Fork clones a terminal flow from a chosen step into a new flow for non-destructive recovery; Delete/Purge retain. See the repository's docs/ directory for guides.
Security model ¶
Flow and step keys ("{shard}-{id}-{token}") are unguessable bearer capabilities, not authorization. Holding a flow key is by itself sufficient to act on that one flow — Resume, Cancel, Fork, Continue, Delete, and every introspection call — with no further check: the sole gate is the key (the numeric id plus its random flow_token). The engine performs no authentication, authorization, or rate limiting and has no notion of caller identity; its only vantage is the flow reference and the task URL, so ownership and tenancy are invisible to it. Authorizing an operation is therefore the host's responsibility: before calling the engine, verify the authenticated principal may act on the flow — typically from the baggage the host set at Create (see workflow.FlowOptions.Baggage), or the host's own record mapping a principal to the keys it was issued.
The token defends only against reference forgery and id enumeration: flow ids are sequential, so without the token a caller cannot fabricate a key for a flow it was never handed. That is defense in depth, not access control — a leaked, logged, or shared key grants its bearer full write access to that one flow, so treat a key like a password. The engine does not emit keys to traces or logs (telemetry carries only a token-free "{shard}-{id}" correlation id), and there is deliberately no operation that resolves a correlation id back to a key, which would be a capability-minting oracle.
List and Search return keys, tokens included. Exposing them to a principal is equivalent to granting the write capability for every flow they return, so a host must gate them by ownership and never surface them to less-than-fully-trusted callers. Key exposure is also transitive across an execution tree: Step and History navigation resolve and return the keys of neighboring steps, crossing flow boundaries into parent and subgraph-child flows, so a single step key reaches the whole tree (each neighbor key both discloses that step's state and can seed a Fork). Authorizing introspection by one flow's ownership is therefore insufficient when the caller holds any step key in that tree; treat the tree (its root) as the authorization unit, or restrict these surfaces to fully-trusted operators. The inbound peer entry point DeliverSignal is unauthenticated by the engine; authenticating replica-to-replica transport is the host's responsibility. Operations on an unknown or mismatched key return a uniform not-found (no existence oracle), but that is a hardening detail, not a substitute for host authorization.
Resource limits ¶
The engine imposes no size or count limits — not on initial state, baggage, the frozen graph JSON, interrupt/resume payloads, forEach fan-out width (one step row per array element), or subgraph nesting depth. This is deliberate, the same division of labor as backpressure and time budgets: state size is workload-defined (a document-processing workflow may legitimately carry tens of megabytes per flow), and no single cap fits both that and a small control flow. Bounding resource use is therefore the host's job — it holds the caller identity and tenancy the engine cannot see. A host enforces quotas where it has that context: reject an over-large initial state or Baggage before Create, cap forEach input arrays in author space, and bound its own retention (Purge deletes at most 4096 roots per call, so a retention job loops). For a pass-through host that adds no policy of its own, this obligation flows through to the application using that host. (Deep subgraph nesting is bounded storage, not a crash vector: Fork clones the tree iteratively, so nesting depth costs no goroutine stack.)
Example ¶
Wire an engine to a host, then create, start, and await a flow.
package main
import (
"context"
"fmt"
"github.com/microbus-io/dwarf/engine"
"github.com/microbus-io/dwarf/workflow"
)
// exampleHost implements engine.Host. A real host loads graphs from a registry/file/database/RPC and
// dispatches tasks over a local call, RPC, or message bus; here an in-memory registry and a local
// function stand in. LoadGraph and ExecuteTask are required; the remaining Host methods (flow-stop
// notification and the cross-replica signals) are left as no-ops.
type exampleHost struct {
graphs map[string]*workflow.Graph
}
func (h exampleHost) LoadGraph(ctx context.Context, name string) (*workflow.Graph, error) {
return h.graphs[name], nil
}
func (h exampleHost) ExecuteTask(ctx context.Context, taskName string, f *workflow.Flow) error {
f.SetString("greeting", "hello "+f.GetString("name"))
return nil
}
func (exampleHost) SignalPeers(context.Context, string, []byte) {}
func main() {
ctx := context.Background()
graphs := map[string]*workflow.Graph{}
g := workflow.NewGraph("Greet")
g.SetEndpoint("Hello", "Hello")
g.AddTransition("Hello", workflow.END)
graphs["greet"] = g
eng := engine.NewEngine()
eng.SetDSN("postgres://user:pass@localhost:5432/dwarf")
eng.SetHost(exampleHost{graphs: graphs})
err := eng.Startup(ctx)
if err != nil {
panic(err)
}
defer eng.Shutdown(ctx)
// Run is Create + Await in one call.
_, out, err := eng.Run(ctx, "greet", map[string]any{"name": "ada"}, nil)
if err != nil {
panic(err)
}
fmt.Println(out.State["greeting"])
}
Output:
Index ¶
- type Engine
- func (e *Engine) Await(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)
- func (e *Engine) Cancel(ctx context.Context, flowKey string, reason string) error
- func (e *Engine) Continue(ctx context.Context, threadKey string, additionalState any) (string, error)
- func (e *Engine) Create(ctx context.Context, workflowURL string, initialState any, ...) (flowKey string, err error)
- func (e *Engine) Delete(ctx context.Context, flowKey string) error
- func (e *Engine) DeliverSignal(ctx context.Context, op string, payload []byte) error
- func (e *Engine) Fingerprint(ctx context.Context, flowKey string) (fingerprint string, status string, err error)
- func (e *Engine) Fork(ctx context.Context, stepKey string, stateOverrides any) (string, error)
- func (e *Engine) History(ctx context.Context, flowKey string) ([]workflow.FlowStep, error)
- func (e *Engine) HistoryMermaid(ctx context.Context, flowKey string, w io.StringWriter) error
- func (e *Engine) List(ctx context.Context, query workflow.Query) ([]workflow.FlowSummary, string, error)
- func (e *Engine) Purge(ctx context.Context, query workflow.Query) (int, error)
- func (e *Engine) Resume(ctx context.Context, flowKey string, resumeData any) error
- func (e *Engine) Run(ctx context.Context, workflowURL string, initialState any, ...) (flowKey string, outcome *workflow.FlowOutcome, err error)
- func (e *Engine) RunInTest(t *testing.T)
- func (e *Engine) SetDSN(dsn string) error
- func (e *Engine) SetDebugLogger() error
- func (e *Engine) SetDefaultPriority(p int) error
- func (e *Engine) SetHost(h Host) error
- func (e *Engine) SetInTest(name string) error
- func (e *Engine) SetLogger(l *slog.Logger) error
- func (e *Engine) SetMaxOpenConns(n int) error
- func (e *Engine) SetMeterProvider(mp metric.MeterProvider) error
- func (e *Engine) SetNumShards(num int) error
- func (e *Engine) SetTimeBudget(d time.Duration) error
- func (e *Engine) SetTracerProvider(tp trace.TracerProvider) error
- func (e *Engine) SetWorkers(n int) error
- func (e *Engine) SetWorkersPerConn(n int) error
- func (e *Engine) ShardInfo(ctx context.Context) ([]ShardSummary, error)
- func (e *Engine) Shutdown(ctx context.Context) error
- func (e *Engine) Snapshot(ctx context.Context, flowKey string) (*workflow.FlowOutcome, error)
- func (e *Engine) Startup(ctx context.Context) error
- func (e *Engine) Step(ctx context.Context, stepKey string) (*workflow.FlowStep, error)
- type Host
- type ShardSummary
- type TaskHandler
- type TestProxy
- func (p *TestProxy) AddPeer(peer *Engine)
- func (p *TestProxy) ExecuteTask(ctx context.Context, taskURL string, flow *workflow.Flow) error
- func (p *TestProxy) HandleGraph(name string, graph *workflow.Graph)
- func (p *TestProxy) HandleTask(name string, handler TaskHandler)
- func (p *TestProxy) LoadGraph(ctx context.Context, workflowURL string) (*workflow.Graph, error)
- func (p *TestProxy) SignalPeers(ctx context.Context, op string, payload []byte)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the standalone workflow orchestration engine.
func (*Engine) Continue ¶
func (e *Engine) Continue(ctx context.Context, threadKey string, additionalState any) (string, error)
Continue creates a new flow from the latest completed flow in a thread, inheriting that flow's policy (scheduling, baggage) - it does not take FlowOptions. For a turn with different policy, use Create with FlowOptions.ThreadKey.
func (*Engine) Create ¶
func (e *Engine) Create(ctx context.Context, workflowURL string, initialState any, opts *workflow.FlowOptions) (flowKey string, err error)
Create creates a new flow for a workflow and starts it, returning the running flow's key. opts carries the flow's policy (scheduling, DeleteOnCompletion, Baggage, ThreadKey); nil uses defaults. For a flow that must wait for an external trigger, have the entry task call flow.Interrupt and resume it with Resume (which, unlike a separate start, also delivers a payload).
Example ¶
Create makes and runs a flow, and accepts FlowOptions for scheduling, notifications, thread membership, and the opaque host baggage carried with the flow.
package main
import (
"context"
"fmt"
"github.com/microbus-io/dwarf/engine"
"github.com/microbus-io/dwarf/workflow"
)
func main() {
var eng *engine.Engine // obtained from NewEngine().…Startup(ctx)
ctx := context.Background()
flowKey, err := eng.Create(ctx, "greet", map[string]any{"name": "ada"},
&workflow.FlowOptions{
Priority: 10, // lower runs first
FairnessKey: "tenant-42", // fair scheduling bucket
Baggage: map[string]any{"actor": "ada"}, // read with workflow.BaggageFrom(ctx)
})
if err != nil {
panic(err)
}
// Create runs the flow immediately; Await blocks until it stops.
out, _ := eng.Await(ctx, flowKey)
fmt.Println(out.Status)
}
Output:
func (*Engine) DeliverSignal ¶ added in v0.4.0
DeliverSignal processes an inbound peer signal. The host calls it with the op routing key and the payload bytes it received from a peer (the JSON encoding of what the engine handed that peer's SignalPeers). It delegates by op to the matching internal handler. op and payload are opaque to the host; only the engine interprets them.
Trust boundary: the host MUST authenticate the peer channel; a signal admitted here is trusted.
func (*Engine) Fingerprint ¶
func (e *Engine) Fingerprint(ctx context.Context, flowKey string) (fingerprint string, status string, err error)
Fingerprint returns a fingerprint and status for change detection.
func (*Engine) Fork ¶ added in v0.8.0
Fork clones a terminal flow's prefix up to the given step into a new, self-contained running flow and re-executes from that step with optional stateOverrides applied to it. The original flow is never modified. The fork inherits the original's scheduling and baggage (it does not take FlowOptions). Returns the new flow's key.
func (*Engine) HistoryMermaid ¶
HistoryMermaid writes the execution DAG of a flow as a Mermaid diagram.
func (*Engine) List ¶
func (e *Engine) List(ctx context.Context, query workflow.Query) ([]workflow.FlowSummary, string, error)
List queries flows by status, workflow name, or thread key.
func (*Engine) Purge ¶
Purge marks flows matching a query (and their subgraph subtrees) for deletion; a background reaper removes them shortly after. Marked flows are excluded from List/History immediately. Returns the count of roots marked - no more than 4096 per call; iterate to mark more. Running flows are skipped.
func (*Engine) Run ¶
func (e *Engine) Run(ctx context.Context, workflowURL string, initialState any, opts *workflow.FlowOptions) (flowKey string, outcome *workflow.FlowOutcome, err error)
Run creates, starts, and awaits a flow in one call, returning the new flow's key alongside its outcome (the key is the flow's identity, not part of the outcome). opts carries scheduling and the opaque host Baggage; nil opts uses defaults.
Error semantics differ by phase. A create failure returns flowKey "" and a nil outcome - no flow exists. An await failure - most commonly the caller's ctx expiring before the flow stops - leaves the flow running (it is durable and not bound to this call) and returns its flowKey with a nil outcome and the error, so the caller keeps a handle to Await/Snapshot/Cancel it later. Run never cancels the flow on the caller's behalf; a caller that wants the flow torn down on timeout calls Cancel explicitly.
func (*Engine) RunInTest ¶
RunInTest initializes the engine for testing with per-test isolated databases (keyed by t.Name()) and registers cleanup via t.Cleanup.
Unless the caller wired its own logger, the engine logs to stderr at Info by default (rather than the production discard default) so a CI failure has engine-level clues - flow-status transitions show where a flow got stuck, and the Error logs surface wedge sweeps / poll / refill faults. stderr, not t.Log, because a `go test` timeout panic drops the failing test's buffered t.Log output but not stderr. Override the level with DWARF_TEST_LOG_LEVEL (e.g. "error" to quiet local runs, "debug" for the full play-by-play), or call SetLogger/SetDebugLogger before RunInTest to take over entirely.
func (*Engine) SetDSN ¶ added in v0.4.0
SetDSN sets the SQL data source name. Use "%d" for sharded DSNs; an empty DSN in test mode uses SQLite in-memory. Construction-time only - the shards are opened against the DSN at Startup, so changing it on a running engine (which would require reopening live connections) is rejected.
func (*Engine) SetDebugLogger ¶ added in v0.4.0
SetDebugLogger is a convenience that wires a human-readable text logger to stderr at debug level - shorthand for SetLogger(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))). It is meant for development and test runs where you want to see the engine's (and its sequel DB layer's) internal logging without standing up an OTEL pipeline. Output goes to stderr, not stdout, so it never mixes with a program's data stream - the standard convention for diagnostic logs. Because it routes through SetLogger, it counts as an explicitly-set logger, so it also reaches sequel via the engine's existing SetLogger wiring (sequel's migration logs appear too). Construction-time only.
func (*Engine) SetDefaultPriority ¶ added in v0.4.0
SetDefaultPriority sets the default priority for new flows. Live: read fresh on each Create, so it takes effect on a running engine immediately.
func (*Engine) SetHost ¶ added in v0.4.0
SetHost registers the host the engine reaches the outside world through: it loads graphs, executes tasks, and (optionally) receives flow-stop notifications and carries cross-replica coordination signals. A host must implement LoadGraph and ExecuteTask; the remaining Host methods may be no-ops. Construction-time only.
func (*Engine) SetInTest ¶ added in v0.8.1
SetInTest puts the engine into test mode keyed by name, so a subsequent Startup opens per-name isolated, auto-dropped databases (via sequel.CreateTestingDatabase) instead of the configured ones. Construction- time only. It is the *testing.T-free counterpart to RunInTest, for a host running under an external test harness that has no *testing.T but a stable per-test isolation key: every engine sharing the same name resolves to the same isolated databases, so the replicas of a multi-replica test app converge on one set. RunInTest(t) is SetInTest(t.Name()) plus Startup and a t.Cleanup shutdown.
func (*Engine) SetLogger ¶ added in v0.4.0
SetLogger sets the structured logger. The engine logs through the *Context variants (DebugContext/InfoContext/WarnContext/ErrorContext) so a handler that reads the context - e.g. the otelslog bridge - can correlate each record with the active step span. A host routes logs to OTEL by passing a logger whose handler bridges there. Defaults to a discard logger: until a logger is injected the engine (and its sequel DB layer) stay silent rather than writing to the application-owned slog.Default(). A nil logger resets to that silent default. Construction-time only - the engine resolves the logger once at Startup.
func (*Engine) SetMaxOpenConns ¶ added in v0.4.0
SetMaxOpenConns sets the per-shard hard ceiling on open SQL connections.
func (*Engine) SetMeterProvider ¶ added in v0.4.0
func (e *Engine) SetMeterProvider(mp metric.MeterProvider) error
SetMeterProvider sets the OpenTelemetry MeterProvider the engine builds its dwarf_* instruments from. Defaults to the global otel.GetMeterProvider() (the no-op provider unless the host configures the OTEL SDK). The engine creates instruments under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host service's identity. Construction-time only - the engine resolves the meter once at Startup.
func (*Engine) SetNumShards ¶ added in v0.4.0
SetNumShards sets the number of database shards (must be at least 1). Construction-time only: shards are opened and migrated at Startup at this count, and the count is immutable for the engine's life, so a call on a running engine is rejected. Changing the shard count requires a coordinated restart (a maintenance window): each flow key encodes its shard, so a live/piecemeal change would leave flows on a newly-added shard unroutable (404) on any replica still at the old count.
func (*Engine) SetTimeBudget ¶ added in v0.4.0
SetTimeBudget sets the default duration for a single task execution, used by any flow that does not override it via FlowOptions.TimeBudget. Live: read fresh on each Create (existing flows keep the budget frozen at their own Create).
func (*Engine) SetTracerProvider ¶ added in v0.4.0
func (e *Engine) SetTracerProvider(tp trace.TracerProvider) error
SetTracerProvider sets the OpenTelemetry TracerProvider the engine builds its spans from. Defaults to the global otel.GetTracerProvider() (the no-op provider unless the host configures the OTEL SDK). The engine emits one span per flow and one span per task, nested to mirror the call structure, under the "github.com/microbus-io/dwarf" scope; the provider's Resource carries the host's identity. The host injects only the provider - it writes no span or context code. Construction-time only - the engine resolves the tracer once at Startup.
func (*Engine) SetWorkers ¶ added in v0.4.0
SetWorkers sets the number of worker goroutines. Construction-time only: the pool size is fixed at Startup, so a call on a running engine is rejected.
func (*Engine) SetWorkersPerConn ¶ added in v0.8.0
SetWorkersPerConn sets the assumed number of worker goroutines that share one database connection. The pool size is derived FROM the number of workers, not the other way around.
func (*Engine) ShardInfo ¶
func (e *Engine) ShardInfo(ctx context.Context) ([]ShardSummary, error)
ShardInfo returns health and size summaries for all shards.
func (*Engine) Shutdown ¶
Shutdown stops all worker goroutines and closes database connections. Idempotent: a call on an engine that is not running (never started, or already shut down) is a no-op, so it is safe to defer and to call more than once.
type Host ¶ added in v0.4.0
type Host interface {
// LoadGraph fetches a workflow graph definition by its URL (the addressable resolve key passed to
// Create). The flow's opaque baggage rides on ctx; read it with workflow.BaggageFrom(ctx) if loading
// is identity-dependent (authz, per-actor graphs). The engine validates the returned graph
// (graph.Validate) at Create and at subgraph spawn, so the host need not: returning (nil, nil) yields a
// 404 and a structurally invalid graph a 400, rather than a later dispatch-time failure.
LoadGraph(ctx context.Context, workflowURL string) (*workflow.Graph, error)
// ExecuteTask executes a single task within a workflow. taskURL is the task's dispatch URL (the real
// downstream address), not the graph node name. The flow carrier has its state pre-populated; the
// executor should call the task and let it write changes to the flow. The flow's opaque baggage rides
// on ctx - read it with workflow.BaggageFrom(ctx) (e.g. to mint a token).
//
// Execution is at-least-once and may be concurrent: a task can run more than once, and if a worker's
// lease is lost while it is still running (a task that overruns its ctx deadline, or a forward DB-clock
// step past the lease) a second worker re-runs it in parallel. The engine guarantees the flow's
// persisted state reflects exactly one execution, but exactly-once side effects are the task's
// responsibility - tasks must be idempotent. Honor the ctx deadline (it bounds the step's time budget);
// a task that ignores it can only be recovered by lease expiry, not cancelled.
ExecuteTask(ctx context.Context, taskURL string, flow *workflow.Flow) error
// SignalPeers delivers a cross-replica coordination signal to the other replicas. op is an opaque
// routing key (usable as a topic); payload is opaque bytes the engine already serialized. The host
// ships (op, payload) to peers and on the receiving side calls Engine.DeliverSignal(ctx, op,
// payload). See the cross-replica signal contract above. A single-replica host does nothing here.
SignalPeers(ctx context.Context, op string, payload []byte)
}
Host is the contract between the dwarf engine and the surrounding host application. The engine owns no transport of its own; it reaches workflow graphs and tasks and carries cross-replica coordination signals exclusively through the host. Register it once via Engine.SetHost.
A host MUST implement LoadGraph and ExecuteTask. SignalPeers is optional: an implementation may do nothing in it when it runs single-replica with no cross-replica coordination.
Cross-replica signal contract (SignalPeers): the engine funnels all of its coordination signals (work doorbells, flow-stop wakes) through this one method. op is a routing key the host may use as a topic/subject; payload is opaque bytes the engine already serialized. The host delivers (op, payload) to OTHER replicas, EXCLUDING the calling replica, and on the receiving side hands them back via Engine.DeliverSignal(ctx, op, payload) - it is a pure pipe that never inspects either. The engine always applies a signal's effect locally before calling SignalPeers, so an implementation that echoes the signal back to the sender would cause it to be processed twice on the originating replica (a doubled enqueue or status-change wake); if the transport delivers published messages to the publisher, the implementation must filter out self-delivery. Because the host never branches on op or inspects payload, adding a new engine signal kind requires no host change.
type ShardSummary ¶
type ShardSummary struct {
Shard int `json:"shard,omitzero"`
Error string `json:"error,omitzero"`
LatencyMs int `json:"latencyMs,omitzero"`
Steps int `json:"steps,omitzero"`
Flows int `json:"flows,omitzero"`
}
ShardSummary is the health/size summary of a single database shard.
type TaskHandler ¶
TaskHandler is the signature for a test task handler. Read the flow's baggage, if any, with workflow.BaggageFrom(ctx).
type TestProxy ¶
type TestProxy struct {
// contains filtered or unexported fields
}
TestProxy routes graph fetches and task dispatches to registered handlers. It implements the Host interface for use with Engine.SetHost / Engine.RunInTest: LoadGraph and ExecuteTask dispatch to the registered handlers, and SignalPeers relays to the peer engines registered with AddPeer (none by default, i.e. single-replica). For a multi-replica test, give each replica its own proxy and AddPeer the other engines.
func NewTestProxy ¶
func NewTestProxy() *TestProxy
NewTestProxy creates a new test proxy with empty handler registries.
func (*TestProxy) AddPeer ¶ added in v0.4.0
AddPeer registers a peer engine that SignalPeers relays to, standing in for the bus in a single-process multi-replica test. Add every OTHER replica's engine (not this proxy's own), so the engine's self-exclusion contract holds. Call before Startup/RunInTest.
func (*TestProxy) ExecuteTask ¶
ExecuteTask implements Host.
func (*TestProxy) HandleGraph ¶
HandleGraph registers a workflow graph under the given name. The name should match the workflow URL passed to Engine.Create or Engine.Run.
func (*TestProxy) HandleTask ¶
func (p *TestProxy) HandleTask(name string, handler TaskHandler)
HandleTask registers a task handler under the given name. The name should match the task URL registered via graph.SetEndpoint.
func (*TestProxy) SignalPeers ¶ added in v0.4.0
SignalPeers implements Host; it relays the signal to every peer engine registered with AddPeer. With no peers (the default) it is a single-replica no-op. The relay is async to mirror bus semantics and avoid synchronous reentrancy into a peer mid-processStep.