core

package
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package core provides the lifecycle manager implementation.

This file provides coordinated lifecycle management for GoAgent components:

  • Priority-based startup and shutdown ordering
  • Dependency resolution
  • Health aggregation
  • Graceful shutdown with timeout support

Package core provides panic handling infrastructure with hot-pluggable implementations.

This file defines interfaces and registry for panic recovery customization:

  • PanicHandler - Custom panic recovery strategies
  • PanicMetricsCollector - Statistics and monitoring
  • PanicLogger - Specialized logging
  • PanicHandlerRegistry - Thread-safe hot-swapping

Package core provides plugin bridge utilities for dynamic component loading.

This file addresses the fundamental tension between:

  • Compile-time type safety (Go generics)
  • Runtime flexibility (plugin systems, dynamic registration)

The solution introduces a layered architecture:

  1. DynamicRunnable - Runtime-safe interface using any types
  2. TypedAdapter - Converts between generic and dynamic versions
  3. PluginRegistry - Manages dynamically loaded components

Package core provides enhanced plugin registry with dependency injection.

This file extends the basic PluginRegistry with:

  • Version management (multiple versions of same plugin)
  • Dependency injection container
  • Configuration management
  • Lifecycle integration

Index

Constants

View Source
const (
	ChunkTypeText     = execution.ChunkTypeText
	ChunkTypeError    = execution.ChunkTypeError
	ChunkTypeJSON     = execution.ChunkTypeJSON
	ChunkTypeProgress = execution.ChunkTypeProgress
	ChunkTypeBinary   = execution.ChunkTypeBinary
	ChunkTypeControl  = execution.ChunkTypeControl
	ChunkTypeStatus   = execution.ChunkTypeStatus

	StreamStateRunning  = execution.StreamStateRunning
	StreamStatePaused   = execution.StreamStatePaused
	StreamStateError    = execution.StreamStateError
	StreamStateComplete = execution.StreamStateComplete
	StreamStateClosed   = execution.StreamStateClosed
)

Constants for internal use

Variables

View Source
var (
	// Agent 相关错误
	ErrAgentNotFound        = errors.New("agent not found")
	ErrAgentAlreadyExists   = errors.New("agent already exists")
	ErrAgentExecutionFailed = errors.New("agent execution failed")
	ErrInvalidAgentInput    = errors.New("invalid agent input")
	ErrNotImplemented       = errors.New("method not implemented")

	// Chain 相关错误
	ErrChainNotFound       = errors.New("chain not found")
	ErrChainAlreadyExists  = errors.New("chain already exists")
	ErrChainProcessFailed  = errors.New("chain process failed")
	ErrInvalidChainInput   = errors.New("invalid chain input")
	ErrStepExecutionFailed = errors.New("step execution failed")

	// Orchestrator 相关错误
	ErrOrchestratorNotReady       = errors.New("orchestrator not ready")
	ErrInvalidOrchestratorRequest = errors.New("invalid orchestrator request")
	ErrExecutionFailed            = errors.New("execution failed")
	ErrExecutionTimeout           = errors.New("execution timeout")

	// Tool 相关错误
	ErrToolNotFound        = errors.New("tool not found")
	ErrToolAlreadyExists   = errors.New("tool already exists")
	ErrToolExecutionFailed = errors.New("tool execution failed")
	ErrInvalidToolInput    = errors.New("invalid tool input")

	// Memory 相关错误
	ErrMemoryStoreFailed    = errors.New("memory store failed")
	ErrMemoryRetrieveFailed = errors.New("memory retrieve failed")
	ErrMemoryNotFound       = errors.New("memory not found")

	// LLM 相关错误
	ErrLLMNotAvailable    = errors.New("LLM not available")
	ErrLLMRequestFailed   = errors.New("LLM request failed")
	ErrLLMResponseInvalid = errors.New("LLM response invalid")
)

错误定义

View Source
var (
	NewBaseMiddleware           = middleware.NewBaseMiddleware
	NewMiddlewareChain          = middleware.NewMiddlewareChain
	NewLoggingMiddleware        = middleware.NewLoggingMiddleware
	NewTimingMiddleware         = middleware.NewTimingMiddleware
	NewCacheMiddleware          = middleware.NewCacheMiddleware
	NewRetryMiddleware          = middleware.NewRetryMiddleware
	NewTransformMiddleware      = middleware.NewTransformMiddleware
	NewCircuitBreakerMiddleware = middleware.NewCircuitBreakerMiddleware
	NewRateLimiterMiddleware    = middleware.NewRateLimiterMiddleware
	NewValidationMiddleware     = middleware.NewValidationMiddleware
	NewAuthenticationMiddleware = middleware.NewAuthenticationMiddleware
	NewDynamicPromptMiddleware  = middleware.NewDynamicPromptMiddleware
	NewToolSelectorMiddleware   = middleware.NewToolSelectorMiddleware
)

Constructor functions for internal use

View Source
var (
	DefaultStreamOptions = execution.DefaultStreamOptions
	NewTextChunk         = execution.NewTextChunk
	NewProgressChunk     = execution.NewProgressChunk
	NewErrorChunk        = execution.NewErrorChunk
)

Functions for internal use

View Source
var (
	NewAgentState = state.NewAgentState
)

Constructor functions for internal use

Functions

func Collect added in v0.2.0

func Collect[T any](gen Generator[T]) ([]T, error)

Collect 收集 Generator 的所有输出到切片

参数:

  • gen - Generator 实例

返回:

  • 所有数据的切片和第一个错误

注意:

  • 遇到第一个错误时停止收集
  • 适用于小数据集,大数据集建议流式处理

示例:

results, err := Collect(gen)
if err != nil {
    return err
}
fmt.Println(results)

func IsFastInvoker

func IsFastInvoker(agent Agent) bool

IsFastInvoker 检查 Agent 是否支持快速调用

使用示例:

if core.IsFastInvoker(agent) {
    // 使用快速调用路径
    output, err := agent.(core.FastInvoker).InvokeFast(ctx, input)
} else {
    // 使用标准路径
    output, err := agent.Invoke(ctx, input)
}

func PutChainInput

func PutChainInput(input *ChainInput)

PutChainInput returns a ChainInput to the object pool

func PutChainOutput

func PutChainOutput(output *ChainOutput)

PutChainOutput returns a ChainOutput to the object pool

func RegisterTyped

func RegisterTyped[I, O any](r *PluginRegistry, name string, runnable Runnable[I, O], metadata *PluginMetadata) error

RegisterTyped 注册类型安全的组件(自动适配为 DynamicRunnable)

func ResolveTyped

func ResolveTyped[T any](c *Container, name string) (T, error)

ResolveTyped resolves a service with type assertion.

func SetGlobalMetricsCollector

func SetGlobalMetricsCollector(collector PanicMetricsCollector)

SetGlobalMetricsCollector sets the global metrics collector.

func SetGlobalPanicHandler

func SetGlobalPanicHandler(handler PanicHandler)

SetGlobalPanicHandler sets the global panic handler.

func SetGlobalPanicLogger

func SetGlobalPanicLogger(logger PanicLogger)

SetGlobalPanicLogger sets the global panic logger.

func ToChannel added in v0.2.0

func ToChannel[T any](ctx context.Context, gen Generator[T], bufferSize int) <-chan StreamChunk[T]

ToChannel 将 Generator 转换为 Channel(向后兼容)

参数:

  • ctx - 上下文(用于取消)
  • gen - Generator 实例
  • bufferSize - channel 缓冲区大小(0 表示无缓冲)

返回:

  • 只读 channel,包含 StreamChunk

注意:

  • 此函数会启动 goroutine 消费 Generator
  • 当 ctx 取消或 Generator 结束时,channel 自动关闭

示例:

ch := ToChannel(ctx, gen, 10)
for item := range ch {
    if item.Error != nil {
        return item.Error
    }
    fmt.Println(item.Data)
}

Types

type Agent

type Agent interface {
	// 继承 Runnable 接口,Agent 是一个可执行的组件
	Runnable[*AgentInput, *AgentOutput]

	// Agent 特有方法
	// Name 返回 Agent 的名称
	Name() string

	// Description 返回 Agent 的描述
	Description() string

	// Capabilities 返回 Agent 的能力列表
	Capabilities() []string
}

Agent 定义通用 AI Agent 接口

Agent 是一个 Runnable[*AgentInput, *AgentOutput],具有推理能力的智能体,能够: - 接收输入并进行处理(通过 Runnable.Invoke) - 调用工具获取额外信息 - 使用 LLM 进行推理 - 返回结构化输出 - 支持流式处理、批量执行、管道连接等 Runnable 特性

Note: This is a generic version of the Agent interface. For cross-package compatibility, consider using interfaces.Agent.

type AgentAction

type AgentAction struct {
	Tool      string                 // 工具名称
	ToolInput map[string]interface{} // 工具输入
	Log       string                 // 日志信息
}

AgentAction Agent 执行的操作

type AgentExecutor

type AgentExecutor struct {
	// contains filtered or unexported fields
}

AgentExecutor 执行 Agent 的辅助结构

提供额外的执行逻辑,如重试、超时控制等

func NewAgentExecutor

func NewAgentExecutor(agent Agent, options ...ExecutorOption) *AgentExecutor

NewAgentExecutor 创建 Agent 执行器

func (*AgentExecutor) Execute

func (e *AgentExecutor) Execute(ctx context.Context, input *AgentInput) (*AgentOutput, error)

Execute 执行 Agent,支持重试和超时

type AgentInput

type AgentInput struct {
	// 任务描述
	Task        string                 `json:"task"`        // 任务描述
	Instruction string                 `json:"instruction"` // 具体指令
	Context     map[string]interface{} `json:"context"`     // 上下文信息

	// 执行选项
	Options AgentOptions `json:"options"` // 执行选项

	// 元数据
	SessionID string    `json:"session_id"` // 会话 ID
	Timestamp time.Time `json:"timestamp"`  // 时间戳
}

AgentInput Agent 输入

type AgentOptions

type AgentOptions struct {
	// LLM 配置
	Temperature float64 `json:"temperature,omitempty"` // LLM 温度参数
	MaxTokens   int     `json:"max_tokens,omitempty"`  // 最大 token 数
	Model       string  `json:"model,omitempty"`       // LLM 模型

	// 工具配置
	EnableTools  bool     `json:"enable_tools,omitempty"`   // 是否启用工具
	AllowedTools []string `json:"allowed_tools,omitempty"`  // 允许的工具列表
	MaxToolCalls int      `json:"max_tool_calls,omitempty"` // 最大工具调用次数

	// 记忆配置
	EnableMemory     bool `json:"enable_memory,omitempty"`      // 是否启用记忆
	LoadHistory      bool `json:"load_history,omitempty"`       // 是否加载历史
	SaveToMemory     bool `json:"save_to_memory,omitempty"`     // 是否保存到记忆
	MaxHistoryLength int  `json:"max_history_length,omitempty"` // 最大历史长度

	// 超时配置
	Timeout time.Duration `json:"timeout,omitempty"` // 超时时间
}

AgentOptions Agent 执行选项

func DefaultAgentOptions

func DefaultAgentOptions() AgentOptions

DefaultAgentOptions 返回默认的 Agent 选项

type AgentOutput

type AgentOutput struct {
	// 执行结果
	Result  interface{} `json:"result"`  // 结果数据
	Status  string      `json:"status"`  // 状态: "success", "failed", "partial"
	Message string      `json:"message"` // 结果消息

	// 推理过程
	ReasoningSteps []ReasoningStep `json:"reasoning_steps"` // 推理步骤
	ToolCalls      []ToolCall      `json:"tool_calls"`      // 工具调用记录

	// Token 使用统计
	TokenUsage *interfaces.TokenUsage `json:"token_usage,omitempty"` // LLM Token 使用统计

	// 元数据
	Latency   time.Duration          `json:"latency"`   // 执行延迟
	Timestamp time.Time              `json:"timestamp"` // 时间戳
	Metadata  map[string]interface{} `json:"metadata"`  // 额外元数据
}

AgentOutput Agent 输出

func TryInvokeFast

func TryInvokeFast(ctx context.Context, agent Agent, input *AgentInput) (*AgentOutput, error)

TryInvokeFast 尝试使用快速调用,如果不支持则回退到标准 Invoke

这是一个便捷函数,用于在不确定 Agent 是否支持 FastInvoker 时使用

使用示例:

output, err := core.TryInvokeFast(ctx, agent, input)

等价于:

var output *core.AgentOutput
var err error
if fastAgent, ok := agent.(core.FastInvoker); ok {
    output, err = fastAgent.InvokeFast(ctx, input)
} else {
    output, err = agent.Invoke(ctx, input)
}

type AgentState

type AgentState = state.AgentState

Type aliases for internal use within core package

type BackoffPolicy

type BackoffPolicy interface {
	// NextDelay 计算下一次重试的延迟
	NextDelay(attempt int) int64 // 返回毫秒数
}

BackoffPolicy 退避策略

type BaseAgent

type BaseAgent struct {
	*BaseRunnable[*AgentInput, *AgentOutput]
	// contains filtered or unexported fields
}

BaseAgent 提供 Agent 的基础实现

BaseAgent 实现了 Agent 接口,包括完整的 Runnable 接口支持 具体的执行逻辑需要通过组合或继承来实现

func NewBaseAgent

func NewBaseAgent(name, description string, capabilities []string) *BaseAgent

NewBaseAgent 创建基础 Agent

func (*BaseAgent) Batch

func (a *BaseAgent) Batch(ctx context.Context, inputs []*AgentInput) ([]*AgentOutput, error)

Batch 批量执行 Agent 使用 BaseRunnable 的默认批处理实现

func (*BaseAgent) Capabilities

func (a *BaseAgent) Capabilities() []string

Capabilities 返回 Agent 能力列表

func (*BaseAgent) Description

func (a *BaseAgent) Description() string

Description 返回 Agent 描述

func (*BaseAgent) Invoke

func (a *BaseAgent) Invoke(ctx context.Context, input *AgentInput) (*AgentOutput, error)

Invoke 执行 Agent 这是 Runnable 接口的核心方法,需要由具体 Agent 实现

func (*BaseAgent) InvokeFast

func (a *BaseAgent) InvokeFast(ctx context.Context, input *AgentInput) (*AgentOutput, error)

InvokeFast 快速调用(绕过中间件)

用于热路径优化,直接调用 Execute 方法 性能提升:避免接口调用和中间件开销

注意:此方法不会触发 Runnable 的回调和中间件逻辑 仅在性能关键路径且不需要额外处理时使用

func (*BaseAgent) Name

func (a *BaseAgent) Name() string

Name 返回 Agent 名称

func (*BaseAgent) Pipe

func (a *BaseAgent) Pipe(next Runnable[*AgentOutput, any]) Runnable[*AgentInput, any]

Pipe 连接到另一个 Runnable 将当前 Agent 的输出连接到下一个 Runnable 的输入

func (*BaseAgent) RunGenerator added in v0.2.0

func (a *BaseAgent) RunGenerator(ctx context.Context, input *AgentInput) Generator[*AgentOutput]

RunGenerator 使用 Generator 模式执行 Agent(实验性功能)

相比 Stream 方法的优势:

  • 零内存分配(无 channel、goroutine 开销)
  • 支持早期终止(通过 for-range break)
  • 更简洁的迭代语法

参数:

  • ctx - 上下文
  • input - 输入数据

返回:

  • Generator,产生 *AgentOutput 和 error

示例:

for output, err := range agent.RunGenerator(ctx, input) {
    if err != nil {
        return err
    }
    fmt.Println("Step:", output.Status)
}

注意:

  • 这是实验性 API,可能在未来版本中调整
  • 默认实现调用 Invoke 并产生单个结果
  • 具体 Agent 可以重写此方法实现真正的流式处理
  • 如需向后兼容,使用 Stream 方法

func (*BaseAgent) Stream

func (a *BaseAgent) Stream(ctx context.Context, input *AgentInput) (<-chan StreamChunk[*AgentOutput], error)

Stream 流式执行 Agent 默认实现将 Invoke 的结果包装成单个流块

func (*BaseAgent) WithCallbacks

func (a *BaseAgent) WithCallbacks(callbacks ...Callback) Runnable[*AgentInput, *AgentOutput]

WithCallbacks 添加回调处理器 返回一个新的 Agent 实例,包含指定的回调

func (*BaseAgent) WithConfig

func (a *BaseAgent) WithConfig(config RunnableConfig) Runnable[*AgentInput, *AgentOutput]

WithConfig 配置 Agent 返回一个新的 Agent 实例,使用指定的配置

type BaseCallback

type BaseCallback struct{}

BaseCallback 提供回调的默认实现(什么都不做)

用户可以嵌入 BaseCallback 并只重写需要的方法

func NewBaseCallback

func NewBaseCallback() *BaseCallback

NewBaseCallback 创建基础回调

func (*BaseCallback) OnAgentAction

func (b *BaseCallback) OnAgentAction(ctx context.Context, action *AgentAction) error

func (*BaseCallback) OnAgentFinish

func (b *BaseCallback) OnAgentFinish(ctx context.Context, output interface{}) error

func (*BaseCallback) OnChainEnd

func (b *BaseCallback) OnChainEnd(ctx context.Context, chainName string, output interface{}) error

func (*BaseCallback) OnChainError

func (b *BaseCallback) OnChainError(ctx context.Context, chainName string, err error) error

func (*BaseCallback) OnChainStart

func (b *BaseCallback) OnChainStart(ctx context.Context, chainName string, input interface{}) error

func (*BaseCallback) OnEnd

func (b *BaseCallback) OnEnd(ctx context.Context, output interface{}) error

func (*BaseCallback) OnError

func (b *BaseCallback) OnError(ctx context.Context, err error) error

func (*BaseCallback) OnLLMEnd

func (b *BaseCallback) OnLLMEnd(ctx context.Context, output string, tokenUsage int) error

func (*BaseCallback) OnLLMError

func (b *BaseCallback) OnLLMError(ctx context.Context, err error) error

func (*BaseCallback) OnLLMStart

func (b *BaseCallback) OnLLMStart(ctx context.Context, prompts []string, model string) error

func (*BaseCallback) OnStart

func (b *BaseCallback) OnStart(ctx context.Context, input interface{}) error

func (*BaseCallback) OnToolEnd

func (b *BaseCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error

func (*BaseCallback) OnToolError

func (b *BaseCallback) OnToolError(ctx context.Context, toolName string, err error) error

func (*BaseCallback) OnToolStart

func (b *BaseCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error

type BaseChain

type BaseChain struct {
	*BaseRunnable[*ChainInput, *ChainOutput]
	// contains filtered or unexported fields
}

BaseChain 提供 Chain 的基础实现

实现了完整的 Runnable 接口,包括: - Invoke: 执行链式处理 - Stream: 流式执行(将步骤结果作为流输出) - Batch: 批量执行多个输入 - Pipe: 管道连接 - WithCallbacks: 回调支持

func NewBaseChain

func NewBaseChain(name string, steps []Step) *BaseChain

NewBaseChain 创建基础 Chain

func (*BaseChain) Batch

func (c *BaseChain) Batch(ctx context.Context, inputs []*ChainInput) ([]*ChainOutput, error)

Batch 批量执行(实现 Runnable 接口)

func (*BaseChain) Invoke

func (c *BaseChain) Invoke(ctx context.Context, input *ChainInput) (*ChainOutput, error)

Invoke 执行链式处理(实现 Runnable 接口)

func (*BaseChain) Name

func (c *BaseChain) Name() string

Name 返回 Chain 名称

func (*BaseChain) Pipe

func (c *BaseChain) Pipe(next Runnable[*ChainOutput, any]) Runnable[*ChainInput, any]

Pipe 连接到另一个 Runnable(实现 Runnable 接口)

func (*BaseChain) Steps

func (c *BaseChain) Steps() int

Steps 返回步骤数量

func (*BaseChain) Stream

func (c *BaseChain) Stream(ctx context.Context, input *ChainInput) (<-chan StreamChunk[*ChainOutput], error)

Stream 流式执行(实现 Runnable 接口)

每个步骤执行完成后,立即发送一个流块

func (*BaseChain) WithCallbacks

func (c *BaseChain) WithCallbacks(callbacks ...Callback) Runnable[*ChainInput, *ChainOutput]

WithCallbacks 添加回调(重写返回类型)

func (*BaseChain) WithConfig

func (c *BaseChain) WithConfig(config RunnableConfig) Runnable[*ChainInput, *ChainOutput]

WithConfig 配置 Runnable(重写返回类型)

type BaseLifecycle

type BaseLifecycle struct {
	// contains filtered or unexported fields
}

BaseLifecycle provides a no-op implementation of Lifecycle interface. Components can embed this to only override methods they need.

func NewBaseLifecycle

func NewBaseLifecycle(name string) *BaseLifecycle

NewBaseLifecycle creates a new BaseLifecycle.

func (*BaseLifecycle) HealthCheck

func (b *BaseLifecycle) HealthCheck(ctx context.Context) interfaces.HealthStatus

HealthCheck returns healthy by default.

func (*BaseLifecycle) Init

func (b *BaseLifecycle) Init(ctx context.Context, config interface{}) error

Init does nothing by default.

func (*BaseLifecycle) Start

func (b *BaseLifecycle) Start(ctx context.Context) error

Start does nothing by default.

func (*BaseLifecycle) Stop

func (b *BaseLifecycle) Stop(ctx context.Context) error

Stop does nothing by default.

type BaseMiddleware

type BaseMiddleware = middleware.BaseMiddleware

Type aliases for internal use within core package

type BaseOrchestrator

type BaseOrchestrator struct {
	// contains filtered or unexported fields
}

BaseOrchestrator 提供编排器的基础实现

func NewBaseOrchestrator

func NewBaseOrchestrator(name string) *BaseOrchestrator

NewBaseOrchestrator 创建基础编排器

func (*BaseOrchestrator) Execute

Execute returns ErrNotImplemented.

Concrete orchestrator implementations must override this method. Using composition: embed BaseOrchestrator and implement Execute.

func (*BaseOrchestrator) GetAgent

func (o *BaseOrchestrator) GetAgent(name string) (Agent, bool)

GetAgent 获取 Agent

func (*BaseOrchestrator) GetChain

func (o *BaseOrchestrator) GetChain(name string) (Chain, bool)

GetChain 获取 Chain

func (*BaseOrchestrator) GetTool

func (o *BaseOrchestrator) GetTool(name string) (Tool, bool)

GetTool 获取 Tool

func (*BaseOrchestrator) Name

func (o *BaseOrchestrator) Name() string

Name 返回编排器名称

func (*BaseOrchestrator) RegisterAgent

func (o *BaseOrchestrator) RegisterAgent(name string, agent Agent) error

RegisterAgent 注册 Agent

func (*BaseOrchestrator) RegisterChain

func (o *BaseOrchestrator) RegisterChain(name string, chain Chain) error

RegisterChain 注册 Chain

func (*BaseOrchestrator) RegisterTool

func (o *BaseOrchestrator) RegisterTool(name string, tool Tool) error

RegisterTool 注册 Tool

type BasePlugin

type BasePlugin struct {
	*BaseLifecycle
	// contains filtered or unexported fields
}

BasePlugin provides a default implementation of the Plugin interface.

func NewBasePlugin

func NewBasePlugin(name, version string) *BasePlugin

NewBasePlugin creates a new base plugin.

func (*BasePlugin) AddAgent

func (p *BasePlugin) AddAgent(agent DynamicRunnable)

AddAgent registers an agent with the plugin.

func (*BasePlugin) AddTool

func (p *BasePlugin) AddTool(tool DynamicRunnable)

AddTool registers a tool with the plugin.

func (*BasePlugin) GetAgents

func (p *BasePlugin) GetAgents() []DynamicRunnable

GetAgents returns registered agents.

func (*BasePlugin) GetMiddleware

func (p *BasePlugin) GetMiddleware() []interface{}

GetMiddleware returns registered middleware.

func (*BasePlugin) GetTools

func (p *BasePlugin) GetTools() []DynamicRunnable

GetTools returns registered tools.

func (*BasePlugin) Name

func (p *BasePlugin) Name() string

Name returns the plugin name.

func (*BasePlugin) Version

func (p *BasePlugin) Version() string

Version returns the plugin version.

type BaseRunnable

type BaseRunnable[I, O any] struct {
	// contains filtered or unexported fields
}

BaseRunnable 提供 Runnable 的基础实现

实现了通用的功能如批处理、回调等 具体的 Invoke 和 Stream 需要由子类实现

func NewBaseRunnable

func NewBaseRunnable[I, O any]() *BaseRunnable[I, O]

NewBaseRunnable 创建基础 Runnable

func (*BaseRunnable[I, O]) Batch

func (r *BaseRunnable[I, O]) Batch(ctx context.Context, inputs []I, invoker func(context.Context, I) (O, error)) ([]O, error)

Batch 默认的批处理实现

使用 goroutines 并发执行多个输入

func (*BaseRunnable[I, O]) GetConfig

func (r *BaseRunnable[I, O]) GetConfig() RunnableConfig

GetConfig 获取配置

func (*BaseRunnable[I, O]) WithCallbacks

func (r *BaseRunnable[I, O]) WithCallbacks(callbacks ...Callback) *BaseRunnable[I, O]

WithCallbacks 添加回调

func (*BaseRunnable[I, O]) WithConfig

func (r *BaseRunnable[I, O]) WithConfig(config RunnableConfig) *BaseRunnable[I, O]

WithConfig 配置 Runnable

type Callback

type Callback interface {
	// 通用回调
	OnStart(ctx context.Context, input interface{}) error
	OnEnd(ctx context.Context, output interface{}) error
	OnError(ctx context.Context, err error) error

	// LLM 回调
	OnLLMStart(ctx context.Context, prompts []string, model string) error
	OnLLMEnd(ctx context.Context, output string, tokenUsage int) error
	OnLLMError(ctx context.Context, err error) error

	// Chain 回调
	OnChainStart(ctx context.Context, chainName string, input interface{}) error
	OnChainEnd(ctx context.Context, chainName string, output interface{}) error
	OnChainError(ctx context.Context, chainName string, err error) error

	// Tool 回调
	OnToolStart(ctx context.Context, toolName string, input interface{}) error
	OnToolEnd(ctx context.Context, toolName string, output interface{}) error
	OnToolError(ctx context.Context, toolName string, err error) error

	// Agent 回调
	OnAgentAction(ctx context.Context, action *AgentAction) error
	OnAgentFinish(ctx context.Context, output interface{}) error
}

Callback 定义回调处理器接口

借鉴 LangChain 的回调系统,提供灵活的监控和调试能力 所有回调方法都返回 error,如果返回非 nil 错误,执行将中止

type CallbackManager

type CallbackManager struct {
	// contains filtered or unexported fields
}

CallbackManager 管理多个回调处理器

func NewCallbackManager

func NewCallbackManager(callbacks ...Callback) *CallbackManager

NewCallbackManager 创建回调管理器

func (*CallbackManager) AddCallback

func (m *CallbackManager) AddCallback(cb Callback)

AddCallback 添加回调

func (*CallbackManager) OnEnd

func (m *CallbackManager) OnEnd(ctx context.Context, output interface{}) error

OnEnd 触发 OnEnd 回调

func (*CallbackManager) OnError

func (m *CallbackManager) OnError(ctx context.Context, err error) error

OnError 触发 OnError 回调

func (*CallbackManager) OnStart

func (m *CallbackManager) OnStart(ctx context.Context, input interface{}) error

OnStart 触发 OnStart 回调

func (*CallbackManager) RemoveCallback

func (m *CallbackManager) RemoveCallback(cb Callback)

RemoveCallback 移除回调

func (*CallbackManager) TriggerCallbacks

func (m *CallbackManager) TriggerCallbacks(fn func(Callback) error) error

TriggerCallbacks 触发所有回调的指定方法

type Chain

type Chain interface {
	Runnable[*ChainInput, *ChainOutput]

	// Name 返回 Chain 的名称
	Name() string

	// Steps 返回包含的步骤数量
	Steps() int
}

Chain 定义链式处理接口

Chain 是一种串行执行的处理模式,适用于: - 多步骤的数据处理流程 - 需要按顺序执行的分析任务 - 每个步骤依赖前一步骤的输出

Chain 现在直接实现 Runnable 接口,享受其所有能力: - Invoke: 单次执行 - Stream: 流式执行 - Batch: 批量执行 - Pipe: 管道连接 - WithCallbacks: 回调支持

type ChainInput

type ChainInput struct {
	// 输入数据
	Data interface{} `json:"data"` // 主要输入数据

	// 变量和上下文
	Vars map[string]interface{} `json:"vars,omitempty"` // 变量集合,用于在步骤间传递上下文

	// 执行控制
	Options ChainOptions `json:"options,omitempty"` // 执行选项
}

ChainInput Chain 输入

func GetChainInput

func GetChainInput() *ChainInput

GetChainInput retrieves a ChainInput from the object pool

type ChainOptions

type ChainOptions struct {
	// 执行控制
	StopOnError bool          `json:"stop_on_error,omitempty"` // 出错时是否停止
	Timeout     time.Duration `json:"timeout,omitempty"`       // 超时时间
	Parallel    bool          `json:"parallel,omitempty"`      // 是否并行执行(如果可能)

	// 步骤控制
	SkipSteps []int `json:"skip_steps,omitempty"` // 跳过的步骤编号
	OnlySteps []int `json:"only_steps,omitempty"` // 仅执行的步骤编号

	// 额外选项
	Extra map[string]interface{} `json:"extra,omitempty"` // 额外选项
}

ChainOptions Chain 执行选项

func DefaultChainOptions

func DefaultChainOptions() ChainOptions

DefaultChainOptions 返回默认的 Chain 选项

type ChainOutput

type ChainOutput struct {
	// 输出数据
	Data interface{} `json:"data"` // 最终输出数据

	// 执行信息
	StepsExecuted []StepExecution `json:"steps_executed"` // 执行的步骤详情
	TotalLatency  time.Duration   `json:"total_latency"`  // 总耗时
	Status        string          `json:"status"`         // 执行状态: "success", "failed", "partial"

	// 额外结果
	Metadata map[string]interface{} `json:"metadata,omitempty"` // 额外元数据
}

ChainOutput Chain 输出

func GetChainOutput

func GetChainOutput() *ChainOutput

GetChainOutput retrieves a ChainOutput from the object pool

type ChainableAgent

type ChainableAgent struct {
	*BaseAgent
	// contains filtered or unexported fields
}

ChainableAgent 可链式调用的 Agent

允许将多个 Agent 串联起来,前一个的输出作为后一个的输入

func NewChainableAgent

func NewChainableAgent(name, description string, agents ...Agent) *ChainableAgent

NewChainableAgent 创建可链式调用的 Agent

func (*ChainableAgent) Invoke

func (c *ChainableAgent) Invoke(ctx context.Context, input *AgentInput) (*AgentOutput, error)

Invoke 顺序调用所有 Agent(使用快速路径优化)

func (*ChainableAgent) InvokeFast

func (c *ChainableAgent) InvokeFast(ctx context.Context, input *AgentInput) (*AgentOutput, error)

InvokeFast 快速调用链(绕过回调)

用于嵌套链场景的性能优化

type ChunkMetadata

type ChunkMetadata = execution.ChunkMetadata

Type aliases for internal use within core package

type ChunkTransformFunc

type ChunkTransformFunc = execution.ChunkTransformFunc

Type aliases for internal use within core package

type ChunkType

type ChunkType = execution.ChunkType

Type aliases for internal use within core package

type Container

type Container struct {
	// contains filtered or unexported fields
}

Container provides dependency injection for plugins.

Plugins can request shared resources like Logger, Store, LLM clients, etc. through the container instead of passing them as parameters.

func NewContainer

func NewContainer() *Container

NewContainer creates a new dependency injection container.

func (*Container) MustResolve

func (c *Container) MustResolve(name string) interface{}

MustResolve resolves a service or panics.

func (*Container) Register

func (c *Container) Register(name string, instance interface{})

Register registers a service instance.

func (*Container) RegisterFactory

func (c *Container) RegisterFactory(name string, factory func() (interface{}, error))

RegisterFactory registers a factory function for lazy initialization.

func (*Container) Resolve

func (c *Container) Resolve(name string) (interface{}, error)

Resolve retrieves a service by name. If the service is registered as a factory, it will be created on first access.

type CostTrackingCallback

type CostTrackingCallback struct {
	*BaseCallback
	// contains filtered or unexported fields
}

CostTrackingCallback 成本追踪回调

追踪 LLM 调用的成本

func NewCostTrackingCallback

func NewCostTrackingCallback(pricing map[string]float64) *CostTrackingCallback

NewCostTrackingCallback 创建成本追踪回调

func (*CostTrackingCallback) GetTotalCost

func (c *CostTrackingCallback) GetTotalCost() float64

GetTotalCost 获取总成本

func (*CostTrackingCallback) GetTotalTokens

func (c *CostTrackingCallback) GetTotalTokens() int

GetTotalTokens 获取总 token 数

func (*CostTrackingCallback) OnLLMEnd

func (c *CostTrackingCallback) OnLLMEnd(ctx context.Context, output string, tokenUsage int) error

func (*CostTrackingCallback) Reset

func (c *CostTrackingCallback) Reset()

Reset 重置统计

type DefaultLifecycleManager

type DefaultLifecycleManager struct {
	// contains filtered or unexported fields
}

DefaultLifecycleManager implements interfaces.LifecycleManager.

Features:

  • Priority-based startup (lower priority starts first)
  • Reverse priority shutdown (higher priority stops first)
  • Dependency resolution for components implementing DependencyAware
  • Concurrent health checks
  • Graceful shutdown with configurable timeout

func GlobalLifecycleManager

func GlobalLifecycleManager() *DefaultLifecycleManager

GlobalLifecycleManager returns the global lifecycle manager instance.

func NewLifecycleManager

func NewLifecycleManager() *DefaultLifecycleManager

NewLifecycleManager creates a new lifecycle manager with default config.

func NewLifecycleManagerWithConfig

func NewLifecycleManagerWithConfig(config LifecycleManagerConfig) *DefaultLifecycleManager

NewLifecycleManagerWithConfig creates a new lifecycle manager with custom config.

func (*DefaultLifecycleManager) GetState

GetState returns the current state of a component.

func (*DefaultLifecycleManager) HealthCheckAll

HealthCheckAll returns aggregated health of all components.

func (*DefaultLifecycleManager) InitAll

func (m *DefaultLifecycleManager) InitAll(ctx context.Context) error

InitAll initializes all registered components in priority order.

func (*DefaultLifecycleManager) Register

func (m *DefaultLifecycleManager) Register(name string, component interfaces.Lifecycle, priority int) error

Register adds a component to be managed.

func (*DefaultLifecycleManager) RegisterWithConfig

func (m *DefaultLifecycleManager) RegisterWithConfig(name string, component interfaces.Lifecycle, priority int, config interface{}) error

RegisterWithConfig registers a component with its configuration.

func (*DefaultLifecycleManager) ShutdownSignal

func (m *DefaultLifecycleManager) ShutdownSignal() <-chan struct{}

ShutdownSignal returns a channel that closes when shutdown is signaled.

func (*DefaultLifecycleManager) SignalShutdown

func (m *DefaultLifecycleManager) SignalShutdown()

SignalShutdown signals all components to begin shutdown.

func (*DefaultLifecycleManager) StartAll

func (m *DefaultLifecycleManager) StartAll(ctx context.Context) error

StartAll starts all initialized components in priority order.

func (*DefaultLifecycleManager) StopAll

func (m *DefaultLifecycleManager) StopAll(ctx context.Context) error

StopAll stops all running components in reverse priority order.

func (*DefaultLifecycleManager) Unregister

func (m *DefaultLifecycleManager) Unregister(name string) error

Unregister removes a component from management.

func (*DefaultLifecycleManager) WaitForShutdown

func (m *DefaultLifecycleManager) WaitForShutdown(ctx context.Context) error

WaitForShutdown blocks until all components are stopped.

type DefaultPanicHandler

type DefaultPanicHandler struct{}

DefaultPanicHandler is the default panic handler implementation.

Behavior:

  • Converts all panics to AgentError with CodeInternal
  • Preserves panic value and stack trace in error context
  • Compatible with existing error handling infrastructure

func (*DefaultPanicHandler) HandlePanic

func (h *DefaultPanicHandler) HandlePanic(ctx context.Context, component, operation string, panicValue interface{}, stackTrace string) error

HandlePanic implements PanicHandler interface.

type DynamicRunnable

type DynamicRunnable interface {
	// Invoke 执行动态输入,返回动态输出
	InvokeDynamic(ctx context.Context, input any) (any, error)

	// Stream 流式执行
	StreamDynamic(ctx context.Context, input any) (<-chan DynamicStreamChunk, error)

	// Batch 批量执行
	BatchDynamic(ctx context.Context, inputs []any) ([]any, error)

	// TypeInfo 返回类型信息,用于运行时验证
	TypeInfo() TypeInfo
}

DynamicRunnable 是插件兼容的动态 Runnable 接口

设计理念:

  • 在插件边界使用 any 类型,保证运行时兼容性
  • 通过 InputSchema/OutputSchema 提供运行时类型信息
  • 支持 JSON 序列化的输入输出,便于跨进程通信

使用场景:

  • Go 原生插件 (.so)
  • 动态注册的第三方组件
  • RPC/HTTP 调用的远程组件

type DynamicStreamChunk

type DynamicStreamChunk struct {
	Data  any   `json:"data,omitempty"`
	Error error `json:"-"`
	Done  bool  `json:"done"`
}

DynamicStreamChunk 动态流式输出块

type DynamicToTypedAdapter

type DynamicToTypedAdapter[I, O any] struct {
	// contains filtered or unexported fields
}

DynamicToTypedAdapter 将 DynamicRunnable 转换为泛型 Runnable

用于在类型安全的代码中使用动态加载的组件

func NewDynamicToTypedAdapter

func NewDynamicToTypedAdapter[I, O any](dynamic DynamicRunnable) *DynamicToTypedAdapter[I, O]

NewDynamicToTypedAdapter 创建动态到类型安全的适配器

func (*DynamicToTypedAdapter[I, O]) Batch

func (a *DynamicToTypedAdapter[I, O]) Batch(ctx context.Context, inputs []I) ([]O, error)

Batch 实现批量执行

func (*DynamicToTypedAdapter[I, O]) Invoke

func (a *DynamicToTypedAdapter[I, O]) Invoke(ctx context.Context, input I) (O, error)

Invoke 实现 Runnable[I, O] 接口

func (*DynamicToTypedAdapter[I, O]) Pipe

func (a *DynamicToTypedAdapter[I, O]) Pipe(next Runnable[O, any]) Runnable[I, any]

Pipe 连接到另一个 Runnable

func (*DynamicToTypedAdapter[I, O]) Stream

func (a *DynamicToTypedAdapter[I, O]) Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error)

Stream 实现流式执行

func (*DynamicToTypedAdapter[I, O]) WithCallbacks

func (a *DynamicToTypedAdapter[I, O]) WithCallbacks(callbacks ...Callback) Runnable[I, O]

WithCallbacks 添加回调

func (*DynamicToTypedAdapter[I, O]) WithConfig

func (a *DynamicToTypedAdapter[I, O]) WithConfig(config RunnableConfig) Runnable[I, O]

WithConfig 配置 Runnable

func (*DynamicToTypedAdapter[I, O]) WithValidator

func (a *DynamicToTypedAdapter[I, O]) WithValidator(validator func(any) error) *DynamicToTypedAdapter[I, O]

WithValidator 添加运行时验证器

type EnhancedPluginRegistry

type EnhancedPluginRegistry struct {
	// contains filtered or unexported fields
}

EnhancedPluginRegistry extends PluginRegistry with version management and DI.

func GlobalEnhancedPluginRegistry

func GlobalEnhancedPluginRegistry() *EnhancedPluginRegistry

GlobalEnhancedPluginRegistry returns the global enhanced plugin registry.

func NewEnhancedPluginRegistry

func NewEnhancedPluginRegistry() *EnhancedPluginRegistry

NewEnhancedPluginRegistry creates a new enhanced plugin registry.

func (*EnhancedPluginRegistry) Container

func (r *EnhancedPluginRegistry) Container() *Container

Container returns the dependency injection container.

func (*EnhancedPluginRegistry) GetAllAgents

func (r *EnhancedPluginRegistry) GetAllAgents() map[string][]DynamicRunnable

GetAllAgents returns all agents from all registered plugins.

func (*EnhancedPluginRegistry) GetAllTools

func (r *EnhancedPluginRegistry) GetAllTools() map[string][]DynamicRunnable

GetAllTools returns all tools from all registered plugins.

func (*EnhancedPluginRegistry) GetLatestPlugin

func (r *EnhancedPluginRegistry) GetLatestPlugin(name string) (Plugin, string, error)

GetLatestPlugin retrieves the latest version of a plugin.

func (*EnhancedPluginRegistry) GetPlugin

func (r *EnhancedPluginRegistry) GetPlugin(name, version string) (Plugin, error)

GetPlugin retrieves a specific version of a plugin.

func (*EnhancedPluginRegistry) InitializeAll

func (r *EnhancedPluginRegistry) InitializeAll(ctx context.Context) error

InitializeAll initializes all plugins in dependency order.

func (*EnhancedPluginRegistry) Lifecycle

Lifecycle returns the lifecycle manager.

func (*EnhancedPluginRegistry) ListPlugins

func (r *EnhancedPluginRegistry) ListPlugins() []string

ListPlugins returns all registered plugin names.

func (*EnhancedPluginRegistry) ListVersions

func (r *EnhancedPluginRegistry) ListVersions(name string) []string

ListVersions returns all versions of a plugin.

func (*EnhancedPluginRegistry) RegisterPlugin

func (r *EnhancedPluginRegistry) RegisterPlugin(plugin Plugin, config *PluginConfig) error

RegisterPlugin registers a plugin with optional version.

func (*EnhancedPluginRegistry) StartAll

func (r *EnhancedPluginRegistry) StartAll(ctx context.Context) error

StartAll starts all initialized plugins.

func (*EnhancedPluginRegistry) StopAll

func (r *EnhancedPluginRegistry) StopAll(ctx context.Context) error

StopAll stops all running plugins.

type ExecutionStep

type ExecutionStep struct {
	// 步骤信息
	Step        int    `json:"step"`        // 步骤编号
	Name        string `json:"name"`        // 步骤名称
	Type        string `json:"type"`        // 类型: "agent", "chain", "tool", "decision"
	Description string `json:"description"` // 描述

	// 执行信息
	ComponentName string      `json:"component_name"` // 组件名称
	Input         interface{} `json:"input"`          // 输入
	Output        interface{} `json:"output"`         // 输出
	Status        string      `json:"status"`         // 状态: "pending", "running", "success", "failed", "skipped"

	// 时间信息
	StartTime time.Time     `json:"start_time"` // 开始时间
	EndTime   time.Time     `json:"end_time"`   // 结束时间
	Duration  time.Duration `json:"duration"`   // 耗时

	// 错误信息
	Error        string `json:"error,omitempty"`         // 错误信息
	RetryCount   int    `json:"retry_count,omitempty"`   // 重试次数
	PartialError string `json:"partial_error,omitempty"` // 部分错误

	// 元数据
	Metadata map[string]interface{} `json:"metadata,omitempty"` // 额外元数据
}

ExecutionStep 执行步骤

type ExecutorOption

type ExecutorOption func(*AgentExecutor)

ExecutorOption 执行器选项函数

func WithMaxRetries

func WithMaxRetries(maxRetries int) ExecutorOption

WithMaxRetries 设置最大重试次数

func WithStopOnError

func WithStopOnError(stop bool) ExecutorOption

WithStopOnError 设置是否在错误时停止

func WithTimeout

func WithTimeout(timeout time.Duration) ExecutorOption

WithTimeout 设置超时时间

type FastInvoker

type FastInvoker interface {
	// InvokeFast 快速执行 Agent,跳过回调和中间件
	//
	// 参数:
	//   - ctx: 执行上下文(用于取消和超时控制)
	//   - input: Agent 输入
	//
	// 返回:
	//   - output: Agent 输出
	//   - error: 执行错误
	//
	// 性能特性:
	//   - 无回调触发
	//   - 无中间件执行
	//   - 最小化内存分配
	//   - 优化的执行路径
	InvokeFast(ctx context.Context, input *AgentInput) (*AgentOutput, error)
}

FastInvoker 定义快速调用接口

FastInvoker 提供绕过回调和中间件的高性能执行路径,适用于:

  • Chain 内部调用
  • 嵌套 Agent 调用
  • 高频循环场景
  • 性能关键路径

性能收益:

  • 延迟降低 4-6%
  • 内存分配减少 5-8%
  • 无回调遍历开销
  • 无虚拟方法分派开销

注意事项:

  • InvokeFast 不触发任何回调(OnStart/OnFinish/OnLLM*/OnTool*)
  • 不执行中间件逻辑
  • 适用于内部调用,外部入口应使用标准 Invoke 方法
  • 调试时建议使用标准 Invoke 以获得完整追踪信息

使用示例:

// 检查 Agent 是否支持快速调用
if fastAgent, ok := agent.(FastInvoker); ok {
    output, err := fastAgent.InvokeFast(ctx, input)
} else {
    output, err := agent.Invoke(ctx, input)
}

参考文档:docs/guides/INVOKE_FAST_OPTIMIZATION.md

type FunctionalLifecycle

type FunctionalLifecycle struct {
	// contains filtered or unexported fields
}

FunctionalLifecycle allows creating Lifecycle implementations using functions.

func NewFunctionalLifecycle

func NewFunctionalLifecycle(name string, opts ...FunctionalLifecycleOption) *FunctionalLifecycle

NewFunctionalLifecycle creates a new FunctionalLifecycle.

func (*FunctionalLifecycle) HealthCheck

HealthCheck calls the configured health function.

func (*FunctionalLifecycle) Init

func (f *FunctionalLifecycle) Init(ctx context.Context, config interface{}) error

Init calls the configured init function.

func (*FunctionalLifecycle) Start

func (f *FunctionalLifecycle) Start(ctx context.Context) error

Start calls the configured start function.

func (*FunctionalLifecycle) Stop

Stop calls the configured stop function.

type FunctionalLifecycleOption

type FunctionalLifecycleOption func(*FunctionalLifecycle)

FunctionalLifecycleOption configures a FunctionalLifecycle.

func WithHealthFunc

WithHealthFunc sets the HealthCheck function.

func WithInitFunc

func WithInitFunc(fn func(context.Context, interface{}) error) FunctionalLifecycleOption

WithInitFunc sets the Init function.

func WithStartFunc

func WithStartFunc(fn func(context.Context) error) FunctionalLifecycleOption

WithStartFunc sets the Start function.

func WithStopFunc

func WithStopFunc(fn func(context.Context) error) FunctionalLifecycleOption

WithStopFunc sets the Stop function.

type Generator added in v0.2.0

type Generator[T any] iter.Seq2[T, error]

Generator 定义生成器类型(基于 Go 1.25 iter.Seq2)

Generator 提供惰性求值的流式处理能力,相比 Channel 有以下优势:

  • 零内存分配(无 channel、goroutine 开销)
  • 支持早期终止(通过 yield 返回值)
  • 统一的迭代接口(for-range 循环)

示例:

gen := agent.RunGenerator(ctx, input)
for output, err := range gen {
    if err != nil {
        return err
    }
    fmt.Println(output)
}

func Filter added in v0.2.0

func Filter[T any](gen Generator[T], predicate func(T) bool) Generator[T]

Filter 过滤 Generator 输出

参数:

  • gen - Generator 实例
  • predicate - 过滤条件函数

返回:

  • 新的 Generator,仅包含满足条件的元素

示例:

filtered := Filter(gen, func(data T) bool {
    return data.Score > 0.5
})

func FromChannel added in v0.2.0

func FromChannel[T any](ch <-chan StreamChunk[T]) Generator[T]

FromChannel 将 Channel 转换为 Generator

参数:

  • ch - 输入 channel

返回:

  • Generator 实例

示例:

gen := FromChannel(ch)
for data, err := range gen {
    // 处理数据
}

func GeneratorFunc added in v0.2.0

func GeneratorFunc[T any](fn func(yield func(T, error) bool)) Generator[T]

GeneratorFunc 将函数转换为 Generator

参数:

  • fn - 生成器函数,接受 yield 函数作为参数

返回:

  • Generator 实例

示例:

gen := GeneratorFunc(func(yield func(T, error) bool) {
    for i := 0; i < 10; i++ {
        if !yield(data, nil) {
            return  // 早期终止
        }
    }
})

func Map added in v0.2.0

func Map[T, R any](gen Generator[T], mapper func(T) R) Generator[R]

Map 映射 Generator 输出

参数:

  • gen - Generator 实例
  • mapper - 映射函数

返回:

  • 新的 Generator,包含映射后的元素

示例:

mapped := Map(gen, func(data T) R {
    return transform(data)
})

func Take added in v0.2.0

func Take[T any](gen Generator[T], n int) Generator[T]

Take 从 Generator 中取前 n 个元素

参数:

  • gen - Generator 实例
  • n - 要取的元素数量

返回:

  • 新的 Generator,最多产生 n 个元素

示例:

first5 := Take(gen, 5)
for data, err := range first5 {
    // 最多处理 5 个元素
}

type Handler

type Handler = middleware.Handler

Type aliases for internal use within core package

type Interrupt

type Interrupt struct {
	// ID is a unique identifier for this interrupt
	ID string

	// Type specifies what kind of interrupt this is
	Type InterruptType

	// Priority indicates how urgently this needs to be handled
	Priority InterruptPriority

	// Message describes what human action is needed
	Message string

	// Context provides additional context for the interrupt
	Context map[string]interface{}

	// State is a snapshot of the agent state at interrupt time
	State state.State

	// CreatedAt is when the interrupt was created
	CreatedAt time.Time

	// ExpiresAt is when the interrupt expires (optional)
	ExpiresAt *time.Time

	// Metadata holds additional interrupt metadata
	Metadata map[string]interface{}
}

Interrupt represents a point where human intervention is needed.

type InterruptConfig

type InterruptConfig struct {
	// EnableAutoSave saves state before each interrupt
	EnableAutoSave bool

	// DefaultTimeout is the default timeout for interrupts
	DefaultTimeout time.Duration

	// RequireReason requires a reason for all responses
	RequireReason bool

	// EnableHistory keeps history of all interrupts
	EnableHistory bool
}

InterruptConfig configures interrupt behavior.

func DefaultInterruptConfig

func DefaultInterruptConfig() *InterruptConfig

DefaultInterruptConfig returns default interrupt configuration.

type InterruptManager

type InterruptManager struct {
	// contains filtered or unexported fields
}

InterruptManager manages interrupts and their lifecycle.

func NewInterruptManager

func NewInterruptManager(checkpointer checkpoint.Checkpointer) *InterruptManager

NewInterruptManager creates a new interrupt manager.

func (*InterruptManager) CancelInterrupt

func (m *InterruptManager) CancelInterrupt(interruptID string) error

CancelInterrupt cancels a pending interrupt.

func (*InterruptManager) CreateInterrupt

func (m *InterruptManager) CreateInterrupt(ctx context.Context, interrupt *Interrupt) (*Interrupt, *InterruptResponse, error)

CreateInterrupt creates a new interrupt and waits for human response. Returns the created interrupt (with ID assigned) and the response.

func (*InterruptManager) GetInterrupt

func (m *InterruptManager) GetInterrupt(interruptID string) (*Interrupt, error)

GetInterrupt retrieves an interrupt by ID.

func (*InterruptManager) ListPendingInterrupts

func (m *InterruptManager) ListPendingInterrupts() []*Interrupt

ListPendingInterrupts returns all interrupts awaiting response.

func (*InterruptManager) OnInterruptCreated

func (m *InterruptManager) OnInterruptCreated(fn func(*Interrupt))

OnInterruptCreated sets a hook for when interrupts are created.

func (*InterruptManager) OnInterruptResolved

func (m *InterruptManager) OnInterruptResolved(fn func(*Interrupt, *InterruptResponse))

OnInterruptResolved sets a hook for when interrupts are resolved.

func (*InterruptManager) RespondToInterrupt

func (m *InterruptManager) RespondToInterrupt(interruptID string, response *InterruptResponse) error

RespondToInterrupt provides a response to an existing interrupt.

type InterruptPriority

type InterruptPriority string

InterruptPriority defines the priority/severity of an interrupt.

const (
	// InterruptPriorityLow can be handled asynchronously
	InterruptPriorityLow InterruptPriority = "low"

	// InterruptPriorityMedium should be handled in a reasonable timeframe
	InterruptPriorityMedium InterruptPriority = "medium"

	// InterruptPriorityHigh requires immediate attention
	InterruptPriorityHigh InterruptPriority = "high"

	// InterruptPriorityCritical blocks execution until resolved
	InterruptPriorityCritical InterruptPriority = "critical"
)

type InterruptResponse

type InterruptResponse struct {
	// InterruptID is the ID of the interrupt being responded to
	InterruptID string

	// Approved indicates if the action was approved
	Approved bool

	// Input contains any human-provided input/feedback
	Input map[string]interface{}

	// Reason explains why the decision was made
	Reason string

	// RespondedAt is when the response was provided
	RespondedAt time.Time

	// RespondedBy identifies who provided the response
	RespondedBy string
}

InterruptResponse represents the human response to an interrupt.

type InterruptRule

type InterruptRule struct {
	// Name identifies the rule
	Name string

	// Condition evaluates if an interrupt should be triggered
	Condition func(ctx context.Context, state state.State) bool

	// CreateInterrupt creates the interrupt if condition is met
	CreateInterrupt func(ctx context.Context, state state.State) *Interrupt
}

InterruptRule defines a condition that triggers an interrupt.

type InterruptType

type InterruptType string

InterruptType defines the type of interrupt.

const (
	// InterruptTypeApproval requires human approval to continue
	InterruptTypeApproval InterruptType = "approval"

	// InterruptTypeInput requires human input/feedback
	InterruptTypeInput InterruptType = "input"

	// InterruptTypeReview requires human review before proceeding
	InterruptTypeReview InterruptType = "review"

	// InterruptTypeDecision requires human decision making
	InterruptTypeDecision InterruptType = "decision"
)

type InterruptableExecutor

type InterruptableExecutor struct {
	// contains filtered or unexported fields
}

InterruptableExecutor wraps execution with interrupt capability.

func NewInterruptableExecutor

func NewInterruptableExecutor(manager *InterruptManager, checkpointer checkpoint.Checkpointer) *InterruptableExecutor

NewInterruptableExecutor creates a new interruptable executor.

func (*InterruptableExecutor) AddInterruptRule

func (e *InterruptableExecutor) AddInterruptRule(rule InterruptRule)

AddInterruptRule adds a rule that triggers interrupts.

func (*InterruptableExecutor) CheckInterrupts

func (e *InterruptableExecutor) CheckInterrupts(ctx context.Context, state state.State) ([]*Interrupt, error)

CheckInterrupts evaluates all rules and creates interrupts if needed.

func (*InterruptableExecutor) ExecuteWithInterrupts

func (e *InterruptableExecutor) ExecuteWithInterrupts(
	ctx context.Context,
	state state.State,
	fn func(context.Context, state.State) error,
) error

ExecuteWithInterrupts executes a function with interrupt checking.

type LegacyStreamChunk

type LegacyStreamChunk = execution.LegacyStreamChunk

Type aliases for internal use within core package

type LifecycleManagerConfig

type LifecycleManagerConfig struct {
	// DefaultInitTimeout is the default timeout for Init operations
	DefaultInitTimeout time.Duration

	// DefaultStartTimeout is the default timeout for Start operations
	DefaultStartTimeout time.Duration

	// DefaultStopTimeout is the default timeout for Stop operations
	DefaultStopTimeout time.Duration

	// HealthCheckInterval is the interval between automatic health checks
	HealthCheckInterval time.Duration

	// ContinueOnError determines whether to continue if a component fails
	ContinueOnError bool
}

LifecycleManagerConfig configures the lifecycle manager.

func DefaultLifecycleManagerConfig

func DefaultLifecycleManagerConfig() LifecycleManagerConfig

DefaultLifecycleManagerConfig returns sensible defaults.

type Logger

type Logger interface {
	Info(msg string, fields ...interface{})
	Error(msg string, fields ...interface{})
	Debug(msg string, fields ...interface{})
}

Logger 日志接口

type LoggingCallback

type LoggingCallback struct {
	*BaseCallback
	// contains filtered or unexported fields
}

LoggingCallback 日志记录回调

将所有事件记录到日志

func NewLoggingCallback

func NewLoggingCallback(logger Logger, verbose bool) *LoggingCallback

NewLoggingCallback 创建日志回调

func (*LoggingCallback) OnEnd

func (l *LoggingCallback) OnEnd(ctx context.Context, output interface{}) error

func (*LoggingCallback) OnError

func (l *LoggingCallback) OnError(ctx context.Context, err error) error

func (*LoggingCallback) OnLLMEnd

func (l *LoggingCallback) OnLLMEnd(ctx context.Context, output string, tokenUsage int) error

func (*LoggingCallback) OnLLMStart

func (l *LoggingCallback) OnLLMStart(ctx context.Context, prompts []string, model string) error

func (*LoggingCallback) OnStart

func (l *LoggingCallback) OnStart(ctx context.Context, input interface{}) error

func (*LoggingCallback) OnToolEnd

func (l *LoggingCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error

func (*LoggingCallback) OnToolStart

func (l *LoggingCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error

type MetricsCallback

type MetricsCallback struct {
	*BaseCallback
	// contains filtered or unexported fields
}

MetricsCallback 指标收集回调

收集执行指标(延迟、调用次数等)

func NewMetricsCallback

func NewMetricsCallback(metrics MetricsCollector) *MetricsCallback

NewMetricsCallback 创建指标回调

func (*MetricsCallback) OnLLMEnd

func (m *MetricsCallback) OnLLMEnd(ctx context.Context, output string, tokenUsage int) error

func (*MetricsCallback) OnLLMStart

func (m *MetricsCallback) OnLLMStart(ctx context.Context, prompts []string, model string) error

func (*MetricsCallback) OnToolEnd

func (m *MetricsCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error

func (*MetricsCallback) OnToolStart

func (m *MetricsCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error

type MetricsCollector

type MetricsCollector interface {
	IncrementCounter(name string, value int64, tags map[string]string)
	RecordHistogram(name string, value float64, tags map[string]string)
	RecordGauge(name string, value float64, tags map[string]string)
}

MetricsCollector 指标收集器接口

type Middleware

type Middleware = middleware.Middleware

Type aliases for internal use within core package

type MiddlewareChain

type MiddlewareChain = middleware.MiddlewareChain

Type aliases for internal use within core package

type MiddlewareRequest

type MiddlewareRequest = middleware.MiddlewareRequest

Type aliases for internal use within core package

type MiddlewareResponse

type MiddlewareResponse = middleware.MiddlewareResponse

Type aliases for internal use within core package

type NoOpMetricsCollector

type NoOpMetricsCollector struct{}

NoOpMetricsCollector is a no-op metrics collector (default).

This is the default implementation that does nothing. Replace with a real implementation (e.g., PrometheusMetricsCollector) in production.

func (*NoOpMetricsCollector) RecordPanic

func (c *NoOpMetricsCollector) RecordPanic(ctx context.Context, component, operation string, panicValue interface{})

RecordPanic implements PanicMetricsCollector interface (no-op).

type NoOpPanicLogger

type NoOpPanicLogger struct{}

NoOpPanicLogger is a no-op logger (default).

This is the default implementation that does nothing. Replace with a real implementation (e.g., StructuredPanicLogger) in production.

func (*NoOpPanicLogger) LogPanic

func (l *NoOpPanicLogger) LogPanic(ctx context.Context, component, operation string, panicValue interface{}, stackTrace string, recoveredError error)

LogPanic implements PanicLogger interface (no-op).

type Orchestrator

type Orchestrator interface {
	// Execute 执行编排任务
	Execute(ctx context.Context, request *OrchestratorRequest) (*OrchestratorResponse, error)

	// RegisterAgent 注册 Agent
	RegisterAgent(name string, agent Agent) error

	// RegisterChain 注册 Chain
	RegisterChain(name string, chain Chain) error

	// RegisterTool 注册 Tool
	RegisterTool(name string, tool Tool) error

	// Name 返回编排器名称
	Name() string
}

Orchestrator 定义编排器接口

Orchestrator 负责协调多个 Agent、Chain 和 Tool 的执行,适用于: - 复杂的多步骤工作流 - 需要多个 Agent 协作的场景 - 动态决策和条件分支

type OrchestratorOptions

type OrchestratorOptions struct {
	// 日志选项
	EnableLogging bool   `json:"enable_logging"` // 是否启用日志
	LogLevel      string `json:"log_level"`      // 日志级别

	// 监控选项
	EnableMetrics bool `json:"enable_metrics"` // 是否启用指标
	EnableTracing bool `json:"enable_tracing"` // 是否启用追踪

	// 回调
	OnStepStart    func(step ExecutionStep) `json:"-"` // 步骤开始回调
	OnStepComplete func(step ExecutionStep) `json:"-"` // 步骤完成回调
	OnError        func(err error)          `json:"-"` // 错误回调

	// 额外选项
	Extra map[string]interface{} `json:"extra,omitempty"` // 额外选项
}

OrchestratorOptions 编排器选项

func DefaultOrchestratorOptions

func DefaultOrchestratorOptions() OrchestratorOptions

DefaultOrchestratorOptions 返回默认编排选项

type OrchestratorRequest

type OrchestratorRequest struct {
	// 任务信息
	TaskID      string                 `json:"task_id"`     // 任务 ID
	TaskType    string                 `json:"task_type"`   // 任务类型
	Description string                 `json:"description"` // 任务描述
	Parameters  map[string]interface{} `json:"parameters"`  // 任务参数

	// 执行策略
	Strategy OrchestratorStrategy `json:"strategy"` // 编排策略

	// 选项
	Options OrchestratorOptions `json:"options"` // 执行选项

	// 元数据
	SessionID string    `json:"session_id"` // 会话 ID
	Timestamp time.Time `json:"timestamp"`  // 时间戳
}

OrchestratorRequest 编排器请求

type OrchestratorResponse

type OrchestratorResponse struct {
	// 执行结果
	Result  interface{} `json:"result"`  // 最终结果
	Status  string      `json:"status"`  // 状态: "success", "failed", "partial"
	Message string      `json:"message"` // 结果消息

	// 执行过程
	ExecutionPlan  []ExecutionStep `json:"execution_plan"`  // 执行计划
	ExecutionSteps []ExecutionStep `json:"execution_steps"` // 实际执行的步骤

	// 性能指标
	TotalLatency time.Duration `json:"total_latency"` // 总延迟
	StartTime    time.Time     `json:"start_time"`    // 开始时间
	EndTime      time.Time     `json:"end_time"`      // 结束时间

	// 元数据
	Metadata map[string]interface{} `json:"metadata"` // 额外元数据
}

OrchestratorResponse 编排器响应

type OrchestratorStrategy

type OrchestratorStrategy struct {
	// 执行模式
	Mode string `json:"mode"` // "sequential", "parallel", "hybrid"

	// 重试策略
	EnableRetry  bool `json:"enable_retry"`  // 是否启用重试
	MaxRetries   int  `json:"max_retries"`   // 最大重试次数
	RetryBackoff int  `json:"retry_backoff"` // 重试退避时间(秒)

	// 失败处理
	FailurePolicy string `json:"failure_policy"` // "stop", "continue", "rollback"

	// 超时策略
	GlobalTimeout time.Duration `json:"global_timeout"` // 全局超时
	StepTimeout   time.Duration `json:"step_timeout"`   // 单步超时
}

OrchestratorStrategy 编排策略

func DefaultOrchestratorStrategy

func DefaultOrchestratorStrategy() OrchestratorStrategy

DefaultOrchestratorStrategy 返回默认编排策略

type PanicHandler

type PanicHandler interface {
	// HandlePanic processes a recovered panic and returns an appropriate error.
	//
	// Parameters:
	//   - ctx: The execution context
	//   - component: The component where panic occurred (e.g., "runnable", "lifecycle_manager")
	//   - operation: The operation being performed (e.g., "invoke", "init", "start")
	//   - panicValue: The value passed to panic()
	//   - stackTrace: The stack trace at the point of panic
	//
	// Returns:
	//   - An error representing the panic, suitable for returning to callers
	HandlePanic(ctx context.Context, component, operation string, panicValue interface{}, stackTrace string) error
}

PanicHandler defines the interface for handling recovered panics.

Implementations can provide custom recovery strategies, such as:

  • Converting panics to domain-specific errors
  • Adding custom context information
  • Implementing retry logic
  • Graceful degradation

type PanicHandlerRegistry

type PanicHandlerRegistry struct {
	// contains filtered or unexported fields
}

PanicHandlerRegistry manages panic handling implementations with thread-safe hot-swapping.

Features:

  • Thread-safe read/write operations
  • Lock-free reads using atomic pointers
  • Hot-swappable implementations at runtime
  • Global singleton with sensible defaults

func GlobalPanicHandlerRegistry

func GlobalPanicHandlerRegistry() *PanicHandlerRegistry

GlobalPanicHandlerRegistry returns the global panic handler registry instance.

This singleton is used by all panic recovery code in the system. Customize by calling SetHandler, SetMetricsCollector, or SetLogger.

func NewPanicHandlerRegistry

func NewPanicHandlerRegistry() *PanicHandlerRegistry

NewPanicHandlerRegistry creates a new registry with default implementations.

func (*PanicHandlerRegistry) GetHandler

func (r *PanicHandlerRegistry) GetHandler() PanicHandler

GetHandler returns the current panic handler (lock-free read).

func (*PanicHandlerRegistry) GetLogger

func (r *PanicHandlerRegistry) GetLogger() PanicLogger

GetLogger returns the current panic logger (lock-free read).

func (*PanicHandlerRegistry) GetMetricsCollector

func (r *PanicHandlerRegistry) GetMetricsCollector() PanicMetricsCollector

GetMetricsCollector returns the current metrics collector (lock-free read).

func (*PanicHandlerRegistry) HandlePanic

func (r *PanicHandlerRegistry) HandlePanic(ctx context.Context, component, operation string, panicValue interface{}) error

HandlePanic is a convenience method that orchestrates the full panic handling flow: 1. Convert panic to error using handler 2. Record metrics 3. Log the event

This is the recommended way to handle panics with all customizations applied.

func (*PanicHandlerRegistry) SetHandler

func (r *PanicHandlerRegistry) SetHandler(handler PanicHandler)

SetHandler replaces the panic handler (thread-safe).

func (*PanicHandlerRegistry) SetLogger

func (r *PanicHandlerRegistry) SetLogger(logger PanicLogger)

SetLogger replaces the panic logger (thread-safe).

func (*PanicHandlerRegistry) SetMetricsCollector

func (r *PanicHandlerRegistry) SetMetricsCollector(collector PanicMetricsCollector)

SetMetricsCollector replaces the metrics collector (thread-safe).

type PanicLogger

type PanicLogger interface {
	// LogPanic logs a panic event with all relevant context.
	//
	// Parameters:
	//   - ctx: The execution context
	//   - component: The component where panic occurred
	//   - operation: The operation being performed
	//   - panicValue: The value passed to panic()
	//   - stackTrace: The stack trace at the point of panic
	//   - recoveredError: The error returned from HandlePanic
	LogPanic(ctx context.Context, component, operation string, panicValue interface{}, stackTrace string, recoveredError error)
}

PanicLogger defines the interface for logging panic events.

Implementations can provide specialized logging:

  • Structured logging (JSON, logfmt)
  • Different log levels based on panic type
  • Integration with centralized logging systems
  • Alert triggering for critical panics

type PanicMetricsCollector

type PanicMetricsCollector interface {
	// RecordPanic records that a panic occurred.
	//
	// Parameters:
	//   - ctx: The execution context
	//   - component: The component where panic occurred
	//   - operation: The operation being performed
	//   - panicValue: The value passed to panic()
	RecordPanic(ctx context.Context, component, operation string, panicValue interface{})
}

PanicMetricsCollector defines the interface for collecting panic statistics.

Implementations can integrate with monitoring systems:

  • Prometheus counters and histograms
  • StatsD metrics
  • Custom telemetry systems
  • Application performance monitoring (APM)

type Plugin

type Plugin interface {
	interfaces.Lifecycle

	// Name returns the plugin's unique identifier.
	Name() string

	// Version returns the plugin's version string.
	Version() string

	// GetTools returns tools provided by this plugin.
	GetTools() []DynamicRunnable

	// GetAgents returns agents provided by this plugin.
	GetAgents() []DynamicRunnable

	// GetMiddleware returns middleware provided by this plugin.
	GetMiddleware() []interface{}
}

Plugin represents a loadable plugin that can provide tools, agents, or other components.

Plugins implement lifecycle management and can access shared resources through the dependency injection container.

type PluginConfig

type PluginConfig struct {
	// Name is the plugin identifier
	Name string

	// Version is the plugin version
	Version string

	// Config is plugin-specific configuration
	Config map[string]interface{}

	// Dependencies lists other plugins this plugin depends on
	Dependencies []PluginDependency

	// Enabled determines if the plugin should be loaded
	Enabled bool
}

PluginConfig contains configuration for a plugin instance.

type PluginDependency

type PluginDependency struct {
	// Name of the required plugin
	Name string

	// VersionConstraint (e.g., ">=1.0.0", "~1.2.0")
	VersionConstraint string

	// Optional marks this dependency as optional
	Optional bool
}

PluginDependency specifies a dependency on another plugin.

type PluginMetadata

type PluginMetadata struct {
	Name        string   `json:"name"`
	Version     string   `json:"version"`
	Description string   `json:"description"`
	Author      string   `json:"author,omitempty"`
	Tags        []string `json:"tags,omitempty"`
	TypeInfo    TypeInfo `json:"type_info"`
}

PluginMetadata 插件元数据

type PluginRegistry

type PluginRegistry struct {
	// contains filtered or unexported fields
}

PluginRegistry 插件注册中心

管理动态加载的组件,提供:

  • 组件注册和发现
  • 类型信息查询
  • 版本管理

func GlobalPluginRegistry

func GlobalPluginRegistry() *PluginRegistry

GlobalPluginRegistry 获取全局插件注册中心

func NewPluginRegistry

func NewPluginRegistry() *PluginRegistry

NewPluginRegistry 创建插件注册中心

func (*PluginRegistry) Get

func (r *PluginRegistry) Get(name string) (DynamicRunnable, error)

Get 获取插件

func (*PluginRegistry) GetMetadata

func (r *PluginRegistry) GetMetadata(name string) (*PluginMetadata, error)

GetMetadata 获取插件元数据

func (*PluginRegistry) List

func (r *PluginRegistry) List() []string

List 列出所有插件

func (*PluginRegistry) Register

func (r *PluginRegistry) Register(name string, plugin DynamicRunnable, metadata *PluginMetadata) error

Register 注册插件

func (*PluginRegistry) Unregister

func (r *PluginRegistry) Unregister(name string) error

Unregister 注销插件

type ReasoningStep

type ReasoningStep struct {
	Step        int           `json:"step"`        // 步骤编号
	Action      string        `json:"action"`      // 执行的操作
	Description string        `json:"description"` // 操作描述
	Result      string        `json:"result"`      // 操作结果
	Duration    time.Duration `json:"duration"`    // 耗时
	Success     bool          `json:"success"`     // 是否成功
	Error       string        `json:"error"`       // 错误信息
}

ReasoningStep 推理步骤

type RetryPolicy

type RetryPolicy struct {
	MaxRetries int           // 最大重试次数
	Backoff    BackoffPolicy // 退避策略
}

RetryPolicy 重试策略

type Runnable

type Runnable[I, O any] interface {
	// Invoke 执行单个输入并返回输出
	Invoke(ctx context.Context, input I) (O, error)

	// Stream 流式执行,返回输出通道
	// 调用者负责从通道读取并处理结果
	Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error)

	// Batch 批量执行多个输入
	Batch(ctx context.Context, inputs []I) ([]O, error)

	// Pipe 连接到另一个 Runnable,形成管道
	// 当前 Runnable 的输出成为下一个 Runnable 的输入
	Pipe(next Runnable[O, any]) Runnable[I, any]

	// WithCallbacks 添加回调处理器
	WithCallbacks(callbacks ...Callback) Runnable[I, O]

	// WithConfig 配置 Runnable
	WithConfig(config RunnableConfig) Runnable[I, O]
}

Runnable 定义统一的可执行接口

借鉴 LangChain 的 Runnable 设计,提供统一的执行接口,支持: - 单个输入执行 (Invoke) - 流式执行 (Stream) - 批量执行 (Batch) - 管道连接 (Pipe) - 回调支持 (WithCallbacks)

泛型参数:

  • I: 输入类型
  • O: 输出类型

func GetTyped

func GetTyped[I, O any](r *PluginRegistry, name string) (Runnable[I, O], error)

GetTyped 获取类型安全的插件包装

type RunnableConfig

type RunnableConfig struct {
	// Callbacks 回调处理器列表
	Callbacks []Callback

	// Tags 标签,用于标识和分类
	Tags []string

	// Metadata 元数据
	Metadata map[string]interface{}

	// MaxConcurrency 最大并发数(用于 Batch)
	MaxConcurrency int

	// RetryPolicy 重试策略
	RetryPolicy *RetryPolicy
}

RunnableConfig Runnable 配置

type RunnableFunc

type RunnableFunc[I, O any] struct {
	*BaseRunnable[I, O]
	// contains filtered or unexported fields
}

RunnableFunc 函数式 Runnable

将普通函数包装成 Runnable

func NewRunnableFunc

func NewRunnableFunc[I, O any](fn func(context.Context, I) (O, error)) *RunnableFunc[I, O]

NewRunnableFunc 创建函数式 Runnable

func (*RunnableFunc[I, O]) Batch

func (f *RunnableFunc[I, O]) Batch(ctx context.Context, inputs []I) ([]O, error)

Batch 批量执行

func (*RunnableFunc[I, O]) Invoke

func (f *RunnableFunc[I, O]) Invoke(ctx context.Context, input I) (O, error)

Invoke 执行函数

func (*RunnableFunc[I, O]) Pipe

func (f *RunnableFunc[I, O]) Pipe(next Runnable[O, any]) Runnable[I, any]

Pipe 连接到另一个 Runnable

func (*RunnableFunc[I, O]) Stream

func (f *RunnableFunc[I, O]) Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error)

Stream 流式执行(默认实现:包装成单个块)

func (*RunnableFunc[I, O]) WithCallbacks

func (f *RunnableFunc[I, O]) WithCallbacks(callbacks ...Callback) Runnable[I, O]

WithCallbacks 添加回调(重写以返回正确的类型)

func (*RunnableFunc[I, O]) WithConfig

func (f *RunnableFunc[I, O]) WithConfig(config RunnableConfig) Runnable[I, O]

WithConfig 配置 Runnable(重写以返回正确的类型)

type RunnablePipe

type RunnablePipe[I, M, O any] struct {
	// contains filtered or unexported fields
}

RunnablePipe 管道 Runnable,连接两个 Runnable

func NewRunnablePipe

func NewRunnablePipe[I, M, O any](first Runnable[I, M], second Runnable[M, O]) *RunnablePipe[I, M, O]

NewRunnablePipe 创建管道 Runnable

func (*RunnablePipe[I, M, O]) Batch

func (p *RunnablePipe[I, M, O]) Batch(ctx context.Context, inputs []I) ([]O, error)

Batch 批量执行管道

func (*RunnablePipe[I, M, O]) Invoke

func (p *RunnablePipe[I, M, O]) Invoke(ctx context.Context, input I) (O, error)

Invoke 执行管道

func (*RunnablePipe[I, M, O]) Pipe

func (p *RunnablePipe[I, M, O]) Pipe(next Runnable[O, any]) Runnable[I, any]

Pipe 连接到另一个 Runnable

func (*RunnablePipe[I, M, O]) Stream

func (p *RunnablePipe[I, M, O]) Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error)

Stream 流式执行管道

func (*RunnablePipe[I, M, O]) WithCallbacks

func (p *RunnablePipe[I, M, O]) WithCallbacks(callbacks ...Callback) Runnable[I, O]

WithCallbacks 添加回调

func (*RunnablePipe[I, M, O]) WithConfig

func (p *RunnablePipe[I, M, O]) WithConfig(config RunnableConfig) Runnable[I, O]

WithConfig 配置管道

type RunnableSequence

type RunnableSequence struct {
	// contains filtered or unexported fields
}

RunnableSequence 顺序执行多个 Runnable

类似于 Unix 管道: r1 | r2 | r3

func NewRunnableSequence

func NewRunnableSequence(runnables ...Runnable[any, any]) *RunnableSequence

NewRunnableSequence 创建顺序执行的 Runnable 序列

func (*RunnableSequence) Batch

func (s *RunnableSequence) Batch(ctx context.Context, inputs []any) ([]any, error)

Batch 批量执行序列

func (*RunnableSequence) Invoke

func (s *RunnableSequence) Invoke(ctx context.Context, input any) (any, error)

Invoke 顺序执行所有 Runnable

func (*RunnableSequence) Pipe

func (s *RunnableSequence) Pipe(next Runnable[any, any]) Runnable[any, any]

Pipe 连接到另一个 Runnable

func (*RunnableSequence) Stream

func (s *RunnableSequence) Stream(ctx context.Context, input any) (<-chan StreamChunk[any], error)

Stream 流式执行序列

func (*RunnableSequence) WithCallbacks

func (s *RunnableSequence) WithCallbacks(callbacks ...Callback) Runnable[any, any]

WithCallbacks 添加回调

func (*RunnableSequence) WithConfig

func (s *RunnableSequence) WithConfig(config RunnableConfig) Runnable[any, any]

WithConfig 配置序列

type SimpleAgent

type SimpleAgent = interfaces.Agent

SimpleAgent is a type alias for the canonical Agent interface.

For new code that doesn't need generic typing, use interfaces.Agent directly. This alias provides a migration path for existing code.

type Span

type Span interface {
	End()
	SetAttribute(key string, value interface{})
	SetStatus(code StatusCode, description string)
	RecordError(err error)
}

Span 追踪 span 接口

type State

type State = state.State

Type aliases for internal use within core package

type StatusCode

type StatusCode int

StatusCode 状态码

const (
	StatusCodeOK StatusCode = iota
	StatusCodeError
)

type StdoutCallback

type StdoutCallback struct {
	*BaseCallback
	// contains filtered or unexported fields
}

StdoutCallback 标准输出回调

将事件输出到标准输出,用于调试

func NewStdoutCallback

func NewStdoutCallback(color bool) *StdoutCallback

NewStdoutCallback 创建标准输出回调

func (*StdoutCallback) OnError

func (s *StdoutCallback) OnError(ctx context.Context, err error) error

func (*StdoutCallback) OnLLMEnd

func (s *StdoutCallback) OnLLMEnd(ctx context.Context, output string, tokenUsage int) error

func (*StdoutCallback) OnLLMStart

func (s *StdoutCallback) OnLLMStart(ctx context.Context, prompts []string, model string) error

func (*StdoutCallback) OnToolEnd

func (s *StdoutCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error

func (*StdoutCallback) OnToolStart

func (s *StdoutCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error

type Step

type Step interface {
	// Execute 执行步骤
	Execute(ctx context.Context, input interface{}) (interface{}, error)

	// Name 返回步骤名称
	Name() string

	// Description 返回步骤描述
	Description() string
}

Step 定义 Chain 中的单个步骤接口

type StepExecution

type StepExecution struct {
	StepNumber  int           `json:"step_number"` // 步骤编号
	StepName    string        `json:"step_name"`   // 步骤名称
	Description string        `json:"description"` // 步骤描述
	Input       interface{}   `json:"input"`       // 输入
	Output      interface{}   `json:"output"`      // 输出
	Duration    time.Duration `json:"duration"`    // 耗时
	Success     bool          `json:"success"`     // 是否成功
	Error       string        `json:"error"`       // 错误信息
	Skipped     bool          `json:"skipped"`     // 是否跳过
}

StepExecution 步骤执行记录

type StreamChunk

type StreamChunk[T any] struct {
	Data  T     // 数据
	Error error // 错误(如果有)
	Done  bool  // 是否完成
}

StreamChunk 流式输出的数据块

type StreamConsumer

type StreamConsumer = execution.StreamConsumer

Type aliases for internal use within core package

type StreamOptions

type StreamOptions = execution.StreamOptions

Type aliases for internal use within core package

type StreamOutput

type StreamOutput = execution.StreamOutput

Type aliases for internal use within core package

type StreamState

type StreamState = execution.StreamState

Type aliases for internal use within core package

type StreamStatus

type StreamStatus = execution.StreamStatus

Type aliases for internal use within core package

type Tool

type Tool interface {
	// Execute 执行工具
	Execute(ctx context.Context, input *ToolInput) (*ToolOutput, error)

	// Name 返回工具名称
	Name() string

	// Description 返回工具描述
	Description() string

	// Parameters 返回工具参数定义
	Parameters() []ToolParameter
}

Tool 定义工具接口

type ToolCall

type ToolCall struct {
	ToolName string                 `json:"tool_name"` // 工具名称
	Input    map[string]interface{} `json:"input"`     // 输入参数
	Output   interface{}            `json:"output"`    // 输出结果
	Duration time.Duration          `json:"duration"`  // 耗时
	Success  bool                   `json:"success"`   // 是否成功
	Error    string                 `json:"error"`     // 错误信息
}

ToolCall 工具调用记录

type ToolInput

type ToolInput struct {
	Action     string                 `json:"action"`     // 操作类型
	Parameters map[string]interface{} `json:"parameters"` // 参数
	Context    map[string]interface{} `json:"context"`    // 上下文
}

ToolInput 工具输入

type ToolOutput

type ToolOutput struct {
	Success bool        `json:"success"` // 是否成功
	Data    interface{} `json:"data"`    // 输出数据
	Message string      `json:"message"` // 消息
	Error   string      `json:"error"`   // 错误信息
}

ToolOutput 工具输出

type ToolParameter

type ToolParameter struct {
	Name        string      `json:"name"`        // 参数名称
	Type        string      `json:"type"`        // 参数类型
	Description string      `json:"description"` // 参数描述
	Required    bool        `json:"required"`    // 是否必需
	Default     interface{} `json:"default"`     // 默认值
}

ToolParameter 工具参数定义

type Tracer

type Tracer interface {
	StartSpan(ctx context.Context, name string, attrs map[string]interface{}) (context.Context, Span)
}

Tracer 追踪器接口

type TracingCallback

type TracingCallback struct {
	*BaseCallback
	// contains filtered or unexported fields
}

TracingCallback 分布式追踪回调

集成 OpenTelemetry 或其他追踪系统

func NewTracingCallback

func NewTracingCallback(tracer Tracer) *TracingCallback

NewTracingCallback 创建追踪回调

func (*TracingCallback) OnLLMEnd

func (t *TracingCallback) OnLLMEnd(ctx context.Context, output string, tokenUsage int) error

func (*TracingCallback) OnLLMError

func (t *TracingCallback) OnLLMError(ctx context.Context, err error) error

func (*TracingCallback) OnLLMStart

func (t *TracingCallback) OnLLMStart(ctx context.Context, prompts []string, model string) error

type TypeInfo

type TypeInfo struct {
	// InputType 输入类型的反射信息
	InputType reflect.Type `json:"-"`

	// OutputType 输出类型的反射信息
	OutputType reflect.Type `json:"-"`

	// InputSchema JSON Schema 描述(可选,用于验证)
	InputSchema string `json:"input_schema,omitempty"`

	// OutputSchema JSON Schema 描述(可选,用于验证)
	OutputSchema string `json:"output_schema,omitempty"`

	// Name 组件名称
	Name string `json:"name"`

	// Description 组件描述
	Description string `json:"description,omitempty"`
}

TypeInfo 提供运行时类型信息

用于:

  • 插件注册时的类型验证
  • 运行时类型检查和转换
  • 文档生成和 API 描述

type TypedToDynamicAdapter

type TypedToDynamicAdapter[I, O any] struct {
	// contains filtered or unexported fields
}

TypedToDynamicAdapter 将泛型 Runnable 转换为 DynamicRunnable

这是桥接编译时类型安全和运行时灵活性的关键组件

func NewTypedToDynamicAdapter

func NewTypedToDynamicAdapter[I, O any](typed Runnable[I, O], name string) *TypedToDynamicAdapter[I, O]

NewTypedToDynamicAdapter 创建类型安全到动态的适配器

func (*TypedToDynamicAdapter[I, O]) BatchDynamic

func (a *TypedToDynamicAdapter[I, O]) BatchDynamic(ctx context.Context, inputs []any) ([]any, error)

BatchDynamic 实现批量执行

func (*TypedToDynamicAdapter[I, O]) InvokeDynamic

func (a *TypedToDynamicAdapter[I, O]) InvokeDynamic(ctx context.Context, input any) (any, error)

InvokeDynamic 实现 DynamicRunnable 接口

func (*TypedToDynamicAdapter[I, O]) StreamDynamic

func (a *TypedToDynamicAdapter[I, O]) StreamDynamic(ctx context.Context, input any) (<-chan DynamicStreamChunk, error)

StreamDynamic 实现流式执行

func (*TypedToDynamicAdapter[I, O]) TypeInfo

func (a *TypedToDynamicAdapter[I, O]) TypeInfo() TypeInfo

TypeInfo 返回类型信息

Directories

Path Synopsis
Package state provides type-safe state management with schema validation.
Package state provides type-safe state management with schema validation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL