workflow

package
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotConfigured = errors.New("step dependency not configured")

ErrNotConfigured is returned when a step's required dependency (Gateway, Registry, Handler) has not been injected. Callers can check for this with errors.Is(err, ErrNotConfigured).

Functions

func ValidateDAGDefinition

func ValidateDAGDefinition(def *DAGDefinition) error

ValidateDAGDefinition validates a loaded DAGDefinition

func WithWorkflowStreamEmitter added in v1.0.0

func WithWorkflowStreamEmitter(ctx context.Context, emitter WorkflowStreamEmitter) context.Context

WithWorkflowStreamEmitter stores a WorkflowStreamEmitter in the context.

Types

type CheckpointDiff

type CheckpointDiff struct {
	Version1       int           `json:"version1"`
	Version2       int           `json:"version2"`
	AddedNodes     []string      `json:"added_nodes"`
	RemovedNodes   []string      `json:"removed_nodes"`
	ChangedNodes   []string      `json:"changed_nodes"`
	TimeDifference time.Duration `json:"time_difference"`
}

CheckpointDiff represents differences between checkpoints.

type CheckpointManager

type CheckpointManager interface {
	SaveCheckpoint(ctx context.Context, checkpoint *EnhancedCheckpoint) error
}

CheckpointManager interface for checkpoint integration

type CheckpointStore

type CheckpointStore interface {
	Save(ctx context.Context, checkpoint *EnhancedCheckpoint) error
	Load(ctx context.Context, checkpointID string) (*EnhancedCheckpoint, error)
	LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
	LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
	ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
	Delete(ctx context.Context, checkpointID string) error
}

CheckpointStore defines storage interface for enhanced checkpoints (Workflow layer).

Note: Two CheckpointStore interfaces exist in the project, operating on different types:

  • agent.CheckpointStore — operates on *agent.Checkpoint (agent state, List/DeleteThread/Rollback)
  • workflow.CheckpointStore (this) — operates on *workflow.EnhancedCheckpoint (DAG node results, time-travel)

They cannot be unified because the checkpoint structs have different fields (agent state vs DAG execution state).

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker 熔断器实现

func NewCircuitBreaker

func NewCircuitBreaker(
	nodeID string,
	config CircuitBreakerConfig,
	eventHandler CircuitBreakerEventHandler,
	logger *zap.Logger,
) *CircuitBreaker

NewCircuitBreaker 创建熔断器

func (*CircuitBreaker) AllowRequest

func (cb *CircuitBreaker) AllowRequest() (bool, error)

AllowRequest 检查是否允许请求通过

func (*CircuitBreaker) GetFailures

func (cb *CircuitBreaker) GetFailures() int

GetFailures 获取当前失败次数

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitState

GetState 获取当前状态

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure 记录失败

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess 记录成功

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset 重置熔断器

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	// FailureThreshold 连续失败次数阈值,达到后触发熔断
	FailureThreshold int `json:"failure_threshold"`
	// RecoveryTimeout 熔断后等待恢复的时间
	RecoveryTimeout Duration `json:"recovery_timeout"`
	// HalfOpenMaxProbes 半开状态允许的探测请求数
	HalfOpenMaxProbes int `json:"half_open_max_probes"`
	// SuccessThresholdInHalfOpen 半开状态下连续成功多少次后恢复
	SuccessThresholdInHalfOpen int `json:"success_threshold_in_half_open"`
}

CircuitBreakerConfig 熔断器配置

func DefaultCircuitBreakerConfig

func DefaultCircuitBreakerConfig() CircuitBreakerConfig

DefaultCircuitBreakerConfig 默认熔断器配置

type CircuitBreakerEvent

type CircuitBreakerEvent struct {
	NodeID    string       `json:"node_id"`
	OldState  CircuitState `json:"old_state"`
	NewState  CircuitState `json:"new_state"`
	Timestamp time.Time    `json:"timestamp"`
	Reason    string       `json:"reason"`
	Failures  int          `json:"failures"`
}

CircuitBreakerEvent 熔断器状态变更事件

type CircuitBreakerEventHandler

type CircuitBreakerEventHandler interface {
	OnStateChange(event CircuitBreakerEvent)
}

CircuitBreakerEventHandler 事件处理器接口

type CircuitBreakerRegistry

type CircuitBreakerRegistry struct {
	// contains filtered or unexported fields
}

CircuitBreakerRegistry 熔断器注册表,管理所有节点的熔断器

func NewCircuitBreakerRegistry

func NewCircuitBreakerRegistry(
	config CircuitBreakerConfig,
	eventHandler CircuitBreakerEventHandler,
	logger *zap.Logger,
) *CircuitBreakerRegistry

NewCircuitBreakerRegistry 创建熔断器注册表

func (*CircuitBreakerRegistry) GetAllStates

func (r *CircuitBreakerRegistry) GetAllStates() map[string]CircuitState

GetAllStates 获取所有熔断器状态

func (*CircuitBreakerRegistry) GetOrCreate

func (r *CircuitBreakerRegistry) GetOrCreate(nodeID string) *CircuitBreaker

GetOrCreate 获取或创建节点的熔断器

func (*CircuitBreakerRegistry) ResetAll

func (r *CircuitBreakerRegistry) ResetAll()

ResetAll 重置所有熔断器

type CircuitState

type CircuitState int

CircuitState represents the state of a circuit breaker. This is an independent definition equivalent to circuitbreaker.State in llm/circuitbreaker/. The workflow package maintains its own copy to avoid depending on the llm package (dependency direction: llm <- workflow, not workflow -> llm).

const (
	// CircuitClosed 正常状态,允许请求通过
	CircuitClosed CircuitState = iota
	// CircuitOpen 熔断状态,拒绝所有请求
	CircuitOpen
	// CircuitHalfOpen 半开状态,允许探测请求
	CircuitHalfOpen
)

func (CircuitState) String

func (s CircuitState) String() string

type CodeStep

type CodeStep struct {
	Handler func(ctx context.Context, input any) (any, error)
}

CodeStep executes custom code via an injected Go handler function.

func (*CodeStep) Execute

func (s *CodeStep) Execute(ctx context.Context, input any) (any, error)

func (*CodeStep) Name

func (s *CodeStep) Name() string

type ConditionFunc

type ConditionFunc func(ctx context.Context, input any) (bool, error)

ConditionFunc evaluates a condition and returns true or false

type DAGBuilder

type DAGBuilder struct {
	// contains filtered or unexported fields
}

DAGBuilder provides a fluent API for constructing DAG workflows

func NewDAGBuilder

func NewDAGBuilder(name string) *DAGBuilder

NewDAGBuilder creates a new DAG builder with the given name

func (*DAGBuilder) AddEdge

func (b *DAGBuilder) AddEdge(from, to string) *DAGBuilder

AddEdge adds a directed edge from one node to another

func (*DAGBuilder) AddNode

func (b *DAGBuilder) AddNode(id string, nodeType NodeType) *NodeBuilder

AddNode adds a node to the graph and returns a NodeBuilder for configuration

func (*DAGBuilder) Build

func (b *DAGBuilder) Build() (*DAGWorkflow, error)

Build validates the DAG and creates a DAGWorkflow

func (*DAGBuilder) SetEntry

func (b *DAGBuilder) SetEntry(nodeID string) *DAGBuilder

SetEntry sets the entry node for the workflow

func (*DAGBuilder) WithDescription

func (b *DAGBuilder) WithDescription(desc string) *DAGBuilder

WithDescription sets the workflow description

func (*DAGBuilder) WithLogger

func (b *DAGBuilder) WithLogger(logger *zap.Logger) *DAGBuilder

WithLogger sets a custom logger

type DAGDefinition

type DAGDefinition struct {
	// Name is the workflow name
	Name string `json:"name" yaml:"name"`
	// Description describes the workflow
	Description string `json:"description" yaml:"description"`
	// Entry is the ID of the entry node
	Entry string `json:"entry" yaml:"entry"`
	// Nodes contains all node definitions
	Nodes []NodeDefinition `json:"nodes" yaml:"nodes"`
	// Metadata stores additional workflow information
	Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}

DAGDefinition represents a serializable workflow definition

func FromJSON

func FromJSON(jsonStr string) (*DAGDefinition, error)

FromJSON creates a DAGDefinition from JSON string

func FromYAML

func FromYAML(yamlStr string) (*DAGDefinition, error)

FromYAML creates a DAGDefinition from YAML string

func LoadFromJSONFile

func LoadFromJSONFile(filename string) (*DAGDefinition, error)

LoadFromJSONFile loads a DAGDefinition from a JSON file

func LoadFromYAMLFile

func LoadFromYAMLFile(filename string) (*DAGDefinition, error)

LoadFromYAMLFile loads a DAGDefinition from a YAML file

func (*DAGDefinition) MarshalJSON

func (d *DAGDefinition) MarshalJSON() ([]byte, error)

MarshalJSON serializes a DAGDefinition to JSON

func (*DAGDefinition) MarshalYAML

func (d *DAGDefinition) MarshalYAML() (any, error)

MarshalYAML serializes a DAGDefinition to YAML

func (*DAGDefinition) SaveToJSONFile

func (d *DAGDefinition) SaveToJSONFile(filename string) error

SaveToJSONFile saves a DAGDefinition to a JSON file

func (*DAGDefinition) SaveToYAMLFile

func (d *DAGDefinition) SaveToYAMLFile(filename string) error

SaveToYAMLFile saves a DAGDefinition to a YAML file

func (*DAGDefinition) ToDAGWorkflow added in v1.4.6

func (d *DAGDefinition) ToDAGWorkflow() (*DAGWorkflow, error)

ToDAGWorkflow converts a validated DAGDefinition into an executable DAGWorkflow. Runtime-only handlers (step logic/condition expression parsing) are intentionally represented with safe defaults so structure can execute deterministically.

func (*DAGDefinition) ToJSON

func (d *DAGDefinition) ToJSON() (string, error)

ToJSON converts a DAGDefinition to JSON string

func (*DAGDefinition) ToYAML

func (d *DAGDefinition) ToYAML() (string, error)

ToYAML converts a DAGDefinition to YAML string

func (*DAGDefinition) UnmarshalJSON

func (d *DAGDefinition) UnmarshalJSON(data []byte) error

UnmarshalJSON deserializes a DAGDefinition from JSON

func (*DAGDefinition) UnmarshalYAML

func (d *DAGDefinition) UnmarshalYAML(node *yaml.Node) error

UnmarshalYAML deserializes a DAGDefinition from YAML

type DAGExecutor

type DAGExecutor struct {
	// contains filtered or unexported fields
}

DAGExecutor executes DAG workflows with dependency resolution

func NewDAGExecutor

func NewDAGExecutor(checkpointMgr CheckpointManager, logger *zap.Logger) *DAGExecutor

NewDAGExecutor creates a new DAG executor

func (*DAGExecutor) Execute

func (e *DAGExecutor) Execute(ctx context.Context, graph *DAGGraph, input any) (any, error)

Execute runs the DAG workflow with dependency resolution. Bug fix (P0): executeMu ensures that concurrent Execute() calls on the same executor are serialized, preventing data races on shared execution state.

func (*DAGExecutor) GetCircuitBreakerStates

func (e *DAGExecutor) GetCircuitBreakerStates() map[string]CircuitState

GetCircuitBreakerStates 获取所有熔断器状态

func (*DAGExecutor) GetExecutionID

func (e *DAGExecutor) GetExecutionID() string

GetExecutionID returns the current execution ID

func (*DAGExecutor) GetHistory

func (e *DAGExecutor) GetHistory() *ExecutionHistory

GetHistory returns the execution history for the current execution

func (*DAGExecutor) GetHistoryStore

func (e *DAGExecutor) GetHistoryStore() *ExecutionHistoryStore

GetHistoryStore returns the history store

func (*DAGExecutor) GetNodeResult

func (e *DAGExecutor) GetNodeResult(nodeID string) (any, bool)

GetNodeResult retrieves the result of a completed node

func (*DAGExecutor) SetCircuitBreakerConfig

func (e *DAGExecutor) SetCircuitBreakerConfig(config CircuitBreakerConfig, handler CircuitBreakerEventHandler)

SetCircuitBreakerConfig 设置熔断器配置

func (*DAGExecutor) SetHistoryStore

func (e *DAGExecutor) SetHistoryStore(store *ExecutionHistoryStore)

SetHistoryStore sets a custom history store

type DAGGraph

type DAGGraph struct {
	// contains filtered or unexported fields
}

DAGGraph represents the workflow structure as a directed acyclic graph

func NewDAGGraph

func NewDAGGraph() *DAGGraph

NewDAGGraph creates a new empty DAG graph

func (*DAGGraph) AddEdge

func (g *DAGGraph) AddEdge(fromID, toID string)

AddEdge adds a directed edge from one node to another

func (*DAGGraph) AddNode

func (g *DAGGraph) AddNode(node *DAGNode)

AddNode adds a node to the graph

func (*DAGGraph) Edges

func (g *DAGGraph) Edges() map[string][]string

Edges returns all edges in the graph

func (*DAGGraph) GetEdges

func (g *DAGGraph) GetEdges(nodeID string) []string

GetEdges retrieves the outgoing edges for a node

func (*DAGGraph) GetEntry

func (g *DAGGraph) GetEntry() string

GetEntry returns the entry node ID

func (*DAGGraph) GetNode

func (g *DAGGraph) GetNode(nodeID string) (*DAGNode, bool)

GetNode retrieves a node by ID

func (*DAGGraph) Nodes

func (g *DAGGraph) Nodes() map[string]*DAGNode

Nodes returns all nodes in the graph

func (*DAGGraph) SetEntry

func (g *DAGGraph) SetEntry(nodeID string)

SetEntry sets the entry node for the graph

type DAGNode

type DAGNode struct {
	// ID is the unique identifier for this node
	ID string
	// Type specifies the node type
	Type NodeType
	// Step is the step to execute (for action nodes)
	Step Step
	// Condition evaluates branching logic (for conditional nodes)
	Condition ConditionFunc
	// LoopConfig defines loop behavior (for loop nodes)
	LoopConfig *LoopConfig
	// SubGraph is a nested workflow (for subgraph nodes)
	SubGraph *DAGGraph
	// ErrorConfig defines error handling behavior
	ErrorConfig *ErrorConfig
	// Metadata stores additional node information
	Metadata map[string]any
}

DAGNode represents a single node in the workflow graph

type DAGWorkflow

type DAGWorkflow struct {
	// contains filtered or unexported fields
}

DAGWorkflow represents a DAG-based workflow

func NewDAGWorkflow

func NewDAGWorkflow(name, description string, graph *DAGGraph) *DAGWorkflow

NewDAGWorkflow creates a new DAG workflow

func (*DAGWorkflow) Description

func (w *DAGWorkflow) Description() string

Description returns the workflow description

func (*DAGWorkflow) Execute

func (w *DAGWorkflow) Execute(ctx context.Context, input any) (any, error)

Execute executes the DAG workflow using DAGExecutor

func (*DAGWorkflow) GetMetadata

func (w *DAGWorkflow) GetMetadata(key string) (any, bool)

GetMetadata retrieves a metadata value

func (*DAGWorkflow) Graph

func (w *DAGWorkflow) Graph() *DAGGraph

Graph returns the underlying DAG graph

func (*DAGWorkflow) Name

func (w *DAGWorkflow) Name() string

Name returns the workflow name

func (*DAGWorkflow) SetExecutor

func (w *DAGWorkflow) SetExecutor(executor *DAGExecutor)

SetExecutor sets a custom executor for the workflow

func (*DAGWorkflow) SetMetadata

func (w *DAGWorkflow) SetMetadata(key string, value any)

SetMetadata sets a metadata value

func (*DAGWorkflow) ToDAGDefinition

func (w *DAGWorkflow) ToDAGDefinition() *DAGDefinition

ToDAGDefinition converts a DAGWorkflow to a DAGDefinition for serialization Note: This only captures the structure, not the runtime functions (conditions, iterators, steps)

type Duration added in v1.0.0

type Duration struct {
	time.Duration
}

Duration wraps time.Duration with human-readable JSON serialization. JSON output is a string like "30s", "5m", "1h30m" instead of nanoseconds.

func (Duration) MarshalJSON added in v1.0.0

func (d Duration) MarshalJSON() ([]byte, error)

MarshalJSON serializes Duration as a human-readable string.

func (*Duration) UnmarshalJSON added in v1.0.0

func (d *Duration) UnmarshalJSON(b []byte) error

UnmarshalJSON deserializes Duration from a string (e.g. "30s").

type EnhancedCheckpoint

type EnhancedCheckpoint struct {
	ID             string         `json:"id"`
	WorkflowID     string         `json:"workflow_id"`
	ThreadID       string         `json:"thread_id"`
	Version        int            `json:"version"`
	NodeID         string         `json:"node_id"`
	NodeResults    map[string]any `json:"node_results"`
	Variables      map[string]any `json:"variables"`
	PendingNodes   []string       `json:"pending_nodes"`
	CompletedNodes []string       `json:"completed_nodes"`
	Input          any            `json:"input"`
	CreatedAt      time.Time      `json:"created_at"`
	ParentID       string         `json:"parent_id,omitempty"`
	Metadata       map[string]any `json:"metadata,omitempty"`
	Snapshot       *GraphSnapshot `json:"snapshot,omitempty"`
}

EnhancedCheckpoint represents a workflow checkpoint with full state.

type EnhancedCheckpointManager

type EnhancedCheckpointManager struct {
	// contains filtered or unexported fields
}

EnhancedCheckpointManager manages workflow checkpoints with time-travel.

func NewEnhancedCheckpointManager

func NewEnhancedCheckpointManager(store CheckpointStore, logger *zap.Logger) *EnhancedCheckpointManager

NewEnhancedCheckpointManager creates a new checkpoint manager.

func (*EnhancedCheckpointManager) Compare

func (m *EnhancedCheckpointManager) Compare(ctx context.Context, threadID string, v1, v2 int) (*CheckpointDiff, error)

Compare compares two checkpoint versions.

func (*EnhancedCheckpointManager) CreateCheckpoint

func (m *EnhancedCheckpointManager) CreateCheckpoint(ctx context.Context, executor *DAGExecutor, graph *DAGGraph, threadID string, input any) (*EnhancedCheckpoint, error)

CreateCheckpoint creates a checkpoint from current execution state.

func (*EnhancedCheckpointManager) GetHistory

func (m *EnhancedCheckpointManager) GetHistory(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)

GetHistory returns checkpoint history for time-travel debugging.

func (*EnhancedCheckpointManager) ResumeFromCheckpoint

func (m *EnhancedCheckpointManager) ResumeFromCheckpoint(ctx context.Context, checkpointID string, graph *DAGGraph) (*DAGExecutor, error)

ResumeFromCheckpoint resumes workflow execution from a checkpoint.

func (*EnhancedCheckpointManager) Rollback

func (m *EnhancedCheckpointManager) Rollback(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)

Rollback rolls back to a specific version.

type ErrorConfig

type ErrorConfig struct {
	// Strategy specifies how to handle errors
	Strategy ErrorStrategy
	// MaxRetries is the maximum number of retry attempts (for retry strategy)
	MaxRetries int
	// RetryDelayMs is the delay between retries in milliseconds
	RetryDelayMs int
	// FallbackValue is the value to use when skipping a failed node
	FallbackValue any
}

ErrorConfig defines error handling behavior for a node

type ErrorDefinition added in v1.0.0

type ErrorDefinition struct {
	// Strategy specifies how to handle errors (fail_fast, skip, retry)
	Strategy string `json:"strategy" yaml:"strategy"`
	// MaxRetries is the maximum number of retry attempts (for retry strategy)
	MaxRetries int `json:"max_retries,omitempty" yaml:"max_retries,omitempty"`
	// RetryDelayMs is the delay between retries in milliseconds
	RetryDelayMs int `json:"retry_delay_ms,omitempty" yaml:"retry_delay_ms,omitempty"`
	// FallbackValue is the value to use when skipping a failed node
	FallbackValue any `json:"fallback_value,omitempty" yaml:"fallback_value,omitempty"`
}

ErrorDefinition represents a serializable error handling configuration

type ErrorStrategy

type ErrorStrategy string

ErrorStrategy defines how errors should be handled

const (
	// ErrorStrategyFailFast stops execution immediately on error
	ErrorStrategyFailFast ErrorStrategy = "fail_fast"
	// ErrorStrategySkip skips the failed node and continues
	ErrorStrategySkip ErrorStrategy = "skip"
	// ErrorStrategyRetry retries the failed node
	ErrorStrategyRetry ErrorStrategy = "retry"
)

type ExecutionContext

type ExecutionContext struct {
	// WorkflowID identifies the workflow being executed
	WorkflowID string `json:"workflow_id,omitempty"`
	// CurrentNode is the ID of the currently executing node
	CurrentNode string `json:"current_node,omitempty"`
	// NodeResults stores the results of completed nodes
	NodeResults map[string]any `json:"node_results,omitempty"`
	// Variables stores workflow variables
	Variables map[string]any `json:"variables,omitempty"`
	// StartTime is when the workflow execution started
	StartTime time.Time `json:"start_time,omitempty"`
	// LastUpdateTime is when the context was last updated
	LastUpdateTime time.Time `json:"last_update_time,omitempty"`
}

ExecutionContext captures the execution state for checkpointing

func NewExecutionContext

func NewExecutionContext(workflowID string) *ExecutionContext

NewExecutionContext creates a new execution context

func (*ExecutionContext) GetNodeResult

func (ec *ExecutionContext) GetNodeResult(nodeID string) (any, bool)

GetNodeResult retrieves the result of a completed node

func (*ExecutionContext) GetVariable

func (ec *ExecutionContext) GetVariable(key string) (any, bool)

GetVariable retrieves a workflow variable

func (*ExecutionContext) SetCurrentNode

func (ec *ExecutionContext) SetCurrentNode(nodeID string)

SetCurrentNode updates the currently executing node

func (*ExecutionContext) SetNodeResult

func (ec *ExecutionContext) SetNodeResult(nodeID string, result any)

SetNodeResult stores the result of a completed node

func (*ExecutionContext) SetVariable

func (ec *ExecutionContext) SetVariable(key string, value any)

SetVariable sets a workflow variable

type ExecutionHistory

type ExecutionHistory struct {
	ExecutionID string           `json:"execution_id"`
	WorkflowID  string           `json:"workflow_id"`
	StartTime   time.Time        `json:"start_time"`
	EndTime     time.Time        `json:"end_time"`
	Duration    time.Duration    `json:"duration"`
	Status      ExecutionStatus  `json:"status"`
	Nodes       []*NodeExecution `json:"nodes"`
	Error       string           `json:"error,omitempty"`
	Metadata    map[string]any   `json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

ExecutionHistory records the complete execution path of a workflow

func NewExecutionHistory

func NewExecutionHistory(executionID, workflowID string) *ExecutionHistory

NewExecutionHistory creates a new execution history

func (*ExecutionHistory) Complete

func (h *ExecutionHistory) Complete(err error)

Complete marks the execution as completed

func (*ExecutionHistory) GetNodeByID

func (h *ExecutionHistory) GetNodeByID(nodeID string) *NodeExecution

GetNodeByID returns the execution record for a specific node

func (*ExecutionHistory) GetNodes

func (h *ExecutionHistory) GetNodes() []*NodeExecution

GetNodes returns a copy of the node executions

func (*ExecutionHistory) RecordNodeEnd

func (h *ExecutionHistory) RecordNodeEnd(node *NodeExecution, output any, err error)

RecordNodeEnd records the end of a node execution

func (*ExecutionHistory) RecordNodeStart

func (h *ExecutionHistory) RecordNodeStart(nodeID string, nodeType NodeType, input any) *NodeExecution

RecordNodeStart records the start of a node execution

type ExecutionHistoryStore

type ExecutionHistoryStore struct {
	// contains filtered or unexported fields
}

ExecutionHistoryStore stores and queries execution histories

func NewExecutionHistoryStore

func NewExecutionHistoryStore() *ExecutionHistoryStore

NewExecutionHistoryStore creates a new execution history store

func (*ExecutionHistoryStore) Get

func (s *ExecutionHistoryStore) Get(executionID string) (*ExecutionHistory, bool)

Get retrieves an execution history by ID

func (*ExecutionHistoryStore) ListByStatus

func (s *ExecutionHistoryStore) ListByStatus(status ExecutionStatus) []*ExecutionHistory

ListByStatus returns executions with a specific status

func (*ExecutionHistoryStore) ListByTimeRange

func (s *ExecutionHistoryStore) ListByTimeRange(start, end time.Time) []*ExecutionHistory

ListByTimeRange returns executions within a time range

func (*ExecutionHistoryStore) ListByWorkflow

func (s *ExecutionHistoryStore) ListByWorkflow(workflowID string) []*ExecutionHistory

ListByWorkflow returns all executions for a workflow

func (*ExecutionHistoryStore) Save

func (s *ExecutionHistoryStore) Save(history *ExecutionHistory)

Save saves an execution history

type ExecutionStatus

type ExecutionStatus = types.ExecutionStatus

ExecutionStatus represents the status of an execution. Uses the unified types.ExecutionStatus to avoid cross-layer coupling with agent/persistence.

const (
	// ExecutionStatusRunning indicates the execution is in progress
	ExecutionStatusRunning ExecutionStatus = types.ExecutionStatusRunning
	// ExecutionStatusCompleted indicates the execution completed successfully
	ExecutionStatusCompleted ExecutionStatus = types.ExecutionStatusCompleted
	// ExecutionStatusFailed indicates the execution failed
	ExecutionStatusFailed ExecutionStatus = types.ExecutionStatusFailed
)

type Facade added in v1.4.6

type Facade struct {
	// contains filtered or unexported fields
}

Facade 是 workflow 对外统一执行入口。 API/handler 层应通过该入口执行 workflow.Workflow,而不是直接操作具体 executor。

func NewFacade added in v1.4.6

func NewFacade(executor *DAGExecutor) *Facade

NewFacade 创建 workflow 执行门面。

func (*Facade) ExecuteDAG added in v1.4.6

func (f *Facade) ExecuteDAG(ctx context.Context, wf *DAGWorkflow, input any) (any, error)

ExecuteDAG 执行 DAG workflow,作为 workflow 对外统一执行入口。

type FuncStep

type FuncStep struct {
	// contains filtered or unexported fields
}

FuncStep 函数步骤实现

func NewFuncStep

func NewFuncStep(name string, fn StepFunc) *FuncStep

NewFuncStep 创建函数步骤

func (*FuncStep) Execute

func (s *FuncStep) Execute(ctx context.Context, input any) (any, error)

func (*FuncStep) Name

func (s *FuncStep) Name() string

type GraphSnapshot

type GraphSnapshot struct {
	Nodes     map[string]NodeSnapshot `json:"nodes"`
	Edges     map[string][]string     `json:"edges"`
	EntryNode string                  `json:"entry_node"`
}

GraphSnapshot captures the complete graph state.

type HumanInputHandler

type HumanInputHandler interface {
	// RequestInput sends a prompt to a human and waits for a response.
	// inputType hints at the expected response format (e.g. "text", "choice").
	// options provides selectable choices when inputType is "choice".
	RequestInput(ctx context.Context, prompt string, inputType string, options []string) (any, error)
}

HumanInputHandler abstracts human-in-the-loop interaction for workflow steps. Implement this interface to bridge workflow with your HITL management layer.

type HumanInputStep

type HumanInputStep struct {
	Prompt  string
	Type    string
	Options []string
	Timeout int
	Handler HumanInputHandler // Optional: inject to enable real HITL
}

HumanInputStep waits for human input. When Handler is set, it sends a request to the HITL handler and waits for a response.

func (*HumanInputStep) Execute

func (s *HumanInputStep) Execute(ctx context.Context, input any) (any, error)

func (*HumanInputStep) Name

func (s *HumanInputStep) Name() string

type InMemoryCheckpointStore

type InMemoryCheckpointStore struct {
	// contains filtered or unexported fields
}

InMemoryCheckpointStore provides in-memory storage.

func NewInMemoryCheckpointStore

func NewInMemoryCheckpointStore() *InMemoryCheckpointStore

NewInMemoryCheckpointStore creates a new in-memory store.

func (*InMemoryCheckpointStore) Delete

func (s *InMemoryCheckpointStore) Delete(ctx context.Context, id string) error

func (*InMemoryCheckpointStore) ListVersions

func (s *InMemoryCheckpointStore) ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)

func (*InMemoryCheckpointStore) Load

func (*InMemoryCheckpointStore) LoadLatest

func (s *InMemoryCheckpointStore) LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)

func (*InMemoryCheckpointStore) LoadVersion

func (s *InMemoryCheckpointStore) LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)

func (*InMemoryCheckpointStore) Save

type IteratorFunc

type IteratorFunc func(ctx context.Context, input any) ([]any, error)

IteratorFunc generates a collection of items for iteration

type LLMStep

type LLMStep struct {
	Model       string
	Prompt      string
	Temperature float64
	MaxTokens   int
	Gateway     core.GatewayLike // Optional: inject to enable real LLM calls
}

LLMStep executes an LLM call. When Gateway is set, it performs a real LLM invoke request.

func (*LLMStep) Execute

func (s *LLMStep) Execute(ctx context.Context, input any) (any, error)

func (*LLMStep) Name

func (s *LLMStep) Name() string

type LoopConfig

type LoopConfig struct {
	// Type specifies the loop type (while, for, foreach)
	Type LoopType
	// MaxIterations limits the maximum number of iterations (0 = unlimited)
	MaxIterations int
	// Condition evaluates whether to continue looping (for while loops)
	Condition ConditionFunc
	// Iterator generates items to iterate over (for foreach loops)
	Iterator IteratorFunc
}

LoopConfig defines loop behavior

type LoopDefinition

type LoopDefinition struct {
	// Type is the loop type (while, for, foreach)
	Type string `json:"type" yaml:"type"`
	// MaxIterations limits the maximum number of iterations
	MaxIterations int `json:"max_iterations" yaml:"max_iterations"`
	// Condition is the condition name (for while loops)
	Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
}

LoopDefinition represents a serializable loop configuration

type LoopType

type LoopType string

LoopType defines the type of loop

const (
	// LoopTypeWhile executes while condition is true
	LoopTypeWhile LoopType = "while"
	// LoopTypeFor executes for a fixed number of iterations
	LoopTypeFor LoopType = "for"
	// LoopTypeForEach executes for each item in a collection
	LoopTypeForEach LoopType = "foreach"
)

type NodeBuilder

type NodeBuilder struct {
	// contains filtered or unexported fields
}

NodeBuilder provides a fluent API for configuring individual nodes

func (*NodeBuilder) Done

func (nb *NodeBuilder) Done() *DAGBuilder

Done completes node configuration and returns to the DAGBuilder

func (*NodeBuilder) WithCondition

func (nb *NodeBuilder) WithCondition(cond ConditionFunc) *NodeBuilder

WithCondition sets the condition function for a conditional node

func (*NodeBuilder) WithErrorConfig

func (nb *NodeBuilder) WithErrorConfig(config ErrorConfig) *NodeBuilder

WithErrorConfig sets the error handling configuration for a node

func (*NodeBuilder) WithLoop

func (nb *NodeBuilder) WithLoop(config LoopConfig) *NodeBuilder

WithLoop sets the loop configuration for a loop node

func (*NodeBuilder) WithMetadata

func (nb *NodeBuilder) WithMetadata(key string, value any) *NodeBuilder

WithMetadata sets a metadata value

func (*NodeBuilder) WithOnFalse

func (nb *NodeBuilder) WithOnFalse(nodeIDs ...string) *NodeBuilder

WithOnFalse sets the nodes to execute when condition is false

func (*NodeBuilder) WithOnTrue

func (nb *NodeBuilder) WithOnTrue(nodeIDs ...string) *NodeBuilder

WithOnTrue sets the nodes to execute when condition is true

func (*NodeBuilder) WithStep

func (nb *NodeBuilder) WithStep(step Step) *NodeBuilder

WithStep sets the step for an action node

func (*NodeBuilder) WithSubGraph

func (nb *NodeBuilder) WithSubGraph(subGraph *DAGGraph) *NodeBuilder

WithSubGraph sets the subgraph for a subgraph node

type NodeConfig

type NodeConfig struct {
	// LLM node config
	Model       string  `json:"model,omitempty"`
	Prompt      string  `json:"prompt,omitempty"`
	Temperature float64 `json:"temperature,omitempty"`
	MaxTokens   int     `json:"max_tokens,omitempty"`

	// Tool node config
	ToolName   string         `json:"tool_name,omitempty"`
	ToolParams map[string]any `json:"tool_params,omitempty"`

	// Condition node config
	Condition  string `json:"condition,omitempty"`
	Expression string `json:"expression,omitempty"`

	// Loop node config
	LoopType      string `json:"loop_type,omitempty"`
	MaxIterations int    `json:"max_iterations,omitempty"`

	// Code node config
	Code     string `json:"code,omitempty"`
	Language string `json:"language,omitempty"`

	// Human input config
	InputPrompt string   `json:"input_prompt,omitempty"`
	InputType   string   `json:"input_type,omitempty"`
	Options     []string `json:"options,omitempty"`
	Timeout     int      `json:"timeout_seconds,omitempty"`

	// Subflow config
	SubflowID string `json:"subflow_id,omitempty"`
}

NodeConfig contains node-specific configuration.

type NodeDefinition

type NodeDefinition struct {
	// ID is the unique node identifier
	ID string `json:"id" yaml:"id"`
	// Type is the node type
	Type string `json:"type" yaml:"type"`
	// Step is the step name (for action nodes)
	Step string `json:"step,omitempty" yaml:"step,omitempty"`
	// Condition is the condition name (for conditional nodes)
	Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
	// Next lists the next nodes to execute (for action nodes)
	Next []string `json:"next,omitempty" yaml:"next,omitempty"`
	// OnTrue lists nodes to execute when condition is true
	OnTrue []string `json:"on_true,omitempty" yaml:"on_true,omitempty"`
	// OnFalse lists nodes to execute when condition is false
	OnFalse []string `json:"on_false,omitempty" yaml:"on_false,omitempty"`
	// Loop defines loop configuration (for loop nodes)
	Loop *LoopDefinition `json:"loop,omitempty" yaml:"loop,omitempty"`
	// SubGraph defines a nested workflow (for subgraph nodes)
	SubGraph *DAGDefinition `json:"subgraph,omitempty" yaml:"subgraph,omitempty"`
	// Error defines error handling configuration
	Error *ErrorDefinition `json:"error,omitempty" yaml:"error,omitempty"`
	// Metadata stores additional node information
	Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}

NodeDefinition represents a serializable node definition

type NodeExecution

type NodeExecution struct {
	NodeID    string          `json:"node_id"`
	NodeType  NodeType        `json:"node_type"`
	StartTime time.Time       `json:"start_time"`
	EndTime   time.Time       `json:"end_time"`
	Duration  time.Duration   `json:"duration"`
	Status    ExecutionStatus `json:"status"`
	Input     any             `json:"input,omitempty"`
	Output    any             `json:"output,omitempty"`
	Error     string          `json:"error,omitempty"`
}

NodeExecution records the execution of a single node

type NodeSnapshot

type NodeSnapshot struct {
	ID       string `json:"id"`
	Type     string `json:"type"`
	Status   string `json:"status"`
	Input    any    `json:"input,omitempty"`
	Output   any    `json:"output,omitempty"`
	Error    string `json:"error,omitempty"`
	Duration int64  `json:"duration_ms,omitempty"`
}

NodeSnapshot captures a node's state.

type NodeType

type NodeType string

NodeType defines the type of a DAG node

const (
	// NodeTypeAction executes a step
	NodeTypeAction NodeType = "action"
	// NodeTypeCondition performs conditional branching
	NodeTypeCondition NodeType = "condition"
	// NodeTypeLoop performs loop iteration
	NodeTypeLoop NodeType = "loop"
	// NodeTypeParallel executes nodes concurrently
	NodeTypeParallel NodeType = "parallel"
	// NodeTypeSubGraph executes a nested workflow
	NodeTypeSubGraph NodeType = "subgraph"
	// NodeTypeCheckpoint creates a checkpoint
	NodeTypeCheckpoint NodeType = "checkpoint"
)

type PassthroughStep

type PassthroughStep struct{}

PassthroughStep passes input directly to output.

func (*PassthroughStep) Execute

func (s *PassthroughStep) Execute(ctx context.Context, input any) (any, error)

func (*PassthroughStep) Name

func (s *PassthroughStep) Name() string

type Port

type Port struct {
	ID   string `json:"id"`
	Name string `json:"name"`
	Type string `json:"type"` // string, number, boolean, object, array
}

Port represents an input/output port on a node.

type Position

type Position struct {
	X float64 `json:"x"`
	Y float64 `json:"y"`
}

Position represents node position in visual canvas.

type Runnable added in v1.0.0

type Runnable interface {
	Execute(ctx context.Context, input any) (any, error)
}

Runnable is the common execution interface shared by workflow executable nodes. It represents any unit of work that can be executed with input and produce output.

type Step

type Step interface {
	Runnable
	// Name 返回步骤名称
	Name() string
}

Step 工作流步骤接口

type StepFunc

type StepFunc func(ctx context.Context, input any) (any, error)

StepFunc 步骤函数类型

type Tool

type Tool interface {
	Name() string
	Execute(ctx context.Context, params map[string]any) (any, error)
}

Tool represents an executable tool within a workflow.

type ToolRegistry

type ToolRegistry interface {
	// GetTool returns a Tool by name. Returns nil, false if not found.
	GetTool(name string) (Tool, bool)
	// ExecuteTool looks up and executes a tool in one call.
	ExecuteTool(ctx context.Context, name string, params map[string]any) (any, error)
}

ToolRegistry abstracts tool lookup and execution for workflow steps. Implement this interface to bridge workflow with your tool management layer.

type ToolStep

type ToolStep struct {
	ToolName string
	Params   map[string]any
	Registry ToolRegistry // Optional: inject to enable real tool execution
}

ToolStep executes a tool call. When Registry is set, it performs a real tool execution.

func (*ToolStep) Execute

func (s *ToolStep) Execute(ctx context.Context, input any) (any, error)

func (*ToolStep) Name

func (s *ToolStep) Name() string

type Variable

type Variable struct {
	Name         string `json:"name"`
	Type         string `json:"type"`
	DefaultValue any    `json:"default_value,omitempty"`
	Description  string `json:"description,omitempty"`
}

Variable represents a workflow variable.

type VisualBuilder

type VisualBuilder struct {
	// contains filtered or unexported fields
}

VisualBuilder builds DAG workflows from visual definitions.

func NewVisualBuilder

func NewVisualBuilder() *VisualBuilder

NewVisualBuilder creates a new visual builder.

func (*VisualBuilder) Build

func (b *VisualBuilder) Build(vw *VisualWorkflow) (*DAGWorkflow, error)

Build converts a visual workflow to executable DAG.

func (*VisualBuilder) RegisterStep

func (b *VisualBuilder) RegisterStep(name string, step Step)

RegisterStep registers a step implementation.

type VisualEdge

type VisualEdge struct {
	ID         string `json:"id"`
	Source     string `json:"source"`
	SourcePort string `json:"source_port,omitempty"`
	Target     string `json:"target"`
	TargetPort string `json:"target_port,omitempty"`
	Label      string `json:"label,omitempty"`
	Condition  string `json:"condition,omitempty"` // For conditional edges
}

VisualEdge represents a connection between nodes.

type VisualNode

type VisualNode struct {
	ID       string         `json:"id"`
	Type     VisualNodeType `json:"type"`
	Label    string         `json:"label"`
	Position Position       `json:"position"`
	Config   NodeConfig     `json:"config"`
	Inputs   []Port         `json:"inputs,omitempty"`
	Outputs  []Port         `json:"outputs,omitempty"`
}

VisualNode represents a node in the visual workflow.

type VisualNodeType

type VisualNodeType string

VisualNodeType defines visual node types.

const (
	VNodeStart     VisualNodeType = "start"
	VNodeEnd       VisualNodeType = "end"
	VNodeLLM       VisualNodeType = "llm"
	VNodeTool      VisualNodeType = "tool"
	VNodeCondition VisualNodeType = "condition"
	VNodeLoop      VisualNodeType = "loop"
	VNodeParallel  VisualNodeType = "parallel"
	VNodeHuman     VisualNodeType = "human_input"
	VNodeCode      VisualNodeType = "code"
	VNodeSubflow   VisualNodeType = "subflow"
)

type VisualWorkflow

type VisualWorkflow struct {
	ID          string         `json:"id"`
	Name        string         `json:"name"`
	Description string         `json:"description"`
	Version     string         `json:"version"`
	Nodes       []VisualNode   `json:"nodes"`
	Edges       []VisualEdge   `json:"edges"`
	Variables   []Variable     `json:"variables,omitempty"`
	Metadata    map[string]any `json:"metadata,omitempty"`
	CreatedAt   time.Time      `json:"created_at"`
	UpdatedAt   time.Time      `json:"updated_at"`
}

VisualWorkflow represents a workflow designed in visual builder.

func Import

func Import(data []byte) (*VisualWorkflow, error)

Import imports visual workflow from JSON.

func (*VisualWorkflow) Export

func (vw *VisualWorkflow) Export() ([]byte, error)

Export exports visual workflow to JSON.

func (*VisualWorkflow) Validate

func (vw *VisualWorkflow) Validate() error

Validate validates the visual workflow.

type Workflow

type Workflow interface {
	Runnable
	// Name 返回工作流名称
	Name() string
	// Description 返回工作流描述
	Description() string
}

Workflow 工作流接口 Workflow 是预定义的步骤序列,提供可预测和一致的执行

type WorkflowStreamEmitter added in v1.0.0

type WorkflowStreamEmitter func(WorkflowStreamEvent)

WorkflowStreamEmitter is a callback that receives workflow stream events.

type WorkflowStreamEvent added in v1.0.0

type WorkflowStreamEvent struct {
	Type     WorkflowStreamEventType `json:"type"`
	NodeID   string                  `json:"node_id,omitempty"`
	NodeName string                  `json:"node_name,omitempty"`
	Data     any                     `json:"data,omitempty"`
	Error    error                   `json:"-"`
}

WorkflowStreamEvent carries information about a workflow execution event.

type WorkflowStreamEventType added in v1.0.0

type WorkflowStreamEventType string

WorkflowStreamEventType defines the type of workflow stream event.

const (
	// WorkflowEventNodeStart is emitted before a DAG node begins execution.
	WorkflowEventNodeStart WorkflowStreamEventType = "node_start"
	// WorkflowEventNodeComplete is emitted after a DAG node finishes successfully.
	WorkflowEventNodeComplete WorkflowStreamEventType = "node_complete"
	// WorkflowEventNodeError is emitted when a DAG node fails.
	WorkflowEventNodeError WorkflowStreamEventType = "node_error"
	// WorkflowEventStepProgress is emitted for intermediate step progress.
	WorkflowEventStepProgress WorkflowStreamEventType = "step_progress"
	// WorkflowEventToken is emitted for streaming token output from LLM steps.
	WorkflowEventToken WorkflowStreamEventType = "token"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL