Documentation
¶
Overview ¶
Package workflow holds the pure data types of the dwarf workflow engine: the building blocks a host uses to define workflows and the carriers it reads and writes when running tasks.
It has no heavy dependencies and no knowledge of the database or the engine's runtime, so code that defines tasks and graphs imports only this package, never the engine.
Defining a workflow ¶
A Graph is a directed graph of tasks and transitions. Build one with NewGraph and the Add* methods:
g := workflow.NewGraph("Checkout")
g.SetEndpoint("Reserve", "inventory.reserve")
g.SetEndpoint("Charge", "billing.charge")
g.AddTransition("Reserve", "Charge")
g.AddTransition("Charge", workflow.END)
Transitions can be unconditional, conditional (AddTransitionWhen / AddTransitionSwitch), static fan-out across named tasks (AddTransitionFanOut), dynamic fan-out over an array (AddTransitionForEach), an error handler (AddTransitionOnError), or an explicit jump target (AddTransitionGoto). The linear AddTransitionChain wires a run of tasks. When parallel branches converge at a fan-in, per-field reducers (SetReducer, see the Reducer constants) merge their changes.
Running a task ¶
A task receives a *Flow: the engine pre-populates it with the step's input state, the task reads inputs and writes outputs with the typed accessors (Get/Set and friends), and may emit control signals - Retry, Sleep, Goto, Interrupt (human-in-the-loop), or Subgraph (call another workflow). Writes to reducer-managed fields are deltas, not accumulated values.
func charge(ctx context.Context, f *workflow.Flow) error {
amount := f.GetFloat("amount")
if amount <= 0 {
return errors.New("nothing to charge")
}
f.SetString("receipt", chargeCard(amount))
return nil
}
The engine never inspects an error's status code or text: a task that wants to back off (rate limit, transient unavailability) reads its own signal and arms Retry. An error returned to the engine is terminal for that attempt - routed via the graph's onError transition if one exists, else it fails the step.
FlowOutcome, FlowStep, FlowSummary, and Query are the read-side result types returned by the engine's inspection operations.
Index ¶
- Constants
- func BaggageFrom(ctx context.Context) any
- func ContextWithBaggage(ctx context.Context, baggage any) context.Context
- func MergeState(state any, changes any, reducers map[string]Reducer) (map[string]any, error)
- type Flow
- func (f *Flow) Attempt() int
- func (f *Flow) Clear()
- func (f *Flow) CreatedAt() time.Time
- func (f *Flow) Delete(keys ...string)
- func (f *Flow) FlowKey() string
- func (f *Flow) Get(key string, target any) error
- func (f *Flow) GetBool(key string) bool
- func (f *Flow) GetDuration(key string) time.Duration
- func (f *Flow) GetFloat(key string) float64
- func (f *Flow) GetInt(key string) int
- func (f *Flow) GetString(key string) string
- func (f *Flow) GetStrings(key string) []string
- func (f *Flow) Goto(taskName string)
- func (f *Flow) GotoRequested() string
- func (f *Flow) Has(key string) bool
- func (f *Flow) Interrupt(payload any, out any) (yield bool, err error)
- func (f *Flow) InterruptRequested() (map[string]any, bool)
- func (f *Flow) MarshalJSON() ([]byte, error)
- func (f *Flow) ParseState(target any) error
- func (f *Flow) Retry(initialDelay time.Duration, delayMultiplier float64, ...) bool
- func (f *Flow) RetryRequested() (initialDelay time.Duration, multiplier float64, maxDelay time.Duration, ...)
- func (f *Flow) Set(key string, value any) error
- func (f *Flow) SetBool(key string, value bool)
- func (f *Flow) SetChanges(source any, snap map[string]any) error
- func (f *Flow) SetDuration(key string, value time.Duration)
- func (f *Flow) SetFloat(key string, value float64)
- func (f *Flow) SetInt(key string, value int)
- func (f *Flow) SetState(source any) error
- func (f *Flow) SetString(key string, value string)
- func (f *Flow) SetStrings(key string, value []string)
- func (f *Flow) Sleep(duration time.Duration)
- func (f *Flow) SleepRequested() time.Duration
- func (f *Flow) Snapshot() map[string]any
- func (f *Flow) StepCreatedAt() time.Time
- func (f *Flow) StepKey() string
- func (f *Flow) Subgraph(workflowURL string, in any, out any) (yield bool, err error)
- func (f *Flow) SubgraphRequested() (url string, input map[string]any, taskName string, ok bool)
- func (f *Flow) Subtask(name, taskURL string, in any, out any) (yield bool, err error)
- func (f *Flow) Transform(pairs ...string)
- func (f *Flow) UnmarshalJSON(data []byte) error
- func (f *Flow) UpdatedAt() time.Time
- type FlowOptions
- type FlowOutcome
- type FlowRenderer
- func (r *FlowRenderer) Render() (string, error)
- func (r *FlowRenderer) WithAttentionColors(fill, text string) *FlowRenderer
- func (r *FlowRenderer) WithErrorColors(fill, text string) *FlowRenderer
- func (r *FlowRenderer) WithLeftRight() *FlowRenderer
- func (r *FlowRenderer) WithLinks(paramName string) *FlowRenderer
- func (r *FlowRenderer) WithPrimaryColors(fill, text string) *FlowRenderer
- func (r *FlowRenderer) WithSecondaryColors(fill, text string) *FlowRenderer
- func (r *FlowRenderer) WithTitle(text string) *FlowRenderer
- func (r *FlowRenderer) WithTopDown() *FlowRenderer
- type FlowStep
- type FlowSummary
- type Graph
- func (g *Graph) AddTransition(from, to string)
- func (g *Graph) AddTransitionChain(names ...string)
- func (g *Graph) AddTransitionFanOut(from string, to ...string)
- func (g *Graph) AddTransitionForEach(from, to string, forEach string, as string)
- func (g *Graph) AddTransitionGoto(from, to string)
- func (g *Graph) AddTransitionOnError(from, to string)
- func (g *Graph) AddTransitionSwitch(from, to string, when string)
- func (g *Graph) AddTransitionWhen(from, to string, when string)
- func (g *Graph) Annotate(name string, note string)
- func (g *Graph) Annotation(name string) string
- func (g *Graph) EntryPoint() string
- func (g *Graph) ErrorTransition(name string) (Transition, bool)
- func (g *Graph) FanInFor(fanOutSource string) string
- func (g *Graph) HasFanIn() bool
- func (g *Graph) IsFanIn(name string) bool
- func (g *Graph) IsFanOutSource(name string) bool
- func (g *Graph) MarshalJSON() ([]byte, error)
- func (g *Graph) Name() string
- func (g *Graph) Nodes() []Node
- func (g *Graph) Reducers() map[string]Reducer
- func (g *Graph) SetEndpoint(name, url string)
- func (g *Graph) SetEntryPoint(name string)
- func (g *Graph) SetFanIn(name string)
- func (g *Graph) SetReducer(field string, reducer Reducer)
- func (g *Graph) Transitions() []Transition
- func (g *Graph) URLOf(name string) string
- func (g *Graph) UnmarshalJSON(data []byte) error
- func (g *Graph) Validate() error
- type GraphRenderer
- func (r *GraphRenderer) Render() (string, error)
- func (r *GraphRenderer) WithAnnotationColor(color string) *GraphRenderer
- func (r *GraphRenderer) WithLeftRight() *GraphRenderer
- func (r *GraphRenderer) WithLinks(paramName string) *GraphRenderer
- func (r *GraphRenderer) WithPrimaryColors(fill, text string) *GraphRenderer
- func (r *GraphRenderer) WithSecondaryColors(fill, text string) *GraphRenderer
- func (r *GraphRenderer) WithTitleLabel(show bool) *GraphRenderer
- func (r *GraphRenderer) WithTopDown() *GraphRenderer
- type Node
- type Query
- type RawFlow
- func (f *RawFlow) ClearChanges()
- func (f *RawFlow) ClearControl()
- func (f *RawFlow) RawChanges() map[string]any
- func (f *RawFlow) RawState() map[string]any
- func (f *RawFlow) SetAttempt(attempt int)
- func (f *RawFlow) SetCreatedAt(createdAt time.Time)
- func (f *RawFlow) SetFlowKey(flowKey string)
- func (f *RawFlow) SetInterruptResolution(resumeData map[string]any)
- func (f *RawFlow) SetRawChanges(changes map[string]any)
- func (f *RawFlow) SetRawState(state map[string]any)
- func (f *RawFlow) SetStepCreatedAt(stepCreatedAt time.Time)
- func (f *RawFlow) SetStepKey(stepKey string)
- func (f *RawFlow) SetSubgraphResolution(result map[string]any, errStr string)
- func (f *RawFlow) SetUpdatedAt(updatedAt time.Time)
- type Reducer
- type Transition
Examples ¶
Constants ¶
const ( StatusCreated = "created" // Flow/step exists but has not been started StatusPending = "pending" // Step is awaiting execution StatusRunning = "running" // Flow is actively executing a task StatusInterrupted = "interrupted" // Flow is paused, waiting for external input StatusCompleted = "completed" // Flow has finished successfully StatusFailed = "failed" // Flow has failed with an error StatusCancelled = "cancelled" // Flow was cancelled by the user )
const END = "END"
END is a pseudo-node indicating that the workflow should terminate. Use it as the target of a transition to mark a terminal path.
Variables ¶
This section is empty.
Functions ¶
func BaggageFrom ¶
BaggageFrom returns the flow's opaque baggage carried on ctx, or nil if none. The value is the JSON-decoded form the host set in FlowOptions.Baggage at Create (typically map[string]any). It lives in the workflow package so task code can read it without importing the engine.
func ContextWithBaggage ¶
ContextWithBaggage returns a copy of ctx carrying the flow's opaque baggage. The engine calls this when dispatching to the host's LoadGraph/ExecuteTask (and at the create-time LoadGraph call); hosts read the value back with BaggageFrom. Set the baggage itself via FlowOptions.Baggage at Create, not here.
func MergeState ¶
MergeState applies changes on top of state, using the provided reducers for fields that have one. Fields without a registered reducer use replace semantics (last write wins).
Example ¶
Reducers merge the changes of parallel branches at a fan-in. MergeState applies them to fold a delta onto a base state.
package main
import (
"encoding/json"
"fmt"
"github.com/microbus-io/dwarf/workflow"
)
func main() {
reducers := map[string]workflow.Reducer{
"items": workflow.ReducerAppend, // concatenate arrays
"total": workflow.ReducerAdd, // sum numbers
}
base := map[string]any{"items": []any{"a"}, "total": 1}
delta := map[string]any{"items": []any{"b"}, "total": 2}
// Reducer outputs are raw JSON, ready to be re-serialized as the next step's state.
merged, _ := workflow.MergeState(base, delta, reducers)
out, _ := json.Marshal(merged)
fmt.Println(string(out))
}
Output: {"items":["a","b"],"total":3}
Types ¶
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
Flow is the carrier object passed to tasks. It holds the state and control signals for a single step in a workflow execution.
Example ¶
A Flow carries state to and from a task. Tasks read inputs and write outputs with typed accessors.
package main
import (
"fmt"
"github.com/microbus-io/dwarf/workflow"
)
func main() {
f := workflow.NewFlow()
f.SetString("name", "ada")
f.SetInt("count", 3)
fmt.Println(f.GetString("name"), f.GetInt("count"))
}
Output: ada 3
func (*Flow) Attempt ¶ added in v0.6.0
Attempt returns the zero-based retry attempt counter for the current step: 0 on the first execution, incremented by the orchestrator on each Retry. A task can gate on it to bound retries by count (e.g. "if flow.Attempt() < 3") instead of, or alongside, Retry's time-based horizon.
func (*Flow) Clear ¶
func (f *Flow) Clear()
Clear removes every state field. Equivalent to Delete on every current key. Useful at workflow boundaries (e.g. a task that builds a fresh subgraph input from a curated subset of parent state) or anywhere a task wants a blank slate before populating it.
func (*Flow) CreatedAt ¶
CreatedAt returns the wall-clock time at which the flow was created. Useful for tasks that want to implement their own elapsed-time guard (e.g. "if time.Since(flow.CreatedAt()) > 24h then return an error to fail the workflow"). Zero when called outside a dispatched task or when the orchestrator has not populated it.
func (*Flow) Delete ¶
Delete removes the listed state fields. Each becomes JSON null in changes (so the next merge drops the field for Replace, contributes the reducer's identity for sum*/list*/set*) and is removed from the local state map so subsequent reads in this task see it as absent.
func (*Flow) FlowKey ¶ added in v0.4.0
FlowKey returns the external key of the flow this task is executing in, in the form "{shard}-{flowID}-{token}". Useful for correlating logs/traces or calling back into the engine (e.g. History, Snapshot) for the task's own flow. Empty when called outside a dispatched task.
func (*Flow) Get ¶
Get unmarshals a state field into the target. Use this for complex types (structs, maps, etc.).
func (*Flow) GetDuration ¶
GetDuration returns a state field as a time.Duration.
func (*Flow) GetStrings ¶
GetStrings returns a state field as a string slice.
func (*Flow) Goto ¶
Goto overrides transition routing. The orchestrator skips condition evaluation and follows the specified task instead.
func (*Flow) GotoRequested ¶
GotoRequested returns the task URL set by Goto, or empty if not set.
func (*Flow) Has ¶
Has reports whether a state field exists. A cleared slot (JSON null) reads as absent.
func (*Flow) Interrupt ¶
Interrupt parks the flow to await external input, or returns the resume data once it has arrived.
On the first call (not yet resumed) it records the interrupt request with the given payload - propagated up the surgraph chain and surfaced to the awaiting caller so it can see what data the task needs - and returns yield=true. The task must return immediately.
On re-entry after Resume it unmarshals the resume data into out with yield=false and does not re-arm; the task proceeds. The payload is any JSON-marshalable value (a struct or a map[string]any); out is a pointer (a *struct or *map[string]any) the resume data is unmarshaled into by JSON tag, or nil to ignore it. The returned err is non-nil only if the payload fails to marshal (or out fails to unmarshal); interrupt itself has no failure mode, so err is otherwise always nil. Symmetric with Subgraph: any in, pointer out.
var resume ResumeData
yield, err := flow.Interrupt(map[string]any{"request": "userInput"}, &resume)
if yield {
return nil // parked, awaiting Resume
}
// proceed with resume
func (*Flow) InterruptRequested ¶
InterruptRequested returns the interrupt payload and true if Interrupt was called.
func (*Flow) MarshalJSON ¶
MarshalJSON serializes the Flow including private fields.
func (*Flow) ParseState ¶
ParseState unmarshals state fields into the target struct. Fields are matched by their JSON tag names. Fields in state that are not in the struct are ignored.
func (*Flow) Retry ¶
func (f *Flow) Retry(initialDelay time.Duration, delayMultiplier float64, maxIntervalDelay time.Duration, giveUpAfter time.Duration) bool
Retry requests the orchestrator to re-execute this task with exponential backoff. The bound is wall-clock, not a count: Retry returns true (the caller should return nil) while the next attempt would still land within giveUpAfter of the step's first creation, and false (the caller should return its error) once the horizon is reached - including when the next backoff delay alone would overshoot it, so a wait we already know is doomed is not parked before failing. Pass giveUpAfter <= 0 for unlimited retry.
The delay before attempt N is min(initialDelay * delayMultiplier^N, maxIntervalDelay); pass a zero initialDelay for immediate retries, and a zero maxIntervalDelay for no per-interval cap. To hold the delay constant (e.g. honoring a provider's Retry-After carried in initialDelay), pass delayMultiplier 1.0. Sleep, if also set, is added on top as a floor.
Retry carries no condition of its own - it is the single retry primitive, called inside whatever error branch the task decides is retryable. Keeping the condition explicit at the call site avoids the "retry on every error" trap (most errors - validation, bad input, business rejections - should not be retried). Gate it on whatever your task considers transient:
result, err := callExternalAPI(ctx)
if err != nil {
if isTransient(err) && flow.Retry(1*time.Second, 2.0, 30*time.Second, 1*time.Hour) {
return result, nil // transient failure: retry scheduled, don't report error
}
return result, err // non-retryable, or horizon exceeded
}
To bound by count instead of (or in addition to) time, gate on Attempt: pass giveUpAfter 0 and check flow.Attempt() at the call site.
func (*Flow) RetryRequested ¶
func (f *Flow) RetryRequested() (initialDelay time.Duration, multiplier float64, maxDelay time.Duration, ok bool)
RetryRequested returns the backoff parameters and true if Retry was called. The orchestrator uses these to compute the re-dispatch delay; the give-up decision is made client-side in Retry, so only the backoff shape crosses this boundary.
func (*Flow) Set ¶
Set sets a state field and tracks the change. Use this for complex types (structs, maps, etc.).
func (*Flow) SetChanges ¶
SetChanges marshals the source struct back to state, comparing against the provided snapshot. Only fields whose JSON value differs from the snapshot are recorded as changes. Changed fields are written to both the state and changes maps, so that subsequent reads (including transition condition evaluation) see the updated values.
func (*Flow) SetDuration ¶
SetDuration sets a state time.Duration field and tracks the change.
func (*Flow) SetState ¶
SetState marshals the source struct fields into state without tracking changes. Fields are matched by their JSON tag names.
func (*Flow) SetStrings ¶
SetStrings sets a state string slice field and tracks the change.
func (*Flow) Sleep ¶
Sleep tells the orchestrator to wait for the given duration before the next execution.
func (*Flow) SleepRequested ¶
SleepRequested returns the duration set by Sleep, or zero if not set.
func (*Flow) Snapshot ¶
Snapshot captures a read-only copy of the flow's current state (including any changes applied so far). Pass the returned snapshot to SetChanges to record only the fields that differ.
func (*Flow) StepCreatedAt ¶ added in v0.6.0
StepCreatedAt returns the wall-clock time at which this step was first created, preserved across retries of the step. It anchors Retry's giveUpAfter horizon, and a task can read it directly to implement a custom elapsed-time guard. Zero when called outside a dispatched task.
func (*Flow) StepKey ¶ added in v0.4.0
StepKey returns the external key of the step this task is executing, in the form "{shard}-{stepID}-{token}". Useful for correlating logs/traces or calling back into the engine (e.g. Step) for the task's own step. Empty when called outside a dispatched task.
func (*Flow) Subgraph ¶
Subgraph runs a child workflow and unmarshals its result once it completes, parking the step in between.
Semantically a function call: only the explicit in argument crosses the boundary into the child, and only the explicit out crosses back. The parent's state does NOT auto-cross either direction. in is any JSON-marshalable value (a struct or a map[string]any) and becomes the child's initial state field-by-field; a nil in means "no arguments" (the child starts with empty state). A caller that wants the parent's full state to cross can pass flow.Snapshot() as in. The out argument is a pointer (a *struct or *map[string]any) into which the child's final_state is unmarshaled by JSON tag; pass nil to ignore the result. Using a typed struct reads only the fields you declare, with type safety.
On the first call (child not yet run) it arms the subgraph park with the child workflow URL and in and returns yield=true; the task must return immediately.
On re-entry after the child terminates it unmarshals the child's final_state into out, returns yield=false, and sets err if the child failed. Does not re-arm on re-entry.
var out ChildOut
yield, err := flow.Subgraph(childURL, ChildIn{Value: value}, &out)
if yield {
return nil // parked, child running
}
if err != nil {
if flow.Retry(time.Second, 2.0, 30*time.Second, time.Hour) {
return nil
}
return errors.Trace(err)
}
// read fields from out
func (*Flow) SubgraphRequested ¶
SubgraphRequested returns the request URL, input state, the subtask name, and true if Subgraph or Subtask was called. taskName discriminates: non-empty means Subtask (the engine synthesizes a single-task graph named taskName), empty means a regular Subgraph (the engine loads the graph by URL).
func (*Flow) Subtask ¶ added in v0.5.0
Subtask launches a single task as an isolated child flow, the task-level sibling of Subgraph. The engine synthesizes a trivial one-node graph (named name) around taskURL instead of loading a graph, so any task endpoint can be invoked without a graph definition; everything after launch - parking, re-entry, the out-pointer result, cancel/interrupt propagation - is identical to Subgraph. name is required (it is the node's display name in diagrams/history) and must be non-empty. It is a thin wrapper over Subgraph: the request, single-park guard, input normalization, and re-entry all flow through Subgraph; the non-empty name is what marks the pending request as a subtask.
Pass a task URL, not a graph URL: a graph URL would be wrapped as a one-node graph and dispatched as a task, failing at dispatch. (Symmetrically, Subgraph with a task URL fails in LoadGraph.) The typed per-service client makes this mistake impossible; the raw primitives are the dynamic-only escape hatch.
func (*Flow) Transform ¶
Transform clears all state, then re-introduces the listed fields under new names. Arguments are (newKey, oldKey) pairs; the value previously stored under oldKey is captured before the clear and re-set under newKey. Old keys that were absent or already null are skipped (the new key is not introduced as null). Panics on an odd number of arguments.
Typical use: a small task immediately upstream of a subgraph node that reshapes parent state into the subgraph's expected input.
flow.Transform("subInput1", "parentVarA", "subInput2", "parentVarB")
func (*Flow) UnmarshalJSON ¶
UnmarshalJSON deserializes the Flow including private fields.
type FlowOptions ¶
type FlowOptions struct {
// Priority orders flows competing for workers; an explicit priority is >= 1,
// lower runs first. Zero means "unset" and uses the engine's
// DefaultPriority config.
Priority int `json:"priority,omitzero"`
// FairnessKey groups flows for fair scheduling, typically a tenant.
// Empty derives it from baggage, else the "" bucket.
FairnessKey string `json:"fairnessKey,omitzero"`
// FairnessWeight is the relative dispatch share of the fairness key.
// Zero uses a weight of 1.
FairnessWeight float64 `json:"fairnessWeight,omitzero"`
// StartAt delays execution of the flow's entry step until the given UTC time.
// Zero or a past time means run as soon as the flow is started. Sets the
// entry step's not_before column; the flow can still be created and started
// immediately, but no worker will pick the step up before StartAt.
StartAt time.Time `json:"startAt,omitzero"`
// NotifyOnStop requests that the host's FlowStopped callback fire when this flow stops
// (completed/failed/cancelled/interrupted). The engine persists the intent and, at stop time,
// invokes FlowStopped with the flow's Baggage on the context - the host decides where/how to deliver
// the notification from that baggage. When false (the default) FlowStopped is never called for the
// flow. Set at Create; plain Start then runs the flow.
NotifyOnStop bool `json:"notifyOnStop,omitzero"`
// Baggage is opaque, host-defined context (identity/claims, tenant, locale, ...) carried with the
// flow. The engine never interprets it: it is set once here, stored on the flow, inherited by
// subgraphs and Continue, and delivered to every Host LoadGraph/ExecuteTask call via the dispatch
// context - read it with BaggageFrom(ctx). Any JSON-marshalable value; the host receives the
// JSON-decoded form (typically map[string]any), exactly like flow state.
Baggage any `json:"baggage,omitzero"`
}
FlowOptions sets flow-level properties at Create or Run: scheduling (priority, fairness, start time) plus the opaque host Baggage. A nil *FlowOptions, or any zero field, uses the engine's defaults.
type FlowOutcome ¶
type FlowOutcome struct {
// Status is the flow's current lifecycle status: created, running, interrupted, completed, failed, or cancelled.
Status string `json:"status,omitzero"`
// State is the flow's accumulated state. For terminal statuses this is the final_state;
// for running and interrupted flows it is the merged snapshot of the current step.
State map[string]any `json:"state,omitzero"`
// Error is the task error string. Populated when Status is "failed".
Error string `json:"error,omitzero"`
// InterruptPayload is the raw payload from flow.Interrupt(payload). Populated when Status is "interrupted".
InterruptPayload map[string]any `json:"interruptPayload,omitzero"`
// CancelReason is the reason string passed to Cancel(flowKey, reason). Populated when Status is "cancelled".
CancelReason string `json:"cancelReason,omitzero"`
}
FlowOutcome carries the status and side-channel signals of a flow at a moment in time. Returned by Snapshot, Await, and Run, and fired as the payload of the OnFlowStopped event. Side-channel fields are populated only for the matching Status; for example InterruptPayload is populated only when Status is "interrupted".
type FlowRenderer ¶
type FlowRenderer struct {
// contains filtered or unexported fields
}
FlowRenderer renders the execution history of a flow as a Mermaid flowchart.
func NewFlowRenderer ¶
func NewFlowRenderer(steps []FlowStep) *FlowRenderer
NewFlowRenderer creates a renderer for a flow's execution history.
func (*FlowRenderer) Render ¶
func (r *FlowRenderer) Render() (string, error)
Render returns the Mermaid flowchart representation.
func (*FlowRenderer) WithAttentionColors ¶
func (r *FlowRenderer) WithAttentionColors(fill, text string) *FlowRenderer
func (*FlowRenderer) WithErrorColors ¶
func (r *FlowRenderer) WithErrorColors(fill, text string) *FlowRenderer
func (*FlowRenderer) WithLeftRight ¶
func (r *FlowRenderer) WithLeftRight() *FlowRenderer
func (*FlowRenderer) WithLinks ¶
func (r *FlowRenderer) WithLinks(paramName string) *FlowRenderer
func (*FlowRenderer) WithPrimaryColors ¶
func (r *FlowRenderer) WithPrimaryColors(fill, text string) *FlowRenderer
func (*FlowRenderer) WithSecondaryColors ¶
func (r *FlowRenderer) WithSecondaryColors(fill, text string) *FlowRenderer
func (*FlowRenderer) WithTitle ¶
func (r *FlowRenderer) WithTitle(text string) *FlowRenderer
func (*FlowRenderer) WithTopDown ¶
func (r *FlowRenderer) WithTopDown() *FlowRenderer
type FlowStep ¶
type FlowStep struct {
StepKey string `json:"stepKey,omitzero"`
StepID int `json:"stepID,omitzero"`
StepDepth int `json:"stepDepth,omitzero"`
TaskName string `json:"taskName,omitzero"`
Attempt int `json:"attempt,omitzero"`
// PredecessorID and SuccessorID are the shard-local step ids of this step's neighbors in
// the execution DAG. 0 means no such edge (entry / exit step).
PredecessorID int `json:"predecessorID,omitzero"`
SuccessorID int `json:"successorID,omitzero"`
// PrevKey and NextKey are the external step keys of the resolved navigation neighbors,
// ready for use as ?step= links. Populated only by the Step endpoint.
PrevKey string `json:"prevKey,omitzero"`
NextKey string `json:"nextKey,omitzero"`
Subgraph bool `json:"subgraph,omitzero"`
SubWorkflowURL string `json:"subWorkflowURL,omitzero"`
SubWorkflowName string `json:"subWorkflowName,omitzero"`
SubHistory []FlowStep `json:"subHistory,omitzero"`
State map[string]any `json:"state,omitzero"`
Changes map[string]any `json:"changes,omitzero"`
InterruptPayload map[string]any `json:"interruptPayload,omitzero"`
Status string `json:"status,omitzero"`
// Parked reports whether the step is currently held out of the selection band (a subgraph caller
// waiting on its child, or a breaker-parked backlog step). A terminal step is never parked.
Parked bool `json:"parked,omitzero"`
Error string `json:"error,omitzero"`
CreatedAt time.Time `json:"createdAt,omitzero"`
// StartedAt is when the worker first dispatched the current attempt of this step.
// Use HasStarted to gate reads; on a not-yet-leased row it carries the INSERT-time default.
StartedAt time.Time `json:"startedAt,omitzero"`
UpdatedAt time.Time `json:"updatedAt,omitzero"`
}
FlowStep is a single step in a flow's execution history.
func (FlowStep) Duration ¶
Duration is the wall-clock time from CreatedAt to UpdatedAt. Returns zero when either timestamp is missing or the delta is negative.
func (FlowStep) HasStarted ¶
HasStarted reports whether StartedAt is a real dispatch timestamp rather than the INSERT-time default. True for running and any terminal status; false for created/pending.
type FlowSummary ¶
type FlowSummary struct {
FlowKey string `json:"flowKey,omitzero"`
ThreadKey string `json:"threadKey,omitzero"`
WorkflowURL string `json:"workflowURL,omitzero"`
WorkflowName string `json:"workflowName,omitzero"`
Status string `json:"status,omitzero"`
TaskName string `json:"taskName,omitzero"`
Error string `json:"error,omitzero"`
CancelReason string `json:"cancelReason,omitzero"`
CreatedAt time.Time `json:"createdAt,omitzero"`
// StartedAt is when this attempt began dispatching (Start, or a Restart/RestartFrom
// rewind). Distinct from CreatedAt, which is the row's INSERT moment and is only
// reset on full Restart. Use StartedAt for duration metrics; CreatedAt for "when did
// this flow first appear."
StartedAt time.Time `json:"startedAt,omitzero"`
UpdatedAt time.Time `json:"updatedAt,omitzero"`
// Priority is the flow's scheduling priority (>= 1, lower runs first), resolved at Create.
Priority int `json:"priority,omitzero"`
// FairnessKey is the flow's scheduling fairness bucket, resolved at Create.
FairnessKey string `json:"fairnessKey,omitzero"`
}
FlowSummary is a summary of a flow for listing purposes.
func (FlowSummary) Duration ¶
func (f FlowSummary) Duration() time.Duration
Duration is the wall-clock time from StartedAt to UpdatedAt.
type Graph ¶
type Graph struct {
// contains filtered or unexported fields
}
Graph is the definition of a workflow. It describes the tasks, transitions between them, and reducers for merging state during fan-in.
Example ¶
Build a linear workflow graph and validate it.
package main
import (
"fmt"
"github.com/microbus-io/dwarf/workflow"
)
func main() {
g := workflow.NewGraph("Checkout")
g.SetEndpoint("Reserve", "inventory.reserve")
g.SetEndpoint("Charge", "billing.charge")
g.AddTransitionChain("Reserve", "Charge", workflow.END)
fmt.Println("name:", g.Name())
fmt.Println("entry:", g.EntryPoint())
fmt.Println("valid:", g.Validate() == nil)
}
Output: name: Checkout entry: Reserve valid: true
func NewGraph ¶
NewGraph creates a new workflow graph with the given display name. The name is a human-friendly label (surfaced in rendering and Validate error messages); it is NOT the resolve key. The value passed to Create/Run and to the host's LoadGraph is a separate opaque URL that the engine stores on the flow (workflow_url) - it is never kept on the graph itself.
func (*Graph) AddTransition ¶
AddTransition adds an unconditional transition between two nodes. Both endpoints are auto-registered as tasks if not already present (see autoRegister).
func (*Graph) AddTransitionChain ¶ added in v0.5.0
AddTransitionChain wires an unconditional transition between each consecutive pair of names: AddTransitionChain("A", "B", "C") is AddTransition("A", "B") followed by AddTransition("B", "C"). It is a convenience for linear segments; fewer than two names is a no-op. END belongs last (a node after END would produce an invalid transition out of END). Mix with the other AddTransition* methods for branching, conditions, and loops.
func (*Graph) AddTransitionFanOut ¶ added in v0.6.2
AddTransitionFanOut wires an unconditional transition from one source to each of several destinations: AddTransitionFanOut("A", "B", "C") is AddTransition("A", "B") followed by AddTransition("A", "C"), so B and C both fire and run in parallel. It is a convenience for static fan-out; no destinations is a no-op. It creates only the outgoing edges - if the branches later rejoin at a node, that node still needs SetFanIn (and usually a reducer) wired separately. Distinct from AddTransitionForEach, which fans out dynamically over a runtime collection rather than across statically-named nodes.
func (*Graph) AddTransitionForEach ¶
AddTransitionForEach adds a dynamic fan-out transition.
func (*Graph) AddTransitionGoto ¶
AddTransitionGoto adds a transition that is only taken when the source task calls flow.Goto with a target that resolves to this transition's destination.
func (*Graph) AddTransitionOnError ¶
AddTransitionOnError adds a transition that is taken when the source task returns an error.
func (*Graph) AddTransitionSwitch ¶
AddTransitionSwitch adds a first-match-wins transition between two nodes. Multiple Switch transitions from the same source are evaluated in registration order and only the first whose 'when' expression evaluates true fires; the rest are skipped. If no Switch matches the flow ends at the source node, so the last Switch from a node is typically a catch-all with when="true". Only one branch ever runs, so a downstream SetFanIn is not required.
A node that uses Switch transitions must declare every successful-path outgoing transition as Switch (the validator rejects mixing Switch with When/plain/ForEach/Goto from the same source). OnError transitions are orthogonal and remain allowed.
func (*Graph) AddTransitionWhen ¶
AddTransitionWhen adds a conditional transition between two nodes.
func (*Graph) Annotate ¶
Annotate attaches a short note to a node. The note renders as a teal, borderless text label directly beneath the node in the Mermaid diagram.
func (*Graph) Annotation ¶
Annotation returns the note attached to a node via Annotate, or "" if none.
func (*Graph) EntryPoint ¶
EntryPoint returns the node name of the entry point of the graph.
func (*Graph) ErrorTransition ¶
func (g *Graph) ErrorTransition(name string) (Transition, bool)
ErrorTransition returns the error transition from the given node name, if one exists.
func (*Graph) FanInFor ¶
FanInFor returns the fan-in node that pops the frame pushed by a fan-out at the named source, or "" if the source is not a fan-out. Populated by Validate.
func (*Graph) IsFanOutSource ¶
IsFanOutSource reports whether the named node has 2+ non-goto/non-error outgoing transitions, or any forEach outgoing transition. Switch transitions are exclusive (only one branch ever fires) and therefore do not count toward fan-out.
func (*Graph) MarshalJSON ¶
MarshalJSON serializes the graph to JSON.
func (*Graph) SetEndpoint ¶ added in v0.5.0
SetEndpoint binds a node (identified by its graph name) to the given dispatch URL, creating the node if it does not exist and updating its URL if it does. The name is the node's identity in the graph (used by transitions, fan-in, goto, breakpoints); the URL is the opaque downstream endpoint the engine hands to the host's ExecuteTask and groups the breaker/valve/saturation by. The first node bound becomes the default entry point unless SetEntryPoint is called explicitly. The pseudo-node END is not registered.
The same URL may be bound under multiple names. This is how a workflow author reuses the same task code at distinct positions in the graph with different downstream transitions per position.
func (*Graph) SetEntryPoint ¶
SetEntryPoint sets the entry point of the graph explicitly, overriding the default (first task added). The argument is a node name.
func (*Graph) SetFanIn ¶
SetFanIn marks a node as a fan-in nexus. Opts the graph into the lineage validator.
func (*Graph) SetReducer ¶
SetReducer sets the merge strategy for a state field during fan-in.
func (*Graph) Transitions ¶
func (g *Graph) Transitions() []Transition
Transitions returns the list of transitions in the graph. The returned slice shares the graph's underlying storage; callers must not mutate it. The graph is treated as immutable after Validate, so read-only iteration is safe.
func (*Graph) URLOf ¶
URLOf returns the dispatch URL for a node identified by name. Returns the empty string if the name is not registered. END maps to itself.
func (*Graph) UnmarshalJSON ¶
UnmarshalJSON deserializes the graph from JSON.
type GraphRenderer ¶
type GraphRenderer struct {
// contains filtered or unexported fields
}
GraphRenderer renders a workflow Graph to a Mermaid flowchart. Configure via the With* builder methods, then call Render.
func NewGraphRenderer ¶
func NewGraphRenderer(g *Graph) *GraphRenderer
NewGraphRenderer creates a renderer for the given graph with default styling.
func (*GraphRenderer) Render ¶
func (r *GraphRenderer) Render() (string, error)
Render returns a fully-styled Mermaid flowchart representation of the graph.
func (*GraphRenderer) WithAnnotationColor ¶
func (r *GraphRenderer) WithAnnotationColor(color string) *GraphRenderer
WithAnnotationColor overrides the color of annotation text.
func (*GraphRenderer) WithLeftRight ¶
func (r *GraphRenderer) WithLeftRight() *GraphRenderer
WithLeftRight renders the diagram left-to-right.
func (*GraphRenderer) WithLinks ¶
func (r *GraphRenderer) WithLinks(paramName string) *GraphRenderer
WithLinks enables click directives on every task node.
func (*GraphRenderer) WithPrimaryColors ¶
func (r *GraphRenderer) WithPrimaryColors(fill, text string) *GraphRenderer
WithPrimaryColors overrides the primary brand pair.
func (*GraphRenderer) WithSecondaryColors ¶
func (r *GraphRenderer) WithSecondaryColors(fill, text string) *GraphRenderer
WithSecondaryColors overrides the secondary surface pair.
func (*GraphRenderer) WithTitleLabel ¶
func (r *GraphRenderer) WithTitleLabel(show bool) *GraphRenderer
WithTitleLabel toggles the Mermaid frontmatter title (the graph's display name) rendered as a caption above the chart.
func (*GraphRenderer) WithTopDown ¶
func (r *GraphRenderer) WithTopDown() *GraphRenderer
WithTopDown renders the diagram top-to-bottom.
type Node ¶
Node describes a task or subgraph node registered in a workflow graph. Name is the node's identifier within the graph and the value stored on step rows (dwarf_steps.task_name). URL is the dispatch target the engine calls when the node is reached.
type Query ¶
type Query struct {
Status string `json:"status,omitzero"`
WorkflowURL string `json:"workflowURL,omitzero"`
// WorkflowName filters to flows whose graph display name (the human-friendly name set via
// NewGraph and denormalized onto the flow row) equals this value. Distinct from WorkflowURL,
// which matches the resolve key. Empty disables the filter; composes with WorkflowURL.
WorkflowName string `json:"workflowName,omitzero"`
ThreadKey string `json:"threadKey,omitzero"`
// TaskName filters to flows whose current step is on the named task.
TaskName string `json:"taskName,omitzero"`
// FairnessKey filters to flows with this scheduling fairness key. The host typically sets the
// fairness key to the tenant, so this is how "list flows for tenant X" is expressed. Empty
// disables the filter.
FairnessKey string `json:"fairnessKey,omitzero"`
// Priority filters to flows at this scheduling priority band. Zero disables the filter
// (valid priorities are >= 1).
Priority int `json:"priority,omitzero"`
// OlderThan filters to flows whose updated_at is older than this duration relative to now.
// Zero disables the filter.
OlderThan time.Duration `json:"olderThan,omitzero"`
// NewerThan filters to flows whose updated_at is within this duration of now.
// Zero disables the filter. Composes with OlderThan to express "between X and Y ago."
NewerThan time.Duration `json:"newerThan,omitzero"`
// Shard restricts the query to a single 1-based shard. Zero queries all shards.
Shard int `json:"shard,omitzero"`
// Cursor is the opaque pagination cursor returned as NextCursor by the previous List call.
Cursor string `json:"cursor,omitzero"`
// Search is a case-insensitive substring matched against workflow_url, workflow_name, current
// task_name, error, and cancel_reason. SQL LIKE wildcards (%, _) in the value are honored.
Search string `json:"search,omitzero"`
Limit int `json:"limit,omitzero"`
}
Query specifies filtering and pagination options for listing or purging flows.
type RawFlow ¶
type RawFlow struct {
Flow
}
RawFlow wraps Flow with additional methods used by the orchestrator. Task endpoints should use Flow directly; RawFlow is for internal orchestration use only.
func NewRawFlow ¶
func NewRawFlow() *RawFlow
NewRawFlow creates a new RawFlow with initialized maps.
func (*RawFlow) ClearChanges ¶
func (f *RawFlow) ClearChanges()
ClearChanges resets the changes map. Called by the orchestrator after persisting changes.
func (*RawFlow) ClearControl ¶
func (f *RawFlow) ClearControl()
ClearControl resets all control signals. Called by the orchestrator after processing them.
func (*RawFlow) RawChanges ¶
RawChanges returns a copy of the raw changes map.
func (*RawFlow) SetAttempt ¶
SetAttempt sets the attempt counter on the flow. Called by the orchestrator before dispatching a task so that Retry can check whether attempts are exhausted.
func (*RawFlow) SetCreatedAt ¶ added in v0.6.0
SetCreatedAt records the flow row's createdAt. Called by the orchestrator before dispatching a task so the task can read it via Flow.CreatedAt().
func (*RawFlow) SetFlowKey ¶ added in v0.4.0
SetFlowKey records the external key of the flow being dispatched, so the task can read it via Flow.FlowKey(). Called by the orchestrator before dispatching a task.
func (*RawFlow) SetInterruptResolution ¶
SetInterruptResolution records that an interrupt park has resolved, with the resume data materialized from the step row's resume_data column, so flow.Interrupt returns it (with yield=false) on re-entry instead of re-arming. The orchestrator calls this only when the step row's interrupt_done is set; an un-resumed step leaves the flow's default (not resolved).
func (*RawFlow) SetRawChanges ¶
SetRawChanges replaces the entire changes map with the given raw map.
func (*RawFlow) SetRawState ¶
SetRawState replaces the entire state with the given raw map, without tracking changes.
func (*RawFlow) SetStepCreatedAt ¶ added in v0.6.0
SetStepCreatedAt records the step row's createdAt, preserved across retries. Called by the orchestrator before dispatching a task so Retry can measure its giveUpAfter horizon and the task can read it via Flow.StepCreatedAt().
func (*RawFlow) SetStepKey ¶ added in v0.4.0
SetStepKey records the external key of the step being dispatched, so the task can read it via Flow.StepKey(). Called by the orchestrator before dispatching a task.
func (*RawFlow) SetSubgraphResolution ¶
SetSubgraphResolution records that a subgraph park has resolved, with the child's final_state (result) and error materialized from the step row's subgraph_result / subgraph_error columns, so flow.Subgraph returns them (with yield=false) on re-entry instead of re-arming. The orchestrator calls this only when the step row's subgraph_done is set.
func (*RawFlow) SetUpdatedAt ¶ added in v0.6.0
SetUpdatedAt records the flow row's updatedAt. Called by the orchestrator before dispatching a task so the task can read it via Flow.UpdatedAt().
type Reducer ¶
type Reducer string
Reducer defines how concurrent state modifications from parallel tasks are merged during fan-in.
const ( ReducerReplace Reducer = "replace" // Last write wins (default) ReducerAppend Reducer = "append" // Concatenate arrays ReducerAdd Reducer = "add" // Sum numeric values ReducerMin Reducer = "min" // Smaller of two numeric values ReducerMax Reducer = "max" // Larger of two numeric values ReducerUnion Reducer = "union" // Merge arrays, deduplicate ReducerMerge Reducer = "merge" // Merge objects, new key wins ReducerAnd Reducer = "and" // Logical AND of booleans ReducerOr Reducer = "or" // Logical OR of booleans ReducerConcat Reducer = "concat" // Concatenate strings )
type Transition ¶
type Transition struct {
From string `json:"from"`
To string `json:"to"`
When string `json:"when,omitzero"`
WithGoto bool `json:"withGoto,omitzero"`
ForEach string `json:"forEach,omitzero"` // dynamic fan-out over a state field
As string `json:"as,omitzero"` // alias for the current element during forEach fan-out
OnError bool `json:"onError,omitzero"` // taken when the source task returns an error
Switch bool `json:"switch,omitzero"` // first-match-wins among siblings; never fans out
}
Transition defines a possible transition between two nodes in a workflow graph. From and To are node names, not URLs.