workflow

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

包 workflow 提供工作流编排与执行能力。

该包包含 AgentFlow 在 `workflow` 目录下的核心实现。

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterChannel

func RegisterChannel[T any](sg *StateGraph, channel *Channel[T])

RegisterChannel registers a channel with the state graph.

func ValidateDAGDefinition

func ValidateDAGDefinition(def *DAGDefinition) error

ValidateDAGDefinition validates a loaded DAGDefinition

Types

type AgentAdapter

type AgentAdapter struct {
	// contains filtered or unexported fields
}

AgentAdapter adapts an AgentInterface to the AgentExecutor interface, allowing any agent implementation to be used in workflow steps.

func NewAgentAdapter

func NewAgentAdapter(agent AgentInterface, opts ...AgentAdapterOption) *AgentAdapter

NewAgentAdapter creates an AgentAdapter that implements AgentExecutor.

func (*AgentAdapter) Execute

func (a *AgentAdapter) Execute(ctx context.Context, input any) (any, error)

Execute implements AgentExecutor. It converts the workflow input to a string, calls the agent, and converts the string output back.

func (*AgentAdapter) ID

func (a *AgentAdapter) ID() string

ID implements AgentExecutor.

func (*AgentAdapter) Name

func (a *AgentAdapter) Name() string

Name implements AgentExecutor.

type AgentAdapterOption

type AgentAdapterOption func(*AgentAdapter)

AgentAdapterOption configures an AgentAdapter.

func WithAgentInputMapper

func WithAgentInputMapper(mapper func(any) (string, error)) AgentAdapterOption

WithAgentInputMapper sets a custom function to convert workflow input to agent string input.

func WithAgentOutputMapper

func WithAgentOutputMapper(mapper func(string) (any, error)) AgentAdapterOption

WithAgentOutputMapper sets a custom function to convert agent string output to workflow output.

type AgentExecutor

type AgentExecutor interface {
	// Execute executes the agent with the given input.
	Execute(ctx context.Context, input any) (any, error)
	// ID returns the agent's unique identifier.
	ID() string
	// Name returns the agent's name.
	Name() string
}

AgentExecutor defines the interface for agent execution. This allows workflow to use agents without direct dependency on agent package.

type AgentInterface

type AgentInterface interface {
	// Execute runs the agent with a string prompt and returns a string response.
	Execute(ctx context.Context, input string) (string, error)
	// ID returns the agent's unique identifier.
	ID() string
	// Name returns the agent's display name.
	Name() string
}

AgentInterface is a minimal agent contract defined in the workflow package to avoid importing the agent package (which would cause circular imports). The agent.Agent interface uses (ctx, *Input) -> (*Output, error), but this interface uses plain string I/O for simplicity. Callers can use AgentAdapterOption functions to customize input/output conversion.

type AgentRouter

type AgentRouter struct {
	// contains filtered or unexported fields
}

AgentRouter routes tasks to appropriate agents based on criteria.

func NewAgentRouter

func NewAgentRouter(selector func(ctx context.Context, input any, agents map[string]AgentExecutor) (AgentExecutor, error)) *AgentRouter

NewAgentRouter creates a new AgentRouter.

func (*AgentRouter) Execute

func (r *AgentRouter) Execute(ctx context.Context, input any) (any, error)

Execute implements the Step interface by routing to the appropriate agent.

func (*AgentRouter) Name

func (r *AgentRouter) Name() string

Name implements the Step interface.

func (*AgentRouter) RegisterAgent

func (r *AgentRouter) RegisterAgent(agent AgentExecutor)

RegisterAgent registers an agent with the router.

type AgentStep

type AgentStep struct {
	// contains filtered or unexported fields
}

AgentStep wraps an AgentExecutor as a workflow Step. This allows agents to be used as steps in workflow chains.

func NewAgentStep

func NewAgentStep(agent AgentExecutor, opts ...AgentStepOption) *AgentStep

NewAgentStep creates a new AgentStep from an AgentExecutor.

func (*AgentStep) AgentID

func (s *AgentStep) AgentID() string

AgentID returns the underlying agent's ID.

func (*AgentStep) Execute

func (s *AgentStep) Execute(ctx context.Context, input any) (any, error)

Execute implements the Step interface.

func (*AgentStep) Name

func (s *AgentStep) Name() string

Name implements the Step interface.

type AgentStepOption

type AgentStepOption func(*AgentStep)

AgentStepOption configures an AgentStep.

func WithInputMapper

func WithInputMapper(mapper func(any) (any, error)) AgentStepOption

WithInputMapper sets a function to transform input before agent execution.

func WithOutputMapper

func WithOutputMapper(mapper func(any) (any, error)) AgentStepOption

WithOutputMapper sets a function to transform output after agent execution.

func WithStepName

func WithStepName(name string) AgentStepOption

WithStepName sets a custom name for the step.

type Aggregator

type Aggregator interface {
	// Aggregate 聚合结果
	Aggregate(ctx context.Context, results []TaskResult) (any, error)
}

Aggregator 聚合器接口 将多个任务的结果聚合为最终输出

type AggregatorFunc

type AggregatorFunc func(ctx context.Context, results []TaskResult) (any, error)

AggregatorFunc 聚合器函数类型

type Annotation

type Annotation[T any] struct {
	Name    string
	Default T
	Reducer Reducer[T]
}

Annotation provides type-safe state definition.

func NewAnnotation

func NewAnnotation[T any](name string, defaultVal T, reducer Reducer[T]) Annotation[T]

NewAnnotation creates a new annotation.

func (Annotation[T]) CreateChannel

func (a Annotation[T]) CreateChannel() *Channel[T]

CreateChannel creates a channel from an annotation.

type ChainWorkflow

type ChainWorkflow struct {
	// contains filtered or unexported fields
}

ChainWorkflow 提示词链工作流 将任务分解为固定的步骤序列,每个步骤处理前一步的输出

func NewChainWorkflow

func NewChainWorkflow(name, description string, steps ...Step) *ChainWorkflow

NewChainWorkflow 创建提示词链工作流

func (*ChainWorkflow) AddStep

func (w *ChainWorkflow) AddStep(step Step)

AddStep 添加步骤

func (*ChainWorkflow) Description

func (w *ChainWorkflow) Description() string

func (*ChainWorkflow) Execute

func (w *ChainWorkflow) Execute(ctx context.Context, input any) (any, error)

Execute 执行提示词链 按顺序执行每个步骤,将前一步的输出作为下一步的输入

func (*ChainWorkflow) Name

func (w *ChainWorkflow) Name() string

func (*ChainWorkflow) Steps

func (w *ChainWorkflow) Steps() []Step

Steps 返回所有步骤

type Channel

type Channel[T any] struct {
	// contains filtered or unexported fields
}

Channel represents a state channel with optional reducer.

func GetChannel

func GetChannel[T any](sg *StateGraph, name string) (*Channel[T], error)

GetChannel retrieves a typed channel by name.

func NewChannel

func NewChannel[T any](name string, initial T, opts ...ChannelOption[T]) *Channel[T]

NewChannel creates a new state channel.

func (*Channel[T]) Get

func (c *Channel[T]) Get() T

Get returns the current value.

func (*Channel[T]) History

func (c *Channel[T]) History() []T

History returns the value history.

func (*Channel[T]) Update

func (c *Channel[T]) Update(update T) T

Update applies an update using the reducer.

func (*Channel[T]) Version

func (c *Channel[T]) Version() uint64

Version returns the current version number.

type ChannelOption

type ChannelOption[T any] func(*Channel[T])

ChannelOption configures a channel.

func WithHistory

func WithHistory[T any](max int) ChannelOption[T]

WithHistory enables history tracking with max entries.

func WithReducer

func WithReducer[T any](r Reducer[T]) ChannelOption[T]

WithReducer sets a custom reducer for the channel.

type CheckpointDiff

type CheckpointDiff struct {
	Version1       int           `json:"version1"`
	Version2       int           `json:"version2"`
	AddedNodes     []string      `json:"added_nodes"`
	RemovedNodes   []string      `json:"removed_nodes"`
	ChangedNodes   []string      `json:"changed_nodes"`
	TimeDifference time.Duration `json:"time_difference"`
}

CheckpointDiff represents differences between checkpoints.

type CheckpointManager

type CheckpointManager interface {
	SaveCheckpoint(ctx context.Context, checkpoint any) error
}

CheckpointManager interface for checkpoint integration

type CheckpointStore

type CheckpointStore interface {
	Save(ctx context.Context, checkpoint *EnhancedCheckpoint) error
	Load(ctx context.Context, checkpointID string) (*EnhancedCheckpoint, error)
	LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
	LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
	ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
	Delete(ctx context.Context, checkpointID string) error
}

CheckpointStore defines storage interface for enhanced checkpoints (Workflow layer).

Note: Two CheckpointStore interfaces exist in the project, operating on different types:

  • agent.CheckpointStore — operates on *agent.Checkpoint (agent state, List/DeleteThread/Rollback)
  • workflow.CheckpointStore (this) — operates on *workflow.EnhancedCheckpoint (DAG node results, time-travel)

They cannot be unified because the checkpoint structs have different fields (agent state vs DAG execution state).

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker 熔断器实现

func NewCircuitBreaker

func NewCircuitBreaker(
	nodeID string,
	config CircuitBreakerConfig,
	eventHandler CircuitBreakerEventHandler,
	logger *zap.Logger,
) *CircuitBreaker

NewCircuitBreaker 创建熔断器

func (*CircuitBreaker) AllowRequest

func (cb *CircuitBreaker) AllowRequest() (bool, error)

AllowRequest 检查是否允许请求通过

func (*CircuitBreaker) GetFailures

func (cb *CircuitBreaker) GetFailures() int

GetFailures 获取当前失败次数

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitState

GetState 获取当前状态

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure 记录失败

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess 记录成功

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset 重置熔断器

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	// FailureThreshold 连续失败次数阈值,达到后触发熔断
	FailureThreshold int `json:"failure_threshold"`
	// RecoveryTimeout 熔断后等待恢复的时间
	RecoveryTimeout time.Duration `json:"recovery_timeout"`
	// HalfOpenMaxProbes 半开状态允许的探测请求数
	HalfOpenMaxProbes int `json:"half_open_max_probes"`
	// SuccessThresholdInHalfOpen 半开状态下连续成功多少次后恢复
	SuccessThresholdInHalfOpen int `json:"success_threshold_in_half_open"`
}

CircuitBreakerConfig 熔断器配置

func DefaultCircuitBreakerConfig

func DefaultCircuitBreakerConfig() CircuitBreakerConfig

DefaultCircuitBreakerConfig 默认熔断器配置

type CircuitBreakerEvent

type CircuitBreakerEvent struct {
	NodeID    string       `json:"node_id"`
	OldState  CircuitState `json:"old_state"`
	NewState  CircuitState `json:"new_state"`
	Timestamp time.Time    `json:"timestamp"`
	Reason    string       `json:"reason"`
	Failures  int          `json:"failures"`
}

CircuitBreakerEvent 熔断器状态变更事件

type CircuitBreakerEventHandler

type CircuitBreakerEventHandler interface {
	OnStateChange(event CircuitBreakerEvent)
}

CircuitBreakerEventHandler 事件处理器接口

type CircuitBreakerRegistry

type CircuitBreakerRegistry struct {
	// contains filtered or unexported fields
}

CircuitBreakerRegistry 熔断器注册表,管理所有节点的熔断器

func NewCircuitBreakerRegistry

func NewCircuitBreakerRegistry(
	config CircuitBreakerConfig,
	eventHandler CircuitBreakerEventHandler,
	logger *zap.Logger,
) *CircuitBreakerRegistry

NewCircuitBreakerRegistry 创建熔断器注册表

func (*CircuitBreakerRegistry) GetAllStates

func (r *CircuitBreakerRegistry) GetAllStates() map[string]CircuitState

GetAllStates 获取所有熔断器状态

func (*CircuitBreakerRegistry) GetOrCreate

func (r *CircuitBreakerRegistry) GetOrCreate(nodeID string) *CircuitBreaker

GetOrCreate 获取或创建节点的熔断器

func (*CircuitBreakerRegistry) ResetAll

func (r *CircuitBreakerRegistry) ResetAll()

ResetAll 重置所有熔断器

type CircuitState

type CircuitState int

CircuitState represents the state of a circuit breaker. This is an independent definition equivalent to circuitbreaker.State in llm/circuitbreaker/. The workflow package maintains its own copy to avoid depending on the llm package (dependency direction: llm <- workflow, not workflow -> llm).

const (
	// CircuitClosed 正常状态,允许请求通过
	CircuitClosed CircuitState = iota
	// CircuitOpen 熔断状态,拒绝所有请求
	CircuitOpen
	// CircuitHalfOpen 半开状态,允许探测请求
	CircuitHalfOpen
)

func (CircuitState) String

func (s CircuitState) String() string

type CodeStep

type CodeStep struct {
	Handler func(ctx context.Context, input any) (any, error)
}

CodeStep executes custom code via an injected Go handler function.

func (*CodeStep) Execute

func (s *CodeStep) Execute(ctx context.Context, input any) (any, error)

func (*CodeStep) Name

func (s *CodeStep) Name() string

type ConditionFunc

type ConditionFunc func(ctx context.Context, input any) (bool, error)

ConditionFunc evaluates a condition and returns true or false

type ConditionalAgentStep

type ConditionalAgentStep struct {
	// contains filtered or unexported fields
}

ConditionalAgentStep executes different agents based on conditions.

func NewConditionalAgentStep

func NewConditionalAgentStep() *ConditionalAgentStep

NewConditionalAgentStep creates a conditional agent step.

func (*ConditionalAgentStep) Default

Default sets the default agent when no conditions match.

func (*ConditionalAgentStep) Execute

func (c *ConditionalAgentStep) Execute(ctx context.Context, input any) (any, error)

Execute implements the Step interface.

func (*ConditionalAgentStep) Name

func (c *ConditionalAgentStep) Name() string

Name implements the Step interface.

func (*ConditionalAgentStep) When

func (c *ConditionalAgentStep) When(check func(ctx context.Context, input any) bool, agent AgentExecutor) *ConditionalAgentStep

When adds a condition-agent pair.

type DAGBuilder

type DAGBuilder struct {
	// contains filtered or unexported fields
}

DAGBuilder provides a fluent API for constructing DAG workflows

func NewDAGBuilder

func NewDAGBuilder(name string) *DAGBuilder

NewDAGBuilder creates a new DAG builder with the given name

func (*DAGBuilder) AddEdge

func (b *DAGBuilder) AddEdge(from, to string) *DAGBuilder

AddEdge adds a directed edge from one node to another

func (*DAGBuilder) AddNode

func (b *DAGBuilder) AddNode(id string, nodeType NodeType) *NodeBuilder

AddNode adds a node to the graph and returns a NodeBuilder for configuration

func (*DAGBuilder) Build

func (b *DAGBuilder) Build() (*DAGWorkflow, error)

Build validates the DAG and creates a DAGWorkflow

func (*DAGBuilder) SetEntry

func (b *DAGBuilder) SetEntry(nodeID string) *DAGBuilder

SetEntry sets the entry node for the workflow

func (*DAGBuilder) WithDescription

func (b *DAGBuilder) WithDescription(desc string) *DAGBuilder

WithDescription sets the workflow description

func (*DAGBuilder) WithLogger

func (b *DAGBuilder) WithLogger(logger *zap.Logger) *DAGBuilder

WithLogger sets a custom logger

type DAGDefinition

type DAGDefinition struct {
	// Name is the workflow name
	Name string `json:"name" yaml:"name"`
	// Description describes the workflow
	Description string `json:"description" yaml:"description"`
	// Entry is the ID of the entry node
	Entry string `json:"entry" yaml:"entry"`
	// Nodes contains all node definitions
	Nodes []NodeDefinition `json:"nodes" yaml:"nodes"`
	// Metadata stores additional workflow information
	Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}

DAGDefinition represents a serializable workflow definition

func FromJSON

func FromJSON(jsonStr string) (*DAGDefinition, error)

FromJSON creates a DAGDefinition from JSON string

func FromYAML

func FromYAML(yamlStr string) (*DAGDefinition, error)

FromYAML creates a DAGDefinition from YAML string

func LoadFromJSONFile

func LoadFromJSONFile(filename string) (*DAGDefinition, error)

LoadFromJSONFile loads a DAGDefinition from a JSON file

func LoadFromYAMLFile

func LoadFromYAMLFile(filename string) (*DAGDefinition, error)

LoadFromYAMLFile loads a DAGDefinition from a YAML file

func (*DAGDefinition) MarshalJSON

func (d *DAGDefinition) MarshalJSON() ([]byte, error)

MarshalJSON serializes a DAGDefinition to JSON

func (*DAGDefinition) MarshalYAML

func (d *DAGDefinition) MarshalYAML() (any, error)

MarshalYAML serializes a DAGDefinition to YAML

func (*DAGDefinition) SaveToJSONFile

func (d *DAGDefinition) SaveToJSONFile(filename string) error

SaveToJSONFile saves a DAGDefinition to a JSON file

func (*DAGDefinition) SaveToYAMLFile

func (d *DAGDefinition) SaveToYAMLFile(filename string) error

SaveToYAMLFile saves a DAGDefinition to a YAML file

func (*DAGDefinition) ToJSON

func (d *DAGDefinition) ToJSON() (string, error)

ToJSON converts a DAGDefinition to JSON string

func (*DAGDefinition) ToYAML

func (d *DAGDefinition) ToYAML() (string, error)

ToYAML converts a DAGDefinition to YAML string

func (*DAGDefinition) UnmarshalJSON

func (d *DAGDefinition) UnmarshalJSON(data []byte) error

UnmarshalJSON deserializes a DAGDefinition from JSON

func (*DAGDefinition) UnmarshalYAML

func (d *DAGDefinition) UnmarshalYAML(node *yaml.Node) error

UnmarshalYAML deserializes a DAGDefinition from YAML

type DAGExecutor

type DAGExecutor struct {
	// contains filtered or unexported fields
}

DAGExecutor executes DAG workflows with dependency resolution

func NewDAGExecutor

func NewDAGExecutor(checkpointMgr CheckpointManager, logger *zap.Logger) *DAGExecutor

NewDAGExecutor creates a new DAG executor

func (*DAGExecutor) Execute

func (e *DAGExecutor) Execute(ctx context.Context, graph *DAGGraph, input any) (any, error)

Execute runs the DAG workflow with dependency resolution

func (*DAGExecutor) GetCircuitBreakerStates

func (e *DAGExecutor) GetCircuitBreakerStates() map[string]CircuitState

GetCircuitBreakerStates 获取所有熔断器状态

func (*DAGExecutor) GetExecutionID

func (e *DAGExecutor) GetExecutionID() string

GetExecutionID returns the current execution ID

func (*DAGExecutor) GetHistory

func (e *DAGExecutor) GetHistory() *ExecutionHistory

GetHistory returns the execution history for the current execution

func (*DAGExecutor) GetHistoryStore

func (e *DAGExecutor) GetHistoryStore() *ExecutionHistoryStore

GetHistoryStore returns the history store

func (*DAGExecutor) GetNodeResult

func (e *DAGExecutor) GetNodeResult(nodeID string) (any, bool)

GetNodeResult retrieves the result of a completed node

func (*DAGExecutor) SetCircuitBreakerConfig

func (e *DAGExecutor) SetCircuitBreakerConfig(config CircuitBreakerConfig, handler CircuitBreakerEventHandler)

SetCircuitBreakerConfig 设置熔断器配置

func (*DAGExecutor) SetHistoryStore

func (e *DAGExecutor) SetHistoryStore(store *ExecutionHistoryStore)

SetHistoryStore sets a custom history store

type DAGGraph

type DAGGraph struct {
	// contains filtered or unexported fields
}

DAGGraph represents the workflow structure as a directed acyclic graph

func NewDAGGraph

func NewDAGGraph() *DAGGraph

NewDAGGraph creates a new empty DAG graph

func (*DAGGraph) AddEdge

func (g *DAGGraph) AddEdge(fromID, toID string)

AddEdge adds a directed edge from one node to another

func (*DAGGraph) AddNode

func (g *DAGGraph) AddNode(node *DAGNode)

AddNode adds a node to the graph

func (*DAGGraph) Edges

func (g *DAGGraph) Edges() map[string][]string

Edges returns all edges in the graph

func (*DAGGraph) GetEdges

func (g *DAGGraph) GetEdges(nodeID string) []string

GetEdges retrieves the outgoing edges for a node

func (*DAGGraph) GetEntry

func (g *DAGGraph) GetEntry() string

GetEntry returns the entry node ID

func (*DAGGraph) GetNode

func (g *DAGGraph) GetNode(nodeID string) (*DAGNode, bool)

GetNode retrieves a node by ID

func (*DAGGraph) Nodes

func (g *DAGGraph) Nodes() map[string]*DAGNode

Nodes returns all nodes in the graph

func (*DAGGraph) SetEntry

func (g *DAGGraph) SetEntry(nodeID string)

SetEntry sets the entry node for the graph

type DAGNode

type DAGNode struct {
	// ID is the unique identifier for this node
	ID string
	// Type specifies the node type
	Type NodeType
	// Step is the step to execute (for action nodes)
	Step Step
	// Condition evaluates branching logic (for conditional nodes)
	Condition ConditionFunc
	// LoopConfig defines loop behavior (for loop nodes)
	LoopConfig *LoopConfig
	// SubGraph is a nested workflow (for subgraph nodes)
	SubGraph *DAGGraph
	// ErrorConfig defines error handling behavior
	ErrorConfig *ErrorConfig
	// Metadata stores additional node information
	Metadata map[string]any
}

DAGNode represents a single node in the workflow graph

type DAGWorkflow

type DAGWorkflow struct {
	// contains filtered or unexported fields
}

DAGWorkflow represents a DAG-based workflow

func NewDAGWorkflow

func NewDAGWorkflow(name, description string, graph *DAGGraph) *DAGWorkflow

NewDAGWorkflow creates a new DAG workflow

func (*DAGWorkflow) Description

func (w *DAGWorkflow) Description() string

Description returns the workflow description

func (*DAGWorkflow) Execute

func (w *DAGWorkflow) Execute(ctx context.Context, input any) (any, error)

Execute executes the DAG workflow using DAGExecutor

func (*DAGWorkflow) GetMetadata

func (w *DAGWorkflow) GetMetadata(key string) (any, bool)

GetMetadata retrieves a metadata value

func (*DAGWorkflow) Graph

func (w *DAGWorkflow) Graph() *DAGGraph

Graph returns the underlying DAG graph

func (*DAGWorkflow) Name

func (w *DAGWorkflow) Name() string

Name returns the workflow name

func (*DAGWorkflow) SetExecutor

func (w *DAGWorkflow) SetExecutor(executor *DAGExecutor)

SetExecutor sets a custom executor for the workflow

func (*DAGWorkflow) SetMetadata

func (w *DAGWorkflow) SetMetadata(key string, value any)

SetMetadata sets a metadata value

func (*DAGWorkflow) ToDAGDefinition

func (w *DAGWorkflow) ToDAGDefinition() *DAGDefinition

ToDAGDefinition converts a DAGWorkflow to a DAGDefinition for serialization Note: This only captures the structure, not the runtime functions (conditions, iterators, steps)

type EnhancedCheckpoint

type EnhancedCheckpoint struct {
	ID             string         `json:"id"`
	WorkflowID     string         `json:"workflow_id"`
	ThreadID       string         `json:"thread_id"`
	Version        int            `json:"version"`
	NodeID         string         `json:"node_id"`
	NodeResults    map[string]any `json:"node_results"`
	Variables      map[string]any `json:"variables"`
	PendingNodes   []string       `json:"pending_nodes"`
	CompletedNodes []string       `json:"completed_nodes"`
	Input          any            `json:"input"`
	CreatedAt      time.Time      `json:"created_at"`
	ParentID       string         `json:"parent_id,omitempty"`
	Metadata       map[string]any `json:"metadata,omitempty"`
	Snapshot       *GraphSnapshot `json:"snapshot,omitempty"`
}

EnhancedCheckpoint represents a workflow checkpoint with full state.

type EnhancedCheckpointManager

type EnhancedCheckpointManager struct {
	// contains filtered or unexported fields
}

EnhancedCheckpointManager manages workflow checkpoints with time-travel.

func NewEnhancedCheckpointManager

func NewEnhancedCheckpointManager(store CheckpointStore, logger *zap.Logger) *EnhancedCheckpointManager

NewEnhancedCheckpointManager creates a new checkpoint manager.

func (*EnhancedCheckpointManager) Compare

func (m *EnhancedCheckpointManager) Compare(ctx context.Context, threadID string, v1, v2 int) (*CheckpointDiff, error)

Compare compares two checkpoint versions.

func (*EnhancedCheckpointManager) CreateCheckpoint

func (m *EnhancedCheckpointManager) CreateCheckpoint(ctx context.Context, executor *DAGExecutor, graph *DAGGraph, threadID string, input any) (*EnhancedCheckpoint, error)

CreateCheckpoint creates a checkpoint from current execution state.

func (*EnhancedCheckpointManager) GetHistory

func (m *EnhancedCheckpointManager) GetHistory(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)

GetHistory returns checkpoint history for time-travel debugging.

func (*EnhancedCheckpointManager) ResumeFromCheckpoint

func (m *EnhancedCheckpointManager) ResumeFromCheckpoint(ctx context.Context, checkpointID string, graph *DAGGraph) (*DAGExecutor, error)

ResumeFromCheckpoint resumes workflow execution from a checkpoint.

func (*EnhancedCheckpointManager) Rollback

func (m *EnhancedCheckpointManager) Rollback(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)

Rollback rolls back to a specific version.

type ErrorConfig

type ErrorConfig struct {
	// Strategy specifies how to handle errors
	Strategy ErrorStrategy
	// MaxRetries is the maximum number of retry attempts (for retry strategy)
	MaxRetries int
	// RetryDelayMs is the delay between retries in milliseconds
	RetryDelayMs int
	// FallbackValue is the value to use when skipping a failed node
	FallbackValue any
}

ErrorConfig defines error handling behavior for a node

type ErrorStrategy

type ErrorStrategy string

ErrorStrategy defines how errors should be handled

const (
	// ErrorStrategyFailFast stops execution immediately on error
	ErrorStrategyFailFast ErrorStrategy = "fail_fast"
	// ErrorStrategySkip skips the failed node and continues
	ErrorStrategySkip ErrorStrategy = "skip"
	// ErrorStrategyRetry retries the failed node
	ErrorStrategyRetry ErrorStrategy = "retry"
)

type ExecutionContext

type ExecutionContext struct {
	// WorkflowID identifies the workflow being executed
	WorkflowID string `json:"workflow_id,omitempty"`
	// CurrentNode is the ID of the currently executing node
	CurrentNode string `json:"current_node,omitempty"`
	// NodeResults stores the results of completed nodes
	NodeResults map[string]any `json:"node_results,omitempty"`
	// Variables stores workflow variables
	Variables map[string]any `json:"variables,omitempty"`
	// StartTime is when the workflow execution started
	StartTime time.Time `json:"start_time,omitempty"`
	// LastUpdateTime is when the context was last updated
	LastUpdateTime time.Time `json:"last_update_time,omitempty"`
}

ExecutionContext captures the execution state for checkpointing

func NewExecutionContext

func NewExecutionContext(workflowID string) *ExecutionContext

NewExecutionContext creates a new execution context

func (*ExecutionContext) GetNodeResult

func (ec *ExecutionContext) GetNodeResult(nodeID string) (any, bool)

GetNodeResult retrieves the result of a completed node

func (*ExecutionContext) GetVariable

func (ec *ExecutionContext) GetVariable(key string) (any, bool)

GetVariable retrieves a workflow variable

func (*ExecutionContext) SetCurrentNode

func (ec *ExecutionContext) SetCurrentNode(nodeID string)

SetCurrentNode updates the currently executing node

func (*ExecutionContext) SetNodeResult

func (ec *ExecutionContext) SetNodeResult(nodeID string, result any)

SetNodeResult stores the result of a completed node

func (*ExecutionContext) SetVariable

func (ec *ExecutionContext) SetVariable(key string, value any)

SetVariable sets a workflow variable

type ExecutionHistory

type ExecutionHistory struct {
	ExecutionID string           `json:"execution_id"`
	WorkflowID  string           `json:"workflow_id"`
	StartTime   time.Time        `json:"start_time"`
	EndTime     time.Time        `json:"end_time"`
	Duration    time.Duration    `json:"duration"`
	Status      ExecutionStatus  `json:"status"`
	Nodes       []*NodeExecution `json:"nodes"`
	Error       string           `json:"error,omitempty"`
	Metadata    map[string]any   `json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

ExecutionHistory records the complete execution path of a workflow

func NewExecutionHistory

func NewExecutionHistory(executionID, workflowID string) *ExecutionHistory

NewExecutionHistory creates a new execution history

func (*ExecutionHistory) Complete

func (h *ExecutionHistory) Complete(err error)

Complete marks the execution as completed

func (*ExecutionHistory) GetNodeByID

func (h *ExecutionHistory) GetNodeByID(nodeID string) *NodeExecution

GetNodeByID returns the execution record for a specific node

func (*ExecutionHistory) GetNodes

func (h *ExecutionHistory) GetNodes() []*NodeExecution

GetNodes returns a copy of the node executions

func (*ExecutionHistory) RecordNodeEnd

func (h *ExecutionHistory) RecordNodeEnd(node *NodeExecution, output any, err error)

RecordNodeEnd records the end of a node execution

func (*ExecutionHistory) RecordNodeStart

func (h *ExecutionHistory) RecordNodeStart(nodeID string, nodeType NodeType, input any) *NodeExecution

RecordNodeStart records the start of a node execution

type ExecutionHistoryStore

type ExecutionHistoryStore struct {
	// contains filtered or unexported fields
}

ExecutionHistoryStore stores and queries execution histories

func NewExecutionHistoryStore

func NewExecutionHistoryStore() *ExecutionHistoryStore

NewExecutionHistoryStore creates a new execution history store

func (*ExecutionHistoryStore) Get

func (s *ExecutionHistoryStore) Get(executionID string) (*ExecutionHistory, bool)

Get retrieves an execution history by ID

func (*ExecutionHistoryStore) ListByStatus

func (s *ExecutionHistoryStore) ListByStatus(status ExecutionStatus) []*ExecutionHistory

ListByStatus returns executions with a specific status

func (*ExecutionHistoryStore) ListByTimeRange

func (s *ExecutionHistoryStore) ListByTimeRange(start, end time.Time) []*ExecutionHistory

ListByTimeRange returns executions within a time range

func (*ExecutionHistoryStore) ListByWorkflow

func (s *ExecutionHistoryStore) ListByWorkflow(workflowID string) []*ExecutionHistory

ListByWorkflow returns all executions for a workflow

func (*ExecutionHistoryStore) Save

func (s *ExecutionHistoryStore) Save(history *ExecutionHistory)

Save saves an execution history

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus represents the status of an execution

const (
	// ExecutionStatusRunning indicates the execution is in progress
	ExecutionStatusRunning ExecutionStatus = "running"
	// ExecutionStatusCompleted indicates the execution completed successfully
	ExecutionStatusCompleted ExecutionStatus = "completed"
	// ExecutionStatusFailed indicates the execution failed
	ExecutionStatusFailed ExecutionStatus = "failed"
)

type FuncAggregator

type FuncAggregator struct {
	// contains filtered or unexported fields
}

FuncAggregator 函数聚合器

func NewFuncAggregator

func NewFuncAggregator(fn AggregatorFunc) *FuncAggregator

NewFuncAggregator 创建函数聚合器

func (*FuncAggregator) Aggregate

func (a *FuncAggregator) Aggregate(ctx context.Context, results []TaskResult) (any, error)

type FuncHandler

type FuncHandler struct {
	// contains filtered or unexported fields
}

FuncHandler 函数处理器

func NewFuncHandler

func NewFuncHandler(name string, fn HandlerFunc) *FuncHandler

NewFuncHandler 创建函数处理器

func (*FuncHandler) Handle

func (h *FuncHandler) Handle(ctx context.Context, input any) (any, error)

func (*FuncHandler) Name

func (h *FuncHandler) Name() string

type FuncRouter

type FuncRouter struct {
	// contains filtered or unexported fields
}

FuncRouter 函数路由器

func NewFuncRouter

func NewFuncRouter(fn RouterFunc) *FuncRouter

NewFuncRouter 创建函数路由器

func (*FuncRouter) Route

func (r *FuncRouter) Route(ctx context.Context, input any) (string, error)

type FuncStep

type FuncStep struct {
	// contains filtered or unexported fields
}

FuncStep 函数步骤实现

func NewFuncStep

func NewFuncStep(name string, fn StepFunc) *FuncStep

NewFuncStep 创建函数步骤

func (*FuncStep) Execute

func (s *FuncStep) Execute(ctx context.Context, input any) (any, error)

func (*FuncStep) Name

func (s *FuncStep) Name() string

type FuncTask

type FuncTask struct {
	// contains filtered or unexported fields
}

FuncTask 函数任务

func NewFuncTask

func NewFuncTask(name string, fn TaskFunc) *FuncTask

NewFuncTask 创建函数任务

func (*FuncTask) Execute

func (t *FuncTask) Execute(ctx context.Context, input any) (any, error)

func (*FuncTask) Name

func (t *FuncTask) Name() string

type GraphSnapshot

type GraphSnapshot struct {
	Nodes     map[string]NodeSnapshot `json:"nodes"`
	Edges     map[string][]string     `json:"edges"`
	EntryNode string                  `json:"entry_node"`
}

GraphSnapshot captures the complete graph state.

type Handler

type Handler interface {
	// Handle 处理输入
	Handle(ctx context.Context, input any) (any, error)
	// Name 返回处理器名称
	Name() string
}

Handler 处理器接口

type HandlerFunc

type HandlerFunc func(ctx context.Context, input any) (any, error)

HandlerFunc 处理器函数类型

type HumanInputHandler

type HumanInputHandler interface {
	// RequestInput sends a prompt to a human and waits for a response.
	// inputType hints at the expected response format (e.g. "text", "choice").
	// options provides selectable choices when inputType is "choice".
	RequestInput(ctx context.Context, prompt string, inputType string, options []string) (any, error)
}

HumanInputHandler abstracts human-in-the-loop interaction for workflow steps. Implement this interface to bridge workflow with your HITL management layer.

type HumanInputStep

type HumanInputStep struct {
	Prompt  string
	Type    string
	Options []string
	Timeout int
	Handler HumanInputHandler // Optional: inject to enable real HITL
}

HumanInputStep waits for human input. When Handler is set, it sends a request to the HITL handler and waits for a response. When Handler is nil, it returns a placeholder map (backward compatible).

func (*HumanInputStep) Execute

func (s *HumanInputStep) Execute(ctx context.Context, input any) (any, error)

func (*HumanInputStep) Name

func (s *HumanInputStep) Name() string

type InMemoryCheckpointStore

type InMemoryCheckpointStore struct {
	// contains filtered or unexported fields
}

InMemoryCheckpointStore provides in-memory storage.

func NewInMemoryCheckpointStore

func NewInMemoryCheckpointStore() *InMemoryCheckpointStore

NewInMemoryCheckpointStore creates a new in-memory store.

func (*InMemoryCheckpointStore) Delete

func (s *InMemoryCheckpointStore) Delete(ctx context.Context, id string) error

func (*InMemoryCheckpointStore) ListVersions

func (s *InMemoryCheckpointStore) ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)

func (*InMemoryCheckpointStore) Load

func (*InMemoryCheckpointStore) LoadLatest

func (s *InMemoryCheckpointStore) LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)

func (*InMemoryCheckpointStore) LoadVersion

func (s *InMemoryCheckpointStore) LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)

func (*InMemoryCheckpointStore) Save

type IteratorFunc

type IteratorFunc func(ctx context.Context, input any) ([]any, error)

IteratorFunc generates a collection of items for iteration

type LLMStep

type LLMStep struct {
	Model       string
	Prompt      string
	Temperature float64
	MaxTokens   int
	Provider    llm.Provider // Optional: inject to enable real LLM calls
}

LLMStep executes an LLM call. When Provider is set, it performs a real LLM completion request. When Provider is nil, it returns a placeholder map (backward compatible).

func (*LLMStep) Execute

func (s *LLMStep) Execute(ctx context.Context, input any) (any, error)

func (*LLMStep) Name

func (s *LLMStep) Name() string

type LoopConfig

type LoopConfig struct {
	// Type specifies the loop type (while, for, foreach)
	Type LoopType
	// MaxIterations limits the maximum number of iterations (0 = unlimited)
	MaxIterations int
	// Condition evaluates whether to continue looping (for while loops)
	Condition ConditionFunc
	// Iterator generates items to iterate over (for foreach loops)
	Iterator IteratorFunc
}

LoopConfig defines loop behavior

type LoopDefinition

type LoopDefinition struct {
	// Type is the loop type (while, for, foreach)
	Type string `json:"type" yaml:"type"`
	// MaxIterations limits the maximum number of iterations
	MaxIterations int `json:"max_iterations" yaml:"max_iterations"`
	// Condition is the condition name (for while loops)
	Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
}

LoopDefinition represents a serializable loop configuration

type LoopType

type LoopType string

LoopType defines the type of loop

const (
	// LoopTypeWhile executes while condition is true
	LoopTypeWhile LoopType = "while"
	// LoopTypeFor executes for a fixed number of iterations
	LoopTypeFor LoopType = "for"
	// LoopTypeForEach executes for each item in a collection
	LoopTypeForEach LoopType = "foreach"
)

type NativeAgentAdapter

type NativeAgentAdapter struct {
	// contains filtered or unexported fields
}

NativeAgentAdapter adapts an agent.Agent (with *Input/*Output signatures) to the AgentExecutor interface used by workflow steps.

Input conversion (any -> *agent.Input):

  • *agent.Input: passed through directly
  • string: wrapped as Input.Content
  • map[string]any: Content extracted from "content" key, rest goes to Context
  • other types: converted to string via fmt.Sprintf and set as Content

Output: returns the *agent.Output directly (callers can type-assert).

func NewNativeAgentAdapter

func NewNativeAgentAdapter(a agent.Agent) *NativeAgentAdapter

NewNativeAgentAdapter creates an adapter that bridges agent.Agent to AgentExecutor.

func (*NativeAgentAdapter) Execute

func (n *NativeAgentAdapter) Execute(ctx context.Context, input any) (any, error)

Execute implements AgentExecutor. It converts the workflow input to *agent.Input, calls agent.Execute, and returns the *agent.Output.

func (*NativeAgentAdapter) ID

func (n *NativeAgentAdapter) ID() string

ID implements AgentExecutor.

func (*NativeAgentAdapter) Name

func (n *NativeAgentAdapter) Name() string

Name implements AgentExecutor.

type NodeBuilder

type NodeBuilder struct {
	// contains filtered or unexported fields
}

NodeBuilder provides a fluent API for configuring individual nodes

func (*NodeBuilder) Done

func (nb *NodeBuilder) Done() *DAGBuilder

Done completes node configuration and returns to the DAGBuilder

func (*NodeBuilder) WithCondition

func (nb *NodeBuilder) WithCondition(cond ConditionFunc) *NodeBuilder

WithCondition sets the condition function for a conditional node

func (*NodeBuilder) WithErrorConfig

func (nb *NodeBuilder) WithErrorConfig(config ErrorConfig) *NodeBuilder

WithErrorConfig sets the error handling configuration for a node

func (*NodeBuilder) WithLoop

func (nb *NodeBuilder) WithLoop(config LoopConfig) *NodeBuilder

WithLoop sets the loop configuration for a loop node

func (*NodeBuilder) WithMetadata

func (nb *NodeBuilder) WithMetadata(key string, value any) *NodeBuilder

WithMetadata sets a metadata value

func (*NodeBuilder) WithOnFalse

func (nb *NodeBuilder) WithOnFalse(nodeIDs ...string) *NodeBuilder

WithOnFalse sets the nodes to execute when condition is false

func (*NodeBuilder) WithOnTrue

func (nb *NodeBuilder) WithOnTrue(nodeIDs ...string) *NodeBuilder

WithOnTrue sets the nodes to execute when condition is true

func (*NodeBuilder) WithStep

func (nb *NodeBuilder) WithStep(step Step) *NodeBuilder

WithStep sets the step for an action node

func (*NodeBuilder) WithSubGraph

func (nb *NodeBuilder) WithSubGraph(subGraph *DAGGraph) *NodeBuilder

WithSubGraph sets the subgraph for a subgraph node

type NodeConfig

type NodeConfig struct {
	// LLM node config
	Model       string  `json:"model,omitempty"`
	Prompt      string  `json:"prompt,omitempty"`
	Temperature float64 `json:"temperature,omitempty"`
	MaxTokens   int     `json:"max_tokens,omitempty"`

	// Tool node config
	ToolName   string         `json:"tool_name,omitempty"`
	ToolParams map[string]any `json:"tool_params,omitempty"`

	// Condition node config
	Condition  string `json:"condition,omitempty"`
	Expression string `json:"expression,omitempty"`

	// Loop node config
	LoopType      string `json:"loop_type,omitempty"`
	MaxIterations int    `json:"max_iterations,omitempty"`

	// Code node config
	Code     string `json:"code,omitempty"`
	Language string `json:"language,omitempty"`

	// Human input config
	InputPrompt string   `json:"input_prompt,omitempty"`
	InputType   string   `json:"input_type,omitempty"`
	Options     []string `json:"options,omitempty"`
	Timeout     int      `json:"timeout_seconds,omitempty"`

	// Subflow config
	SubflowID string `json:"subflow_id,omitempty"`
}

NodeConfig contains node-specific configuration.

type NodeDefinition

type NodeDefinition struct {
	// ID is the unique node identifier
	ID string `json:"id" yaml:"id"`
	// Type is the node type
	Type string `json:"type" yaml:"type"`
	// Step is the step name (for action nodes)
	Step string `json:"step,omitempty" yaml:"step,omitempty"`
	// Condition is the condition name (for conditional nodes)
	Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
	// Next lists the next nodes to execute (for action nodes)
	Next []string `json:"next,omitempty" yaml:"next,omitempty"`
	// OnTrue lists nodes to execute when condition is true
	OnTrue []string `json:"on_true,omitempty" yaml:"on_true,omitempty"`
	// OnFalse lists nodes to execute when condition is false
	OnFalse []string `json:"on_false,omitempty" yaml:"on_false,omitempty"`
	// Loop defines loop configuration (for loop nodes)
	Loop *LoopDefinition `json:"loop,omitempty" yaml:"loop,omitempty"`
	// SubGraph defines a nested workflow (for subgraph nodes)
	SubGraph *DAGDefinition `json:"subgraph,omitempty" yaml:"subgraph,omitempty"`
	// Metadata stores additional node information
	Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}

NodeDefinition represents a serializable node definition

type NodeExecution

type NodeExecution struct {
	NodeID    string          `json:"node_id"`
	NodeType  NodeType        `json:"node_type"`
	StartTime time.Time       `json:"start_time"`
	EndTime   time.Time       `json:"end_time"`
	Duration  time.Duration   `json:"duration"`
	Status    ExecutionStatus `json:"status"`
	Input     any             `json:"input,omitempty"`
	Output    any             `json:"output,omitempty"`
	Error     string          `json:"error,omitempty"`
}

NodeExecution records the execution of a single node

type NodeOutput

type NodeOutput struct {
	Updates map[string]any `json:"updates"`
	NodeID  string         `json:"node_id"`
}

NodeOutput represents output from a graph node.

type NodeSnapshot

type NodeSnapshot struct {
	ID       string `json:"id"`
	Type     string `json:"type"`
	Status   string `json:"status"`
	Input    any    `json:"input,omitempty"`
	Output   any    `json:"output,omitempty"`
	Error    string `json:"error,omitempty"`
	Duration int64  `json:"duration_ms,omitempty"`
}

NodeSnapshot captures a node's state.

type NodeType

type NodeType string

NodeType defines the type of a DAG node

const (
	// NodeTypeAction executes a step
	NodeTypeAction NodeType = "action"
	// NodeTypeCondition performs conditional branching
	NodeTypeCondition NodeType = "condition"
	// NodeTypeLoop performs loop iteration
	NodeTypeLoop NodeType = "loop"
	// NodeTypeParallel executes nodes concurrently
	NodeTypeParallel NodeType = "parallel"
	// NodeTypeSubGraph executes a nested workflow
	NodeTypeSubGraph NodeType = "subgraph"
	// NodeTypeCheckpoint creates a checkpoint
	NodeTypeCheckpoint NodeType = "checkpoint"
)

type ParallelAgentStep

type ParallelAgentStep struct {
	// contains filtered or unexported fields
}

ParallelAgentStep executes multiple agents in parallel.

func NewParallelAgentStep

func NewParallelAgentStep(agents []AgentExecutor, merger func([]any) (any, error)) *ParallelAgentStep

NewParallelAgentStep creates a step that executes agents in parallel.

func (*ParallelAgentStep) Execute

func (p *ParallelAgentStep) Execute(ctx context.Context, input any) (any, error)

Execute runs all agents in parallel and merges results.

func (*ParallelAgentStep) Name

func (p *ParallelAgentStep) Name() string

Name implements the Step interface.

type ParallelWorkflow

type ParallelWorkflow struct {
	// contains filtered or unexported fields
}

ParallelWorkflow 并行工作流 将任务分割为多个子任务并行执行,然后聚合结果

func NewParallelWorkflow

func NewParallelWorkflow(name, description string, aggregator Aggregator, tasks ...Task) *ParallelWorkflow

NewParallelWorkflow 创建并行工作流

func (*ParallelWorkflow) AddTask

func (w *ParallelWorkflow) AddTask(task Task)

AddTask 添加任务

func (*ParallelWorkflow) Description

func (w *ParallelWorkflow) Description() string

func (*ParallelWorkflow) Execute

func (w *ParallelWorkflow) Execute(ctx context.Context, input any) (any, error)

Execute 执行并行工作流 1. 并行执行所有任务 2. 收集所有结果 3. 使用聚合器聚合结果

func (*ParallelWorkflow) Name

func (w *ParallelWorkflow) Name() string

func (*ParallelWorkflow) Tasks

func (w *ParallelWorkflow) Tasks() []Task

Tasks 返回所有任务

type PassthroughStep

type PassthroughStep struct{}

PassthroughStep passes input directly to output.

func (*PassthroughStep) Execute

func (s *PassthroughStep) Execute(ctx context.Context, input any) (any, error)

func (*PassthroughStep) Name

func (s *PassthroughStep) Name() string

type Port

type Port struct {
	ID   string `json:"id"`
	Name string `json:"name"`
	Type string `json:"type"` // string, number, boolean, object, array
}

Port represents an input/output port on a node.

type Position

type Position struct {
	X float64 `json:"x"`
	Y float64 `json:"y"`
}

Position represents node position in visual canvas.

type Reducer

type Reducer[T any] func(current T, update T) T

Reducer defines how to merge state updates from multiple nodes.

func AppendReducer

func AppendReducer[T any]() Reducer[[]T]

AppendReducer appends slices together.

func LastValueReducer

func LastValueReducer[T any]() Reducer[T]

LastValueReducer returns the most recent value (default).

func MaxReducer

func MaxReducer[T ~int | ~int64 | ~float64]() Reducer[T]

MaxReducer keeps the maximum value.

func MergeMapReducer

func MergeMapReducer[K comparable, V any]() Reducer[map[K]V]

MergeMapReducer merges maps, with update values taking precedence.

func SumReducer

func SumReducer[T ~int | ~int64 | ~float64]() Reducer[T]

SumReducer sums numeric values.

type Router

type Router interface {
	// Route 路由决策,返回路由键
	Route(ctx context.Context, input any) (string, error)
}

Router 路由器接口 根据输入决定使用哪个处理器

type RouterFunc

type RouterFunc func(ctx context.Context, input any) (string, error)

RouterFunc 路由函数类型

type RoutingWorkflow

type RoutingWorkflow struct {
	// contains filtered or unexported fields
}

RoutingWorkflow 路由工作流 根据输入分类,将任务路由到专门的处理器

func NewRoutingWorkflow

func NewRoutingWorkflow(name, description string, router Router) *RoutingWorkflow

NewRoutingWorkflow 创建路由工作流

func (*RoutingWorkflow) Description

func (w *RoutingWorkflow) Description() string

func (*RoutingWorkflow) Execute

func (w *RoutingWorkflow) Execute(ctx context.Context, input any) (any, error)

Execute 执行路由工作流 1. 使用路由器决定路由 2. 查找对应的处理器 3. 执行处理器

func (*RoutingWorkflow) Name

func (w *RoutingWorkflow) Name() string

func (*RoutingWorkflow) RegisterHandler

func (w *RoutingWorkflow) RegisterHandler(route string, handler Handler)

RegisterHandler 注册处理器

func (*RoutingWorkflow) Routes

func (w *RoutingWorkflow) Routes() []string

Routes 返回所有已注册的路由

func (*RoutingWorkflow) SetDefaultRoute

func (w *RoutingWorkflow) SetDefaultRoute(route string)

SetDefaultRoute 设置默认路由

type StateGraph

type StateGraph struct {
	// contains filtered or unexported fields
}

StateGraph manages multiple channels as a unified state.

func NewStateGraph

func NewStateGraph() *StateGraph

NewStateGraph creates a new state graph.

func (*StateGraph) ApplyNodeOutput

func (sg *StateGraph) ApplyNodeOutput(output NodeOutput) error

ApplyNodeOutput applies a node's output to the state graph.

func (*StateGraph) Snapshot

func (sg *StateGraph) Snapshot() StateSnapshot

Snapshot creates a snapshot of the current state.

type StateSnapshot

type StateSnapshot struct {
	Values   map[string]any    `json:"values"`
	Versions map[string]uint64 `json:"versions"`
}

StateSnapshot captures the current state of all channels.

type Step

type Step interface {
	// Execute 执行步骤
	Execute(ctx context.Context, input any) (any, error)
	// Name 返回步骤名称
	Name() string
}

Step 工作流步骤接口

type StepFunc

type StepFunc func(ctx context.Context, input any) (any, error)

StepFunc 步骤函数类型

type Task

type Task interface {
	// Execute 执行任务
	Execute(ctx context.Context, input any) (any, error)
	// Name 返回任务名称
	Name() string
}

Task 并行任务接口

type TaskFunc

type TaskFunc func(ctx context.Context, input any) (any, error)

TaskFunc 任务函数类型

type TaskResult

type TaskResult struct {
	TaskName string
	Result   any
	Error    error
}

TaskResult 任务结果

type Tool

type Tool interface {
	Name() string
	Execute(ctx context.Context, params map[string]any) (any, error)
}

Tool represents an executable tool within a workflow.

type ToolRegistry

type ToolRegistry interface {
	// GetTool returns a Tool by name. Returns nil, false if not found.
	GetTool(name string) (Tool, bool)
	// ExecuteTool looks up and executes a tool in one call.
	ExecuteTool(ctx context.Context, name string, params map[string]any) (any, error)
}

ToolRegistry abstracts tool lookup and execution for workflow steps. Implement this interface to bridge workflow with your tool management layer.

type ToolStep

type ToolStep struct {
	ToolName string
	Params   map[string]any
	Registry ToolRegistry // Optional: inject to enable real tool execution
}

ToolStep executes a tool call. When Registry is set, it performs a real tool execution. When Registry is nil, it returns a placeholder map (backward compatible).

func (*ToolStep) Execute

func (s *ToolStep) Execute(ctx context.Context, input any) (any, error)

func (*ToolStep) Name

func (s *ToolStep) Name() string

type Variable

type Variable struct {
	Name         string `json:"name"`
	Type         string `json:"type"`
	DefaultValue any    `json:"default_value,omitempty"`
	Description  string `json:"description,omitempty"`
}

Variable represents a workflow variable.

type VisualBuilder

type VisualBuilder struct {
	// contains filtered or unexported fields
}

VisualBuilder builds DAG workflows from visual definitions.

func NewVisualBuilder

func NewVisualBuilder() *VisualBuilder

NewVisualBuilder creates a new visual builder.

func (*VisualBuilder) Build

func (b *VisualBuilder) Build(vw *VisualWorkflow) (*DAGWorkflow, error)

Build converts a visual workflow to executable DAG.

func (*VisualBuilder) RegisterStep

func (b *VisualBuilder) RegisterStep(name string, step Step)

RegisterStep registers a step implementation.

type VisualEdge

type VisualEdge struct {
	ID         string `json:"id"`
	Source     string `json:"source"`
	SourcePort string `json:"source_port,omitempty"`
	Target     string `json:"target"`
	TargetPort string `json:"target_port,omitempty"`
	Label      string `json:"label,omitempty"`
	Condition  string `json:"condition,omitempty"` // For conditional edges
}

VisualEdge represents a connection between nodes.

type VisualNode

type VisualNode struct {
	ID       string         `json:"id"`
	Type     VisualNodeType `json:"type"`
	Label    string         `json:"label"`
	Position Position       `json:"position"`
	Config   NodeConfig     `json:"config"`
	Inputs   []Port         `json:"inputs,omitempty"`
	Outputs  []Port         `json:"outputs,omitempty"`
}

VisualNode represents a node in the visual workflow.

type VisualNodeType

type VisualNodeType string

VisualNodeType defines visual node types.

const (
	VNodeStart     VisualNodeType = "start"
	VNodeEnd       VisualNodeType = "end"
	VNodeLLM       VisualNodeType = "llm"
	VNodeTool      VisualNodeType = "tool"
	VNodeCondition VisualNodeType = "condition"
	VNodeLoop      VisualNodeType = "loop"
	VNodeParallel  VisualNodeType = "parallel"
	VNodeHuman     VisualNodeType = "human_input"
	VNodeCode      VisualNodeType = "code"
	VNodeSubflow   VisualNodeType = "subflow"
)

type VisualWorkflow

type VisualWorkflow struct {
	ID          string         `json:"id"`
	Name        string         `json:"name"`
	Description string         `json:"description"`
	Version     string         `json:"version"`
	Nodes       []VisualNode   `json:"nodes"`
	Edges       []VisualEdge   `json:"edges"`
	Variables   []Variable     `json:"variables,omitempty"`
	Metadata    map[string]any `json:"metadata,omitempty"`
	CreatedAt   time.Time      `json:"created_at"`
	UpdatedAt   time.Time      `json:"updated_at"`
}

VisualWorkflow represents a workflow designed in visual builder.

func Import

func Import(data []byte) (*VisualWorkflow, error)

Import imports visual workflow from JSON.

func (*VisualWorkflow) Export

func (vw *VisualWorkflow) Export() ([]byte, error)

Export exports visual workflow to JSON.

func (*VisualWorkflow) Validate

func (vw *VisualWorkflow) Validate() error

Validate validates the visual workflow.

type Workflow

type Workflow interface {
	// Execute 执行工作流
	Execute(ctx context.Context, input any) (any, error)
	// Name 返回工作流名称
	Name() string
	// Description 返回工作流描述
	Description() string
}

Workflow 工作流接口 Workflow 是预定义的步骤序列,提供可预测和一致的执行

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL