Documentation
¶
Overview ¶
包 workflow 提供工作流编排与执行能力。
该包包含 AgentFlow 在 `workflow` 目录下的核心实现。
Index ¶
- func RegisterChannel[T any](sg *StateGraph, channel *Channel[T])
- func ValidateDAGDefinition(def *DAGDefinition) error
- type AgentAdapter
- type AgentAdapterOption
- type AgentExecutor
- type AgentInterface
- type AgentRouter
- type AgentStep
- type AgentStepOption
- type Aggregator
- type AggregatorFunc
- type Annotation
- type ChainWorkflow
- type Channel
- type ChannelOption
- type CheckpointDiff
- type CheckpointManager
- type CheckpointStore
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerEvent
- type CircuitBreakerEventHandler
- type CircuitBreakerRegistry
- type CircuitState
- type CodeStep
- type ConditionFunc
- type ConditionalAgentStep
- func (c *ConditionalAgentStep) Default(agent AgentExecutor) *ConditionalAgentStep
- func (c *ConditionalAgentStep) Execute(ctx context.Context, input any) (any, error)
- func (c *ConditionalAgentStep) Name() string
- func (c *ConditionalAgentStep) When(check func(ctx context.Context, input any) bool, agent AgentExecutor) *ConditionalAgentStep
- type DAGBuilder
- func (b *DAGBuilder) AddEdge(from, to string) *DAGBuilder
- func (b *DAGBuilder) AddNode(id string, nodeType NodeType) *NodeBuilder
- func (b *DAGBuilder) Build() (*DAGWorkflow, error)
- func (b *DAGBuilder) SetEntry(nodeID string) *DAGBuilder
- func (b *DAGBuilder) WithDescription(desc string) *DAGBuilder
- func (b *DAGBuilder) WithLogger(logger *zap.Logger) *DAGBuilder
- type DAGDefinition
- func (d *DAGDefinition) MarshalJSON() ([]byte, error)
- func (d *DAGDefinition) MarshalYAML() (any, error)
- func (d *DAGDefinition) SaveToJSONFile(filename string) error
- func (d *DAGDefinition) SaveToYAMLFile(filename string) error
- func (d *DAGDefinition) ToJSON() (string, error)
- func (d *DAGDefinition) ToYAML() (string, error)
- func (d *DAGDefinition) UnmarshalJSON(data []byte) error
- func (d *DAGDefinition) UnmarshalYAML(node *yaml.Node) error
- type DAGExecutor
- func (e *DAGExecutor) Execute(ctx context.Context, graph *DAGGraph, input any) (any, error)
- func (e *DAGExecutor) GetCircuitBreakerStates() map[string]CircuitState
- func (e *DAGExecutor) GetExecutionID() string
- func (e *DAGExecutor) GetHistory() *ExecutionHistory
- func (e *DAGExecutor) GetHistoryStore() *ExecutionHistoryStore
- func (e *DAGExecutor) GetNodeResult(nodeID string) (any, bool)
- func (e *DAGExecutor) SetCircuitBreakerConfig(config CircuitBreakerConfig, handler CircuitBreakerEventHandler)
- func (e *DAGExecutor) SetHistoryStore(store *ExecutionHistoryStore)
- type DAGGraph
- func (g *DAGGraph) AddEdge(fromID, toID string)
- func (g *DAGGraph) AddNode(node *DAGNode)
- func (g *DAGGraph) Edges() map[string][]string
- func (g *DAGGraph) GetEdges(nodeID string) []string
- func (g *DAGGraph) GetEntry() string
- func (g *DAGGraph) GetNode(nodeID string) (*DAGNode, bool)
- func (g *DAGGraph) Nodes() map[string]*DAGNode
- func (g *DAGGraph) SetEntry(nodeID string)
- type DAGNode
- type DAGWorkflow
- func (w *DAGWorkflow) Description() string
- func (w *DAGWorkflow) Execute(ctx context.Context, input any) (any, error)
- func (w *DAGWorkflow) GetMetadata(key string) (any, bool)
- func (w *DAGWorkflow) Graph() *DAGGraph
- func (w *DAGWorkflow) Name() string
- func (w *DAGWorkflow) SetExecutor(executor *DAGExecutor)
- func (w *DAGWorkflow) SetMetadata(key string, value any)
- func (w *DAGWorkflow) ToDAGDefinition() *DAGDefinition
- type EnhancedCheckpoint
- type EnhancedCheckpointManager
- func (m *EnhancedCheckpointManager) Compare(ctx context.Context, threadID string, v1, v2 int) (*CheckpointDiff, error)
- func (m *EnhancedCheckpointManager) CreateCheckpoint(ctx context.Context, executor *DAGExecutor, graph *DAGGraph, threadID string, ...) (*EnhancedCheckpoint, error)
- func (m *EnhancedCheckpointManager) GetHistory(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
- func (m *EnhancedCheckpointManager) ResumeFromCheckpoint(ctx context.Context, checkpointID string, graph *DAGGraph) (*DAGExecutor, error)
- func (m *EnhancedCheckpointManager) Rollback(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
- type ErrorConfig
- type ErrorStrategy
- type ExecutionContext
- func (ec *ExecutionContext) GetNodeResult(nodeID string) (any, bool)
- func (ec *ExecutionContext) GetVariable(key string) (any, bool)
- func (ec *ExecutionContext) SetCurrentNode(nodeID string)
- func (ec *ExecutionContext) SetNodeResult(nodeID string, result any)
- func (ec *ExecutionContext) SetVariable(key string, value any)
- type ExecutionHistory
- func (h *ExecutionHistory) Complete(err error)
- func (h *ExecutionHistory) GetNodeByID(nodeID string) *NodeExecution
- func (h *ExecutionHistory) GetNodes() []*NodeExecution
- func (h *ExecutionHistory) RecordNodeEnd(node *NodeExecution, output any, err error)
- func (h *ExecutionHistory) RecordNodeStart(nodeID string, nodeType NodeType, input any) *NodeExecution
- type ExecutionHistoryStore
- func (s *ExecutionHistoryStore) Get(executionID string) (*ExecutionHistory, bool)
- func (s *ExecutionHistoryStore) ListByStatus(status ExecutionStatus) []*ExecutionHistory
- func (s *ExecutionHistoryStore) ListByTimeRange(start, end time.Time) []*ExecutionHistory
- func (s *ExecutionHistoryStore) ListByWorkflow(workflowID string) []*ExecutionHistory
- func (s *ExecutionHistoryStore) Save(history *ExecutionHistory)
- type ExecutionStatus
- type FuncAggregator
- type FuncHandler
- type FuncRouter
- type FuncStep
- type FuncTask
- type GraphSnapshot
- type Handler
- type HandlerFunc
- type HumanInputHandler
- type HumanInputStep
- type InMemoryCheckpointStore
- func (s *InMemoryCheckpointStore) Delete(ctx context.Context, id string) error
- func (s *InMemoryCheckpointStore) ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) Load(ctx context.Context, id string) (*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) Save(ctx context.Context, cp *EnhancedCheckpoint) error
- type IteratorFunc
- type LLMStep
- type LoopConfig
- type LoopDefinition
- type LoopType
- type NativeAgentAdapter
- type NodeBuilder
- func (nb *NodeBuilder) Done() *DAGBuilder
- func (nb *NodeBuilder) WithCondition(cond ConditionFunc) *NodeBuilder
- func (nb *NodeBuilder) WithErrorConfig(config ErrorConfig) *NodeBuilder
- func (nb *NodeBuilder) WithLoop(config LoopConfig) *NodeBuilder
- func (nb *NodeBuilder) WithMetadata(key string, value any) *NodeBuilder
- func (nb *NodeBuilder) WithOnFalse(nodeIDs ...string) *NodeBuilder
- func (nb *NodeBuilder) WithOnTrue(nodeIDs ...string) *NodeBuilder
- func (nb *NodeBuilder) WithStep(step Step) *NodeBuilder
- func (nb *NodeBuilder) WithSubGraph(subGraph *DAGGraph) *NodeBuilder
- type NodeConfig
- type NodeDefinition
- type NodeExecution
- type NodeOutput
- type NodeSnapshot
- type NodeType
- type ParallelAgentStep
- type ParallelWorkflow
- type PassthroughStep
- type Port
- type Position
- type Reducer
- type Router
- type RouterFunc
- type RoutingWorkflow
- func (w *RoutingWorkflow) Description() string
- func (w *RoutingWorkflow) Execute(ctx context.Context, input any) (any, error)
- func (w *RoutingWorkflow) Name() string
- func (w *RoutingWorkflow) RegisterHandler(route string, handler Handler)
- func (w *RoutingWorkflow) Routes() []string
- func (w *RoutingWorkflow) SetDefaultRoute(route string)
- type StateGraph
- type StateSnapshot
- type Step
- type StepFunc
- type Task
- type TaskFunc
- type TaskResult
- type Tool
- type ToolRegistry
- type ToolStep
- type Variable
- type VisualBuilder
- type VisualEdge
- type VisualNode
- type VisualNodeType
- type VisualWorkflow
- type Workflow
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RegisterChannel ¶
func RegisterChannel[T any](sg *StateGraph, channel *Channel[T])
RegisterChannel registers a channel with the state graph.
func ValidateDAGDefinition ¶
func ValidateDAGDefinition(def *DAGDefinition) error
ValidateDAGDefinition validates a loaded DAGDefinition
Types ¶
type AgentAdapter ¶
type AgentAdapter struct {
// contains filtered or unexported fields
}
AgentAdapter adapts an AgentInterface to the AgentExecutor interface, allowing any agent implementation to be used in workflow steps.
func NewAgentAdapter ¶
func NewAgentAdapter(agent AgentInterface, opts ...AgentAdapterOption) *AgentAdapter
NewAgentAdapter creates an AgentAdapter that implements AgentExecutor.
type AgentAdapterOption ¶
type AgentAdapterOption func(*AgentAdapter)
AgentAdapterOption configures an AgentAdapter.
func WithAgentInputMapper ¶
func WithAgentInputMapper(mapper func(any) (string, error)) AgentAdapterOption
WithAgentInputMapper sets a custom function to convert workflow input to agent string input.
func WithAgentOutputMapper ¶
func WithAgentOutputMapper(mapper func(string) (any, error)) AgentAdapterOption
WithAgentOutputMapper sets a custom function to convert agent string output to workflow output.
type AgentExecutor ¶
type AgentExecutor interface {
// Execute executes the agent with the given input.
Execute(ctx context.Context, input any) (any, error)
// ID returns the agent's unique identifier.
ID() string
// Name returns the agent's name.
Name() string
}
AgentExecutor defines the interface for agent execution. This allows workflow to use agents without direct dependency on agent package.
type AgentInterface ¶
type AgentInterface interface {
// Execute runs the agent with a string prompt and returns a string response.
Execute(ctx context.Context, input string) (string, error)
// ID returns the agent's unique identifier.
ID() string
// Name returns the agent's display name.
Name() string
}
AgentInterface is a minimal agent contract defined in the workflow package to avoid importing the agent package (which would cause circular imports). The agent.Agent interface uses (ctx, *Input) -> (*Output, error), but this interface uses plain string I/O for simplicity. Callers can use AgentAdapterOption functions to customize input/output conversion.
type AgentRouter ¶
type AgentRouter struct {
// contains filtered or unexported fields
}
AgentRouter routes tasks to appropriate agents based on criteria.
func NewAgentRouter ¶
func NewAgentRouter(selector func(ctx context.Context, input any, agents map[string]AgentExecutor) (AgentExecutor, error)) *AgentRouter
NewAgentRouter creates a new AgentRouter.
func (*AgentRouter) Execute ¶
Execute implements the Step interface by routing to the appropriate agent.
func (*AgentRouter) RegisterAgent ¶
func (r *AgentRouter) RegisterAgent(agent AgentExecutor)
RegisterAgent registers an agent with the router.
type AgentStep ¶
type AgentStep struct {
// contains filtered or unexported fields
}
AgentStep wraps an AgentExecutor as a workflow Step. This allows agents to be used as steps in workflow chains.
func NewAgentStep ¶
func NewAgentStep(agent AgentExecutor, opts ...AgentStepOption) *AgentStep
NewAgentStep creates a new AgentStep from an AgentExecutor.
type AgentStepOption ¶
type AgentStepOption func(*AgentStep)
AgentStepOption configures an AgentStep.
func WithInputMapper ¶
func WithInputMapper(mapper func(any) (any, error)) AgentStepOption
WithInputMapper sets a function to transform input before agent execution.
func WithOutputMapper ¶
func WithOutputMapper(mapper func(any) (any, error)) AgentStepOption
WithOutputMapper sets a function to transform output after agent execution.
func WithStepName ¶
func WithStepName(name string) AgentStepOption
WithStepName sets a custom name for the step.
type Aggregator ¶
type Aggregator interface {
// Aggregate 聚合结果
Aggregate(ctx context.Context, results []TaskResult) (any, error)
}
Aggregator 聚合器接口 将多个任务的结果聚合为最终输出
type AggregatorFunc ¶
type AggregatorFunc func(ctx context.Context, results []TaskResult) (any, error)
AggregatorFunc 聚合器函数类型
type Annotation ¶
Annotation provides type-safe state definition.
func NewAnnotation ¶
func NewAnnotation[T any](name string, defaultVal T, reducer Reducer[T]) Annotation[T]
NewAnnotation creates a new annotation.
func (Annotation[T]) CreateChannel ¶
func (a Annotation[T]) CreateChannel() *Channel[T]
CreateChannel creates a channel from an annotation.
type ChainWorkflow ¶
type ChainWorkflow struct {
// contains filtered or unexported fields
}
ChainWorkflow 提示词链工作流 将任务分解为固定的步骤序列,每个步骤处理前一步的输出
func NewChainWorkflow ¶
func NewChainWorkflow(name, description string, steps ...Step) *ChainWorkflow
NewChainWorkflow 创建提示词链工作流
func (*ChainWorkflow) Description ¶
func (w *ChainWorkflow) Description() string
func (*ChainWorkflow) Name ¶
func (w *ChainWorkflow) Name() string
type Channel ¶
type Channel[T any] struct { // contains filtered or unexported fields }
Channel represents a state channel with optional reducer.
func GetChannel ¶
func GetChannel[T any](sg *StateGraph, name string) (*Channel[T], error)
GetChannel retrieves a typed channel by name.
func NewChannel ¶
func NewChannel[T any](name string, initial T, opts ...ChannelOption[T]) *Channel[T]
NewChannel creates a new state channel.
type ChannelOption ¶
ChannelOption configures a channel.
func WithHistory ¶
func WithHistory[T any](max int) ChannelOption[T]
WithHistory enables history tracking with max entries.
func WithReducer ¶
func WithReducer[T any](r Reducer[T]) ChannelOption[T]
WithReducer sets a custom reducer for the channel.
type CheckpointDiff ¶
type CheckpointDiff struct {
Version1 int `json:"version1"`
Version2 int `json:"version2"`
AddedNodes []string `json:"added_nodes"`
RemovedNodes []string `json:"removed_nodes"`
ChangedNodes []string `json:"changed_nodes"`
TimeDifference time.Duration `json:"time_difference"`
}
CheckpointDiff represents differences between checkpoints.
type CheckpointManager ¶
CheckpointManager interface for checkpoint integration
type CheckpointStore ¶
type CheckpointStore interface {
Save(ctx context.Context, checkpoint *EnhancedCheckpoint) error
Load(ctx context.Context, checkpointID string) (*EnhancedCheckpoint, error)
LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
Delete(ctx context.Context, checkpointID string) error
}
CheckpointStore defines storage interface for enhanced checkpoints (Workflow layer).
Note: Two CheckpointStore interfaces exist in the project, operating on different types:
- agent.CheckpointStore — operates on *agent.Checkpoint (agent state, List/DeleteThread/Rollback)
- workflow.CheckpointStore (this) — operates on *workflow.EnhancedCheckpoint (DAG node results, time-travel)
They cannot be unified because the checkpoint structs have different fields (agent state vs DAG execution state).
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker 熔断器实现
func NewCircuitBreaker ¶
func NewCircuitBreaker( nodeID string, config CircuitBreakerConfig, eventHandler CircuitBreakerEventHandler, logger *zap.Logger, ) *CircuitBreaker
NewCircuitBreaker 创建熔断器
func (*CircuitBreaker) AllowRequest ¶
func (cb *CircuitBreaker) AllowRequest() (bool, error)
AllowRequest 检查是否允许请求通过
func (*CircuitBreaker) GetFailures ¶
func (cb *CircuitBreaker) GetFailures() int
GetFailures 获取当前失败次数
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
// FailureThreshold 连续失败次数阈值,达到后触发熔断
FailureThreshold int `json:"failure_threshold"`
// RecoveryTimeout 熔断后等待恢复的时间
RecoveryTimeout time.Duration `json:"recovery_timeout"`
// HalfOpenMaxProbes 半开状态允许的探测请求数
HalfOpenMaxProbes int `json:"half_open_max_probes"`
// SuccessThresholdInHalfOpen 半开状态下连续成功多少次后恢复
SuccessThresholdInHalfOpen int `json:"success_threshold_in_half_open"`
}
CircuitBreakerConfig 熔断器配置
func DefaultCircuitBreakerConfig ¶
func DefaultCircuitBreakerConfig() CircuitBreakerConfig
DefaultCircuitBreakerConfig 默认熔断器配置
type CircuitBreakerEvent ¶
type CircuitBreakerEvent struct {
NodeID string `json:"node_id"`
OldState CircuitState `json:"old_state"`
NewState CircuitState `json:"new_state"`
Timestamp time.Time `json:"timestamp"`
Reason string `json:"reason"`
Failures int `json:"failures"`
}
CircuitBreakerEvent 熔断器状态变更事件
type CircuitBreakerEventHandler ¶
type CircuitBreakerEventHandler interface {
OnStateChange(event CircuitBreakerEvent)
}
CircuitBreakerEventHandler 事件处理器接口
type CircuitBreakerRegistry ¶
type CircuitBreakerRegistry struct {
// contains filtered or unexported fields
}
CircuitBreakerRegistry 熔断器注册表,管理所有节点的熔断器
func NewCircuitBreakerRegistry ¶
func NewCircuitBreakerRegistry( config CircuitBreakerConfig, eventHandler CircuitBreakerEventHandler, logger *zap.Logger, ) *CircuitBreakerRegistry
NewCircuitBreakerRegistry 创建熔断器注册表
func (*CircuitBreakerRegistry) GetAllStates ¶
func (r *CircuitBreakerRegistry) GetAllStates() map[string]CircuitState
GetAllStates 获取所有熔断器状态
func (*CircuitBreakerRegistry) GetOrCreate ¶
func (r *CircuitBreakerRegistry) GetOrCreate(nodeID string) *CircuitBreaker
GetOrCreate 获取或创建节点的熔断器
func (*CircuitBreakerRegistry) ResetAll ¶
func (r *CircuitBreakerRegistry) ResetAll()
ResetAll 重置所有熔断器
type CircuitState ¶
type CircuitState int
CircuitState represents the state of a circuit breaker. This is an independent definition equivalent to circuitbreaker.State in llm/circuitbreaker/. The workflow package maintains its own copy to avoid depending on the llm package (dependency direction: llm <- workflow, not workflow -> llm).
const ( // CircuitClosed 正常状态,允许请求通过 CircuitClosed CircuitState = iota // CircuitOpen 熔断状态,拒绝所有请求 CircuitOpen // CircuitHalfOpen 半开状态,允许探测请求 CircuitHalfOpen )
func (CircuitState) String ¶
func (s CircuitState) String() string
type ConditionFunc ¶
ConditionFunc evaluates a condition and returns true or false
type ConditionalAgentStep ¶
type ConditionalAgentStep struct {
// contains filtered or unexported fields
}
ConditionalAgentStep executes different agents based on conditions.
func NewConditionalAgentStep ¶
func NewConditionalAgentStep() *ConditionalAgentStep
NewConditionalAgentStep creates a conditional agent step.
func (*ConditionalAgentStep) Default ¶
func (c *ConditionalAgentStep) Default(agent AgentExecutor) *ConditionalAgentStep
Default sets the default agent when no conditions match.
func (*ConditionalAgentStep) Name ¶
func (c *ConditionalAgentStep) Name() string
Name implements the Step interface.
func (*ConditionalAgentStep) When ¶
func (c *ConditionalAgentStep) When(check func(ctx context.Context, input any) bool, agent AgentExecutor) *ConditionalAgentStep
When adds a condition-agent pair.
type DAGBuilder ¶
type DAGBuilder struct {
// contains filtered or unexported fields
}
DAGBuilder provides a fluent API for constructing DAG workflows
func NewDAGBuilder ¶
func NewDAGBuilder(name string) *DAGBuilder
NewDAGBuilder creates a new DAG builder with the given name
func (*DAGBuilder) AddEdge ¶
func (b *DAGBuilder) AddEdge(from, to string) *DAGBuilder
AddEdge adds a directed edge from one node to another
func (*DAGBuilder) AddNode ¶
func (b *DAGBuilder) AddNode(id string, nodeType NodeType) *NodeBuilder
AddNode adds a node to the graph and returns a NodeBuilder for configuration
func (*DAGBuilder) Build ¶
func (b *DAGBuilder) Build() (*DAGWorkflow, error)
Build validates the DAG and creates a DAGWorkflow
func (*DAGBuilder) SetEntry ¶
func (b *DAGBuilder) SetEntry(nodeID string) *DAGBuilder
SetEntry sets the entry node for the workflow
func (*DAGBuilder) WithDescription ¶
func (b *DAGBuilder) WithDescription(desc string) *DAGBuilder
WithDescription sets the workflow description
func (*DAGBuilder) WithLogger ¶
func (b *DAGBuilder) WithLogger(logger *zap.Logger) *DAGBuilder
WithLogger sets a custom logger
type DAGDefinition ¶
type DAGDefinition struct {
// Name is the workflow name
Name string `json:"name" yaml:"name"`
// Description describes the workflow
Description string `json:"description" yaml:"description"`
// Entry is the ID of the entry node
Entry string `json:"entry" yaml:"entry"`
// Nodes contains all node definitions
Nodes []NodeDefinition `json:"nodes" yaml:"nodes"`
// Metadata stores additional workflow information
Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
DAGDefinition represents a serializable workflow definition
func FromJSON ¶
func FromJSON(jsonStr string) (*DAGDefinition, error)
FromJSON creates a DAGDefinition from JSON string
func FromYAML ¶
func FromYAML(yamlStr string) (*DAGDefinition, error)
FromYAML creates a DAGDefinition from YAML string
func LoadFromJSONFile ¶
func LoadFromJSONFile(filename string) (*DAGDefinition, error)
LoadFromJSONFile loads a DAGDefinition from a JSON file
func LoadFromYAMLFile ¶
func LoadFromYAMLFile(filename string) (*DAGDefinition, error)
LoadFromYAMLFile loads a DAGDefinition from a YAML file
func (*DAGDefinition) MarshalJSON ¶
func (d *DAGDefinition) MarshalJSON() ([]byte, error)
MarshalJSON serializes a DAGDefinition to JSON
func (*DAGDefinition) MarshalYAML ¶
func (d *DAGDefinition) MarshalYAML() (any, error)
MarshalYAML serializes a DAGDefinition to YAML
func (*DAGDefinition) SaveToJSONFile ¶
func (d *DAGDefinition) SaveToJSONFile(filename string) error
SaveToJSONFile saves a DAGDefinition to a JSON file
func (*DAGDefinition) SaveToYAMLFile ¶
func (d *DAGDefinition) SaveToYAMLFile(filename string) error
SaveToYAMLFile saves a DAGDefinition to a YAML file
func (*DAGDefinition) ToJSON ¶
func (d *DAGDefinition) ToJSON() (string, error)
ToJSON converts a DAGDefinition to JSON string
func (*DAGDefinition) ToYAML ¶
func (d *DAGDefinition) ToYAML() (string, error)
ToYAML converts a DAGDefinition to YAML string
func (*DAGDefinition) UnmarshalJSON ¶
func (d *DAGDefinition) UnmarshalJSON(data []byte) error
UnmarshalJSON deserializes a DAGDefinition from JSON
func (*DAGDefinition) UnmarshalYAML ¶
func (d *DAGDefinition) UnmarshalYAML(node *yaml.Node) error
UnmarshalYAML deserializes a DAGDefinition from YAML
type DAGExecutor ¶
type DAGExecutor struct {
// contains filtered or unexported fields
}
DAGExecutor executes DAG workflows with dependency resolution
func NewDAGExecutor ¶
func NewDAGExecutor(checkpointMgr CheckpointManager, logger *zap.Logger) *DAGExecutor
NewDAGExecutor creates a new DAG executor
func (*DAGExecutor) GetCircuitBreakerStates ¶
func (e *DAGExecutor) GetCircuitBreakerStates() map[string]CircuitState
GetCircuitBreakerStates 获取所有熔断器状态
func (*DAGExecutor) GetExecutionID ¶
func (e *DAGExecutor) GetExecutionID() string
GetExecutionID returns the current execution ID
func (*DAGExecutor) GetHistory ¶
func (e *DAGExecutor) GetHistory() *ExecutionHistory
GetHistory returns the execution history for the current execution
func (*DAGExecutor) GetHistoryStore ¶
func (e *DAGExecutor) GetHistoryStore() *ExecutionHistoryStore
GetHistoryStore returns the history store
func (*DAGExecutor) GetNodeResult ¶
func (e *DAGExecutor) GetNodeResult(nodeID string) (any, bool)
GetNodeResult retrieves the result of a completed node
func (*DAGExecutor) SetCircuitBreakerConfig ¶
func (e *DAGExecutor) SetCircuitBreakerConfig(config CircuitBreakerConfig, handler CircuitBreakerEventHandler)
SetCircuitBreakerConfig 设置熔断器配置
func (*DAGExecutor) SetHistoryStore ¶
func (e *DAGExecutor) SetHistoryStore(store *ExecutionHistoryStore)
SetHistoryStore sets a custom history store
type DAGGraph ¶
type DAGGraph struct {
// contains filtered or unexported fields
}
DAGGraph represents the workflow structure as a directed acyclic graph
type DAGNode ¶
type DAGNode struct {
// ID is the unique identifier for this node
ID string
// Type specifies the node type
Type NodeType
// Step is the step to execute (for action nodes)
Step Step
// Condition evaluates branching logic (for conditional nodes)
Condition ConditionFunc
// LoopConfig defines loop behavior (for loop nodes)
LoopConfig *LoopConfig
// SubGraph is a nested workflow (for subgraph nodes)
SubGraph *DAGGraph
// ErrorConfig defines error handling behavior
ErrorConfig *ErrorConfig
// Metadata stores additional node information
Metadata map[string]any
}
DAGNode represents a single node in the workflow graph
type DAGWorkflow ¶
type DAGWorkflow struct {
// contains filtered or unexported fields
}
DAGWorkflow represents a DAG-based workflow
func NewDAGWorkflow ¶
func NewDAGWorkflow(name, description string, graph *DAGGraph) *DAGWorkflow
NewDAGWorkflow creates a new DAG workflow
func (*DAGWorkflow) Description ¶
func (w *DAGWorkflow) Description() string
Description returns the workflow description
func (*DAGWorkflow) GetMetadata ¶
func (w *DAGWorkflow) GetMetadata(key string) (any, bool)
GetMetadata retrieves a metadata value
func (*DAGWorkflow) Graph ¶
func (w *DAGWorkflow) Graph() *DAGGraph
Graph returns the underlying DAG graph
func (*DAGWorkflow) SetExecutor ¶
func (w *DAGWorkflow) SetExecutor(executor *DAGExecutor)
SetExecutor sets a custom executor for the workflow
func (*DAGWorkflow) SetMetadata ¶
func (w *DAGWorkflow) SetMetadata(key string, value any)
SetMetadata sets a metadata value
func (*DAGWorkflow) ToDAGDefinition ¶
func (w *DAGWorkflow) ToDAGDefinition() *DAGDefinition
ToDAGDefinition converts a DAGWorkflow to a DAGDefinition for serialization Note: This only captures the structure, not the runtime functions (conditions, iterators, steps)
type EnhancedCheckpoint ¶
type EnhancedCheckpoint struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
ThreadID string `json:"thread_id"`
Version int `json:"version"`
NodeID string `json:"node_id"`
NodeResults map[string]any `json:"node_results"`
Variables map[string]any `json:"variables"`
PendingNodes []string `json:"pending_nodes"`
CompletedNodes []string `json:"completed_nodes"`
Input any `json:"input"`
CreatedAt time.Time `json:"created_at"`
ParentID string `json:"parent_id,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Snapshot *GraphSnapshot `json:"snapshot,omitempty"`
}
EnhancedCheckpoint represents a workflow checkpoint with full state.
type EnhancedCheckpointManager ¶
type EnhancedCheckpointManager struct {
// contains filtered or unexported fields
}
EnhancedCheckpointManager manages workflow checkpoints with time-travel.
func NewEnhancedCheckpointManager ¶
func NewEnhancedCheckpointManager(store CheckpointStore, logger *zap.Logger) *EnhancedCheckpointManager
NewEnhancedCheckpointManager creates a new checkpoint manager.
func (*EnhancedCheckpointManager) Compare ¶
func (m *EnhancedCheckpointManager) Compare(ctx context.Context, threadID string, v1, v2 int) (*CheckpointDiff, error)
Compare compares two checkpoint versions.
func (*EnhancedCheckpointManager) CreateCheckpoint ¶
func (m *EnhancedCheckpointManager) CreateCheckpoint(ctx context.Context, executor *DAGExecutor, graph *DAGGraph, threadID string, input any) (*EnhancedCheckpoint, error)
CreateCheckpoint creates a checkpoint from current execution state.
func (*EnhancedCheckpointManager) GetHistory ¶
func (m *EnhancedCheckpointManager) GetHistory(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
GetHistory returns checkpoint history for time-travel debugging.
func (*EnhancedCheckpointManager) ResumeFromCheckpoint ¶
func (m *EnhancedCheckpointManager) ResumeFromCheckpoint(ctx context.Context, checkpointID string, graph *DAGGraph) (*DAGExecutor, error)
ResumeFromCheckpoint resumes workflow execution from a checkpoint.
func (*EnhancedCheckpointManager) Rollback ¶
func (m *EnhancedCheckpointManager) Rollback(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
Rollback rolls back to a specific version.
type ErrorConfig ¶
type ErrorConfig struct {
// Strategy specifies how to handle errors
Strategy ErrorStrategy
// MaxRetries is the maximum number of retry attempts (for retry strategy)
MaxRetries int
// RetryDelayMs is the delay between retries in milliseconds
RetryDelayMs int
// FallbackValue is the value to use when skipping a failed node
FallbackValue any
}
ErrorConfig defines error handling behavior for a node
type ErrorStrategy ¶
type ErrorStrategy string
ErrorStrategy defines how errors should be handled
const ( // ErrorStrategyFailFast stops execution immediately on error ErrorStrategyFailFast ErrorStrategy = "fail_fast" // ErrorStrategySkip skips the failed node and continues ErrorStrategySkip ErrorStrategy = "skip" // ErrorStrategyRetry retries the failed node ErrorStrategyRetry ErrorStrategy = "retry" )
type ExecutionContext ¶
type ExecutionContext struct {
// WorkflowID identifies the workflow being executed
WorkflowID string `json:"workflow_id,omitempty"`
// CurrentNode is the ID of the currently executing node
CurrentNode string `json:"current_node,omitempty"`
// NodeResults stores the results of completed nodes
NodeResults map[string]any `json:"node_results,omitempty"`
// Variables stores workflow variables
Variables map[string]any `json:"variables,omitempty"`
// StartTime is when the workflow execution started
StartTime time.Time `json:"start_time,omitempty"`
// LastUpdateTime is when the context was last updated
LastUpdateTime time.Time `json:"last_update_time,omitempty"`
}
ExecutionContext captures the execution state for checkpointing
func NewExecutionContext ¶
func NewExecutionContext(workflowID string) *ExecutionContext
NewExecutionContext creates a new execution context
func (*ExecutionContext) GetNodeResult ¶
func (ec *ExecutionContext) GetNodeResult(nodeID string) (any, bool)
GetNodeResult retrieves the result of a completed node
func (*ExecutionContext) GetVariable ¶
func (ec *ExecutionContext) GetVariable(key string) (any, bool)
GetVariable retrieves a workflow variable
func (*ExecutionContext) SetCurrentNode ¶
func (ec *ExecutionContext) SetCurrentNode(nodeID string)
SetCurrentNode updates the currently executing node
func (*ExecutionContext) SetNodeResult ¶
func (ec *ExecutionContext) SetNodeResult(nodeID string, result any)
SetNodeResult stores the result of a completed node
func (*ExecutionContext) SetVariable ¶
func (ec *ExecutionContext) SetVariable(key string, value any)
SetVariable sets a workflow variable
type ExecutionHistory ¶
type ExecutionHistory struct {
ExecutionID string `json:"execution_id"`
WorkflowID string `json:"workflow_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
Status ExecutionStatus `json:"status"`
Nodes []*NodeExecution `json:"nodes"`
Error string `json:"error,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
// contains filtered or unexported fields
}
ExecutionHistory records the complete execution path of a workflow
func NewExecutionHistory ¶
func NewExecutionHistory(executionID, workflowID string) *ExecutionHistory
NewExecutionHistory creates a new execution history
func (*ExecutionHistory) Complete ¶
func (h *ExecutionHistory) Complete(err error)
Complete marks the execution as completed
func (*ExecutionHistory) GetNodeByID ¶
func (h *ExecutionHistory) GetNodeByID(nodeID string) *NodeExecution
GetNodeByID returns the execution record for a specific node
func (*ExecutionHistory) GetNodes ¶
func (h *ExecutionHistory) GetNodes() []*NodeExecution
GetNodes returns a copy of the node executions
func (*ExecutionHistory) RecordNodeEnd ¶
func (h *ExecutionHistory) RecordNodeEnd(node *NodeExecution, output any, err error)
RecordNodeEnd records the end of a node execution
func (*ExecutionHistory) RecordNodeStart ¶
func (h *ExecutionHistory) RecordNodeStart(nodeID string, nodeType NodeType, input any) *NodeExecution
RecordNodeStart records the start of a node execution
type ExecutionHistoryStore ¶
type ExecutionHistoryStore struct {
// contains filtered or unexported fields
}
ExecutionHistoryStore stores and queries execution histories
func NewExecutionHistoryStore ¶
func NewExecutionHistoryStore() *ExecutionHistoryStore
NewExecutionHistoryStore creates a new execution history store
func (*ExecutionHistoryStore) Get ¶
func (s *ExecutionHistoryStore) Get(executionID string) (*ExecutionHistory, bool)
Get retrieves an execution history by ID
func (*ExecutionHistoryStore) ListByStatus ¶
func (s *ExecutionHistoryStore) ListByStatus(status ExecutionStatus) []*ExecutionHistory
ListByStatus returns executions with a specific status
func (*ExecutionHistoryStore) ListByTimeRange ¶
func (s *ExecutionHistoryStore) ListByTimeRange(start, end time.Time) []*ExecutionHistory
ListByTimeRange returns executions within a time range
func (*ExecutionHistoryStore) ListByWorkflow ¶
func (s *ExecutionHistoryStore) ListByWorkflow(workflowID string) []*ExecutionHistory
ListByWorkflow returns all executions for a workflow
func (*ExecutionHistoryStore) Save ¶
func (s *ExecutionHistoryStore) Save(history *ExecutionHistory)
Save saves an execution history
type ExecutionStatus ¶
type ExecutionStatus string
ExecutionStatus represents the status of an execution
const ( // ExecutionStatusRunning indicates the execution is in progress ExecutionStatusRunning ExecutionStatus = "running" // ExecutionStatusCompleted indicates the execution completed successfully ExecutionStatusCompleted ExecutionStatus = "completed" // ExecutionStatusFailed indicates the execution failed ExecutionStatusFailed ExecutionStatus = "failed" )
type FuncAggregator ¶
type FuncAggregator struct {
// contains filtered or unexported fields
}
FuncAggregator 函数聚合器
func NewFuncAggregator ¶
func NewFuncAggregator(fn AggregatorFunc) *FuncAggregator
NewFuncAggregator 创建函数聚合器
func (*FuncAggregator) Aggregate ¶
func (a *FuncAggregator) Aggregate(ctx context.Context, results []TaskResult) (any, error)
type FuncHandler ¶
type FuncHandler struct {
// contains filtered or unexported fields
}
FuncHandler 函数处理器
func NewFuncHandler ¶
func NewFuncHandler(name string, fn HandlerFunc) *FuncHandler
NewFuncHandler 创建函数处理器
func (*FuncHandler) Name ¶
func (h *FuncHandler) Name() string
type FuncRouter ¶
type FuncRouter struct {
// contains filtered or unexported fields
}
FuncRouter 函数路由器
type GraphSnapshot ¶
type GraphSnapshot struct {
Nodes map[string]NodeSnapshot `json:"nodes"`
Edges map[string][]string `json:"edges"`
EntryNode string `json:"entry_node"`
}
GraphSnapshot captures the complete graph state.
type Handler ¶
type Handler interface {
// Handle 处理输入
Handle(ctx context.Context, input any) (any, error)
// Name 返回处理器名称
Name() string
}
Handler 处理器接口
type HandlerFunc ¶
HandlerFunc 处理器函数类型
type HumanInputHandler ¶
type HumanInputHandler interface {
// RequestInput sends a prompt to a human and waits for a response.
// inputType hints at the expected response format (e.g. "text", "choice").
// options provides selectable choices when inputType is "choice".
RequestInput(ctx context.Context, prompt string, inputType string, options []string) (any, error)
}
HumanInputHandler abstracts human-in-the-loop interaction for workflow steps. Implement this interface to bridge workflow with your HITL management layer.
type HumanInputStep ¶
type HumanInputStep struct {
Prompt string
Type string
Options []string
Timeout int
Handler HumanInputHandler // Optional: inject to enable real HITL
}
HumanInputStep waits for human input. When Handler is set, it sends a request to the HITL handler and waits for a response. When Handler is nil, it returns a placeholder map (backward compatible).
func (*HumanInputStep) Name ¶
func (s *HumanInputStep) Name() string
type InMemoryCheckpointStore ¶
type InMemoryCheckpointStore struct {
// contains filtered or unexported fields
}
InMemoryCheckpointStore provides in-memory storage.
func NewInMemoryCheckpointStore ¶
func NewInMemoryCheckpointStore() *InMemoryCheckpointStore
NewInMemoryCheckpointStore creates a new in-memory store.
func (*InMemoryCheckpointStore) Delete ¶
func (s *InMemoryCheckpointStore) Delete(ctx context.Context, id string) error
func (*InMemoryCheckpointStore) ListVersions ¶
func (s *InMemoryCheckpointStore) ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) Load ¶
func (s *InMemoryCheckpointStore) Load(ctx context.Context, id string) (*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) LoadLatest ¶
func (s *InMemoryCheckpointStore) LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) LoadVersion ¶
func (s *InMemoryCheckpointStore) LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) Save ¶
func (s *InMemoryCheckpointStore) Save(ctx context.Context, cp *EnhancedCheckpoint) error
type IteratorFunc ¶
IteratorFunc generates a collection of items for iteration
type LLMStep ¶
type LLMStep struct {
Model string
Prompt string
Temperature float64
MaxTokens int
Provider llm.Provider // Optional: inject to enable real LLM calls
}
LLMStep executes an LLM call. When Provider is set, it performs a real LLM completion request. When Provider is nil, it returns a placeholder map (backward compatible).
type LoopConfig ¶
type LoopConfig struct {
// Type specifies the loop type (while, for, foreach)
Type LoopType
// MaxIterations limits the maximum number of iterations (0 = unlimited)
MaxIterations int
// Condition evaluates whether to continue looping (for while loops)
Condition ConditionFunc
// Iterator generates items to iterate over (for foreach loops)
Iterator IteratorFunc
}
LoopConfig defines loop behavior
type LoopDefinition ¶
type LoopDefinition struct {
// Type is the loop type (while, for, foreach)
Type string `json:"type" yaml:"type"`
// MaxIterations limits the maximum number of iterations
MaxIterations int `json:"max_iterations" yaml:"max_iterations"`
// Condition is the condition name (for while loops)
Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
}
LoopDefinition represents a serializable loop configuration
type NativeAgentAdapter ¶
type NativeAgentAdapter struct {
// contains filtered or unexported fields
}
NativeAgentAdapter adapts an agent.Agent (with *Input/*Output signatures) to the AgentExecutor interface used by workflow steps.
Input conversion (any -> *agent.Input):
- *agent.Input: passed through directly
- string: wrapped as Input.Content
- map[string]any: Content extracted from "content" key, rest goes to Context
- other types: converted to string via fmt.Sprintf and set as Content
Output: returns the *agent.Output directly (callers can type-assert).
func NewNativeAgentAdapter ¶
func NewNativeAgentAdapter(a agent.Agent) *NativeAgentAdapter
NewNativeAgentAdapter creates an adapter that bridges agent.Agent to AgentExecutor.
func (*NativeAgentAdapter) Execute ¶
Execute implements AgentExecutor. It converts the workflow input to *agent.Input, calls agent.Execute, and returns the *agent.Output.
func (*NativeAgentAdapter) ID ¶
func (n *NativeAgentAdapter) ID() string
ID implements AgentExecutor.
func (*NativeAgentAdapter) Name ¶
func (n *NativeAgentAdapter) Name() string
Name implements AgentExecutor.
type NodeBuilder ¶
type NodeBuilder struct {
// contains filtered or unexported fields
}
NodeBuilder provides a fluent API for configuring individual nodes
func (*NodeBuilder) Done ¶
func (nb *NodeBuilder) Done() *DAGBuilder
Done completes node configuration and returns to the DAGBuilder
func (*NodeBuilder) WithCondition ¶
func (nb *NodeBuilder) WithCondition(cond ConditionFunc) *NodeBuilder
WithCondition sets the condition function for a conditional node
func (*NodeBuilder) WithErrorConfig ¶
func (nb *NodeBuilder) WithErrorConfig(config ErrorConfig) *NodeBuilder
WithErrorConfig sets the error handling configuration for a node
func (*NodeBuilder) WithLoop ¶
func (nb *NodeBuilder) WithLoop(config LoopConfig) *NodeBuilder
WithLoop sets the loop configuration for a loop node
func (*NodeBuilder) WithMetadata ¶
func (nb *NodeBuilder) WithMetadata(key string, value any) *NodeBuilder
WithMetadata sets a metadata value
func (*NodeBuilder) WithOnFalse ¶
func (nb *NodeBuilder) WithOnFalse(nodeIDs ...string) *NodeBuilder
WithOnFalse sets the nodes to execute when condition is false
func (*NodeBuilder) WithOnTrue ¶
func (nb *NodeBuilder) WithOnTrue(nodeIDs ...string) *NodeBuilder
WithOnTrue sets the nodes to execute when condition is true
func (*NodeBuilder) WithStep ¶
func (nb *NodeBuilder) WithStep(step Step) *NodeBuilder
WithStep sets the step for an action node
func (*NodeBuilder) WithSubGraph ¶
func (nb *NodeBuilder) WithSubGraph(subGraph *DAGGraph) *NodeBuilder
WithSubGraph sets the subgraph for a subgraph node
type NodeConfig ¶
type NodeConfig struct {
// LLM node config
Model string `json:"model,omitempty"`
Prompt string `json:"prompt,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
// Tool node config
ToolName string `json:"tool_name,omitempty"`
ToolParams map[string]any `json:"tool_params,omitempty"`
// Condition node config
Condition string `json:"condition,omitempty"`
Expression string `json:"expression,omitempty"`
// Loop node config
LoopType string `json:"loop_type,omitempty"`
MaxIterations int `json:"max_iterations,omitempty"`
// Code node config
Code string `json:"code,omitempty"`
Language string `json:"language,omitempty"`
// Human input config
InputPrompt string `json:"input_prompt,omitempty"`
InputType string `json:"input_type,omitempty"`
Options []string `json:"options,omitempty"`
Timeout int `json:"timeout_seconds,omitempty"`
// Subflow config
SubflowID string `json:"subflow_id,omitempty"`
}
NodeConfig contains node-specific configuration.
type NodeDefinition ¶
type NodeDefinition struct {
// ID is the unique node identifier
ID string `json:"id" yaml:"id"`
// Type is the node type
Type string `json:"type" yaml:"type"`
// Step is the step name (for action nodes)
Step string `json:"step,omitempty" yaml:"step,omitempty"`
// Condition is the condition name (for conditional nodes)
Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
// Next lists the next nodes to execute (for action nodes)
Next []string `json:"next,omitempty" yaml:"next,omitempty"`
// OnTrue lists nodes to execute when condition is true
OnTrue []string `json:"on_true,omitempty" yaml:"on_true,omitempty"`
// OnFalse lists nodes to execute when condition is false
OnFalse []string `json:"on_false,omitempty" yaml:"on_false,omitempty"`
// Loop defines loop configuration (for loop nodes)
Loop *LoopDefinition `json:"loop,omitempty" yaml:"loop,omitempty"`
// SubGraph defines a nested workflow (for subgraph nodes)
SubGraph *DAGDefinition `json:"subgraph,omitempty" yaml:"subgraph,omitempty"`
// Metadata stores additional node information
Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
NodeDefinition represents a serializable node definition
type NodeExecution ¶
type NodeExecution struct {
NodeID string `json:"node_id"`
NodeType NodeType `json:"node_type"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
Status ExecutionStatus `json:"status"`
Input any `json:"input,omitempty"`
Output any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
}
NodeExecution records the execution of a single node
type NodeOutput ¶
NodeOutput represents output from a graph node.
type NodeSnapshot ¶
type NodeSnapshot struct {
ID string `json:"id"`
Type string `json:"type"`
Status string `json:"status"`
Input any `json:"input,omitempty"`
Output any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Duration int64 `json:"duration_ms,omitempty"`
}
NodeSnapshot captures a node's state.
type NodeType ¶
type NodeType string
NodeType defines the type of a DAG node
const ( // NodeTypeAction executes a step NodeTypeAction NodeType = "action" // NodeTypeCondition performs conditional branching NodeTypeCondition NodeType = "condition" // NodeTypeLoop performs loop iteration NodeTypeLoop NodeType = "loop" // NodeTypeParallel executes nodes concurrently NodeTypeParallel NodeType = "parallel" // NodeTypeSubGraph executes a nested workflow NodeTypeSubGraph NodeType = "subgraph" // NodeTypeCheckpoint creates a checkpoint NodeTypeCheckpoint NodeType = "checkpoint" )
type ParallelAgentStep ¶
type ParallelAgentStep struct {
// contains filtered or unexported fields
}
ParallelAgentStep executes multiple agents in parallel.
func NewParallelAgentStep ¶
func NewParallelAgentStep(agents []AgentExecutor, merger func([]any) (any, error)) *ParallelAgentStep
NewParallelAgentStep creates a step that executes agents in parallel.
func (*ParallelAgentStep) Name ¶
func (p *ParallelAgentStep) Name() string
Name implements the Step interface.
type ParallelWorkflow ¶
type ParallelWorkflow struct {
// contains filtered or unexported fields
}
ParallelWorkflow 并行工作流 将任务分割为多个子任务并行执行,然后聚合结果
func NewParallelWorkflow ¶
func NewParallelWorkflow(name, description string, aggregator Aggregator, tasks ...Task) *ParallelWorkflow
NewParallelWorkflow 创建并行工作流
func (*ParallelWorkflow) Description ¶
func (w *ParallelWorkflow) Description() string
func (*ParallelWorkflow) Name ¶
func (w *ParallelWorkflow) Name() string
type PassthroughStep ¶
type PassthroughStep struct{}
PassthroughStep passes input directly to output.
func (*PassthroughStep) Name ¶
func (s *PassthroughStep) Name() string
type Port ¶
type Port struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"` // string, number, boolean, object, array
}
Port represents an input/output port on a node.
type Reducer ¶
type Reducer[T any] func(current T, update T) T
Reducer defines how to merge state updates from multiple nodes.
func AppendReducer ¶
AppendReducer appends slices together.
func LastValueReducer ¶
LastValueReducer returns the most recent value (default).
func MaxReducer ¶
MaxReducer keeps the maximum value.
func MergeMapReducer ¶
func MergeMapReducer[K comparable, V any]() Reducer[map[K]V]
MergeMapReducer merges maps, with update values taking precedence.
type RouterFunc ¶
RouterFunc 路由函数类型
type RoutingWorkflow ¶
type RoutingWorkflow struct {
// contains filtered or unexported fields
}
RoutingWorkflow 路由工作流 根据输入分类,将任务路由到专门的处理器
func NewRoutingWorkflow ¶
func NewRoutingWorkflow(name, description string, router Router) *RoutingWorkflow
NewRoutingWorkflow 创建路由工作流
func (*RoutingWorkflow) Description ¶
func (w *RoutingWorkflow) Description() string
func (*RoutingWorkflow) Name ¶
func (w *RoutingWorkflow) Name() string
func (*RoutingWorkflow) RegisterHandler ¶
func (w *RoutingWorkflow) RegisterHandler(route string, handler Handler)
RegisterHandler 注册处理器
func (*RoutingWorkflow) SetDefaultRoute ¶
func (w *RoutingWorkflow) SetDefaultRoute(route string)
SetDefaultRoute 设置默认路由
type StateGraph ¶
type StateGraph struct {
// contains filtered or unexported fields
}
StateGraph manages multiple channels as a unified state.
func (*StateGraph) ApplyNodeOutput ¶
func (sg *StateGraph) ApplyNodeOutput(output NodeOutput) error
ApplyNodeOutput applies a node's output to the state graph.
func (*StateGraph) Snapshot ¶
func (sg *StateGraph) Snapshot() StateSnapshot
Snapshot creates a snapshot of the current state.
type StateSnapshot ¶
type StateSnapshot struct {
Values map[string]any `json:"values"`
Versions map[string]uint64 `json:"versions"`
}
StateSnapshot captures the current state of all channels.
type Step ¶
type Step interface {
// Execute 执行步骤
Execute(ctx context.Context, input any) (any, error)
// Name 返回步骤名称
Name() string
}
Step 工作流步骤接口
type Task ¶
type Task interface {
// Execute 执行任务
Execute(ctx context.Context, input any) (any, error)
// Name 返回任务名称
Name() string
}
Task 并行任务接口
type Tool ¶
type Tool interface {
Name() string
Execute(ctx context.Context, params map[string]any) (any, error)
}
Tool represents an executable tool within a workflow.
type ToolRegistry ¶
type ToolRegistry interface {
// GetTool returns a Tool by name. Returns nil, false if not found.
GetTool(name string) (Tool, bool)
// ExecuteTool looks up and executes a tool in one call.
ExecuteTool(ctx context.Context, name string, params map[string]any) (any, error)
}
ToolRegistry abstracts tool lookup and execution for workflow steps. Implement this interface to bridge workflow with your tool management layer.
type ToolStep ¶
type ToolStep struct {
ToolName string
Params map[string]any
Registry ToolRegistry // Optional: inject to enable real tool execution
}
ToolStep executes a tool call. When Registry is set, it performs a real tool execution. When Registry is nil, it returns a placeholder map (backward compatible).
type Variable ¶
type Variable struct {
Name string `json:"name"`
Type string `json:"type"`
DefaultValue any `json:"default_value,omitempty"`
Description string `json:"description,omitempty"`
}
Variable represents a workflow variable.
type VisualBuilder ¶
type VisualBuilder struct {
// contains filtered or unexported fields
}
VisualBuilder builds DAG workflows from visual definitions.
func NewVisualBuilder ¶
func NewVisualBuilder() *VisualBuilder
NewVisualBuilder creates a new visual builder.
func (*VisualBuilder) Build ¶
func (b *VisualBuilder) Build(vw *VisualWorkflow) (*DAGWorkflow, error)
Build converts a visual workflow to executable DAG.
func (*VisualBuilder) RegisterStep ¶
func (b *VisualBuilder) RegisterStep(name string, step Step)
RegisterStep registers a step implementation.
type VisualEdge ¶
type VisualEdge struct {
ID string `json:"id"`
Source string `json:"source"`
SourcePort string `json:"source_port,omitempty"`
Target string `json:"target"`
TargetPort string `json:"target_port,omitempty"`
Label string `json:"label,omitempty"`
Condition string `json:"condition,omitempty"` // For conditional edges
}
VisualEdge represents a connection between nodes.
type VisualNode ¶
type VisualNode struct {
ID string `json:"id"`
Type VisualNodeType `json:"type"`
Label string `json:"label"`
Position Position `json:"position"`
Config NodeConfig `json:"config"`
Inputs []Port `json:"inputs,omitempty"`
Outputs []Port `json:"outputs,omitempty"`
}
VisualNode represents a node in the visual workflow.
type VisualNodeType ¶
type VisualNodeType string
VisualNodeType defines visual node types.
const ( VNodeStart VisualNodeType = "start" VNodeEnd VisualNodeType = "end" VNodeLLM VisualNodeType = "llm" VNodeTool VisualNodeType = "tool" VNodeCondition VisualNodeType = "condition" VNodeLoop VisualNodeType = "loop" VNodeParallel VisualNodeType = "parallel" VNodeHuman VisualNodeType = "human_input" VNodeCode VisualNodeType = "code" VNodeSubflow VisualNodeType = "subflow" )
type VisualWorkflow ¶
type VisualWorkflow struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Version string `json:"version"`
Nodes []VisualNode `json:"nodes"`
Edges []VisualEdge `json:"edges"`
Variables []Variable `json:"variables,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
VisualWorkflow represents a workflow designed in visual builder.
func Import ¶
func Import(data []byte) (*VisualWorkflow, error)
Import imports visual workflow from JSON.
func (*VisualWorkflow) Export ¶
func (vw *VisualWorkflow) Export() ([]byte, error)
Export exports visual workflow to JSON.
func (*VisualWorkflow) Validate ¶
func (vw *VisualWorkflow) Validate() error
Validate validates the visual workflow.