workflow

package module
v0.8.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultRegistry = NewRegistry()

Functions

func GraphToDOT added in v0.8.3

func GraphToDOT(g *Graph) string

GraphToDOT renders an in-memory Graph to Graphviz DOT.

func GraphToMermaid added in v0.8.3

func GraphToMermaid(g *Graph) string

GraphToMermaid renders an in-memory Graph to Mermaid flowchart.

func RegisterUnit

func RegisterUnit(name string, unit ExecutableUnit)

RegisterUnit keeps backward compatibility with the old API.

func RegisterUnitFactory added in v0.8.3

func RegisterUnitFactory(name string, factory UnitFactory)

RegisterUnitFactory registers a factory for a unit.

func Test_Json_To_Graph

func Test_Json_To_Graph()

func ValidateLayoutBasic added in v0.8.3

func ValidateLayoutBasic(def *WorkflowDefinition, layout map[string]any) error

ValidateLayoutBasic performs a minimal structural check for layout JSON. Accepted forms: 1) { "<nodeId>": { "x": number, "y": number, ... }, ... } 2) { "nodes": { "<nodeId>": { "x": number, "y": number } }, "subflows": { "<whileId>": { "nodes": {...} } } } Subflow keys must refer to WhileUnit nodes.

func WorkflowToDOT added in v0.8.3

func WorkflowToDOT(def *WorkflowDefinition) string

WorkflowToDOT renders a WorkflowDefinition to Graphviz DOT.

func WorkflowToMermaid added in v0.8.3

func WorkflowToMermaid(def *WorkflowDefinition) string

WorkflowToMermaid renders a WorkflowDefinition to Mermaid flowchart.

Types

type BranchFunc

type BranchFunc func(result *ExecutionResult, state ContextMap) string

type BranchMode added in v0.8.3

type BranchMode string
const (
	BranchAll   BranchMode = "all"
	BranchFirst BranchMode = "first"
)

type ChannelSink added in v0.8.3

type ChannelSink struct {
	// contains filtered or unexported fields
}

func NewChannelSink added in v0.8.3

func NewChannelSink(buf int) *ChannelSink

func (*ChannelSink) Channel added in v0.8.3

func (s *ChannelSink) Channel() <-chan Event

func (*ChannelSink) Close added in v0.8.3

func (s *ChannelSink) Close()

func (*ChannelSink) Emit added in v0.8.3

func (s *ChannelSink) Emit(ctx context.Context, e Event)

type ComposerDocument added in v0.8.3

type ComposerDocument struct {
	Definition *WorkflowDefinition `json:"definition"`
	Layout     json.RawMessage     `json:"layout,omitempty"`
}

ComposerDocument stores workflow definition plus optional UI layout metadata. Layout is intentionally kept as opaque JSON to avoid tight coupling with the UI.

func ParseComposerJSON added in v0.8.3

func ParseComposerJSON(data []byte) (*ComposerDocument, error)

func (*ComposerDocument) ValidateBasic added in v0.8.3

func (d *ComposerDocument) ValidateBasic(layoutValidator LayoutValidator) error

ValidateBasic validates the definition and (optionally) the layout.

type ContextMap

type ContextMap map[string]*ExecutionResult

type ControlSignal added in v0.8.3

type ControlSignal string
const (
	ControlBreak    ControlSignal = "break"
	ControlContinue ControlSignal = "continue"
	ControlGoto     ControlSignal = "goto"
)

type ControlSignalError added in v0.8.3

type ControlSignalError struct {
	Signal ControlSignal
	Target string
}

func (*ControlSignalError) Error added in v0.8.3

func (e *ControlSignalError) Error() string

type Duration added in v0.8.3

type Duration struct {
	time.Duration
}

Duration supports JSON input as "1s"/"500ms" or a number (milliseconds).

func (Duration) MarshalJSON added in v0.8.3

func (d Duration) MarshalJSON() ([]byte, error)

func (*Duration) UnmarshalJSON added in v0.8.3

func (d *Duration) UnmarshalJSON(b []byte) error

type EdgeEnv added in v0.8.3

type EdgeEnv struct {
	State  map[string]*ExecutionResult
	Result *ExecutionResult
	Node   string
}

type EdgeSpec added in v0.8.3

type EdgeSpec struct {
	From  string `json:"from"`
	To    string `json:"to"`
	When  string `json:"when,omitempty"`
	Label string `json:"label,omitempty"`
	Order int    `json:"order,omitempty"`
}

type Engine added in v0.8.3

type Engine struct {
	Registry    *Registry
	Store       StateStore
	Concurrency int
	Logger      *slog.Logger
	FailFast    bool
}

func NewEngine added in v0.8.3

func NewEngine() *Engine

func (*Engine) Resume added in v0.8.3

func (e *Engine) Resume(ctx context.Context, def *WorkflowDefinition, runID string, opts *RunOptions) (*ExecutionState, error)

func (*Engine) Run added in v0.8.3

type Event added in v0.8.3

type Event struct {
	Type       EventType `json:"type,omitempty"`
	WorkflowID string    `json:"workflow_id,omitempty"`
	RunID      string    `json:"run_id,omitempty"`
	NodeID     string    `json:"node_id,omitempty"`
	Time       time.Time `json:"time,omitempty"`
	Data       any       `json:"data,omitempty"`
	Error      string    `json:"error,omitempty"`
}

type EventSink added in v0.8.3

type EventSink interface {
	Emit(ctx context.Context, e Event)
}

type EventType added in v0.8.3

type EventType string
const (
	EventRunStarted   EventType = "run_started"
	EventRunFinished  EventType = "run_finished"
	EventNodeStarted  EventType = "node_started"
	EventNodeFinished EventType = "node_finished"
	EventNodeFailed   EventType = "node_failed"
	EventNodeSkipped  EventType = "node_skipped"
	EventEdgeError    EventType = "edge_error"
	EventOutput       EventType = "output"
)

type ExecutableUnit

type ExecutableUnit interface {
	GetUnitMeta() *Unit
	Execute(ctx context.Context, state ContextMap, self *Node) (*ExecutionResult, error)
}

func FindUnit

func FindUnit(name string) (ExecutableUnit, bool)

FindUnit returns a fresh unit instance if registered.

type ExecutionResult

type ExecutionResult struct {
	NodeName      string        `json:"node_name,omitempty"`
	Data          any           `json:"data,omitempty"`
	Stream        bool          `json:"stream,omitempty"`
	Raw           any           `json:"raw,omitempty"`
	Error         string        `json:"error,omitempty"`
	Control       ControlSignal `json:"control,omitempty"`
	ControlTarget string        `json:"control_target,omitempty"`
}

func SimpleResult

func SimpleResult(data any) *ExecutionResult

type ExecutionState added in v0.8.3

type ExecutionState struct {
	WorkflowID string                `json:"workflow_id,omitempty"`
	RunID      string                `json:"run_id,omitempty"`
	Status     RunStatus             `json:"status,omitempty"`
	StartedAt  time.Time             `json:"started_at,omitempty"`
	UpdatedAt  time.Time             `json:"updated_at,omitempty"`
	Nodes      map[string]*NodeState `json:"nodes,omitempty"`
}

func (*ExecutionState) Clone added in v0.8.3

func (s *ExecutionState) Clone() *ExecutionState

type Graph

type Graph struct {
	Nodes map[string]*Node
	Edges map[string][]string
	Hooks struct {
		Before func(name string, state ContextMap)
		After  func(name string, result any, err error, state ContextMap)
	}
	// contains filtered or unexported fields
}

func BuildGraphFromJSON

func BuildGraphFromJSON(data []byte) (*Graph, error)

BuildGraphFromJSON Graph represents a directed graph structure with nodes and edges.

func NewDSLGraph

func NewDSLGraph() *Graph

func (*Graph) AddBranch

func (g *Graph) AddBranch(name string, exec NodeFunc, branch BranchFunc)

func (*Graph) AddEdge

func (g *Graph) AddEdge(from, to string)

func (*Graph) AddNode

func (g *Graph) AddNode(name string, node *Node)

func (*Graph) Branch

func (g *Graph) Branch(name string, fn NodeFunc, branch BranchFunc) *Graph

func (*Graph) Loop

func (g *Graph) Loop(name string, fn NodeFunc, cond LoopCondFunc) *Graph

func (*Graph) OnAfter

func (g *Graph) OnAfter(fn func(string, any, error, ContextMap)) *Graph

func (*Graph) OnBefore

func (g *Graph) OnBefore(fn func(string, ContextMap)) *Graph

func (*Graph) Parallel

func (g *Graph) Parallel(name string, fns ...NodeFunc) *Graph

func (*Graph) Run

func (g *Graph) Run(ctx context.Context, start string, state ContextMap) error

func (*Graph) RunWithDSL

func (g *Graph) RunWithDSL(ctx context.Context, state ContextMap) error

func (*Graph) StartWith

func (g *Graph) StartWith(name string, fn NodeFunc) *Graph

func (*Graph) Then

func (g *Graph) Then(name string, fn NodeFunc) *Graph

type GraphJSON

type GraphJSON struct {
	Nodes map[string]NodeJSON `json:"nodes"`
	Edges map[string][]string `json:"edges"`
}

type Input

type Input struct {
	Data      any    `json:"data,omitempty"`      //最终输出
	DataType  string `json:"data_type,omitempty"` // plaintext, json, json_array,socket
	Slottable bool   `json:"slottable,omitempty"` // 是否是可插槽的
}

type JoinPolicy added in v0.8.3

type JoinPolicy string
const (
	JoinAny JoinPolicy = "any"
	JoinAll JoinPolicy = "all"
)

type LayoutValidator added in v0.8.3

type LayoutValidator interface {
	Validate(def *WorkflowDefinition, layout map[string]any) error
}

LayoutValidator can validate layout structure against a workflow definition. Layout is provided as a generic JSON object to keep storage flexible.

type LayoutValidatorFunc added in v0.8.3

type LayoutValidatorFunc func(def *WorkflowDefinition, layout map[string]any) error

func (LayoutValidatorFunc) Validate added in v0.8.3

func (f LayoutValidatorFunc) Validate(def *WorkflowDefinition, layout map[string]any) error

type LoopCondFunc

type LoopCondFunc func(state ContextMap) bool

type Node

type Node struct {
	ID           string
	Name         string
	Input        *Input
	Params       map[string]any
	Execute      NodeFunc
	Branch       BranchFunc            // 可选分支函数
	Parallel     bool                  // 是否并行节点
	LoopCond     func(ContextMap) bool // 可选循环条件
	ExportFields []string              // 导出字段,用于供下游引用
}

type NodeFunc

type NodeFunc func(ctx context.Context, state ContextMap, self *Node) (*ExecutionResult, error)

type NodeJSON

type NodeJSON struct {
	UnitID string         `json:"unit_id"`
	Name   string         `json:"name"`
	Input  *Input         `json:"input,omitempty"`
	Params map[string]any `json:"params,omitempty"`
}

type NodeOptions added in v0.8.3

type NodeOptions struct {
	Timeout         Duration   `json:"timeout,omitempty"`
	Retries         int        `json:"retries,omitempty"`
	RetryBackoff    Duration   `json:"retry_backoff,omitempty"`
	RetryBackoffMax Duration   `json:"retry_backoff_max,omitempty"`
	Join            JoinPolicy `json:"join,omitempty"`
	ContinueOnError bool       `json:"continue_on_error,omitempty"`
	BranchMode      BranchMode `json:"branch_mode,omitempty"`
}

type NodeRole added in v0.8.3

type NodeRole string
const (
	RoleExec    NodeRole = "exec"
	RoleConfig  NodeRole = "config"
	RoleTrigger NodeRole = "trigger"
)

type NodeSpec added in v0.8.3

type NodeSpec struct {
	ID           string         `json:"id,omitempty"`
	Name         string         `json:"name,omitempty"`
	Unit         string         `json:"unit,omitempty"`
	UnitID       string         `json:"unit_id,omitempty"`
	Input        *Input         `json:"input,omitempty"`
	Params       map[string]any `json:"params,omitempty"`
	ExportFields []string       `json:"export_fields,omitempty"`
	Options      *NodeOptions   `json:"options,omitempty"`
	Role         NodeRole       `json:"role,omitempty"`
	RunMode      RunMode        `json:"run_mode,omitempty"`
	DependsOn    []string       `json:"depends_on,omitempty"`
}

type NodeState added in v0.8.3

type NodeState struct {
	ID         string           `json:"id,omitempty"`
	Name       string           `json:"name,omitempty"`
	Status     NodeStatus       `json:"status,omitempty"`
	Attempts   int              `json:"attempts,omitempty"`
	StartedAt  time.Time        `json:"started_at,omitempty"`
	FinishedAt time.Time        `json:"finished_at,omitempty"`
	Result     *ExecutionResult `json:"result,omitempty"`
	Error      string           `json:"error,omitempty"`
}

type NodeStatus added in v0.8.3

type NodeStatus string
const (
	NodePending   NodeStatus = "pending"
	NodeQueued    NodeStatus = "queued"
	NodeRunning   NodeStatus = "running"
	NodeSucceeded NodeStatus = "succeeded"
	NodeFailed    NodeStatus = "failed"
	NodeSkipped   NodeStatus = "skipped"
)

type Registry added in v0.8.3

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry added in v0.8.3

func NewRegistry() *Registry

func (*Registry) Names added in v0.8.3

func (r *Registry) Names() []string

func (*Registry) New added in v0.8.3

func (r *Registry) New(name string) (ExecutableUnit, bool)

func (*Registry) Register added in v0.8.3

func (r *Registry) Register(name string, factory UnitFactory)

func (*Registry) RegisterInstance added in v0.8.3

func (r *Registry) RegisterInstance(name string, unit ExecutableUnit)

RegisterInstance registers a unit by inferring a safe factory via reflection. Prefer Register with an explicit factory for full control.

type RunMode added in v0.8.3

type RunMode string
const (
	RunModeLazy   RunMode = "lazy"
	RunModeEager  RunMode = "eager"
	RunModeManual RunMode = "manual"
)

type RunOptions added in v0.8.3

type RunOptions struct {
	RunID         string
	Start         []string
	Concurrency   int
	Store         StateStore
	Sink          EventSink
	FailFast      *bool
	AllowCycles   bool
	StopOnControl bool
	SeedState     ContextMap
	SeedExports   map[string][]string
	ResumeState   *ExecutionState
}

type RunStatus added in v0.8.3

type RunStatus string
const (
	RunRunning             RunStatus = "running"
	RunSucceeded           RunStatus = "succeeded"
	RunFailed              RunStatus = "failed"
	RunCancelled           RunStatus = "cancelled"
	RunCompletedWithErrors RunStatus = "completed_with_errors"
)

type StateStore added in v0.8.3

type StateStore interface {
	Save(ctx context.Context, state *ExecutionState) error
	Load(ctx context.Context, workflowID, runID string) (*ExecutionState, error)
}

type Unit

type Unit struct {
	ID          string `json:"id"`
	UnitName    string `json:"unit_name,omitempty"`
	DisplayName string `json:"display_name,omitempty"` // UI 显示名(如“调用接口”)
	Status      string `json:"status,omitempty"`
	ErrMsg      string `json:"err_msg,omitempty"`    // 如果失败,存错误
	OutputRef   string `json:"output_ref,omitempty"` // 输出导出的字段名(用于 UI 显示)
}

type UnitFactory added in v0.8.3

type UnitFactory func() ExecutableUnit

type WorkflowDefinition added in v0.8.3

type WorkflowDefinition struct {
	ID    string               `json:"id,omitempty"`
	Name  string               `json:"name,omitempty"`
	Start []string             `json:"start,omitempty"`
	Nodes map[string]*NodeSpec `json:"nodes,omitempty"`
	Edges []EdgeSpec           `json:"edges,omitempty"`
}

func ParseWorkflowJSON added in v0.8.3

func ParseWorkflowJSON(data []byte) (*WorkflowDefinition, error)

func (*WorkflowDefinition) UnmarshalJSON added in v0.8.3

func (w *WorkflowDefinition) UnmarshalJSON(data []byte) error

func (*WorkflowDefinition) ValidateBasic added in v0.8.3

func (w *WorkflowDefinition) ValidateBasic() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL