workflow

package
v1.27.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const END = "END"

END is a pseudo-node indicating that the workflow should terminate. Use it as the target of a transition to mark a terminal path.

Variables

This section is empty.

Functions

func FilterState

func FilterState(state map[string]any, declared []string) map[string]any

FilterState returns a subset of state based on declared field names. nil or empty = pass nothing. ["*"] = pass everything. Named fields = pass only those fields.

func MergeState

func MergeState(state any, changes any, reducers map[string]Reducer) (map[string]any, error)

MergeState applies changes on top of state, using the provided reducers for fields that have one. Fields without an explicit reducer use replace semantics. State, changes and the result are map[string]any or nil.

Types

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

Flow is the carrier object passed to tasks. It holds the state and control signals for a single step in a workflow execution.

func NewFlow

func NewFlow() *Flow

NewFlow creates a new Flow with initialized maps.

func (*Flow) Get

func (f *Flow) Get(key string, target any) error

Get unmarshals a state field into the target. Use this for complex types (structs, maps, etc.).

func (*Flow) GetBool

func (f *Flow) GetBool(key string) bool

GetBool returns a state field as a bool.

func (*Flow) GetDuration

func (f *Flow) GetDuration(key string) time.Duration

GetDuration returns a state field as a time.Duration.

func (*Flow) GetFloat

func (f *Flow) GetFloat(key string) float64

GetFloat returns a state field as a float64.

func (*Flow) GetInt

func (f *Flow) GetInt(key string) int

GetInt returns a state field as an int.

func (*Flow) GetString

func (f *Flow) GetString(key string) string

GetString returns a state field as a string.

func (*Flow) GetStrings

func (f *Flow) GetStrings(key string) []string

GetStrings returns a state field as a string slice.

func (*Flow) Goto

func (f *Flow) Goto(taskName string)

Goto overrides transition routing. The orchestrator skips condition evaluation and follows the specified task instead.

func (*Flow) GotoRequested

func (f *Flow) GotoRequested() string

GotoRequested returns the task URL set by Goto, or empty if not set.

func (*Flow) Has

func (f *Flow) Has(key string) bool

Has reports whether a state field exists.

func (*Flow) Interrupt

func (f *Flow) Interrupt(payload any)

Interrupt pauses the flow execution and requests external input. The payload is propagated up through the surgraph chain and surfaced via State() so the caller can see what data the task needs. The task should return normally after calling Interrupt.

func (*Flow) InterruptRequested

func (f *Flow) InterruptRequested() (map[string]any, bool)

InterruptRequested returns the interrupt payload and true if Interrupt was called.

func (*Flow) MarshalJSON

func (f *Flow) MarshalJSON() ([]byte, error)

MarshalJSON serializes the Flow including private fields.

func (*Flow) ParseState

func (f *Flow) ParseState(target any) error

ParseState unmarshals state fields into the target struct. Fields are matched by their JSON tag names. Fields in state that are not in the struct are ignored.

func (*Flow) Retry

func (f *Flow) Retry(maxAttempts int, initialDelay time.Duration, multiplier float64, maxDelay time.Duration) bool

Retry requests the orchestrator to retry this task with exponential backoff. Returns true if a retry will be scheduled (attempts remaining), false if exhausted. When true, the task should return nil. When false, the task should return its error. The delay for attempt N is min(initialDelay * multiplier^N, maxDelay).

Example:

result, err := callExternalAPI(ctx)
if err != nil {
    if flow.Retry(5, 1*time.Second, 2.0, 30*time.Second) {
        return result, nil // retry scheduled, don't report error
    }
    return result, err // retries exhausted, report the error
}

func (*Flow) RetryNow

func (f *Flow) RetryNow() bool

RetryNow signals the orchestrator to re-execute this task immediately with no limit. Equivalent to Retry(math.MaxInt32, 0, 0, 0).

func (*Flow) RetryRequested

func (f *Flow) RetryRequested() (maxAttempts int, initialDelay time.Duration, multiplier float64, maxDelay time.Duration, ok bool)

RetryRequested returns the backoff parameters and true if Retry was called. The foreman uses these to compute the sleep delay and check the attempt limit.

func (*Flow) Set

func (f *Flow) Set(key string, value any) error

Set sets a state field and tracks the change. Use this for complex types (structs, maps, etc.).

func (*Flow) SetBool

func (f *Flow) SetBool(key string, value bool)

SetBool sets a state bool field and tracks the change.

func (*Flow) SetChanges

func (f *Flow) SetChanges(source any, snap map[string]any) error

SetChanges marshals the source struct back to state, comparing against the provided snapshot. Only fields whose JSON value differs from the snapshot are recorded as changes. Changed fields are written to both the state and changes maps, so that subsequent reads (including transition condition evaluation) see the updated values.

func (*Flow) SetDuration

func (f *Flow) SetDuration(key string, value time.Duration)

SetDuration sets a state time.Duration field and tracks the change.

func (*Flow) SetFloat

func (f *Flow) SetFloat(key string, value float64)

SetFloat sets a state float64 field and tracks the change.

func (*Flow) SetInt

func (f *Flow) SetInt(key string, value int)

SetInt sets a state int field and tracks the change.

func (*Flow) SetState

func (f *Flow) SetState(source any) error

SetState marshals the source struct fields into state without tracking changes. Fields are matched by their JSON tag names.

func (*Flow) SetString

func (f *Flow) SetString(key string, value string)

SetString sets a state string field and tracks the change.

func (*Flow) SetStrings

func (f *Flow) SetStrings(key string, value []string)

SetStrings sets a state string slice field and tracks the change.

func (*Flow) Sleep

func (f *Flow) Sleep(duration time.Duration)

Sleep tells the orchestrator to wait for the given duration before the next execution.

func (*Flow) SleepRequested

func (f *Flow) SleepRequested() time.Duration

SleepRequested returns the duration set by Sleep, or zero if not set.

func (*Flow) Snapshot

func (f *Flow) Snapshot() map[string]any

Snapshot captures a read-only copy of the flow's current state (including any changes applied so far). Pass the returned snapshot to SetChanges to record only the fields that differ.

func (*Flow) Subgraph

func (f *Flow) Subgraph(workflowURL string, input map[string]any)

Subgraph signals the orchestrator to create and run a child workflow before this step completes. The step is parked until the child finishes - similar to how Interrupt pauses until Resume is called. When the child completes, its output state (filtered through DeclareOutputs) is merged into this step's state and the task is re-executed. On re-entry the task sees the child's output in its state and should return normally without calling Subgraph again.

The input map is merged into the parent's current state using the child graph's reducers, then filtered through the child's DeclareInputs - the same semantics as Continue's additionalState.

func (*Flow) SubgraphRequested

func (f *Flow) SubgraphRequested() (workflowURL string, input map[string]any, ok bool)

SubgraphRequested returns the workflow URL, input state, and true if Subgraph was called.

func (*Flow) UnmarshalJSON

func (f *Flow) UnmarshalJSON(data []byte) error

UnmarshalJSON deserializes the Flow including private fields.

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

Graph is the definition of a workflow. It describes the tasks, transitions between them, and reducers for merging state during fan-in.

Use NewGraph to create a new graph, then AddTask, AddTransition, AddTransitionWhen, SetEntryPoint, and SetReducer to build it up.

func NewGraph

func NewGraph(name string) *Graph

NewGraph creates a new workflow graph with the given name.

func (*Graph) AddErrorTransition

func (g *Graph) AddErrorTransition(from, to string)

AddErrorTransition adds a transition that is taken when the source task returns an error. The error is serialized as a TracedError into the state field "onErr" of the target task. Error transitions cannot be combined with When, ForEach, or WithGoto. Both endpoints are auto-registered as tasks if not already present.

func (*Graph) AddSubgraph

func (g *Graph) AddSubgraph(workflowName string)

AddSubgraph registers a child workflow as a subgraph node in the graph. Transitions can target a subgraph name just like a task name. When the foreman reaches a subgraph transition, it creates and runs the child workflow, blocking the parent until the child completes. Duplicate names are ignored.

func (*Graph) AddTask

func (g *Graph) AddTask(taskURL string)

AddTask registers a task node in the graph. The first node added becomes the default entry point unless SetEntryPoint is called explicitly. Duplicate nodes are ignored. The pseudo-node END is not registered.

func (*Graph) AddTransition

func (g *Graph) AddTransition(from, to string)

AddTransition adds an unconditional transition between two nodes. Both endpoints are auto-registered as tasks if not already present.

func (*Graph) AddTransitionForEach

func (g *Graph) AddTransitionForEach(from, to string, forEach string, as string)

AddTransitionForEach adds a dynamic fan-out transition. The foreman iterates over the state field named by forEach and spawns one instance of the target task per element. The as parameter sets the state key name for the current element (defaults to "item" if empty). Both endpoints are auto-registered as tasks if not already present. If the array is empty, no tasks are spawned. When this is the only outgoing transition from a task, an empty array causes the flow to complete at that point.

func (*Graph) AddTransitionGoto

func (g *Graph) AddTransitionGoto(from, to string)

AddTransitionGoto adds a transition that is only taken when the task explicitly calls flow.Goto with the target task URL. Both endpoints are auto-registered as tasks if not already present.

func (*Graph) AddTransitionWhen

func (g *Graph) AddTransitionWhen(from, to string, when string)

AddTransitionWhen adds a conditional transition between two nodes. The when expression is evaluated against the flow state to determine if the transition should be taken. Both endpoints are auto-registered as tasks if not already present.

func (*Graph) DeclareInputs

func (g *Graph) DeclareInputs(fields ...string)

DeclareInputs declares which state fields are passed into this graph when used as a subgraph. No arguments means nothing is passed in. Use "*" to pass everything. Named fields pass only those fields.

func (*Graph) DeclareOutputs

func (g *Graph) DeclareOutputs(fields ...string)

DeclareOutputs declares which state fields are returned from this graph on completion. No arguments means nothing is returned. Use "*" to return everything. Named fields return only those fields. Output filtering applies both when used as a subgraph and as a root flow.

func (*Graph) EntryPoint

func (g *Graph) EntryPoint() string

EntryPoint returns the entry point task URL of the graph.

func (*Graph) ErrorTransition

func (g *Graph) ErrorTransition(taskName string) (Transition, bool)

ErrorTransition returns the error transition from the given task, if one exists.

func (*Graph) Inputs

func (g *Graph) Inputs() []string

Inputs returns the declared input fields.

func (*Graph) IsSubgraph

func (g *Graph) IsSubgraph(name string) bool

IsSubgraph returns true if the given name is registered as a subgraph.

func (*Graph) MarshalJSON

func (g *Graph) MarshalJSON() ([]byte, error)

MarshalJSON serializes the graph to JSON.

func (*Graph) Mermaid

func (g *Graph) Mermaid() string

Mermaid returns a Mermaid flowchart representation of the graph.

func (*Graph) Name

func (g *Graph) Name() string

Name returns the name of the graph.

func (*Graph) Nodes

func (g *Graph) Nodes() []Node

Nodes returns the list of nodes in the graph.

func (*Graph) Outputs

func (g *Graph) Outputs() []string

Outputs returns the declared output fields.

func (*Graph) Reducers

func (g *Graph) Reducers() map[string]Reducer

Reducers returns the reducer map for state fields.

func (*Graph) SetEntryPoint

func (g *Graph) SetEntryPoint(taskURL string)

SetEntryPoint sets the entry point of the graph explicitly, overriding the default (first task added).

func (*Graph) SetReducer

func (g *Graph) SetReducer(field string, reducer Reducer)

SetReducer sets the merge strategy for a state field during fan-in.

func (*Graph) SetTimeBudget

func (g *Graph) SetTimeBudget(taskURL string, budget time.Duration)

SetTimeBudget sets the execution time budget for a specific task. If not set, the foreman's DefaultTimeBudget config is used. The task must already be registered.

func (*Graph) TimeBudget

func (g *Graph) TimeBudget(taskURL string) time.Duration

TimeBudget returns the execution time budget for a specific task, or 0 if not set.

func (*Graph) Transitions

func (g *Graph) Transitions() []Transition

Transitions returns the list of transitions in the graph.

func (*Graph) UnmarshalJSON

func (g *Graph) UnmarshalJSON(data []byte) error

UnmarshalJSON deserializes the graph from JSON.

func (*Graph) Validate

func (g *Graph) Validate() error

Validate checks the graph for structural errors.

type Node

type Node struct {
	Name       string
	TimeBudget time.Duration
	Subgraph   bool
}

Node describes a task or subgraph node registered in a workflow graph.

type RawFlow

type RawFlow struct {
	Flow
}

RawFlow wraps Flow with additional methods used by the foreman orchestrator. Task endpoints should use Flow directly; RawFlow is for internal orchestration use only.

func NewRawFlow

func NewRawFlow() *RawFlow

NewRawFlow creates a new RawFlow with initialized maps.

func (*RawFlow) ClearChanges

func (f *RawFlow) ClearChanges()

ClearChanges resets the changes map. Called by the orchestrator after persisting changes.

func (*RawFlow) ClearControl

func (f *RawFlow) ClearControl()

ClearControl resets all control signals. Called by the orchestrator after processing them.

func (*RawFlow) RawChanges

func (f *RawFlow) RawChanges() map[string]any

RawChanges returns a copy of the raw changes map.

func (*RawFlow) RawState

func (f *RawFlow) RawState() map[string]any

RawState returns a copy of the raw state map.

func (*RawFlow) SetAttempt

func (f *RawFlow) SetAttempt(attempt int)

SetAttempt sets the attempt counter on the flow. Called by the orchestrator before dispatching a task so that Retry can check whether attempts are exhausted.

func (*RawFlow) SetRawChanges

func (f *RawFlow) SetRawChanges(changes map[string]any)

SetRawChanges replaces the entire changes map with the given raw map.

func (*RawFlow) SetRawState

func (f *RawFlow) SetRawState(state map[string]any)

SetRawState replaces the entire state with the given raw map, without tracking changes.

type Reducer

type Reducer string

Reducer defines how concurrent state modifications from parallel tasks are merged during fan-in.

const (
	ReducerReplace Reducer = "replace" // Last write wins (default)
	ReducerAppend  Reducer = "append"  // Concatenate arrays
	ReducerAdd     Reducer = "add"     // Sum numeric values
	ReducerUnion   Reducer = "union"   // Merge arrays, deduplicate
)

func (Reducer) Reduce

func (r Reducer) Reduce(existing, incoming any) (any, error)

Reduce applies the reducer to merge an incoming value into an existing value. Both existing and incoming are JSON-compatible values (json.RawMessage or native Go types). The result is the merged value.

type Transition

type Transition struct {
	From     string `json:"from"`
	To       string `json:"to"`
	When     string `json:"when,omitzero"`
	WithGoto bool   `json:"withGoto,omitzero"`
	ForEach  string `json:"forEach,omitzero"` // dynamic fan-out over a state field
	As       string `json:"as,omitzero"`      // alias for the current element during forEach fan-out
	OnError  bool   `json:"onError,omitzero"` // taken when the source task returns an error
}

Transition defines a possible transition between two tasks in a workflow graph.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL