Documentation
¶
Index ¶
- Constants
- func FilterState(state map[string]any, declared []string) map[string]any
- func MergeState(state any, changes any, reducers map[string]Reducer) (map[string]any, error)
- type Flow
- func (f *Flow) Get(key string, target any) error
- func (f *Flow) GetBool(key string) bool
- func (f *Flow) GetDuration(key string) time.Duration
- func (f *Flow) GetFloat(key string) float64
- func (f *Flow) GetInt(key string) int
- func (f *Flow) GetString(key string) string
- func (f *Flow) GetStrings(key string) []string
- func (f *Flow) Goto(taskName string)
- func (f *Flow) GotoRequested() string
- func (f *Flow) Has(key string) bool
- func (f *Flow) Interrupt(payload any)
- func (f *Flow) InterruptRequested() (map[string]any, bool)
- func (f *Flow) MarshalJSON() ([]byte, error)
- func (f *Flow) ParseState(target any) error
- func (f *Flow) Retry(maxAttempts int, initialDelay time.Duration, multiplier float64, ...) bool
- func (f *Flow) RetryNow() bool
- func (f *Flow) RetryRequested() (maxAttempts int, initialDelay time.Duration, multiplier float64, ...)
- func (f *Flow) Set(key string, value any) error
- func (f *Flow) SetBool(key string, value bool)
- func (f *Flow) SetChanges(source any, snap map[string]any) error
- func (f *Flow) SetDuration(key string, value time.Duration)
- func (f *Flow) SetFloat(key string, value float64)
- func (f *Flow) SetInt(key string, value int)
- func (f *Flow) SetState(source any) error
- func (f *Flow) SetString(key string, value string)
- func (f *Flow) SetStrings(key string, value []string)
- func (f *Flow) Sleep(duration time.Duration)
- func (f *Flow) SleepRequested() time.Duration
- func (f *Flow) Snapshot() map[string]any
- func (f *Flow) Subgraph(workflowURL string, input map[string]any)
- func (f *Flow) SubgraphRequested() (workflowURL string, input map[string]any, ok bool)
- func (f *Flow) UnmarshalJSON(data []byte) error
- type Graph
- func (g *Graph) AddErrorTransition(from, to string)
- func (g *Graph) AddSubgraph(workflowName string)
- func (g *Graph) AddTask(taskURL string)
- func (g *Graph) AddTransition(from, to string)
- func (g *Graph) AddTransitionForEach(from, to string, forEach string, as string)
- func (g *Graph) AddTransitionGoto(from, to string)
- func (g *Graph) AddTransitionWhen(from, to string, when string)
- func (g *Graph) DeclareInputs(fields ...string)
- func (g *Graph) DeclareOutputs(fields ...string)
- func (g *Graph) EntryPoint() string
- func (g *Graph) ErrorTransition(taskName string) (Transition, bool)
- func (g *Graph) Inputs() []string
- func (g *Graph) IsSubgraph(name string) bool
- func (g *Graph) MarshalJSON() ([]byte, error)
- func (g *Graph) Mermaid() string
- func (g *Graph) Name() string
- func (g *Graph) Nodes() []Node
- func (g *Graph) Outputs() []string
- func (g *Graph) Reducers() map[string]Reducer
- func (g *Graph) SetEntryPoint(taskURL string)
- func (g *Graph) SetReducer(field string, reducer Reducer)
- func (g *Graph) SetTimeBudget(taskURL string, budget time.Duration)
- func (g *Graph) TimeBudget(taskURL string) time.Duration
- func (g *Graph) Transitions() []Transition
- func (g *Graph) UnmarshalJSON(data []byte) error
- func (g *Graph) Validate() error
- type Node
- type RawFlow
- func (f *RawFlow) ClearChanges()
- func (f *RawFlow) ClearControl()
- func (f *RawFlow) RawChanges() map[string]any
- func (f *RawFlow) RawState() map[string]any
- func (f *RawFlow) SetAttempt(attempt int)
- func (f *RawFlow) SetRawChanges(changes map[string]any)
- func (f *RawFlow) SetRawState(state map[string]any)
- type Reducer
- type Transition
Constants ¶
const END = "END"
END is a pseudo-node indicating that the workflow should terminate. Use it as the target of a transition to mark a terminal path.
Variables ¶
This section is empty.
Functions ¶
func FilterState ¶
FilterState returns a subset of state based on declared field names. nil or empty = pass nothing. ["*"] = pass everything. Named fields = pass only those fields.
func MergeState ¶
MergeState applies changes on top of state, using the provided reducers for fields that have one. Fields without an explicit reducer use replace semantics. State, changes and the result are map[string]any or nil.
Types ¶
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
Flow is the carrier object passed to tasks. It holds the state and control signals for a single step in a workflow execution.
func (*Flow) Get ¶
Get unmarshals a state field into the target. Use this for complex types (structs, maps, etc.).
func (*Flow) GetDuration ¶
GetDuration returns a state field as a time.Duration.
func (*Flow) GetStrings ¶
GetStrings returns a state field as a string slice.
func (*Flow) Goto ¶
Goto overrides transition routing. The orchestrator skips condition evaluation and follows the specified task instead.
func (*Flow) GotoRequested ¶
GotoRequested returns the task URL set by Goto, or empty if not set.
func (*Flow) Interrupt ¶
Interrupt pauses the flow execution and requests external input. The payload is propagated up through the surgraph chain and surfaced via State() so the caller can see what data the task needs. The task should return normally after calling Interrupt.
func (*Flow) InterruptRequested ¶
InterruptRequested returns the interrupt payload and true if Interrupt was called.
func (*Flow) MarshalJSON ¶
MarshalJSON serializes the Flow including private fields.
func (*Flow) ParseState ¶
ParseState unmarshals state fields into the target struct. Fields are matched by their JSON tag names. Fields in state that are not in the struct are ignored.
func (*Flow) Retry ¶
func (f *Flow) Retry(maxAttempts int, initialDelay time.Duration, multiplier float64, maxDelay time.Duration) bool
Retry requests the orchestrator to retry this task with exponential backoff. Returns true if a retry will be scheduled (attempts remaining), false if exhausted. When true, the task should return nil. When false, the task should return its error. The delay for attempt N is min(initialDelay * multiplier^N, maxDelay).
Example:
result, err := callExternalAPI(ctx)
if err != nil {
if flow.Retry(5, 1*time.Second, 2.0, 30*time.Second) {
return result, nil // retry scheduled, don't report error
}
return result, err // retries exhausted, report the error
}
func (*Flow) RetryNow ¶
RetryNow signals the orchestrator to re-execute this task immediately with no limit. Equivalent to Retry(math.MaxInt32, 0, 0, 0).
func (*Flow) RetryRequested ¶
func (f *Flow) RetryRequested() (maxAttempts int, initialDelay time.Duration, multiplier float64, maxDelay time.Duration, ok bool)
RetryRequested returns the backoff parameters and true if Retry was called. The foreman uses these to compute the sleep delay and check the attempt limit.
func (*Flow) Set ¶
Set sets a state field and tracks the change. Use this for complex types (structs, maps, etc.).
func (*Flow) SetChanges ¶
SetChanges marshals the source struct back to state, comparing against the provided snapshot. Only fields whose JSON value differs from the snapshot are recorded as changes. Changed fields are written to both the state and changes maps, so that subsequent reads (including transition condition evaluation) see the updated values.
func (*Flow) SetDuration ¶
SetDuration sets a state time.Duration field and tracks the change.
func (*Flow) SetState ¶
SetState marshals the source struct fields into state without tracking changes. Fields are matched by their JSON tag names.
func (*Flow) SetStrings ¶
SetStrings sets a state string slice field and tracks the change.
func (*Flow) Sleep ¶
Sleep tells the orchestrator to wait for the given duration before the next execution.
func (*Flow) SleepRequested ¶
SleepRequested returns the duration set by Sleep, or zero if not set.
func (*Flow) Snapshot ¶
Snapshot captures a read-only copy of the flow's current state (including any changes applied so far). Pass the returned snapshot to SetChanges to record only the fields that differ.
func (*Flow) Subgraph ¶
Subgraph signals the orchestrator to create and run a child workflow before this step completes. The step is parked until the child finishes - similar to how Interrupt pauses until Resume is called. When the child completes, its output state (filtered through DeclareOutputs) is merged into this step's state and the task is re-executed. On re-entry the task sees the child's output in its state and should return normally without calling Subgraph again.
The input map is merged into the parent's current state using the child graph's reducers, then filtered through the child's DeclareInputs - the same semantics as Continue's additionalState.
func (*Flow) SubgraphRequested ¶
SubgraphRequested returns the workflow URL, input state, and true if Subgraph was called.
func (*Flow) UnmarshalJSON ¶
UnmarshalJSON deserializes the Flow including private fields.
type Graph ¶
type Graph struct {
// contains filtered or unexported fields
}
Graph is the definition of a workflow. It describes the tasks, transitions between them, and reducers for merging state during fan-in.
Use NewGraph to create a new graph, then AddTask, AddTransition, AddTransitionWhen, SetEntryPoint, and SetReducer to build it up.
func (*Graph) AddErrorTransition ¶
AddErrorTransition adds a transition that is taken when the source task returns an error. The error is serialized as a TracedError into the state field "onErr" of the target task. Error transitions cannot be combined with When, ForEach, or WithGoto. Both endpoints are auto-registered as tasks if not already present.
func (*Graph) AddSubgraph ¶
AddSubgraph registers a child workflow as a subgraph node in the graph. Transitions can target a subgraph name just like a task name. When the foreman reaches a subgraph transition, it creates and runs the child workflow, blocking the parent until the child completes. Duplicate names are ignored.
func (*Graph) AddTask ¶
AddTask registers a task node in the graph. The first node added becomes the default entry point unless SetEntryPoint is called explicitly. Duplicate nodes are ignored. The pseudo-node END is not registered.
func (*Graph) AddTransition ¶
AddTransition adds an unconditional transition between two nodes. Both endpoints are auto-registered as tasks if not already present.
func (*Graph) AddTransitionForEach ¶
AddTransitionForEach adds a dynamic fan-out transition. The foreman iterates over the state field named by forEach and spawns one instance of the target task per element. The as parameter sets the state key name for the current element (defaults to "item" if empty). Both endpoints are auto-registered as tasks if not already present. If the array is empty, no tasks are spawned. When this is the only outgoing transition from a task, an empty array causes the flow to complete at that point.
func (*Graph) AddTransitionGoto ¶
AddTransitionGoto adds a transition that is only taken when the task explicitly calls flow.Goto with the target task URL. Both endpoints are auto-registered as tasks if not already present.
func (*Graph) AddTransitionWhen ¶
AddTransitionWhen adds a conditional transition between two nodes. The when expression is evaluated against the flow state to determine if the transition should be taken. Both endpoints are auto-registered as tasks if not already present.
func (*Graph) DeclareInputs ¶
DeclareInputs declares which state fields are passed into this graph when used as a subgraph. No arguments means nothing is passed in. Use "*" to pass everything. Named fields pass only those fields.
func (*Graph) DeclareOutputs ¶
DeclareOutputs declares which state fields are returned from this graph on completion. No arguments means nothing is returned. Use "*" to return everything. Named fields return only those fields. Output filtering applies both when used as a subgraph and as a root flow.
func (*Graph) EntryPoint ¶
EntryPoint returns the entry point task URL of the graph.
func (*Graph) ErrorTransition ¶
func (g *Graph) ErrorTransition(taskName string) (Transition, bool)
ErrorTransition returns the error transition from the given task, if one exists.
func (*Graph) IsSubgraph ¶
IsSubgraph returns true if the given name is registered as a subgraph.
func (*Graph) MarshalJSON ¶
MarshalJSON serializes the graph to JSON.
func (*Graph) SetEntryPoint ¶
SetEntryPoint sets the entry point of the graph explicitly, overriding the default (first task added).
func (*Graph) SetReducer ¶
SetReducer sets the merge strategy for a state field during fan-in.
func (*Graph) SetTimeBudget ¶
SetTimeBudget sets the execution time budget for a specific task. If not set, the foreman's DefaultTimeBudget config is used. The task must already be registered.
func (*Graph) TimeBudget ¶
TimeBudget returns the execution time budget for a specific task, or 0 if not set.
func (*Graph) Transitions ¶
func (g *Graph) Transitions() []Transition
Transitions returns the list of transitions in the graph.
func (*Graph) UnmarshalJSON ¶
UnmarshalJSON deserializes the graph from JSON.
type RawFlow ¶
type RawFlow struct {
Flow
}
RawFlow wraps Flow with additional methods used by the foreman orchestrator. Task endpoints should use Flow directly; RawFlow is for internal orchestration use only.
func NewRawFlow ¶
func NewRawFlow() *RawFlow
NewRawFlow creates a new RawFlow with initialized maps.
func (*RawFlow) ClearChanges ¶
func (f *RawFlow) ClearChanges()
ClearChanges resets the changes map. Called by the orchestrator after persisting changes.
func (*RawFlow) ClearControl ¶
func (f *RawFlow) ClearControl()
ClearControl resets all control signals. Called by the orchestrator after processing them.
func (*RawFlow) RawChanges ¶
RawChanges returns a copy of the raw changes map.
func (*RawFlow) SetAttempt ¶
SetAttempt sets the attempt counter on the flow. Called by the orchestrator before dispatching a task so that Retry can check whether attempts are exhausted.
func (*RawFlow) SetRawChanges ¶
SetRawChanges replaces the entire changes map with the given raw map.
func (*RawFlow) SetRawState ¶
SetRawState replaces the entire state with the given raw map, without tracking changes.
type Reducer ¶
type Reducer string
Reducer defines how concurrent state modifications from parallel tasks are merged during fan-in.
type Transition ¶
type Transition struct {
From string `json:"from"`
To string `json:"to"`
When string `json:"when,omitzero"`
WithGoto bool `json:"withGoto,omitzero"`
ForEach string `json:"forEach,omitzero"` // dynamic fan-out over a state field
As string `json:"as,omitzero"` // alias for the current element during forEach fan-out
OnError bool `json:"onError,omitzero"` // taken when the source task returns an error
}
Transition defines a possible transition between two tasks in a workflow graph.