Documentation
¶
Index ¶
- Variables
- func ValidateDAGDefinition(def *DAGDefinition) error
- func WithWorkflowStreamEmitter(ctx context.Context, emitter WorkflowStreamEmitter) context.Context
- type CheckpointDiff
- type CheckpointManager
- type CheckpointStore
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerEvent
- type CircuitBreakerEventHandler
- type CircuitBreakerRegistry
- type CircuitState
- type CodeStep
- type ConditionFunc
- type DAGBuilder
- func (b *DAGBuilder) AddEdge(from, to string) *DAGBuilder
- func (b *DAGBuilder) AddNode(id string, nodeType NodeType) *NodeBuilder
- func (b *DAGBuilder) Build() (*DAGWorkflow, error)
- func (b *DAGBuilder) SetEntry(nodeID string) *DAGBuilder
- func (b *DAGBuilder) WithDescription(desc string) *DAGBuilder
- func (b *DAGBuilder) WithLogger(logger *zap.Logger) *DAGBuilder
- type DAGDefinition
- func (d *DAGDefinition) MarshalJSON() ([]byte, error)
- func (d *DAGDefinition) MarshalYAML() (any, error)
- func (d *DAGDefinition) SaveToJSONFile(filename string) error
- func (d *DAGDefinition) SaveToYAMLFile(filename string) error
- func (d *DAGDefinition) ToDAGWorkflow() (*DAGWorkflow, error)
- func (d *DAGDefinition) ToJSON() (string, error)
- func (d *DAGDefinition) ToYAML() (string, error)
- func (d *DAGDefinition) UnmarshalJSON(data []byte) error
- func (d *DAGDefinition) UnmarshalYAML(node *yaml.Node) error
- type DAGExecutor
- func (e *DAGExecutor) Execute(ctx context.Context, graph *DAGGraph, input any) (any, error)
- func (e *DAGExecutor) GetCircuitBreakerStates() map[string]CircuitState
- func (e *DAGExecutor) GetExecutionID() string
- func (e *DAGExecutor) GetHistory() *ExecutionHistory
- func (e *DAGExecutor) GetHistoryStore() *ExecutionHistoryStore
- func (e *DAGExecutor) GetNodeResult(nodeID string) (any, bool)
- func (e *DAGExecutor) SetCircuitBreakerConfig(config CircuitBreakerConfig, handler CircuitBreakerEventHandler)
- func (e *DAGExecutor) SetHistoryStore(store *ExecutionHistoryStore)
- type DAGGraph
- func (g *DAGGraph) AddEdge(fromID, toID string)
- func (g *DAGGraph) AddNode(node *DAGNode)
- func (g *DAGGraph) Edges() map[string][]string
- func (g *DAGGraph) GetEdges(nodeID string) []string
- func (g *DAGGraph) GetEntry() string
- func (g *DAGGraph) GetNode(nodeID string) (*DAGNode, bool)
- func (g *DAGGraph) Nodes() map[string]*DAGNode
- func (g *DAGGraph) SetEntry(nodeID string)
- type DAGNode
- type DAGWorkflow
- func (w *DAGWorkflow) Description() string
- func (w *DAGWorkflow) Execute(ctx context.Context, input any) (any, error)
- func (w *DAGWorkflow) GetMetadata(key string) (any, bool)
- func (w *DAGWorkflow) Graph() *DAGGraph
- func (w *DAGWorkflow) Name() string
- func (w *DAGWorkflow) SetExecutor(executor *DAGExecutor)
- func (w *DAGWorkflow) SetMetadata(key string, value any)
- func (w *DAGWorkflow) ToDAGDefinition() *DAGDefinition
- type Duration
- type EnhancedCheckpoint
- type EnhancedCheckpointManager
- func (m *EnhancedCheckpointManager) Compare(ctx context.Context, threadID string, v1, v2 int) (*CheckpointDiff, error)
- func (m *EnhancedCheckpointManager) CreateCheckpoint(ctx context.Context, executor *DAGExecutor, graph *DAGGraph, threadID string, ...) (*EnhancedCheckpoint, error)
- func (m *EnhancedCheckpointManager) GetHistory(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
- func (m *EnhancedCheckpointManager) ResumeFromCheckpoint(ctx context.Context, checkpointID string, graph *DAGGraph) (*DAGExecutor, error)
- func (m *EnhancedCheckpointManager) Rollback(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
- type ErrorConfig
- type ErrorDefinition
- type ErrorStrategy
- type ExecutionContext
- func (ec *ExecutionContext) GetNodeResult(nodeID string) (any, bool)
- func (ec *ExecutionContext) GetVariable(key string) (any, bool)
- func (ec *ExecutionContext) SetCurrentNode(nodeID string)
- func (ec *ExecutionContext) SetNodeResult(nodeID string, result any)
- func (ec *ExecutionContext) SetVariable(key string, value any)
- type ExecutionHistory
- func (h *ExecutionHistory) Complete(err error)
- func (h *ExecutionHistory) GetNodeByID(nodeID string) *NodeExecution
- func (h *ExecutionHistory) GetNodes() []*NodeExecution
- func (h *ExecutionHistory) RecordNodeEnd(node *NodeExecution, output any, err error)
- func (h *ExecutionHistory) RecordNodeStart(nodeID string, nodeType NodeType, input any) *NodeExecution
- type ExecutionHistoryStore
- func (s *ExecutionHistoryStore) Get(executionID string) (*ExecutionHistory, bool)
- func (s *ExecutionHistoryStore) ListByStatus(status ExecutionStatus) []*ExecutionHistory
- func (s *ExecutionHistoryStore) ListByTimeRange(start, end time.Time) []*ExecutionHistory
- func (s *ExecutionHistoryStore) ListByWorkflow(workflowID string) []*ExecutionHistory
- func (s *ExecutionHistoryStore) Save(history *ExecutionHistory)
- type ExecutionStatus
- type Facade
- type FuncStep
- type GraphSnapshot
- type HumanInputHandler
- type HumanInputStep
- type InMemoryCheckpointStore
- func (s *InMemoryCheckpointStore) Delete(ctx context.Context, id string) error
- func (s *InMemoryCheckpointStore) ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) Load(ctx context.Context, id string) (*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) Save(ctx context.Context, cp *EnhancedCheckpoint) error
- type IteratorFunc
- type LLMStep
- type LoopConfig
- type LoopDefinition
- type LoopType
- type NodeBuilder
- func (nb *NodeBuilder) Done() *DAGBuilder
- func (nb *NodeBuilder) WithCondition(cond ConditionFunc) *NodeBuilder
- func (nb *NodeBuilder) WithErrorConfig(config ErrorConfig) *NodeBuilder
- func (nb *NodeBuilder) WithLoop(config LoopConfig) *NodeBuilder
- func (nb *NodeBuilder) WithMetadata(key string, value any) *NodeBuilder
- func (nb *NodeBuilder) WithOnFalse(nodeIDs ...string) *NodeBuilder
- func (nb *NodeBuilder) WithOnTrue(nodeIDs ...string) *NodeBuilder
- func (nb *NodeBuilder) WithStep(step Step) *NodeBuilder
- func (nb *NodeBuilder) WithSubGraph(subGraph *DAGGraph) *NodeBuilder
- type NodeConfig
- type NodeDefinition
- type NodeExecution
- type NodeSnapshot
- type NodeType
- type PassthroughStep
- type Port
- type Position
- type Runnable
- type Step
- type StepFunc
- type Tool
- type ToolRegistry
- type ToolStep
- type Variable
- type VisualBuilder
- type VisualEdge
- type VisualNode
- type VisualNodeType
- type VisualWorkflow
- type Workflow
- type WorkflowStreamEmitter
- type WorkflowStreamEvent
- type WorkflowStreamEventType
Constants ¶
This section is empty.
Variables ¶
var ErrNotConfigured = errors.New("step dependency not configured")
ErrNotConfigured is returned when a step's required dependency (Gateway, Registry, Handler) has not been injected. Callers can check for this with errors.Is(err, ErrNotConfigured).
Functions ¶
func ValidateDAGDefinition ¶
func ValidateDAGDefinition(def *DAGDefinition) error
ValidateDAGDefinition validates a loaded DAGDefinition
func WithWorkflowStreamEmitter ¶ added in v1.0.0
func WithWorkflowStreamEmitter(ctx context.Context, emitter WorkflowStreamEmitter) context.Context
WithWorkflowStreamEmitter stores a WorkflowStreamEmitter in the context.
Types ¶
type CheckpointDiff ¶
type CheckpointDiff struct {
Version1 int `json:"version1"`
Version2 int `json:"version2"`
AddedNodes []string `json:"added_nodes"`
RemovedNodes []string `json:"removed_nodes"`
ChangedNodes []string `json:"changed_nodes"`
TimeDifference time.Duration `json:"time_difference"`
}
CheckpointDiff represents differences between checkpoints.
type CheckpointManager ¶
type CheckpointManager interface {
SaveCheckpoint(ctx context.Context, checkpoint *EnhancedCheckpoint) error
}
CheckpointManager interface for checkpoint integration
type CheckpointStore ¶
type CheckpointStore interface {
Save(ctx context.Context, checkpoint *EnhancedCheckpoint) error
Load(ctx context.Context, checkpointID string) (*EnhancedCheckpoint, error)
LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
Delete(ctx context.Context, checkpointID string) error
}
CheckpointStore defines storage interface for enhanced checkpoints (Workflow layer).
Note: Two CheckpointStore interfaces exist in the project, operating on different types:
- agent.CheckpointStore — operates on *agent.Checkpoint (agent state, List/DeleteThread/Rollback)
- workflow.CheckpointStore (this) — operates on *workflow.EnhancedCheckpoint (DAG node results, time-travel)
They cannot be unified because the checkpoint structs have different fields (agent state vs DAG execution state).
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker 熔断器实现
func NewCircuitBreaker ¶
func NewCircuitBreaker( nodeID string, config CircuitBreakerConfig, eventHandler CircuitBreakerEventHandler, logger *zap.Logger, ) *CircuitBreaker
NewCircuitBreaker 创建熔断器
func (*CircuitBreaker) AllowRequest ¶
func (cb *CircuitBreaker) AllowRequest() (bool, error)
AllowRequest 检查是否允许请求通过
func (*CircuitBreaker) GetFailures ¶
func (cb *CircuitBreaker) GetFailures() int
GetFailures 获取当前失败次数
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
// FailureThreshold 连续失败次数阈值,达到后触发熔断
FailureThreshold int `json:"failure_threshold"`
// RecoveryTimeout 熔断后等待恢复的时间
RecoveryTimeout Duration `json:"recovery_timeout"`
// HalfOpenMaxProbes 半开状态允许的探测请求数
HalfOpenMaxProbes int `json:"half_open_max_probes"`
// SuccessThresholdInHalfOpen 半开状态下连续成功多少次后恢复
SuccessThresholdInHalfOpen int `json:"success_threshold_in_half_open"`
}
CircuitBreakerConfig 熔断器配置
func DefaultCircuitBreakerConfig ¶
func DefaultCircuitBreakerConfig() CircuitBreakerConfig
DefaultCircuitBreakerConfig 默认熔断器配置
type CircuitBreakerEvent ¶
type CircuitBreakerEvent struct {
NodeID string `json:"node_id"`
OldState CircuitState `json:"old_state"`
NewState CircuitState `json:"new_state"`
Timestamp time.Time `json:"timestamp"`
Reason string `json:"reason"`
Failures int `json:"failures"`
}
CircuitBreakerEvent 熔断器状态变更事件
type CircuitBreakerEventHandler ¶
type CircuitBreakerEventHandler interface {
OnStateChange(event CircuitBreakerEvent)
}
CircuitBreakerEventHandler 事件处理器接口
type CircuitBreakerRegistry ¶
type CircuitBreakerRegistry struct {
// contains filtered or unexported fields
}
CircuitBreakerRegistry 熔断器注册表,管理所有节点的熔断器
func NewCircuitBreakerRegistry ¶
func NewCircuitBreakerRegistry( config CircuitBreakerConfig, eventHandler CircuitBreakerEventHandler, logger *zap.Logger, ) *CircuitBreakerRegistry
NewCircuitBreakerRegistry 创建熔断器注册表
func (*CircuitBreakerRegistry) GetAllStates ¶
func (r *CircuitBreakerRegistry) GetAllStates() map[string]CircuitState
GetAllStates 获取所有熔断器状态
func (*CircuitBreakerRegistry) GetOrCreate ¶
func (r *CircuitBreakerRegistry) GetOrCreate(nodeID string) *CircuitBreaker
GetOrCreate 获取或创建节点的熔断器
func (*CircuitBreakerRegistry) ResetAll ¶
func (r *CircuitBreakerRegistry) ResetAll()
ResetAll 重置所有熔断器
type CircuitState ¶
type CircuitState int
CircuitState represents the state of a circuit breaker. This is an independent definition equivalent to circuitbreaker.State in llm/circuitbreaker/. The workflow package maintains its own copy to avoid depending on the llm package (dependency direction: llm <- workflow, not workflow -> llm).
const ( // CircuitClosed 正常状态,允许请求通过 CircuitClosed CircuitState = iota // CircuitOpen 熔断状态,拒绝所有请求 CircuitOpen // CircuitHalfOpen 半开状态,允许探测请求 CircuitHalfOpen )
func (CircuitState) String ¶
func (s CircuitState) String() string
type ConditionFunc ¶
ConditionFunc evaluates a condition and returns true or false
type DAGBuilder ¶
type DAGBuilder struct {
// contains filtered or unexported fields
}
DAGBuilder provides a fluent API for constructing DAG workflows
func NewDAGBuilder ¶
func NewDAGBuilder(name string) *DAGBuilder
NewDAGBuilder creates a new DAG builder with the given name
func (*DAGBuilder) AddEdge ¶
func (b *DAGBuilder) AddEdge(from, to string) *DAGBuilder
AddEdge adds a directed edge from one node to another
func (*DAGBuilder) AddNode ¶
func (b *DAGBuilder) AddNode(id string, nodeType NodeType) *NodeBuilder
AddNode adds a node to the graph and returns a NodeBuilder for configuration
func (*DAGBuilder) Build ¶
func (b *DAGBuilder) Build() (*DAGWorkflow, error)
Build validates the DAG and creates a DAGWorkflow
func (*DAGBuilder) SetEntry ¶
func (b *DAGBuilder) SetEntry(nodeID string) *DAGBuilder
SetEntry sets the entry node for the workflow
func (*DAGBuilder) WithDescription ¶
func (b *DAGBuilder) WithDescription(desc string) *DAGBuilder
WithDescription sets the workflow description
func (*DAGBuilder) WithLogger ¶
func (b *DAGBuilder) WithLogger(logger *zap.Logger) *DAGBuilder
WithLogger sets a custom logger
type DAGDefinition ¶
type DAGDefinition struct {
// Name is the workflow name
Name string `json:"name" yaml:"name"`
// Description describes the workflow
Description string `json:"description" yaml:"description"`
// Entry is the ID of the entry node
Entry string `json:"entry" yaml:"entry"`
// Nodes contains all node definitions
Nodes []NodeDefinition `json:"nodes" yaml:"nodes"`
// Metadata stores additional workflow information
Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
DAGDefinition represents a serializable workflow definition
func FromJSON ¶
func FromJSON(jsonStr string) (*DAGDefinition, error)
FromJSON creates a DAGDefinition from JSON string
func FromYAML ¶
func FromYAML(yamlStr string) (*DAGDefinition, error)
FromYAML creates a DAGDefinition from YAML string
func LoadFromJSONFile ¶
func LoadFromJSONFile(filename string) (*DAGDefinition, error)
LoadFromJSONFile loads a DAGDefinition from a JSON file
func LoadFromYAMLFile ¶
func LoadFromYAMLFile(filename string) (*DAGDefinition, error)
LoadFromYAMLFile loads a DAGDefinition from a YAML file
func (*DAGDefinition) MarshalJSON ¶
func (d *DAGDefinition) MarshalJSON() ([]byte, error)
MarshalJSON serializes a DAGDefinition to JSON
func (*DAGDefinition) MarshalYAML ¶
func (d *DAGDefinition) MarshalYAML() (any, error)
MarshalYAML serializes a DAGDefinition to YAML
func (*DAGDefinition) SaveToJSONFile ¶
func (d *DAGDefinition) SaveToJSONFile(filename string) error
SaveToJSONFile saves a DAGDefinition to a JSON file
func (*DAGDefinition) SaveToYAMLFile ¶
func (d *DAGDefinition) SaveToYAMLFile(filename string) error
SaveToYAMLFile saves a DAGDefinition to a YAML file
func (*DAGDefinition) ToDAGWorkflow ¶ added in v1.4.6
func (d *DAGDefinition) ToDAGWorkflow() (*DAGWorkflow, error)
ToDAGWorkflow converts a validated DAGDefinition into an executable DAGWorkflow. Runtime-only handlers (step logic/condition expression parsing) are intentionally represented with safe defaults so structure can execute deterministically.
func (*DAGDefinition) ToJSON ¶
func (d *DAGDefinition) ToJSON() (string, error)
ToJSON converts a DAGDefinition to JSON string
func (*DAGDefinition) ToYAML ¶
func (d *DAGDefinition) ToYAML() (string, error)
ToYAML converts a DAGDefinition to YAML string
func (*DAGDefinition) UnmarshalJSON ¶
func (d *DAGDefinition) UnmarshalJSON(data []byte) error
UnmarshalJSON deserializes a DAGDefinition from JSON
func (*DAGDefinition) UnmarshalYAML ¶
func (d *DAGDefinition) UnmarshalYAML(node *yaml.Node) error
UnmarshalYAML deserializes a DAGDefinition from YAML
type DAGExecutor ¶
type DAGExecutor struct {
// contains filtered or unexported fields
}
DAGExecutor executes DAG workflows with dependency resolution
func NewDAGExecutor ¶
func NewDAGExecutor(checkpointMgr CheckpointManager, logger *zap.Logger) *DAGExecutor
NewDAGExecutor creates a new DAG executor
func (*DAGExecutor) Execute ¶
Execute runs the DAG workflow with dependency resolution. Bug fix (P0): executeMu ensures that concurrent Execute() calls on the same executor are serialized, preventing data races on shared execution state.
func (*DAGExecutor) GetCircuitBreakerStates ¶
func (e *DAGExecutor) GetCircuitBreakerStates() map[string]CircuitState
GetCircuitBreakerStates 获取所有熔断器状态
func (*DAGExecutor) GetExecutionID ¶
func (e *DAGExecutor) GetExecutionID() string
GetExecutionID returns the current execution ID
func (*DAGExecutor) GetHistory ¶
func (e *DAGExecutor) GetHistory() *ExecutionHistory
GetHistory returns the execution history for the current execution
func (*DAGExecutor) GetHistoryStore ¶
func (e *DAGExecutor) GetHistoryStore() *ExecutionHistoryStore
GetHistoryStore returns the history store
func (*DAGExecutor) GetNodeResult ¶
func (e *DAGExecutor) GetNodeResult(nodeID string) (any, bool)
GetNodeResult retrieves the result of a completed node
func (*DAGExecutor) SetCircuitBreakerConfig ¶
func (e *DAGExecutor) SetCircuitBreakerConfig(config CircuitBreakerConfig, handler CircuitBreakerEventHandler)
SetCircuitBreakerConfig 设置熔断器配置
func (*DAGExecutor) SetHistoryStore ¶
func (e *DAGExecutor) SetHistoryStore(store *ExecutionHistoryStore)
SetHistoryStore sets a custom history store
type DAGGraph ¶
type DAGGraph struct {
// contains filtered or unexported fields
}
DAGGraph represents the workflow structure as a directed acyclic graph
type DAGNode ¶
type DAGNode struct {
// ID is the unique identifier for this node
ID string
// Type specifies the node type
Type NodeType
// Step is the step to execute (for action nodes)
Step Step
// Condition evaluates branching logic (for conditional nodes)
Condition ConditionFunc
// LoopConfig defines loop behavior (for loop nodes)
LoopConfig *LoopConfig
// SubGraph is a nested workflow (for subgraph nodes)
SubGraph *DAGGraph
// ErrorConfig defines error handling behavior
ErrorConfig *ErrorConfig
// Metadata stores additional node information
Metadata map[string]any
}
DAGNode represents a single node in the workflow graph
type DAGWorkflow ¶
type DAGWorkflow struct {
// contains filtered or unexported fields
}
DAGWorkflow represents a DAG-based workflow
func NewDAGWorkflow ¶
func NewDAGWorkflow(name, description string, graph *DAGGraph) *DAGWorkflow
NewDAGWorkflow creates a new DAG workflow
func (*DAGWorkflow) Description ¶
func (w *DAGWorkflow) Description() string
Description returns the workflow description
func (*DAGWorkflow) GetMetadata ¶
func (w *DAGWorkflow) GetMetadata(key string) (any, bool)
GetMetadata retrieves a metadata value
func (*DAGWorkflow) Graph ¶
func (w *DAGWorkflow) Graph() *DAGGraph
Graph returns the underlying DAG graph
func (*DAGWorkflow) SetExecutor ¶
func (w *DAGWorkflow) SetExecutor(executor *DAGExecutor)
SetExecutor sets a custom executor for the workflow
func (*DAGWorkflow) SetMetadata ¶
func (w *DAGWorkflow) SetMetadata(key string, value any)
SetMetadata sets a metadata value
func (*DAGWorkflow) ToDAGDefinition ¶
func (w *DAGWorkflow) ToDAGDefinition() *DAGDefinition
ToDAGDefinition converts a DAGWorkflow to a DAGDefinition for serialization Note: This only captures the structure, not the runtime functions (conditions, iterators, steps)
type Duration ¶ added in v1.0.0
Duration wraps time.Duration with human-readable JSON serialization. JSON output is a string like "30s", "5m", "1h30m" instead of nanoseconds.
func (Duration) MarshalJSON ¶ added in v1.0.0
MarshalJSON serializes Duration as a human-readable string.
func (*Duration) UnmarshalJSON ¶ added in v1.0.0
UnmarshalJSON deserializes Duration from a string (e.g. "30s").
type EnhancedCheckpoint ¶
type EnhancedCheckpoint struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
ThreadID string `json:"thread_id"`
Version int `json:"version"`
NodeID string `json:"node_id"`
NodeResults map[string]any `json:"node_results"`
Variables map[string]any `json:"variables"`
PendingNodes []string `json:"pending_nodes"`
CompletedNodes []string `json:"completed_nodes"`
Input any `json:"input"`
CreatedAt time.Time `json:"created_at"`
ParentID string `json:"parent_id,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Snapshot *GraphSnapshot `json:"snapshot,omitempty"`
}
EnhancedCheckpoint represents a workflow checkpoint with full state.
type EnhancedCheckpointManager ¶
type EnhancedCheckpointManager struct {
// contains filtered or unexported fields
}
EnhancedCheckpointManager manages workflow checkpoints with time-travel.
func NewEnhancedCheckpointManager ¶
func NewEnhancedCheckpointManager(store CheckpointStore, logger *zap.Logger) *EnhancedCheckpointManager
NewEnhancedCheckpointManager creates a new checkpoint manager.
func (*EnhancedCheckpointManager) Compare ¶
func (m *EnhancedCheckpointManager) Compare(ctx context.Context, threadID string, v1, v2 int) (*CheckpointDiff, error)
Compare compares two checkpoint versions.
func (*EnhancedCheckpointManager) CreateCheckpoint ¶
func (m *EnhancedCheckpointManager) CreateCheckpoint(ctx context.Context, executor *DAGExecutor, graph *DAGGraph, threadID string, input any) (*EnhancedCheckpoint, error)
CreateCheckpoint creates a checkpoint from current execution state.
func (*EnhancedCheckpointManager) GetHistory ¶
func (m *EnhancedCheckpointManager) GetHistory(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
GetHistory returns checkpoint history for time-travel debugging.
func (*EnhancedCheckpointManager) ResumeFromCheckpoint ¶
func (m *EnhancedCheckpointManager) ResumeFromCheckpoint(ctx context.Context, checkpointID string, graph *DAGGraph) (*DAGExecutor, error)
ResumeFromCheckpoint resumes workflow execution from a checkpoint.
func (*EnhancedCheckpointManager) Rollback ¶
func (m *EnhancedCheckpointManager) Rollback(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
Rollback rolls back to a specific version.
type ErrorConfig ¶
type ErrorConfig struct {
// Strategy specifies how to handle errors
Strategy ErrorStrategy
// MaxRetries is the maximum number of retry attempts (for retry strategy)
MaxRetries int
// RetryDelayMs is the delay between retries in milliseconds
RetryDelayMs int
// FallbackValue is the value to use when skipping a failed node
FallbackValue any
}
ErrorConfig defines error handling behavior for a node
type ErrorDefinition ¶ added in v1.0.0
type ErrorDefinition struct {
// Strategy specifies how to handle errors (fail_fast, skip, retry)
Strategy string `json:"strategy" yaml:"strategy"`
// MaxRetries is the maximum number of retry attempts (for retry strategy)
MaxRetries int `json:"max_retries,omitempty" yaml:"max_retries,omitempty"`
// RetryDelayMs is the delay between retries in milliseconds
RetryDelayMs int `json:"retry_delay_ms,omitempty" yaml:"retry_delay_ms,omitempty"`
// FallbackValue is the value to use when skipping a failed node
FallbackValue any `json:"fallback_value,omitempty" yaml:"fallback_value,omitempty"`
}
ErrorDefinition represents a serializable error handling configuration
type ErrorStrategy ¶
type ErrorStrategy string
ErrorStrategy defines how errors should be handled
const ( // ErrorStrategyFailFast stops execution immediately on error ErrorStrategyFailFast ErrorStrategy = "fail_fast" // ErrorStrategySkip skips the failed node and continues ErrorStrategySkip ErrorStrategy = "skip" // ErrorStrategyRetry retries the failed node ErrorStrategyRetry ErrorStrategy = "retry" )
type ExecutionContext ¶
type ExecutionContext struct {
// WorkflowID identifies the workflow being executed
WorkflowID string `json:"workflow_id,omitempty"`
// CurrentNode is the ID of the currently executing node
CurrentNode string `json:"current_node,omitempty"`
// NodeResults stores the results of completed nodes
NodeResults map[string]any `json:"node_results,omitempty"`
// Variables stores workflow variables
Variables map[string]any `json:"variables,omitempty"`
// StartTime is when the workflow execution started
StartTime time.Time `json:"start_time,omitempty"`
// LastUpdateTime is when the context was last updated
LastUpdateTime time.Time `json:"last_update_time,omitempty"`
}
ExecutionContext captures the execution state for checkpointing
func NewExecutionContext ¶
func NewExecutionContext(workflowID string) *ExecutionContext
NewExecutionContext creates a new execution context
func (*ExecutionContext) GetNodeResult ¶
func (ec *ExecutionContext) GetNodeResult(nodeID string) (any, bool)
GetNodeResult retrieves the result of a completed node
func (*ExecutionContext) GetVariable ¶
func (ec *ExecutionContext) GetVariable(key string) (any, bool)
GetVariable retrieves a workflow variable
func (*ExecutionContext) SetCurrentNode ¶
func (ec *ExecutionContext) SetCurrentNode(nodeID string)
SetCurrentNode updates the currently executing node
func (*ExecutionContext) SetNodeResult ¶
func (ec *ExecutionContext) SetNodeResult(nodeID string, result any)
SetNodeResult stores the result of a completed node
func (*ExecutionContext) SetVariable ¶
func (ec *ExecutionContext) SetVariable(key string, value any)
SetVariable sets a workflow variable
type ExecutionHistory ¶
type ExecutionHistory struct {
ExecutionID string `json:"execution_id"`
WorkflowID string `json:"workflow_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
Status ExecutionStatus `json:"status"`
Nodes []*NodeExecution `json:"nodes"`
Error string `json:"error,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
// contains filtered or unexported fields
}
ExecutionHistory records the complete execution path of a workflow
func NewExecutionHistory ¶
func NewExecutionHistory(executionID, workflowID string) *ExecutionHistory
NewExecutionHistory creates a new execution history
func (*ExecutionHistory) Complete ¶
func (h *ExecutionHistory) Complete(err error)
Complete marks the execution as completed
func (*ExecutionHistory) GetNodeByID ¶
func (h *ExecutionHistory) GetNodeByID(nodeID string) *NodeExecution
GetNodeByID returns the execution record for a specific node
func (*ExecutionHistory) GetNodes ¶
func (h *ExecutionHistory) GetNodes() []*NodeExecution
GetNodes returns a copy of the node executions
func (*ExecutionHistory) RecordNodeEnd ¶
func (h *ExecutionHistory) RecordNodeEnd(node *NodeExecution, output any, err error)
RecordNodeEnd records the end of a node execution
func (*ExecutionHistory) RecordNodeStart ¶
func (h *ExecutionHistory) RecordNodeStart(nodeID string, nodeType NodeType, input any) *NodeExecution
RecordNodeStart records the start of a node execution
type ExecutionHistoryStore ¶
type ExecutionHistoryStore struct {
// contains filtered or unexported fields
}
ExecutionHistoryStore stores and queries execution histories
func NewExecutionHistoryStore ¶
func NewExecutionHistoryStore() *ExecutionHistoryStore
NewExecutionHistoryStore creates a new execution history store
func (*ExecutionHistoryStore) Get ¶
func (s *ExecutionHistoryStore) Get(executionID string) (*ExecutionHistory, bool)
Get retrieves an execution history by ID
func (*ExecutionHistoryStore) ListByStatus ¶
func (s *ExecutionHistoryStore) ListByStatus(status ExecutionStatus) []*ExecutionHistory
ListByStatus returns executions with a specific status
func (*ExecutionHistoryStore) ListByTimeRange ¶
func (s *ExecutionHistoryStore) ListByTimeRange(start, end time.Time) []*ExecutionHistory
ListByTimeRange returns executions within a time range
func (*ExecutionHistoryStore) ListByWorkflow ¶
func (s *ExecutionHistoryStore) ListByWorkflow(workflowID string) []*ExecutionHistory
ListByWorkflow returns all executions for a workflow
func (*ExecutionHistoryStore) Save ¶
func (s *ExecutionHistoryStore) Save(history *ExecutionHistory)
Save saves an execution history
type ExecutionStatus ¶
type ExecutionStatus = types.ExecutionStatus
ExecutionStatus represents the status of an execution. Uses the unified types.ExecutionStatus to avoid cross-layer coupling with agent/persistence.
const ( // ExecutionStatusRunning indicates the execution is in progress ExecutionStatusRunning ExecutionStatus = types.ExecutionStatusRunning // ExecutionStatusCompleted indicates the execution completed successfully ExecutionStatusCompleted ExecutionStatus = types.ExecutionStatusCompleted // ExecutionStatusFailed indicates the execution failed ExecutionStatusFailed ExecutionStatus = types.ExecutionStatusFailed )
type Facade ¶ added in v1.4.6
type Facade struct {
// contains filtered or unexported fields
}
Facade 是 workflow 对外统一执行入口。 API/handler 层应通过该入口执行 workflow.Workflow,而不是直接操作具体 executor。
func NewFacade ¶ added in v1.4.6
func NewFacade(executor *DAGExecutor) *Facade
NewFacade 创建 workflow 执行门面。
func (*Facade) ExecuteDAG ¶ added in v1.4.6
ExecuteDAG 执行 DAG workflow,作为 workflow 对外统一执行入口。
type GraphSnapshot ¶
type GraphSnapshot struct {
Nodes map[string]NodeSnapshot `json:"nodes"`
Edges map[string][]string `json:"edges"`
EntryNode string `json:"entry_node"`
}
GraphSnapshot captures the complete graph state.
type HumanInputHandler ¶
type HumanInputHandler interface {
// RequestInput sends a prompt to a human and waits for a response.
// inputType hints at the expected response format (e.g. "text", "choice").
// options provides selectable choices when inputType is "choice".
RequestInput(ctx context.Context, prompt string, inputType string, options []string) (any, error)
}
HumanInputHandler abstracts human-in-the-loop interaction for workflow steps. Implement this interface to bridge workflow with your HITL management layer.
type HumanInputStep ¶
type HumanInputStep struct {
Prompt string
Type string
Options []string
Timeout int
Handler HumanInputHandler // Optional: inject to enable real HITL
}
HumanInputStep waits for human input. When Handler is set, it sends a request to the HITL handler and waits for a response.
func (*HumanInputStep) Name ¶
func (s *HumanInputStep) Name() string
type InMemoryCheckpointStore ¶
type InMemoryCheckpointStore struct {
// contains filtered or unexported fields
}
InMemoryCheckpointStore provides in-memory storage.
func NewInMemoryCheckpointStore ¶
func NewInMemoryCheckpointStore() *InMemoryCheckpointStore
NewInMemoryCheckpointStore creates a new in-memory store.
func (*InMemoryCheckpointStore) Delete ¶
func (s *InMemoryCheckpointStore) Delete(ctx context.Context, id string) error
func (*InMemoryCheckpointStore) ListVersions ¶
func (s *InMemoryCheckpointStore) ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) Load ¶
func (s *InMemoryCheckpointStore) Load(ctx context.Context, id string) (*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) LoadLatest ¶
func (s *InMemoryCheckpointStore) LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) LoadVersion ¶
func (s *InMemoryCheckpointStore) LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) Save ¶
func (s *InMemoryCheckpointStore) Save(ctx context.Context, cp *EnhancedCheckpoint) error
type IteratorFunc ¶
IteratorFunc generates a collection of items for iteration
type LLMStep ¶
type LLMStep struct {
Model string
Prompt string
Temperature float64
MaxTokens int
Gateway core.GatewayLike // Optional: inject to enable real LLM calls
}
LLMStep executes an LLM call. When Gateway is set, it performs a real LLM invoke request.
type LoopConfig ¶
type LoopConfig struct {
// Type specifies the loop type (while, for, foreach)
Type LoopType
// MaxIterations limits the maximum number of iterations (0 = unlimited)
MaxIterations int
// Condition evaluates whether to continue looping (for while loops)
Condition ConditionFunc
// Iterator generates items to iterate over (for foreach loops)
Iterator IteratorFunc
}
LoopConfig defines loop behavior
type LoopDefinition ¶
type LoopDefinition struct {
// Type is the loop type (while, for, foreach)
Type string `json:"type" yaml:"type"`
// MaxIterations limits the maximum number of iterations
MaxIterations int `json:"max_iterations" yaml:"max_iterations"`
// Condition is the condition name (for while loops)
Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
}
LoopDefinition represents a serializable loop configuration
type NodeBuilder ¶
type NodeBuilder struct {
// contains filtered or unexported fields
}
NodeBuilder provides a fluent API for configuring individual nodes
func (*NodeBuilder) Done ¶
func (nb *NodeBuilder) Done() *DAGBuilder
Done completes node configuration and returns to the DAGBuilder
func (*NodeBuilder) WithCondition ¶
func (nb *NodeBuilder) WithCondition(cond ConditionFunc) *NodeBuilder
WithCondition sets the condition function for a conditional node
func (*NodeBuilder) WithErrorConfig ¶
func (nb *NodeBuilder) WithErrorConfig(config ErrorConfig) *NodeBuilder
WithErrorConfig sets the error handling configuration for a node
func (*NodeBuilder) WithLoop ¶
func (nb *NodeBuilder) WithLoop(config LoopConfig) *NodeBuilder
WithLoop sets the loop configuration for a loop node
func (*NodeBuilder) WithMetadata ¶
func (nb *NodeBuilder) WithMetadata(key string, value any) *NodeBuilder
WithMetadata sets a metadata value
func (*NodeBuilder) WithOnFalse ¶
func (nb *NodeBuilder) WithOnFalse(nodeIDs ...string) *NodeBuilder
WithOnFalse sets the nodes to execute when condition is false
func (*NodeBuilder) WithOnTrue ¶
func (nb *NodeBuilder) WithOnTrue(nodeIDs ...string) *NodeBuilder
WithOnTrue sets the nodes to execute when condition is true
func (*NodeBuilder) WithStep ¶
func (nb *NodeBuilder) WithStep(step Step) *NodeBuilder
WithStep sets the step for an action node
func (*NodeBuilder) WithSubGraph ¶
func (nb *NodeBuilder) WithSubGraph(subGraph *DAGGraph) *NodeBuilder
WithSubGraph sets the subgraph for a subgraph node
type NodeConfig ¶
type NodeConfig struct {
// LLM node config
Model string `json:"model,omitempty"`
Prompt string `json:"prompt,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
// Tool node config
ToolName string `json:"tool_name,omitempty"`
ToolParams map[string]any `json:"tool_params,omitempty"`
// Condition node config
Condition string `json:"condition,omitempty"`
Expression string `json:"expression,omitempty"`
// Loop node config
LoopType string `json:"loop_type,omitempty"`
MaxIterations int `json:"max_iterations,omitempty"`
// Code node config
Code string `json:"code,omitempty"`
Language string `json:"language,omitempty"`
// Human input config
InputPrompt string `json:"input_prompt,omitempty"`
InputType string `json:"input_type,omitempty"`
Options []string `json:"options,omitempty"`
Timeout int `json:"timeout_seconds,omitempty"`
// Subflow config
SubflowID string `json:"subflow_id,omitempty"`
}
NodeConfig contains node-specific configuration.
type NodeDefinition ¶
type NodeDefinition struct {
// ID is the unique node identifier
ID string `json:"id" yaml:"id"`
// Type is the node type
Type string `json:"type" yaml:"type"`
// Step is the step name (for action nodes)
Step string `json:"step,omitempty" yaml:"step,omitempty"`
// Condition is the condition name (for conditional nodes)
Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
// Next lists the next nodes to execute (for action nodes)
Next []string `json:"next,omitempty" yaml:"next,omitempty"`
// OnTrue lists nodes to execute when condition is true
OnTrue []string `json:"on_true,omitempty" yaml:"on_true,omitempty"`
// OnFalse lists nodes to execute when condition is false
OnFalse []string `json:"on_false,omitempty" yaml:"on_false,omitempty"`
// Loop defines loop configuration (for loop nodes)
Loop *LoopDefinition `json:"loop,omitempty" yaml:"loop,omitempty"`
// SubGraph defines a nested workflow (for subgraph nodes)
SubGraph *DAGDefinition `json:"subgraph,omitempty" yaml:"subgraph,omitempty"`
// Error defines error handling configuration
Error *ErrorDefinition `json:"error,omitempty" yaml:"error,omitempty"`
// Metadata stores additional node information
Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
NodeDefinition represents a serializable node definition
type NodeExecution ¶
type NodeExecution struct {
NodeID string `json:"node_id"`
NodeType NodeType `json:"node_type"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
Status ExecutionStatus `json:"status"`
Input any `json:"input,omitempty"`
Output any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
}
NodeExecution records the execution of a single node
type NodeSnapshot ¶
type NodeSnapshot struct {
ID string `json:"id"`
Type string `json:"type"`
Status string `json:"status"`
Input any `json:"input,omitempty"`
Output any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Duration int64 `json:"duration_ms,omitempty"`
}
NodeSnapshot captures a node's state.
type NodeType ¶
type NodeType string
NodeType defines the type of a DAG node
const ( // NodeTypeAction executes a step NodeTypeAction NodeType = "action" // NodeTypeCondition performs conditional branching NodeTypeCondition NodeType = "condition" // NodeTypeLoop performs loop iteration NodeTypeLoop NodeType = "loop" // NodeTypeParallel executes nodes concurrently NodeTypeParallel NodeType = "parallel" // NodeTypeSubGraph executes a nested workflow NodeTypeSubGraph NodeType = "subgraph" // NodeTypeCheckpoint creates a checkpoint NodeTypeCheckpoint NodeType = "checkpoint" )
type PassthroughStep ¶
type PassthroughStep struct{}
PassthroughStep passes input directly to output.
func (*PassthroughStep) Name ¶
func (s *PassthroughStep) Name() string
type Port ¶
type Port struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"` // string, number, boolean, object, array
}
Port represents an input/output port on a node.
type Runnable ¶ added in v1.0.0
Runnable is the common execution interface shared by workflow executable nodes. It represents any unit of work that can be executed with input and produce output.
type Tool ¶
type Tool interface {
Name() string
Execute(ctx context.Context, params map[string]any) (any, error)
}
Tool represents an executable tool within a workflow.
type ToolRegistry ¶
type ToolRegistry interface {
// GetTool returns a Tool by name. Returns nil, false if not found.
GetTool(name string) (Tool, bool)
// ExecuteTool looks up and executes a tool in one call.
ExecuteTool(ctx context.Context, name string, params map[string]any) (any, error)
}
ToolRegistry abstracts tool lookup and execution for workflow steps. Implement this interface to bridge workflow with your tool management layer.
type ToolStep ¶
type ToolStep struct {
ToolName string
Params map[string]any
Registry ToolRegistry // Optional: inject to enable real tool execution
}
ToolStep executes a tool call. When Registry is set, it performs a real tool execution.
type Variable ¶
type Variable struct {
Name string `json:"name"`
Type string `json:"type"`
DefaultValue any `json:"default_value,omitempty"`
Description string `json:"description,omitempty"`
}
Variable represents a workflow variable.
type VisualBuilder ¶
type VisualBuilder struct {
// contains filtered or unexported fields
}
VisualBuilder builds DAG workflows from visual definitions.
func NewVisualBuilder ¶
func NewVisualBuilder() *VisualBuilder
NewVisualBuilder creates a new visual builder.
func (*VisualBuilder) Build ¶
func (b *VisualBuilder) Build(vw *VisualWorkflow) (*DAGWorkflow, error)
Build converts a visual workflow to executable DAG.
func (*VisualBuilder) RegisterStep ¶
func (b *VisualBuilder) RegisterStep(name string, step Step)
RegisterStep registers a step implementation.
type VisualEdge ¶
type VisualEdge struct {
ID string `json:"id"`
Source string `json:"source"`
SourcePort string `json:"source_port,omitempty"`
Target string `json:"target"`
TargetPort string `json:"target_port,omitempty"`
Label string `json:"label,omitempty"`
Condition string `json:"condition,omitempty"` // For conditional edges
}
VisualEdge represents a connection between nodes.
type VisualNode ¶
type VisualNode struct {
ID string `json:"id"`
Type VisualNodeType `json:"type"`
Label string `json:"label"`
Position Position `json:"position"`
Config NodeConfig `json:"config"`
Inputs []Port `json:"inputs,omitempty"`
Outputs []Port `json:"outputs,omitempty"`
}
VisualNode represents a node in the visual workflow.
type VisualNodeType ¶
type VisualNodeType string
VisualNodeType defines visual node types.
const ( VNodeStart VisualNodeType = "start" VNodeEnd VisualNodeType = "end" VNodeLLM VisualNodeType = "llm" VNodeTool VisualNodeType = "tool" VNodeCondition VisualNodeType = "condition" VNodeLoop VisualNodeType = "loop" VNodeParallel VisualNodeType = "parallel" VNodeHuman VisualNodeType = "human_input" VNodeCode VisualNodeType = "code" VNodeSubflow VisualNodeType = "subflow" )
type VisualWorkflow ¶
type VisualWorkflow struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Version string `json:"version"`
Nodes []VisualNode `json:"nodes"`
Edges []VisualEdge `json:"edges"`
Variables []Variable `json:"variables,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
VisualWorkflow represents a workflow designed in visual builder.
func Import ¶
func Import(data []byte) (*VisualWorkflow, error)
Import imports visual workflow from JSON.
func (*VisualWorkflow) Export ¶
func (vw *VisualWorkflow) Export() ([]byte, error)
Export exports visual workflow to JSON.
func (*VisualWorkflow) Validate ¶
func (vw *VisualWorkflow) Validate() error
Validate validates the visual workflow.
type Workflow ¶
type Workflow interface {
Runnable
// Name 返回工作流名称
Name() string
// Description 返回工作流描述
Description() string
}
Workflow 工作流接口 Workflow 是预定义的步骤序列,提供可预测和一致的执行
type WorkflowStreamEmitter ¶ added in v1.0.0
type WorkflowStreamEmitter func(WorkflowStreamEvent)
WorkflowStreamEmitter is a callback that receives workflow stream events.
type WorkflowStreamEvent ¶ added in v1.0.0
type WorkflowStreamEvent struct {
Type WorkflowStreamEventType `json:"type"`
NodeID string `json:"node_id,omitempty"`
NodeName string `json:"node_name,omitempty"`
Data any `json:"data,omitempty"`
Error error `json:"-"`
}
WorkflowStreamEvent carries information about a workflow execution event.
type WorkflowStreamEventType ¶ added in v1.0.0
type WorkflowStreamEventType string
WorkflowStreamEventType defines the type of workflow stream event.
const ( // WorkflowEventNodeStart is emitted before a DAG node begins execution. WorkflowEventNodeStart WorkflowStreamEventType = "node_start" // WorkflowEventNodeComplete is emitted after a DAG node finishes successfully. WorkflowEventNodeComplete WorkflowStreamEventType = "node_complete" // WorkflowEventNodeError is emitted when a DAG node fails. WorkflowEventNodeError WorkflowStreamEventType = "node_error" // WorkflowEventStepProgress is emitted for intermediate step progress. WorkflowEventStepProgress WorkflowStreamEventType = "step_progress" // WorkflowEventToken is emitted for streaming token output from LLM steps. WorkflowEventToken WorkflowStreamEventType = "token" )