workflow

package
v0.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package workflow holds the pure data types of the dwarf workflow engine: the building blocks a host uses to define workflows and the carriers it reads and writes when running tasks.

It has no heavy dependencies and no knowledge of the database or the engine's runtime, so code that defines tasks and graphs imports only this package, never the engine.

Defining a workflow

A Graph is a directed graph of tasks and transitions. Build one with NewGraph and the Add* methods:

g := workflow.NewGraph("Checkout")
g.SetEndpoint("Reserve", "inventory.reserve")
g.SetEndpoint("Charge", "billing.charge")
g.AddTransition("Reserve", "Charge")
g.AddTransition("Charge", workflow.END)

Transitions can be unconditional, conditional (AddTransitionWhen / AddTransitionSwitch), static fan-out across named tasks (AddTransitionFanOut), dynamic fan-out over an array (AddTransitionForEach), an error handler (AddTransitionOnError), or an explicit jump target (AddTransitionGoto). The linear AddTransitionChain wires a run of tasks. When parallel branches converge at a fan-in, per-field reducers (SetReducer, see the Reducer constants) merge their changes.

Running a task

A task receives a *Flow: the engine pre-populates it with the step's input state, the task reads inputs and writes outputs with the typed accessors (Get/Set and friends), and may emit control signals - Retry, Sleep, Goto, Interrupt (human-in-the-loop), or Subgraph (call another workflow). Writes to reducer-managed fields are deltas, not accumulated values.

func charge(ctx context.Context, f *workflow.Flow) error {
	amount := f.GetFloat("amount")
	if amount <= 0 {
		return errors.New("nothing to charge")
	}
	f.SetString("receipt", chargeCard(amount))
	return nil
}

The engine never inspects an error's status code or text: a task that wants to back off (rate limit, transient unavailability) reads its own signal and arms Retry. An error returned to the engine is terminal for that attempt - routed via the graph's onError transition if one exists, else it fails the step.

FlowOutcome, FlowStep, FlowSummary, and Query are the read-side result types returned by the engine's inspection operations.

Index

Examples

Constants

View Source
const (
	StatusCreated     = "created"     // Flow/step exists but has not been started
	StatusPending     = "pending"     // Step is awaiting execution
	StatusRunning     = "running"     // Flow is actively executing a task
	StatusInterrupted = "interrupted" // Flow is paused, waiting for external input
	StatusCompleted   = "completed"   // Flow has finished successfully
	StatusFailed      = "failed"      // Flow has failed with an error
	StatusCancelled   = "cancelled"   // Flow was cancelled by the user
)
View Source
const END = "END"

END is a pseudo-node indicating that the workflow should terminate. Use it as the target of a transition to mark a terminal path.

Variables

This section is empty.

Functions

func BaggageFrom

func BaggageFrom(ctx context.Context) any

BaggageFrom returns the flow's opaque baggage carried on ctx, or nil if none. The value is the JSON-decoded form the host set in FlowOptions.Baggage at Create (typically map[string]any). It lives in the workflow package so task code can read it without importing the engine.

func ContextWithBaggage

func ContextWithBaggage(ctx context.Context, baggage any) context.Context

ContextWithBaggage returns a copy of ctx carrying the flow's opaque baggage. The engine calls this when dispatching to the host's LoadGraph/ExecuteTask (and at the create-time LoadGraph call); hosts read the value back with BaggageFrom. Set the baggage itself via FlowOptions.Baggage at Create, not here.

func MergeState

func MergeState(state any, changes any, reducers map[string]Reducer) (map[string]any, error)

MergeState applies changes on top of state, using the provided reducers for fields that have one. Fields without a registered reducer use replace semantics (last write wins).

Example

Reducers merge the changes of parallel branches at a fan-in. MergeState applies them to fold a delta onto a base state.

package main

import (
	"encoding/json"
	"fmt"

	"github.com/microbus-io/dwarf/workflow"
)

func main() {
	reducers := map[string]workflow.Reducer{
		"items": workflow.ReducerAppend, // concatenate arrays
		"total": workflow.ReducerAdd,    // sum numbers
	}
	base := map[string]any{"items": []any{"a"}, "total": 1}
	delta := map[string]any{"items": []any{"b"}, "total": 2}

	// Reducer outputs are raw JSON, ready to be re-serialized as the next step's state.
	merged, _ := workflow.MergeState(base, delta, reducers)
	out, _ := json.Marshal(merged)
	fmt.Println(string(out))
}
Output:
{"items":["a","b"],"total":3}

Types

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

Flow is the carrier object passed to tasks. It holds the state and control signals for a single step in a workflow execution.

Example

A Flow carries state to and from a task. Tasks read inputs and write outputs with typed accessors.

package main

import (
	"fmt"

	"github.com/microbus-io/dwarf/workflow"
)

func main() {
	f := workflow.NewFlow()
	f.SetString("name", "ada")
	f.SetInt("count", 3)

	fmt.Println(f.GetString("name"), f.GetInt("count"))
}
Output:
ada 3

func NewFlow

func NewFlow() *Flow

NewFlow creates a new Flow with initialized maps.

func (*Flow) Attempt added in v0.6.0

func (f *Flow) Attempt() int

Attempt returns the zero-based retry attempt counter for the current step: 0 on the first execution, incremented by the orchestrator on each Retry. A task can gate on it to bound retries by count (e.g. "if flow.Attempt() < 3") instead of, or alongside, Retry's time-based horizon.

func (*Flow) Clear

func (f *Flow) Clear()

Clear removes every state field. Equivalent to Delete on every current key. Useful at workflow boundaries (e.g. a task that builds a fresh subgraph input from a curated subset of parent state) or anywhere a task wants a blank slate before populating it.

func (*Flow) CreatedAt

func (f *Flow) CreatedAt() time.Time

CreatedAt returns the wall-clock time at which the flow was created. Useful for tasks that want to implement their own elapsed-time guard (e.g. "if time.Since(flow.CreatedAt()) > 24h then return an error to fail the workflow"). Zero when called outside a dispatched task or when the orchestrator has not populated it.

func (*Flow) Delete

func (f *Flow) Delete(keys ...string)

Delete removes the listed state fields. Each becomes JSON null in changes (so the next merge drops the field for Replace, contributes the reducer's identity for sum*/list*/set*) and is removed from the local state map so subsequent reads in this task see it as absent.

func (*Flow) FlowKey added in v0.4.0

func (f *Flow) FlowKey() string

FlowKey returns the external key of the flow this task is executing in, in the form "{shard}-{flowID}-{token}". Useful for correlating logs/traces or calling back into the engine (e.g. History, Snapshot) for the task's own flow. Empty when called outside a dispatched task.

func (*Flow) Get

func (f *Flow) Get(key string, target any) error

Get unmarshals a state field into the target. Use this for complex types (structs, maps, etc.).

func (*Flow) GetBool

func (f *Flow) GetBool(key string) bool

GetBool returns a state field as a bool.

func (*Flow) GetDuration

func (f *Flow) GetDuration(key string) time.Duration

GetDuration returns a state field as a time.Duration.

func (*Flow) GetFloat

func (f *Flow) GetFloat(key string) float64

GetFloat returns a state field as a float64.

func (*Flow) GetInt

func (f *Flow) GetInt(key string) int

GetInt returns a state field as an int.

func (*Flow) GetString

func (f *Flow) GetString(key string) string

GetString returns a state field as a string.

func (*Flow) GetStrings

func (f *Flow) GetStrings(key string) []string

GetStrings returns a state field as a string slice.

func (*Flow) Goto

func (f *Flow) Goto(taskName string)

Goto overrides transition routing. The orchestrator skips condition evaluation and follows the specified task instead.

func (*Flow) GotoRequested

func (f *Flow) GotoRequested() string

GotoRequested returns the task URL set by Goto, or empty if not set.

func (*Flow) Has

func (f *Flow) Has(key string) bool

Has reports whether a state field exists. A cleared slot (JSON null) reads as absent.

func (*Flow) Interrupt

func (f *Flow) Interrupt(payload any, out any) (yield bool, err error)

Interrupt parks the flow to await external input, or returns the resume data once it has arrived.

On the first call (not yet resumed) it records the interrupt request with the given payload - propagated up the surgraph chain and surfaced to the awaiting caller so it can see what data the task needs - and returns yield=true. The task must return immediately.

On re-entry after Resume it unmarshals the resume data into out with yield=false and does not re-arm; the task proceeds. The payload is any JSON-marshalable value (a struct or a map[string]any); out is a pointer (a *struct or *map[string]any) the resume data is unmarshaled into by JSON tag, or nil to ignore it. The returned err is non-nil only if the payload fails to marshal (or out fails to unmarshal); interrupt itself has no failure mode, so err is otherwise always nil. Symmetric with Subgraph: any in, pointer out.

var resume ResumeData
yield, err := flow.Interrupt(map[string]any{"request": "userInput"}, &resume)
if yield {
    return nil // parked, awaiting Resume
}
// proceed with resume

func (*Flow) InterruptRequested

func (f *Flow) InterruptRequested() (map[string]any, bool)

InterruptRequested returns the interrupt payload and true if Interrupt was called.

func (*Flow) MarshalJSON

func (f *Flow) MarshalJSON() ([]byte, error)

MarshalJSON serializes the Flow including private fields.

func (*Flow) ParseState

func (f *Flow) ParseState(target any) error

ParseState unmarshals state fields into the target struct. Fields are matched by their JSON tag names. Fields in state that are not in the struct are ignored.

func (*Flow) Retry

func (f *Flow) Retry(initialDelay time.Duration, delayMultiplier float64, maxIntervalDelay time.Duration, giveUpAfter time.Duration) bool

Retry requests the orchestrator to re-execute this task with exponential backoff. The bound is wall-clock, not a count: Retry returns true (the caller should return nil) while the next attempt would still land within giveUpAfter of the step's first creation, and false (the caller should return its error) once the horizon is reached - including when the next backoff delay alone would overshoot it, so a wait we already know is doomed is not parked before failing. Pass giveUpAfter <= 0 for unlimited retry.

The delay before attempt N is min(initialDelay * delayMultiplier^N, maxIntervalDelay); pass a zero initialDelay for immediate retries, and a zero maxIntervalDelay for no per-interval cap. To hold the delay constant (e.g. honoring a provider's Retry-After carried in initialDelay), pass delayMultiplier 1.0. Sleep, if also set, is added on top as a floor.

Retry carries no condition of its own - it is the single retry primitive, called inside whatever error branch the task decides is retryable. Keeping the condition explicit at the call site avoids the "retry on every error" trap (most errors - validation, bad input, business rejections - should not be retried). Gate it on whatever your task considers transient:

result, err := callExternalAPI(ctx)
if err != nil {
    if isTransient(err) && flow.Retry(1*time.Second, 2.0, 30*time.Second, 1*time.Hour) {
        return result, nil // transient failure: retry scheduled, don't report error
    }
    return result, err // non-retryable, or horizon exceeded
}

To bound by count instead of (or in addition to) time, gate on Attempt: pass giveUpAfter 0 and check flow.Attempt() at the call site.

func (*Flow) RetryRequested

func (f *Flow) RetryRequested() (initialDelay time.Duration, multiplier float64, maxDelay time.Duration, ok bool)

RetryRequested returns the backoff parameters and true if Retry was called. The orchestrator uses these to compute the re-dispatch delay; the give-up decision is made client-side in Retry, so only the backoff shape crosses this boundary.

func (*Flow) Set

func (f *Flow) Set(key string, value any) error

Set sets a state field and tracks the change. Use this for complex types (structs, maps, etc.).

func (*Flow) SetBool

func (f *Flow) SetBool(key string, value bool)

SetBool sets a state bool field and tracks the change.

func (*Flow) SetChanges

func (f *Flow) SetChanges(source any, snap map[string]any) error

SetChanges marshals the source struct back to state, comparing against the provided snapshot. Only fields whose JSON value differs from the snapshot are recorded as changes. Changed fields are written to both the state and changes maps, so that subsequent reads (including transition condition evaluation) see the updated values.

func (*Flow) SetDuration

func (f *Flow) SetDuration(key string, value time.Duration)

SetDuration sets a state time.Duration field and tracks the change.

func (*Flow) SetFloat

func (f *Flow) SetFloat(key string, value float64)

SetFloat sets a state float64 field and tracks the change.

func (*Flow) SetInt

func (f *Flow) SetInt(key string, value int)

SetInt sets a state int field and tracks the change.

func (*Flow) SetState

func (f *Flow) SetState(source any) error

SetState marshals the source struct fields into state without tracking changes. Fields are matched by their JSON tag names.

func (*Flow) SetString

func (f *Flow) SetString(key string, value string)

SetString sets a state string field and tracks the change.

func (*Flow) SetStrings

func (f *Flow) SetStrings(key string, value []string)

SetStrings sets a state string slice field and tracks the change.

func (*Flow) Sleep

func (f *Flow) Sleep(duration time.Duration)

Sleep tells the orchestrator to wait for the given duration before the next execution.

func (*Flow) SleepRequested

func (f *Flow) SleepRequested() time.Duration

SleepRequested returns the duration set by Sleep, or zero if not set.

func (*Flow) Snapshot

func (f *Flow) Snapshot() map[string]any

Snapshot captures a read-only copy of the flow's current state (including any changes applied so far). Pass the returned snapshot to SetChanges to record only the fields that differ.

func (*Flow) StepCreatedAt added in v0.6.0

func (f *Flow) StepCreatedAt() time.Time

StepCreatedAt returns the wall-clock time at which this step was first created, preserved across retries of the step. It anchors Retry's giveUpAfter horizon, and a task can read it directly to implement a custom elapsed-time guard. Zero when called outside a dispatched task.

func (*Flow) StepKey added in v0.4.0

func (f *Flow) StepKey() string

StepKey returns the external key of the step this task is executing, in the form "{shard}-{stepID}-{token}". Useful for correlating logs/traces or calling back into the engine (e.g. Step) for the task's own step. Empty when called outside a dispatched task.

func (*Flow) Subgraph

func (f *Flow) Subgraph(workflowURL string, in any, out any) (yield bool, err error)

Subgraph runs a child workflow and unmarshals its result once it completes, parking the step in between.

Semantically a function call: only the explicit in argument crosses the boundary into the child, and only the explicit out crosses back. The parent's state does NOT auto-cross either direction. in is any JSON-marshalable value (a struct or a map[string]any) and becomes the child's initial state field-by-field; a nil in means "no arguments" (the child starts with empty state). A caller that wants the parent's full state to cross can pass flow.Snapshot() as in. The out argument is a pointer (a *struct or *map[string]any) into which the child's final_state is unmarshaled by JSON tag; pass nil to ignore the result. Using a typed struct reads only the fields you declare, with type safety.

On the first call (child not yet run) it arms the subgraph park with the child workflow URL and in and returns yield=true; the task must return immediately.

On re-entry after the child terminates it unmarshals the child's final_state into out, returns yield=false, and sets err if the child failed. Does not re-arm on re-entry.

var out ChildOut
yield, err := flow.Subgraph(childURL, ChildIn{Value: value}, &out)
if yield {
    return nil // parked, child running
}
if err != nil {
    if flow.Retry(time.Second, 2.0, 30*time.Second, time.Hour) {
        return nil
    }
    return errors.Trace(err)
}
// read fields from out

func (*Flow) SubgraphRequested

func (f *Flow) SubgraphRequested() (url string, input map[string]any, taskName string, ok bool)

SubgraphRequested returns the request URL, input state, the subtask name, and true if Subgraph or Subtask was called. taskName discriminates: non-empty means Subtask (the engine synthesizes a single-task graph named taskName), empty means a regular Subgraph (the engine loads the graph by URL).

func (*Flow) Subtask added in v0.5.0

func (f *Flow) Subtask(name, taskURL string, in any, out any) (yield bool, err error)

Subtask launches a single task as an isolated child flow, the task-level sibling of Subgraph. The engine synthesizes a trivial one-node graph (named name) around taskURL instead of loading a graph, so any task endpoint can be invoked without a graph definition; everything after launch - parking, re-entry, the out-pointer result, cancel/interrupt propagation - is identical to Subgraph. name is required (it is the node's display name in diagrams/history) and must be non-empty. It is a thin wrapper over Subgraph: the request, single-park guard, input normalization, and re-entry all flow through Subgraph; the non-empty name is what marks the pending request as a subtask.

Pass a task URL, not a graph URL: a graph URL would be wrapped as a one-node graph and dispatched as a task, failing at dispatch. (Symmetrically, Subgraph with a task URL fails in LoadGraph.) The typed per-service client makes this mistake impossible; the raw primitives are the dynamic-only escape hatch.

func (*Flow) Transform

func (f *Flow) Transform(pairs ...string)

Transform clears all state, then re-introduces the listed fields under new names. Arguments are (newKey, oldKey) pairs; the value previously stored under oldKey is captured before the clear and re-set under newKey. Old keys that were absent or already null are skipped (the new key is not introduced as null). Panics on an odd number of arguments.

Typical use: a small task immediately upstream of a subgraph node that reshapes parent state into the subgraph's expected input.

flow.Transform("subInput1", "parentVarA", "subInput2", "parentVarB")

func (*Flow) UnmarshalJSON

func (f *Flow) UnmarshalJSON(data []byte) error

UnmarshalJSON deserializes the Flow including private fields.

func (*Flow) UpdatedAt

func (f *Flow) UpdatedAt() time.Time

UpdatedAt returns the wall-clock time of the flow row's last status transition. Useful for tasks that want to gate on "how long since the flow last advanced." Zero when called outside a dispatched task or when the orchestrator has not populated it.

type FlowOptions

type FlowOptions struct {
	// Priority orders flows competing for workers; an explicit priority is >= 1,
	// lower runs first. Zero means "unset" and uses the engine's
	// DefaultPriority config.
	Priority int `json:"priority,omitzero"`
	// FairnessKey groups flows for fair scheduling, typically a tenant.
	// Empty derives it from baggage, else the "" bucket.
	FairnessKey string `json:"fairnessKey,omitzero"`
	// FairnessWeight is the relative dispatch share of the fairness key.
	// Zero uses a weight of 1.
	FairnessWeight float64 `json:"fairnessWeight,omitzero"`
	// StartAt delays execution of the flow's entry step until the given UTC time.
	// Zero or a past time means run as soon as the flow is started. Sets the
	// entry step's not_before column; the flow can still be created and started
	// immediately, but no worker will pick the step up before StartAt.
	StartAt time.Time `json:"startAt,omitzero"`
	// NotifyOnStop requests that the host's FlowStopped callback fire when this flow stops
	// (completed/failed/cancelled/interrupted). The engine persists the intent and, at stop time,
	// invokes FlowStopped with the flow's Baggage on the context - the host decides where/how to deliver
	// the notification from that baggage. When false (the default) FlowStopped is never called for the
	// flow. Set at Create; plain Start then runs the flow.
	NotifyOnStop bool `json:"notifyOnStop,omitzero"`
	// Baggage is opaque, host-defined context (identity/claims, tenant, locale, ...) carried with the
	// flow. The engine never interprets it: it is set once here, stored on the flow, inherited by
	// subgraphs and Continue, and delivered to every Host LoadGraph/ExecuteTask call via the dispatch
	// context - read it with BaggageFrom(ctx). Any JSON-marshalable value; the host receives the
	// JSON-decoded form (typically map[string]any), exactly like flow state.
	Baggage any `json:"baggage,omitzero"`
}

FlowOptions sets flow-level properties at Create or Run: scheduling (priority, fairness, start time) plus the opaque host Baggage. A nil *FlowOptions, or any zero field, uses the engine's defaults.

type FlowOutcome

type FlowOutcome struct {
	// Status is the flow's current lifecycle status: created, running, interrupted, completed, failed, or cancelled.
	Status string `json:"status,omitzero"`
	// State is the flow's accumulated state. For terminal statuses this is the final_state;
	// for running and interrupted flows it is the merged snapshot of the current step.
	State map[string]any `json:"state,omitzero"`
	// Error is the task error string. Populated when Status is "failed".
	Error string `json:"error,omitzero"`
	// InterruptPayload is the raw payload from flow.Interrupt(payload). Populated when Status is "interrupted".
	InterruptPayload map[string]any `json:"interruptPayload,omitzero"`
	// CancelReason is the reason string passed to Cancel(flowKey, reason). Populated when Status is "cancelled".
	CancelReason string `json:"cancelReason,omitzero"`
}

FlowOutcome carries the status and side-channel signals of a flow at a moment in time. Returned by Snapshot, Await, and Run, and fired as the payload of the OnFlowStopped event. Side-channel fields are populated only for the matching Status; for example InterruptPayload is populated only when Status is "interrupted".

type FlowRenderer

type FlowRenderer struct {
	// contains filtered or unexported fields
}

FlowRenderer renders the execution history of a flow as a Mermaid flowchart.

func NewFlowRenderer

func NewFlowRenderer(steps []FlowStep) *FlowRenderer

NewFlowRenderer creates a renderer for a flow's execution history.

func (*FlowRenderer) Render

func (r *FlowRenderer) Render() (string, error)

Render returns the Mermaid flowchart representation.

func (*FlowRenderer) WithAttentionColors

func (r *FlowRenderer) WithAttentionColors(fill, text string) *FlowRenderer

func (*FlowRenderer) WithErrorColors

func (r *FlowRenderer) WithErrorColors(fill, text string) *FlowRenderer

func (*FlowRenderer) WithLeftRight

func (r *FlowRenderer) WithLeftRight() *FlowRenderer
func (r *FlowRenderer) WithLinks(paramName string) *FlowRenderer

func (*FlowRenderer) WithPrimaryColors

func (r *FlowRenderer) WithPrimaryColors(fill, text string) *FlowRenderer

func (*FlowRenderer) WithSecondaryColors

func (r *FlowRenderer) WithSecondaryColors(fill, text string) *FlowRenderer

func (*FlowRenderer) WithTitle

func (r *FlowRenderer) WithTitle(text string) *FlowRenderer

func (*FlowRenderer) WithTopDown

func (r *FlowRenderer) WithTopDown() *FlowRenderer

type FlowStep

type FlowStep struct {
	StepKey   string `json:"stepKey,omitzero"`
	StepID    int    `json:"stepID,omitzero"`
	StepDepth int    `json:"stepDepth,omitzero"`
	TaskName  string `json:"taskName,omitzero"`
	Attempt   int    `json:"attempt,omitzero"`
	// PredecessorID and SuccessorID are the shard-local step ids of this step's neighbors in
	// the execution DAG. 0 means no such edge (entry / exit step).
	PredecessorID int `json:"predecessorID,omitzero"`
	SuccessorID   int `json:"successorID,omitzero"`
	// PrevKey and NextKey are the external step keys of the resolved navigation neighbors,
	// ready for use as ?step= links. Populated only by the Step endpoint.
	PrevKey          string         `json:"prevKey,omitzero"`
	NextKey          string         `json:"nextKey,omitzero"`
	Subgraph         bool           `json:"subgraph,omitzero"`
	SubWorkflowURL   string         `json:"subWorkflowURL,omitzero"`
	SubWorkflowName  string         `json:"subWorkflowName,omitzero"`
	SubHistory       []FlowStep     `json:"subHistory,omitzero"`
	State            map[string]any `json:"state,omitzero"`
	Changes          map[string]any `json:"changes,omitzero"`
	InterruptPayload map[string]any `json:"interruptPayload,omitzero"`
	Status           string         `json:"status,omitzero"`
	// Parked reports whether the step is currently held out of the selection band (a subgraph caller
	// waiting on its child, or a breaker-parked backlog step). A terminal step is never parked.
	Parked    bool      `json:"parked,omitzero"`
	Error     string    `json:"error,omitzero"`
	CreatedAt time.Time `json:"createdAt,omitzero"`
	// StartedAt is when the worker first dispatched the current attempt of this step.
	// Use HasStarted to gate reads; on a not-yet-leased row it carries the INSERT-time default.
	StartedAt time.Time `json:"startedAt,omitzero"`
	UpdatedAt time.Time `json:"updatedAt,omitzero"`
}

FlowStep is a single step in a flow's execution history.

func (FlowStep) Duration

func (s FlowStep) Duration() time.Duration

Duration is the wall-clock time from CreatedAt to UpdatedAt. Returns zero when either timestamp is missing or the delta is negative.

func (FlowStep) HasStarted

func (s FlowStep) HasStarted() bool

HasStarted reports whether StartedAt is a real dispatch timestamp rather than the INSERT-time default. True for running and any terminal status; false for created/pending.

type FlowSummary

type FlowSummary struct {
	FlowKey      string    `json:"flowKey,omitzero"`
	ThreadKey    string    `json:"threadKey,omitzero"`
	WorkflowURL  string    `json:"workflowURL,omitzero"`
	WorkflowName string    `json:"workflowName,omitzero"`
	Status       string    `json:"status,omitzero"`
	TaskName     string    `json:"taskName,omitzero"`
	Error        string    `json:"error,omitzero"`
	CancelReason string    `json:"cancelReason,omitzero"`
	CreatedAt    time.Time `json:"createdAt,omitzero"`
	// StartedAt is when this attempt began dispatching (Start, or a Restart/RestartFrom
	// rewind). Distinct from CreatedAt, which is the row's INSERT moment and is only
	// reset on full Restart. Use StartedAt for duration metrics; CreatedAt for "when did
	// this flow first appear."
	StartedAt time.Time `json:"startedAt,omitzero"`
	UpdatedAt time.Time `json:"updatedAt,omitzero"`
	// Priority is the flow's scheduling priority (>= 1, lower runs first), resolved at Create.
	Priority int `json:"priority,omitzero"`
	// FairnessKey is the flow's scheduling fairness bucket, resolved at Create.
	FairnessKey string `json:"fairnessKey,omitzero"`
}

FlowSummary is a summary of a flow for listing purposes.

func (FlowSummary) Duration

func (f FlowSummary) Duration() time.Duration

Duration is the wall-clock time from StartedAt to UpdatedAt.

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

Graph is the definition of a workflow. It describes the tasks, transitions between them, and reducers for merging state during fan-in.

Example

Build a linear workflow graph and validate it.

package main

import (
	"fmt"

	"github.com/microbus-io/dwarf/workflow"
)

func main() {
	g := workflow.NewGraph("Checkout")
	g.SetEndpoint("Reserve", "inventory.reserve")
	g.SetEndpoint("Charge", "billing.charge")
	g.AddTransitionChain("Reserve", "Charge", workflow.END)

	fmt.Println("name:", g.Name())
	fmt.Println("entry:", g.EntryPoint())
	fmt.Println("valid:", g.Validate() == nil)
}
Output:
name: Checkout
entry: Reserve
valid: true

func NewGraph

func NewGraph(name string) *Graph

NewGraph creates a new workflow graph with the given display name. The name is a human-friendly label (surfaced in rendering and Validate error messages); it is NOT the resolve key. The value passed to Create/Run and to the host's LoadGraph is a separate opaque URL that the engine stores on the flow (workflow_url) - it is never kept on the graph itself.

func (*Graph) AddTransition

func (g *Graph) AddTransition(from, to string)

AddTransition adds an unconditional transition between two nodes. Both endpoints are auto-registered as tasks if not already present (see autoRegister).

func (*Graph) AddTransitionChain added in v0.5.0

func (g *Graph) AddTransitionChain(names ...string)

AddTransitionChain wires an unconditional transition between each consecutive pair of names: AddTransitionChain("A", "B", "C") is AddTransition("A", "B") followed by AddTransition("B", "C"). It is a convenience for linear segments; fewer than two names is a no-op. END belongs last (a node after END would produce an invalid transition out of END). Mix with the other AddTransition* methods for branching, conditions, and loops.

func (*Graph) AddTransitionFanOut added in v0.6.2

func (g *Graph) AddTransitionFanOut(from string, to ...string)

AddTransitionFanOut wires an unconditional transition from one source to each of several destinations: AddTransitionFanOut("A", "B", "C") is AddTransition("A", "B") followed by AddTransition("A", "C"), so B and C both fire and run in parallel. It is a convenience for static fan-out; no destinations is a no-op. It creates only the outgoing edges - if the branches later rejoin at a node, that node still needs SetFanIn (and usually a reducer) wired separately. Distinct from AddTransitionForEach, which fans out dynamically over a runtime collection rather than across statically-named nodes.

func (*Graph) AddTransitionForEach

func (g *Graph) AddTransitionForEach(from, to string, forEach string, as string)

AddTransitionForEach adds a dynamic fan-out transition.

func (*Graph) AddTransitionGoto

func (g *Graph) AddTransitionGoto(from, to string)

AddTransitionGoto adds a transition that is only taken when the source task calls flow.Goto with a target that resolves to this transition's destination.

func (*Graph) AddTransitionOnError

func (g *Graph) AddTransitionOnError(from, to string)

AddTransitionOnError adds a transition that is taken when the source task returns an error.

func (*Graph) AddTransitionSwitch

func (g *Graph) AddTransitionSwitch(from, to string, when string)

AddTransitionSwitch adds a first-match-wins transition between two nodes. Multiple Switch transitions from the same source are evaluated in registration order and only the first whose 'when' expression evaluates true fires; the rest are skipped. If no Switch matches the flow ends at the source node, so the last Switch from a node is typically a catch-all with when="true". Only one branch ever runs, so a downstream SetFanIn is not required.

A node that uses Switch transitions must declare every successful-path outgoing transition as Switch (the validator rejects mixing Switch with When/plain/ForEach/Goto from the same source). OnError transitions are orthogonal and remain allowed.

func (*Graph) AddTransitionWhen

func (g *Graph) AddTransitionWhen(from, to string, when string)

AddTransitionWhen adds a conditional transition between two nodes.

func (*Graph) Annotate

func (g *Graph) Annotate(name string, note string)

Annotate attaches a short note to a node. The note renders as a teal, borderless text label directly beneath the node in the Mermaid diagram.

func (*Graph) Annotation

func (g *Graph) Annotation(name string) string

Annotation returns the note attached to a node via Annotate, or "" if none.

func (*Graph) EntryPoint

func (g *Graph) EntryPoint() string

EntryPoint returns the node name of the entry point of the graph.

func (*Graph) ErrorTransition

func (g *Graph) ErrorTransition(name string) (Transition, bool)

ErrorTransition returns the error transition from the given node name, if one exists.

func (*Graph) FanInFor

func (g *Graph) FanInFor(fanOutSource string) string

FanInFor returns the fan-in node that pops the frame pushed by a fan-out at the named source, or "" if the source is not a fan-out. Populated by Validate.

func (*Graph) HasFanIn

func (g *Graph) HasFanIn() bool

HasFanIn reports whether the graph declares any fan-in nexus.

func (*Graph) IsFanIn

func (g *Graph) IsFanIn(name string) bool

IsFanIn reports whether the named node is a fan-in nexus.

func (*Graph) IsFanOutSource

func (g *Graph) IsFanOutSource(name string) bool

IsFanOutSource reports whether the named node has 2+ non-goto/non-error outgoing transitions, or any forEach outgoing transition. Switch transitions are exclusive (only one branch ever fires) and therefore do not count toward fan-out.

func (*Graph) MarshalJSON

func (g *Graph) MarshalJSON() ([]byte, error)

MarshalJSON serializes the graph to JSON.

func (*Graph) Name

func (g *Graph) Name() string

Name returns the graph's display name.

func (*Graph) Nodes

func (g *Graph) Nodes() []Node

Nodes returns the list of nodes in the graph.

func (*Graph) Reducers

func (g *Graph) Reducers() map[string]Reducer

Reducers returns the reducer map for state fields.

func (*Graph) SetEndpoint added in v0.5.0

func (g *Graph) SetEndpoint(name, url string)

SetEndpoint binds a node (identified by its graph name) to the given dispatch URL, creating the node if it does not exist and updating its URL if it does. The name is the node's identity in the graph (used by transitions, fan-in, goto, breakpoints); the URL is the opaque downstream endpoint the engine hands to the host's ExecuteTask and groups the breaker/valve/saturation by. The first node bound becomes the default entry point unless SetEntryPoint is called explicitly. The pseudo-node END is not registered.

The same URL may be bound under multiple names. This is how a workflow author reuses the same task code at distinct positions in the graph with different downstream transitions per position.

func (*Graph) SetEntryPoint

func (g *Graph) SetEntryPoint(name string)

SetEntryPoint sets the entry point of the graph explicitly, overriding the default (first task added). The argument is a node name.

func (*Graph) SetFanIn

func (g *Graph) SetFanIn(name string)

SetFanIn marks a node as a fan-in nexus. Opts the graph into the lineage validator.

func (*Graph) SetReducer

func (g *Graph) SetReducer(field string, reducer Reducer)

SetReducer sets the merge strategy for a state field during fan-in.

func (*Graph) Transitions

func (g *Graph) Transitions() []Transition

Transitions returns the list of transitions in the graph. The returned slice shares the graph's underlying storage; callers must not mutate it. The graph is treated as immutable after Validate, so read-only iteration is safe.

func (*Graph) URLOf

func (g *Graph) URLOf(name string) string

URLOf returns the dispatch URL for a node identified by name. Returns the empty string if the name is not registered. END maps to itself.

func (*Graph) UnmarshalJSON

func (g *Graph) UnmarshalJSON(data []byte) error

UnmarshalJSON deserializes the graph from JSON.

func (*Graph) Validate

func (g *Graph) Validate() error

Validate checks the graph for structural errors.

type GraphRenderer

type GraphRenderer struct {
	// contains filtered or unexported fields
}

GraphRenderer renders a workflow Graph to a Mermaid flowchart. Configure via the With* builder methods, then call Render.

func NewGraphRenderer

func NewGraphRenderer(g *Graph) *GraphRenderer

NewGraphRenderer creates a renderer for the given graph with default styling.

func (*GraphRenderer) Render

func (r *GraphRenderer) Render() (string, error)

Render returns a fully-styled Mermaid flowchart representation of the graph.

func (*GraphRenderer) WithAnnotationColor

func (r *GraphRenderer) WithAnnotationColor(color string) *GraphRenderer

WithAnnotationColor overrides the color of annotation text.

func (*GraphRenderer) WithLeftRight

func (r *GraphRenderer) WithLeftRight() *GraphRenderer

WithLeftRight renders the diagram left-to-right.

func (r *GraphRenderer) WithLinks(paramName string) *GraphRenderer

WithLinks enables click directives on every task node.

func (*GraphRenderer) WithPrimaryColors

func (r *GraphRenderer) WithPrimaryColors(fill, text string) *GraphRenderer

WithPrimaryColors overrides the primary brand pair.

func (*GraphRenderer) WithSecondaryColors

func (r *GraphRenderer) WithSecondaryColors(fill, text string) *GraphRenderer

WithSecondaryColors overrides the secondary surface pair.

func (*GraphRenderer) WithTitleLabel

func (r *GraphRenderer) WithTitleLabel(show bool) *GraphRenderer

WithTitleLabel toggles the Mermaid frontmatter title (the graph's display name) rendered as a caption above the chart.

func (*GraphRenderer) WithTopDown

func (r *GraphRenderer) WithTopDown() *GraphRenderer

WithTopDown renders the diagram top-to-bottom.

type Node

type Node struct {
	Name string
	URL  string
}

Node describes a task or subgraph node registered in a workflow graph. Name is the node's identifier within the graph and the value stored on step rows (dwarf_steps.task_name). URL is the dispatch target the engine calls when the node is reached.

type Query

type Query struct {
	Status      string `json:"status,omitzero"`
	WorkflowURL string `json:"workflowURL,omitzero"`
	// WorkflowName filters to flows whose graph display name (the human-friendly name set via
	// NewGraph and denormalized onto the flow row) equals this value. Distinct from WorkflowURL,
	// which matches the resolve key. Empty disables the filter; composes with WorkflowURL.
	WorkflowName string `json:"workflowName,omitzero"`
	ThreadKey    string `json:"threadKey,omitzero"`
	// TaskName filters to flows whose current step is on the named task.
	TaskName string `json:"taskName,omitzero"`
	// FairnessKey filters to flows with this scheduling fairness key. The host typically sets the
	// fairness key to the tenant, so this is how "list flows for tenant X" is expressed. Empty
	// disables the filter.
	FairnessKey string `json:"fairnessKey,omitzero"`
	// Priority filters to flows at this scheduling priority band. Zero disables the filter
	// (valid priorities are >= 1).
	Priority int `json:"priority,omitzero"`
	// OlderThan filters to flows whose updated_at is older than this duration relative to now.
	// Zero disables the filter.
	OlderThan time.Duration `json:"olderThan,omitzero"`
	// NewerThan filters to flows whose updated_at is within this duration of now.
	// Zero disables the filter. Composes with OlderThan to express "between X and Y ago."
	NewerThan time.Duration `json:"newerThan,omitzero"`
	// Shard restricts the query to a single 1-based shard. Zero queries all shards.
	Shard int `json:"shard,omitzero"`
	// Cursor is the opaque pagination cursor returned as NextCursor by the previous List call.
	Cursor string `json:"cursor,omitzero"`
	// Search is a case-insensitive substring matched against workflow_url, workflow_name, current
	// task_name, error, and cancel_reason. SQL LIKE wildcards (%, _) in the value are honored.
	Search string `json:"search,omitzero"`
	Limit  int    `json:"limit,omitzero"`
}

Query specifies filtering and pagination options for listing or purging flows.

type RawFlow

type RawFlow struct {
	Flow
}

RawFlow wraps Flow with additional methods used by the orchestrator. Task endpoints should use Flow directly; RawFlow is for internal orchestration use only.

func NewRawFlow

func NewRawFlow() *RawFlow

NewRawFlow creates a new RawFlow with initialized maps.

func (*RawFlow) ClearChanges

func (f *RawFlow) ClearChanges()

ClearChanges resets the changes map. Called by the orchestrator after persisting changes.

func (*RawFlow) ClearControl

func (f *RawFlow) ClearControl()

ClearControl resets all control signals. Called by the orchestrator after processing them.

func (*RawFlow) RawChanges

func (f *RawFlow) RawChanges() map[string]any

RawChanges returns a copy of the raw changes map.

func (*RawFlow) RawState

func (f *RawFlow) RawState() map[string]any

RawState returns a copy of the raw state map.

func (*RawFlow) SetAttempt

func (f *RawFlow) SetAttempt(attempt int)

SetAttempt sets the attempt counter on the flow. Called by the orchestrator before dispatching a task so that Retry can check whether attempts are exhausted.

func (*RawFlow) SetCreatedAt added in v0.6.0

func (f *RawFlow) SetCreatedAt(createdAt time.Time)

SetCreatedAt records the flow row's createdAt. Called by the orchestrator before dispatching a task so the task can read it via Flow.CreatedAt().

func (*RawFlow) SetFlowKey added in v0.4.0

func (f *RawFlow) SetFlowKey(flowKey string)

SetFlowKey records the external key of the flow being dispatched, so the task can read it via Flow.FlowKey(). Called by the orchestrator before dispatching a task.

func (*RawFlow) SetInterruptResolution

func (f *RawFlow) SetInterruptResolution(resumeData map[string]any)

SetInterruptResolution records that an interrupt park has resolved, with the resume data materialized from the step row's resume_data column, so flow.Interrupt returns it (with yield=false) on re-entry instead of re-arming. The orchestrator calls this only when the step row's interrupt_done is set; an un-resumed step leaves the flow's default (not resolved).

func (*RawFlow) SetRawChanges

func (f *RawFlow) SetRawChanges(changes map[string]any)

SetRawChanges replaces the entire changes map with the given raw map.

func (*RawFlow) SetRawState

func (f *RawFlow) SetRawState(state map[string]any)

SetRawState replaces the entire state with the given raw map, without tracking changes.

func (*RawFlow) SetStepCreatedAt added in v0.6.0

func (f *RawFlow) SetStepCreatedAt(stepCreatedAt time.Time)

SetStepCreatedAt records the step row's createdAt, preserved across retries. Called by the orchestrator before dispatching a task so Retry can measure its giveUpAfter horizon and the task can read it via Flow.StepCreatedAt().

func (*RawFlow) SetStepKey added in v0.4.0

func (f *RawFlow) SetStepKey(stepKey string)

SetStepKey records the external key of the step being dispatched, so the task can read it via Flow.StepKey(). Called by the orchestrator before dispatching a task.

func (*RawFlow) SetSubgraphResolution

func (f *RawFlow) SetSubgraphResolution(result map[string]any, errStr string)

SetSubgraphResolution records that a subgraph park has resolved, with the child's final_state (result) and error materialized from the step row's subgraph_result / subgraph_error columns, so flow.Subgraph returns them (with yield=false) on re-entry instead of re-arming. The orchestrator calls this only when the step row's subgraph_done is set.

func (*RawFlow) SetUpdatedAt added in v0.6.0

func (f *RawFlow) SetUpdatedAt(updatedAt time.Time)

SetUpdatedAt records the flow row's updatedAt. Called by the orchestrator before dispatching a task so the task can read it via Flow.UpdatedAt().

type Reducer

type Reducer string

Reducer defines how concurrent state modifications from parallel tasks are merged during fan-in.

const (
	ReducerReplace Reducer = "replace" // Last write wins (default)
	ReducerAppend  Reducer = "append"  // Concatenate arrays
	ReducerAdd     Reducer = "add"     // Sum numeric values
	ReducerMin     Reducer = "min"     // Smaller of two numeric values
	ReducerMax     Reducer = "max"     // Larger of two numeric values
	ReducerUnion   Reducer = "union"   // Merge arrays, deduplicate
	ReducerMerge   Reducer = "merge"   // Merge objects, new key wins
	ReducerAnd     Reducer = "and"     // Logical AND of booleans
	ReducerOr      Reducer = "or"      // Logical OR of booleans
	ReducerConcat  Reducer = "concat"  // Concatenate strings
)

func (Reducer) Reduce

func (r Reducer) Reduce(existing, incoming any) (any, error)

Reduce applies the reducer to merge an incoming value into an existing value.

type Transition

type Transition struct {
	From     string `json:"from"`
	To       string `json:"to"`
	When     string `json:"when,omitzero"`
	WithGoto bool   `json:"withGoto,omitzero"`
	ForEach  string `json:"forEach,omitzero"` // dynamic fan-out over a state field
	As       string `json:"as,omitzero"`      // alias for the current element during forEach fan-out
	OnError  bool   `json:"onError,omitzero"` // taken when the source task returns an error
	Switch   bool   `json:"switch,omitzero"`  // first-match-wins among siblings; never fans out
}

Transition defines a possible transition between two nodes in a workflow graph. From and To are node names, not URLs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL