Documentation
¶
Overview ¶
Package core provides the lifecycle manager implementation.
This file provides coordinated lifecycle management for GoAgent components:
- Priority-based startup and shutdown ordering
- Dependency resolution
- Health aggregation
- Graceful shutdown with timeout support
Package core provides panic handling infrastructure with hot-pluggable implementations.
This file defines interfaces and registry for panic recovery customization:
- PanicHandler - Custom panic recovery strategies
- PanicMetricsCollector - Statistics and monitoring
- PanicLogger - Specialized logging
- PanicHandlerRegistry - Thread-safe hot-swapping
Package core provides plugin bridge utilities for dynamic component loading.
This file addresses the fundamental tension between:
- Compile-time type safety (Go generics)
- Runtime flexibility (plugin systems, dynamic registration)
The solution introduces a layered architecture:
- DynamicRunnable - Runtime-safe interface using any types
- TypedAdapter - Converts between generic and dynamic versions
- PluginRegistry - Manages dynamically loaded components
Package core provides enhanced plugin registry with dependency injection.
This file extends the basic PluginRegistry with:
- Version management (multiple versions of same plugin)
- Dependency injection container
- Configuration management
- Lifecycle integration
Index ¶
- Constants
- Variables
- func Collect[T any](gen Generator[T]) ([]T, error)
- func IsFastInvoker(agent Agent) bool
- func PutChainInput(input *ChainInput)
- func PutChainOutput(output *ChainOutput)
- func RegisterTyped[I, O any](r *PluginRegistry, name string, runnable Runnable[I, O], ...) error
- func ResolveTyped[T any](c *Container, name string) (T, error)
- func SetGlobalMetricsCollector(collector PanicMetricsCollector)
- func SetGlobalPanicHandler(handler PanicHandler)
- func SetGlobalPanicLogger(logger PanicLogger)
- func ToChannel[T any](ctx context.Context, gen Generator[T], bufferSize int) <-chan StreamChunk[T]
- type Agent
- type AgentAction
- type AgentExecutor
- type AgentInput
- type AgentOptions
- type AgentOutput
- type AgentState
- type BackoffPolicy
- type BaseAgent
- func (a *BaseAgent) Batch(ctx context.Context, inputs []*AgentInput) ([]*AgentOutput, error)
- func (a *BaseAgent) Capabilities() []string
- func (a *BaseAgent) Description() string
- func (a *BaseAgent) Invoke(ctx context.Context, input *AgentInput) (*AgentOutput, error)
- func (a *BaseAgent) InvokeFast(ctx context.Context, input *AgentInput) (*AgentOutput, error)
- func (a *BaseAgent) Name() string
- func (a *BaseAgent) Pipe(next Runnable[*AgentOutput, any]) Runnable[*AgentInput, any]
- func (a *BaseAgent) RunGenerator(ctx context.Context, input *AgentInput) Generator[*AgentOutput]
- func (a *BaseAgent) Stream(ctx context.Context, input *AgentInput) (<-chan StreamChunk[*AgentOutput], error)
- func (a *BaseAgent) WithCallbacks(callbacks ...Callback) Runnable[*AgentInput, *AgentOutput]
- func (a *BaseAgent) WithConfig(config RunnableConfig) Runnable[*AgentInput, *AgentOutput]
- type BaseCallback
- func (b *BaseCallback) OnAgentAction(ctx context.Context, action *AgentAction) error
- func (b *BaseCallback) OnAgentFinish(ctx context.Context, output interface{}) error
- func (b *BaseCallback) OnChainEnd(ctx context.Context, chainName string, output interface{}) error
- func (b *BaseCallback) OnChainError(ctx context.Context, chainName string, err error) error
- func (b *BaseCallback) OnChainStart(ctx context.Context, chainName string, input interface{}) error
- func (b *BaseCallback) OnEnd(ctx context.Context, output interface{}) error
- func (b *BaseCallback) OnError(ctx context.Context, err error) error
- func (b *BaseCallback) OnLLMEnd(ctx context.Context, output string, tokenUsage int) error
- func (b *BaseCallback) OnLLMError(ctx context.Context, err error) error
- func (b *BaseCallback) OnLLMStart(ctx context.Context, prompts []string, model string) error
- func (b *BaseCallback) OnStart(ctx context.Context, input interface{}) error
- func (b *BaseCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error
- func (b *BaseCallback) OnToolError(ctx context.Context, toolName string, err error) error
- func (b *BaseCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error
- type BaseChain
- func (c *BaseChain) Batch(ctx context.Context, inputs []*ChainInput) ([]*ChainOutput, error)
- func (c *BaseChain) Invoke(ctx context.Context, input *ChainInput) (*ChainOutput, error)
- func (c *BaseChain) Name() string
- func (c *BaseChain) Pipe(next Runnable[*ChainOutput, any]) Runnable[*ChainInput, any]
- func (c *BaseChain) Steps() int
- func (c *BaseChain) Stream(ctx context.Context, input *ChainInput) (<-chan StreamChunk[*ChainOutput], error)
- func (c *BaseChain) WithCallbacks(callbacks ...Callback) Runnable[*ChainInput, *ChainOutput]
- func (c *BaseChain) WithConfig(config RunnableConfig) Runnable[*ChainInput, *ChainOutput]
- type BaseLifecycle
- type BaseMiddleware
- type BaseOrchestrator
- func (o *BaseOrchestrator) Execute(ctx context.Context, request *OrchestratorRequest) (*OrchestratorResponse, error)
- func (o *BaseOrchestrator) GetAgent(name string) (Agent, bool)
- func (o *BaseOrchestrator) GetChain(name string) (Chain, bool)
- func (o *BaseOrchestrator) GetTool(name string) (Tool, bool)
- func (o *BaseOrchestrator) Name() string
- func (o *BaseOrchestrator) RegisterAgent(name string, agent Agent) error
- func (o *BaseOrchestrator) RegisterChain(name string, chain Chain) error
- func (o *BaseOrchestrator) RegisterTool(name string, tool Tool) error
- type BasePlugin
- func (p *BasePlugin) AddAgent(agent DynamicRunnable)
- func (p *BasePlugin) AddTool(tool DynamicRunnable)
- func (p *BasePlugin) GetAgents() []DynamicRunnable
- func (p *BasePlugin) GetMiddleware() []interface{}
- func (p *BasePlugin) GetTools() []DynamicRunnable
- func (p *BasePlugin) Name() string
- func (p *BasePlugin) Version() string
- type BaseRunnable
- func (r *BaseRunnable[I, O]) Batch(ctx context.Context, inputs []I, invoker func(context.Context, I) (O, error)) ([]O, error)
- func (r *BaseRunnable[I, O]) GetConfig() RunnableConfig
- func (r *BaseRunnable[I, O]) WithCallbacks(callbacks ...Callback) *BaseRunnable[I, O]
- func (r *BaseRunnable[I, O]) WithConfig(config RunnableConfig) *BaseRunnable[I, O]
- type Callback
- type CallbackManager
- func (m *CallbackManager) AddCallback(cb Callback)
- func (m *CallbackManager) OnEnd(ctx context.Context, output interface{}) error
- func (m *CallbackManager) OnError(ctx context.Context, err error) error
- func (m *CallbackManager) OnStart(ctx context.Context, input interface{}) error
- func (m *CallbackManager) RemoveCallback(cb Callback)
- func (m *CallbackManager) TriggerCallbacks(fn func(Callback) error) error
- type Chain
- type ChainInput
- type ChainOptions
- type ChainOutput
- type ChainableAgent
- type ChunkMetadata
- type ChunkTransformFunc
- type ChunkType
- type Container
- type CostTrackingCallback
- type DefaultLifecycleManager
- func (m *DefaultLifecycleManager) GetState(name string) (interfaces.LifecycleState, error)
- func (m *DefaultLifecycleManager) HealthCheckAll(ctx context.Context) map[string]interfaces.HealthStatus
- func (m *DefaultLifecycleManager) InitAll(ctx context.Context) error
- func (m *DefaultLifecycleManager) Register(name string, component interfaces.Lifecycle, priority int) error
- func (m *DefaultLifecycleManager) RegisterWithConfig(name string, component interfaces.Lifecycle, priority int, config interface{}) error
- func (m *DefaultLifecycleManager) ShutdownSignal() <-chan struct{}
- func (m *DefaultLifecycleManager) SignalShutdown()
- func (m *DefaultLifecycleManager) StartAll(ctx context.Context) error
- func (m *DefaultLifecycleManager) StopAll(ctx context.Context) error
- func (m *DefaultLifecycleManager) Unregister(name string) error
- func (m *DefaultLifecycleManager) WaitForShutdown(ctx context.Context) error
- type DefaultPanicHandler
- type DynamicRunnable
- type DynamicStreamChunk
- type DynamicToTypedAdapter
- func (a *DynamicToTypedAdapter[I, O]) Batch(ctx context.Context, inputs []I) ([]O, error)
- func (a *DynamicToTypedAdapter[I, O]) Invoke(ctx context.Context, input I) (O, error)
- func (a *DynamicToTypedAdapter[I, O]) Pipe(next Runnable[O, any]) Runnable[I, any]
- func (a *DynamicToTypedAdapter[I, O]) Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error)
- func (a *DynamicToTypedAdapter[I, O]) WithCallbacks(callbacks ...Callback) Runnable[I, O]
- func (a *DynamicToTypedAdapter[I, O]) WithConfig(config RunnableConfig) Runnable[I, O]
- func (a *DynamicToTypedAdapter[I, O]) WithValidator(validator func(any) error) *DynamicToTypedAdapter[I, O]
- type EnhancedPluginRegistry
- func (r *EnhancedPluginRegistry) Container() *Container
- func (r *EnhancedPluginRegistry) GetAllAgents() map[string][]DynamicRunnable
- func (r *EnhancedPluginRegistry) GetAllTools() map[string][]DynamicRunnable
- func (r *EnhancedPluginRegistry) GetLatestPlugin(name string) (Plugin, string, error)
- func (r *EnhancedPluginRegistry) GetPlugin(name, version string) (Plugin, error)
- func (r *EnhancedPluginRegistry) InitializeAll(ctx context.Context) error
- func (r *EnhancedPluginRegistry) Lifecycle() *DefaultLifecycleManager
- func (r *EnhancedPluginRegistry) ListPlugins() []string
- func (r *EnhancedPluginRegistry) ListVersions(name string) []string
- func (r *EnhancedPluginRegistry) RegisterPlugin(plugin Plugin, config *PluginConfig) error
- func (r *EnhancedPluginRegistry) StartAll(ctx context.Context) error
- func (r *EnhancedPluginRegistry) StopAll(ctx context.Context) error
- type ExecutionStep
- type ExecutorOption
- type FastInvoker
- type FunctionalLifecycle
- type FunctionalLifecycleOption
- func WithHealthFunc(fn func(context.Context) interfaces.HealthStatus) FunctionalLifecycleOption
- func WithInitFunc(fn func(context.Context, interface{}) error) FunctionalLifecycleOption
- func WithStartFunc(fn func(context.Context) error) FunctionalLifecycleOption
- func WithStopFunc(fn func(context.Context) error) FunctionalLifecycleOption
- type Generator
- func Filter[T any](gen Generator[T], predicate func(T) bool) Generator[T]
- func FromChannel[T any](ch <-chan StreamChunk[T]) Generator[T]
- func GeneratorFunc[T any](fn func(yield func(T, error) bool)) Generator[T]
- func Map[T, R any](gen Generator[T], mapper func(T) R) Generator[R]
- func Take[T any](gen Generator[T], n int) Generator[T]
- type Handler
- type Interrupt
- type InterruptConfig
- type InterruptManager
- func (m *InterruptManager) CancelInterrupt(interruptID string) error
- func (m *InterruptManager) CreateInterrupt(ctx context.Context, interrupt *Interrupt) (*Interrupt, *InterruptResponse, error)
- func (m *InterruptManager) GetInterrupt(interruptID string) (*Interrupt, error)
- func (m *InterruptManager) ListPendingInterrupts() []*Interrupt
- func (m *InterruptManager) OnInterruptCreated(fn func(*Interrupt))
- func (m *InterruptManager) OnInterruptResolved(fn func(*Interrupt, *InterruptResponse))
- func (m *InterruptManager) RespondToInterrupt(interruptID string, response *InterruptResponse) error
- type InterruptPriority
- type InterruptResponse
- type InterruptRule
- type InterruptType
- type InterruptableExecutor
- type LegacyStreamChunk
- type LifecycleManagerConfig
- type Logger
- type LoggingCallback
- func (l *LoggingCallback) OnEnd(ctx context.Context, output interface{}) error
- func (l *LoggingCallback) OnError(ctx context.Context, err error) error
- func (l *LoggingCallback) OnLLMEnd(ctx context.Context, output string, tokenUsage int) error
- func (l *LoggingCallback) OnLLMStart(ctx context.Context, prompts []string, model string) error
- func (l *LoggingCallback) OnStart(ctx context.Context, input interface{}) error
- func (l *LoggingCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error
- func (l *LoggingCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error
- type MetricsCallback
- func (m *MetricsCallback) OnLLMEnd(ctx context.Context, output string, tokenUsage int) error
- func (m *MetricsCallback) OnLLMStart(ctx context.Context, prompts []string, model string) error
- func (m *MetricsCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error
- func (m *MetricsCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error
- type MetricsCollector
- type Middleware
- type MiddlewareChain
- type MiddlewareRequest
- type MiddlewareResponse
- type NoOpMetricsCollector
- type NoOpPanicLogger
- type Orchestrator
- type OrchestratorOptions
- type OrchestratorRequest
- type OrchestratorResponse
- type OrchestratorStrategy
- type PanicHandler
- type PanicHandlerRegistry
- func (r *PanicHandlerRegistry) GetHandler() PanicHandler
- func (r *PanicHandlerRegistry) GetLogger() PanicLogger
- func (r *PanicHandlerRegistry) GetMetricsCollector() PanicMetricsCollector
- func (r *PanicHandlerRegistry) HandlePanic(ctx context.Context, component, operation string, panicValue interface{}) error
- func (r *PanicHandlerRegistry) SetHandler(handler PanicHandler)
- func (r *PanicHandlerRegistry) SetLogger(logger PanicLogger)
- func (r *PanicHandlerRegistry) SetMetricsCollector(collector PanicMetricsCollector)
- type PanicLogger
- type PanicMetricsCollector
- type Plugin
- type PluginConfig
- type PluginDependency
- type PluginMetadata
- type PluginRegistry
- func (r *PluginRegistry) Get(name string) (DynamicRunnable, error)
- func (r *PluginRegistry) GetMetadata(name string) (*PluginMetadata, error)
- func (r *PluginRegistry) List() []string
- func (r *PluginRegistry) Register(name string, plugin DynamicRunnable, metadata *PluginMetadata) error
- func (r *PluginRegistry) Unregister(name string) error
- type ReasoningStep
- type RetryPolicy
- type Runnable
- type RunnableConfig
- type RunnableFunc
- func (f *RunnableFunc[I, O]) Batch(ctx context.Context, inputs []I) ([]O, error)
- func (f *RunnableFunc[I, O]) Invoke(ctx context.Context, input I) (O, error)
- func (f *RunnableFunc[I, O]) Pipe(next Runnable[O, any]) Runnable[I, any]
- func (f *RunnableFunc[I, O]) Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error)
- func (f *RunnableFunc[I, O]) WithCallbacks(callbacks ...Callback) Runnable[I, O]
- func (f *RunnableFunc[I, O]) WithConfig(config RunnableConfig) Runnable[I, O]
- type RunnablePipe
- func (p *RunnablePipe[I, M, O]) Batch(ctx context.Context, inputs []I) ([]O, error)
- func (p *RunnablePipe[I, M, O]) Invoke(ctx context.Context, input I) (O, error)
- func (p *RunnablePipe[I, M, O]) Pipe(next Runnable[O, any]) Runnable[I, any]
- func (p *RunnablePipe[I, M, O]) Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error)
- func (p *RunnablePipe[I, M, O]) WithCallbacks(callbacks ...Callback) Runnable[I, O]
- func (p *RunnablePipe[I, M, O]) WithConfig(config RunnableConfig) Runnable[I, O]
- type RunnableSequence
- func (s *RunnableSequence) Batch(ctx context.Context, inputs []any) ([]any, error)
- func (s *RunnableSequence) Invoke(ctx context.Context, input any) (any, error)
- func (s *RunnableSequence) Pipe(next Runnable[any, any]) Runnable[any, any]
- func (s *RunnableSequence) Stream(ctx context.Context, input any) (<-chan StreamChunk[any], error)
- func (s *RunnableSequence) WithCallbacks(callbacks ...Callback) Runnable[any, any]
- func (s *RunnableSequence) WithConfig(config RunnableConfig) Runnable[any, any]
- type SimpleAgent
- type Span
- type State
- type StatusCode
- type StdoutCallback
- func (s *StdoutCallback) OnError(ctx context.Context, err error) error
- func (s *StdoutCallback) OnLLMEnd(ctx context.Context, output string, tokenUsage int) error
- func (s *StdoutCallback) OnLLMStart(ctx context.Context, prompts []string, model string) error
- func (s *StdoutCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error
- func (s *StdoutCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error
- type Step
- type StepExecution
- type StreamChunk
- type StreamConsumer
- type StreamOptions
- type StreamOutput
- type StreamState
- type StreamStatus
- type Tool
- type ToolCall
- type ToolInput
- type ToolOutput
- type ToolParameter
- type Tracer
- type TracingCallback
- type TypeInfo
- type TypedToDynamicAdapter
- func (a *TypedToDynamicAdapter[I, O]) BatchDynamic(ctx context.Context, inputs []any) ([]any, error)
- func (a *TypedToDynamicAdapter[I, O]) InvokeDynamic(ctx context.Context, input any) (any, error)
- func (a *TypedToDynamicAdapter[I, O]) StreamDynamic(ctx context.Context, input any) (<-chan DynamicStreamChunk, error)
- func (a *TypedToDynamicAdapter[I, O]) TypeInfo() TypeInfo
Constants ¶
const ( ChunkTypeText = execution.ChunkTypeText ChunkTypeError = execution.ChunkTypeError ChunkTypeJSON = execution.ChunkTypeJSON ChunkTypeProgress = execution.ChunkTypeProgress ChunkTypeBinary = execution.ChunkTypeBinary ChunkTypeControl = execution.ChunkTypeControl ChunkTypeStatus = execution.ChunkTypeStatus StreamStateRunning = execution.StreamStateRunning StreamStatePaused = execution.StreamStatePaused StreamStateError = execution.StreamStateError StreamStateComplete = execution.StreamStateComplete StreamStateClosed = execution.StreamStateClosed )
Constants for internal use
Variables ¶
var ( // Agent 相关错误 ErrAgentNotFound = errors.New("agent not found") ErrAgentAlreadyExists = errors.New("agent already exists") ErrAgentExecutionFailed = errors.New("agent execution failed") ErrInvalidAgentInput = errors.New("invalid agent input") ErrNotImplemented = errors.New("method not implemented") // Chain 相关错误 ErrChainNotFound = errors.New("chain not found") ErrChainAlreadyExists = errors.New("chain already exists") ErrChainProcessFailed = errors.New("chain process failed") ErrInvalidChainInput = errors.New("invalid chain input") ErrStepExecutionFailed = errors.New("step execution failed") // Orchestrator 相关错误 ErrOrchestratorNotReady = errors.New("orchestrator not ready") ErrInvalidOrchestratorRequest = errors.New("invalid orchestrator request") ErrExecutionFailed = errors.New("execution failed") ErrExecutionTimeout = errors.New("execution timeout") // Tool 相关错误 ErrToolNotFound = errors.New("tool not found") ErrToolAlreadyExists = errors.New("tool already exists") ErrToolExecutionFailed = errors.New("tool execution failed") ErrInvalidToolInput = errors.New("invalid tool input") // Memory 相关错误 ErrMemoryStoreFailed = errors.New("memory store failed") ErrMemoryRetrieveFailed = errors.New("memory retrieve failed") ErrMemoryNotFound = errors.New("memory not found") // LLM 相关错误 ErrLLMNotAvailable = errors.New("LLM not available") ErrLLMRequestFailed = errors.New("LLM request failed") ErrLLMResponseInvalid = errors.New("LLM response invalid") )
错误定义
var ( NewBaseMiddleware = middleware.NewBaseMiddleware NewMiddlewareChain = middleware.NewMiddlewareChain NewLoggingMiddleware = middleware.NewLoggingMiddleware NewTimingMiddleware = middleware.NewTimingMiddleware NewCacheMiddleware = middleware.NewCacheMiddleware NewRetryMiddleware = middleware.NewRetryMiddleware NewTransformMiddleware = middleware.NewTransformMiddleware NewCircuitBreakerMiddleware = middleware.NewCircuitBreakerMiddleware NewRateLimiterMiddleware = middleware.NewRateLimiterMiddleware NewValidationMiddleware = middleware.NewValidationMiddleware NewAuthenticationMiddleware = middleware.NewAuthenticationMiddleware NewDynamicPromptMiddleware = middleware.NewDynamicPromptMiddleware NewToolSelectorMiddleware = middleware.NewToolSelectorMiddleware )
Constructor functions for internal use
var ( DefaultStreamOptions = execution.DefaultStreamOptions NewTextChunk = execution.NewTextChunk NewProgressChunk = execution.NewProgressChunk NewErrorChunk = execution.NewErrorChunk )
Functions for internal use
var (
NewAgentState = state.NewAgentState
)
Constructor functions for internal use
Functions ¶
func Collect ¶ added in v0.2.0
Collect 收集 Generator 的所有输出到切片
参数:
- gen - Generator 实例
返回:
- 所有数据的切片和第一个错误
注意:
- 遇到第一个错误时停止收集
- 适用于小数据集,大数据集建议流式处理
示例:
results, err := Collect(gen)
if err != nil {
return err
}
fmt.Println(results)
func IsFastInvoker ¶
IsFastInvoker 检查 Agent 是否支持快速调用
使用示例:
if core.IsFastInvoker(agent) {
// 使用快速调用路径
output, err := agent.(core.FastInvoker).InvokeFast(ctx, input)
} else {
// 使用标准路径
output, err := agent.Invoke(ctx, input)
}
func PutChainInput ¶
func PutChainInput(input *ChainInput)
PutChainInput returns a ChainInput to the object pool
func PutChainOutput ¶
func PutChainOutput(output *ChainOutput)
PutChainOutput returns a ChainOutput to the object pool
func RegisterTyped ¶
func RegisterTyped[I, O any](r *PluginRegistry, name string, runnable Runnable[I, O], metadata *PluginMetadata) error
RegisterTyped 注册类型安全的组件(自动适配为 DynamicRunnable)
func ResolveTyped ¶
ResolveTyped resolves a service with type assertion.
func SetGlobalMetricsCollector ¶
func SetGlobalMetricsCollector(collector PanicMetricsCollector)
SetGlobalMetricsCollector sets the global metrics collector.
func SetGlobalPanicHandler ¶
func SetGlobalPanicHandler(handler PanicHandler)
SetGlobalPanicHandler sets the global panic handler.
func SetGlobalPanicLogger ¶
func SetGlobalPanicLogger(logger PanicLogger)
SetGlobalPanicLogger sets the global panic logger.
func ToChannel ¶ added in v0.2.0
ToChannel 将 Generator 转换为 Channel(向后兼容)
参数:
- ctx - 上下文(用于取消)
- gen - Generator 实例
- bufferSize - channel 缓冲区大小(0 表示无缓冲)
返回:
- 只读 channel,包含 StreamChunk
注意:
- 此函数会启动 goroutine 消费 Generator
- 当 ctx 取消或 Generator 结束时,channel 自动关闭
示例:
ch := ToChannel(ctx, gen, 10)
for item := range ch {
if item.Error != nil {
return item.Error
}
fmt.Println(item.Data)
}
Types ¶
type Agent ¶
type Agent interface {
// 继承 Runnable 接口,Agent 是一个可执行的组件
Runnable[*AgentInput, *AgentOutput]
// Agent 特有方法
// Name 返回 Agent 的名称
Name() string
// Description 返回 Agent 的描述
Description() string
// Capabilities 返回 Agent 的能力列表
Capabilities() []string
}
Agent 定义通用 AI Agent 接口
Agent 是一个 Runnable[*AgentInput, *AgentOutput],具有推理能力的智能体,能够: - 接收输入并进行处理(通过 Runnable.Invoke) - 调用工具获取额外信息 - 使用 LLM 进行推理 - 返回结构化输出 - 支持流式处理、批量执行、管道连接等 Runnable 特性
Note: This is a generic version of the Agent interface. For cross-package compatibility, consider using interfaces.Agent.
type AgentAction ¶
type AgentAction struct {
Tool string // 工具名称
ToolInput map[string]interface{} // 工具输入
Log string // 日志信息
}
AgentAction Agent 执行的操作
type AgentExecutor ¶
type AgentExecutor struct {
// contains filtered or unexported fields
}
AgentExecutor 执行 Agent 的辅助结构
提供额外的执行逻辑,如重试、超时控制等
func NewAgentExecutor ¶
func NewAgentExecutor(agent Agent, options ...ExecutorOption) *AgentExecutor
NewAgentExecutor 创建 Agent 执行器
func (*AgentExecutor) Execute ¶
func (e *AgentExecutor) Execute(ctx context.Context, input *AgentInput) (*AgentOutput, error)
Execute 执行 Agent,支持重试和超时
type AgentInput ¶
type AgentInput struct {
// 任务描述
Task string `json:"task"` // 任务描述
Instruction string `json:"instruction"` // 具体指令
Context map[string]interface{} `json:"context"` // 上下文信息
// 执行选项
Options AgentOptions `json:"options"` // 执行选项
// 元数据
SessionID string `json:"session_id"` // 会话 ID
Timestamp time.Time `json:"timestamp"` // 时间戳
}
AgentInput Agent 输入
type AgentOptions ¶
type AgentOptions struct {
// LLM 配置
Temperature float64 `json:"temperature,omitempty"` // LLM 温度参数
MaxTokens int `json:"max_tokens,omitempty"` // 最大 token 数
Model string `json:"model,omitempty"` // LLM 模型
// 工具配置
EnableTools bool `json:"enable_tools,omitempty"` // 是否启用工具
AllowedTools []string `json:"allowed_tools,omitempty"` // 允许的工具列表
MaxToolCalls int `json:"max_tool_calls,omitempty"` // 最大工具调用次数
// 记忆配置
EnableMemory bool `json:"enable_memory,omitempty"` // 是否启用记忆
LoadHistory bool `json:"load_history,omitempty"` // 是否加载历史
SaveToMemory bool `json:"save_to_memory,omitempty"` // 是否保存到记忆
MaxHistoryLength int `json:"max_history_length,omitempty"` // 最大历史长度
// 超时配置
Timeout time.Duration `json:"timeout,omitempty"` // 超时时间
}
AgentOptions Agent 执行选项
func DefaultAgentOptions ¶
func DefaultAgentOptions() AgentOptions
DefaultAgentOptions 返回默认的 Agent 选项
type AgentOutput ¶
type AgentOutput struct {
// 执行结果
Result interface{} `json:"result"` // 结果数据
Status string `json:"status"` // 状态: "success", "failed", "partial"
Message string `json:"message"` // 结果消息
// 推理过程
ReasoningSteps []ReasoningStep `json:"reasoning_steps"` // 推理步骤
ToolCalls []ToolCall `json:"tool_calls"` // 工具调用记录
// Token 使用统计
TokenUsage *interfaces.TokenUsage `json:"token_usage,omitempty"` // LLM Token 使用统计
// 元数据
Latency time.Duration `json:"latency"` // 执行延迟
Timestamp time.Time `json:"timestamp"` // 时间戳
Metadata map[string]interface{} `json:"metadata"` // 额外元数据
}
AgentOutput Agent 输出
func TryInvokeFast ¶
func TryInvokeFast(ctx context.Context, agent Agent, input *AgentInput) (*AgentOutput, error)
TryInvokeFast 尝试使用快速调用,如果不支持则回退到标准 Invoke
这是一个便捷函数,用于在不确定 Agent 是否支持 FastInvoker 时使用
使用示例:
output, err := core.TryInvokeFast(ctx, agent, input)
等价于:
var output *core.AgentOutput
var err error
if fastAgent, ok := agent.(core.FastInvoker); ok {
output, err = fastAgent.InvokeFast(ctx, input)
} else {
output, err = agent.Invoke(ctx, input)
}
type AgentState ¶
type AgentState = state.AgentState
Type aliases for internal use within core package
type BackoffPolicy ¶
BackoffPolicy 退避策略
type BaseAgent ¶
type BaseAgent struct {
*BaseRunnable[*AgentInput, *AgentOutput]
// contains filtered or unexported fields
}
BaseAgent 提供 Agent 的基础实现
BaseAgent 实现了 Agent 接口,包括完整的 Runnable 接口支持 具体的执行逻辑需要通过组合或继承来实现
func NewBaseAgent ¶
NewBaseAgent 创建基础 Agent
func (*BaseAgent) Batch ¶
func (a *BaseAgent) Batch(ctx context.Context, inputs []*AgentInput) ([]*AgentOutput, error)
Batch 批量执行 Agent 使用 BaseRunnable 的默认批处理实现
func (*BaseAgent) Capabilities ¶
Capabilities 返回 Agent 能力列表
func (*BaseAgent) Invoke ¶
func (a *BaseAgent) Invoke(ctx context.Context, input *AgentInput) (*AgentOutput, error)
Invoke 执行 Agent 这是 Runnable 接口的核心方法,需要由具体 Agent 实现
func (*BaseAgent) InvokeFast ¶
func (a *BaseAgent) InvokeFast(ctx context.Context, input *AgentInput) (*AgentOutput, error)
InvokeFast 快速调用(绕过中间件)
用于热路径优化,直接调用 Execute 方法 性能提升:避免接口调用和中间件开销
注意:此方法不会触发 Runnable 的回调和中间件逻辑 仅在性能关键路径且不需要额外处理时使用
func (*BaseAgent) Pipe ¶
func (a *BaseAgent) Pipe(next Runnable[*AgentOutput, any]) Runnable[*AgentInput, any]
Pipe 连接到另一个 Runnable 将当前 Agent 的输出连接到下一个 Runnable 的输入
func (*BaseAgent) RunGenerator ¶ added in v0.2.0
func (a *BaseAgent) RunGenerator(ctx context.Context, input *AgentInput) Generator[*AgentOutput]
RunGenerator 使用 Generator 模式执行 Agent(实验性功能)
相比 Stream 方法的优势:
- 零内存分配(无 channel、goroutine 开销)
- 支持早期终止(通过 for-range break)
- 更简洁的迭代语法
参数:
- ctx - 上下文
- input - 输入数据
返回:
- Generator,产生 *AgentOutput 和 error
示例:
for output, err := range agent.RunGenerator(ctx, input) {
if err != nil {
return err
}
fmt.Println("Step:", output.Status)
}
注意:
- 这是实验性 API,可能在未来版本中调整
- 默认实现调用 Invoke 并产生单个结果
- 具体 Agent 可以重写此方法实现真正的流式处理
- 如需向后兼容,使用 Stream 方法
func (*BaseAgent) Stream ¶
func (a *BaseAgent) Stream(ctx context.Context, input *AgentInput) (<-chan StreamChunk[*AgentOutput], error)
Stream 流式执行 Agent 默认实现将 Invoke 的结果包装成单个流块
func (*BaseAgent) WithCallbacks ¶
func (a *BaseAgent) WithCallbacks(callbacks ...Callback) Runnable[*AgentInput, *AgentOutput]
WithCallbacks 添加回调处理器 返回一个新的 Agent 实例,包含指定的回调
func (*BaseAgent) WithConfig ¶
func (a *BaseAgent) WithConfig(config RunnableConfig) Runnable[*AgentInput, *AgentOutput]
WithConfig 配置 Agent 返回一个新的 Agent 实例,使用指定的配置
type BaseCallback ¶
type BaseCallback struct{}
BaseCallback 提供回调的默认实现(什么都不做)
用户可以嵌入 BaseCallback 并只重写需要的方法
func (*BaseCallback) OnAgentAction ¶
func (b *BaseCallback) OnAgentAction(ctx context.Context, action *AgentAction) error
func (*BaseCallback) OnAgentFinish ¶
func (b *BaseCallback) OnAgentFinish(ctx context.Context, output interface{}) error
func (*BaseCallback) OnChainEnd ¶
func (b *BaseCallback) OnChainEnd(ctx context.Context, chainName string, output interface{}) error
func (*BaseCallback) OnChainError ¶
func (*BaseCallback) OnChainStart ¶
func (b *BaseCallback) OnChainStart(ctx context.Context, chainName string, input interface{}) error
func (*BaseCallback) OnEnd ¶
func (b *BaseCallback) OnEnd(ctx context.Context, output interface{}) error
func (*BaseCallback) OnLLMError ¶
func (b *BaseCallback) OnLLMError(ctx context.Context, err error) error
func (*BaseCallback) OnLLMStart ¶
func (*BaseCallback) OnStart ¶
func (b *BaseCallback) OnStart(ctx context.Context, input interface{}) error
func (*BaseCallback) OnToolEnd ¶
func (b *BaseCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error
func (*BaseCallback) OnToolError ¶
func (*BaseCallback) OnToolStart ¶
func (b *BaseCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error
type BaseChain ¶
type BaseChain struct {
*BaseRunnable[*ChainInput, *ChainOutput]
// contains filtered or unexported fields
}
BaseChain 提供 Chain 的基础实现
实现了完整的 Runnable 接口,包括: - Invoke: 执行链式处理 - Stream: 流式执行(将步骤结果作为流输出) - Batch: 批量执行多个输入 - Pipe: 管道连接 - WithCallbacks: 回调支持
func (*BaseChain) Batch ¶
func (c *BaseChain) Batch(ctx context.Context, inputs []*ChainInput) ([]*ChainOutput, error)
Batch 批量执行(实现 Runnable 接口)
func (*BaseChain) Invoke ¶
func (c *BaseChain) Invoke(ctx context.Context, input *ChainInput) (*ChainOutput, error)
Invoke 执行链式处理(实现 Runnable 接口)
func (*BaseChain) Pipe ¶
func (c *BaseChain) Pipe(next Runnable[*ChainOutput, any]) Runnable[*ChainInput, any]
Pipe 连接到另一个 Runnable(实现 Runnable 接口)
func (*BaseChain) Stream ¶
func (c *BaseChain) Stream(ctx context.Context, input *ChainInput) (<-chan StreamChunk[*ChainOutput], error)
Stream 流式执行(实现 Runnable 接口)
每个步骤执行完成后,立即发送一个流块
func (*BaseChain) WithCallbacks ¶
func (c *BaseChain) WithCallbacks(callbacks ...Callback) Runnable[*ChainInput, *ChainOutput]
WithCallbacks 添加回调(重写返回类型)
func (*BaseChain) WithConfig ¶
func (c *BaseChain) WithConfig(config RunnableConfig) Runnable[*ChainInput, *ChainOutput]
WithConfig 配置 Runnable(重写返回类型)
type BaseLifecycle ¶
type BaseLifecycle struct {
// contains filtered or unexported fields
}
BaseLifecycle provides a no-op implementation of Lifecycle interface. Components can embed this to only override methods they need.
func NewBaseLifecycle ¶
func NewBaseLifecycle(name string) *BaseLifecycle
NewBaseLifecycle creates a new BaseLifecycle.
func (*BaseLifecycle) HealthCheck ¶
func (b *BaseLifecycle) HealthCheck(ctx context.Context) interfaces.HealthStatus
HealthCheck returns healthy by default.
func (*BaseLifecycle) Init ¶
func (b *BaseLifecycle) Init(ctx context.Context, config interface{}) error
Init does nothing by default.
type BaseMiddleware ¶
type BaseMiddleware = middleware.BaseMiddleware
Type aliases for internal use within core package
type BaseOrchestrator ¶
type BaseOrchestrator struct {
// contains filtered or unexported fields
}
BaseOrchestrator 提供编排器的基础实现
func NewBaseOrchestrator ¶
func NewBaseOrchestrator(name string) *BaseOrchestrator
NewBaseOrchestrator 创建基础编排器
func (*BaseOrchestrator) Execute ¶
func (o *BaseOrchestrator) Execute(ctx context.Context, request *OrchestratorRequest) (*OrchestratorResponse, error)
Execute returns ErrNotImplemented.
Concrete orchestrator implementations must override this method. Using composition: embed BaseOrchestrator and implement Execute.
func (*BaseOrchestrator) GetAgent ¶
func (o *BaseOrchestrator) GetAgent(name string) (Agent, bool)
GetAgent 获取 Agent
func (*BaseOrchestrator) GetChain ¶
func (o *BaseOrchestrator) GetChain(name string) (Chain, bool)
GetChain 获取 Chain
func (*BaseOrchestrator) GetTool ¶
func (o *BaseOrchestrator) GetTool(name string) (Tool, bool)
GetTool 获取 Tool
func (*BaseOrchestrator) RegisterAgent ¶
func (o *BaseOrchestrator) RegisterAgent(name string, agent Agent) error
RegisterAgent 注册 Agent
func (*BaseOrchestrator) RegisterChain ¶
func (o *BaseOrchestrator) RegisterChain(name string, chain Chain) error
RegisterChain 注册 Chain
func (*BaseOrchestrator) RegisterTool ¶
func (o *BaseOrchestrator) RegisterTool(name string, tool Tool) error
RegisterTool 注册 Tool
type BasePlugin ¶
type BasePlugin struct {
*BaseLifecycle
// contains filtered or unexported fields
}
BasePlugin provides a default implementation of the Plugin interface.
func NewBasePlugin ¶
func NewBasePlugin(name, version string) *BasePlugin
NewBasePlugin creates a new base plugin.
func (*BasePlugin) AddAgent ¶
func (p *BasePlugin) AddAgent(agent DynamicRunnable)
AddAgent registers an agent with the plugin.
func (*BasePlugin) AddTool ¶
func (p *BasePlugin) AddTool(tool DynamicRunnable)
AddTool registers a tool with the plugin.
func (*BasePlugin) GetAgents ¶
func (p *BasePlugin) GetAgents() []DynamicRunnable
GetAgents returns registered agents.
func (*BasePlugin) GetMiddleware ¶
func (p *BasePlugin) GetMiddleware() []interface{}
GetMiddleware returns registered middleware.
func (*BasePlugin) GetTools ¶
func (p *BasePlugin) GetTools() []DynamicRunnable
GetTools returns registered tools.
func (*BasePlugin) Version ¶
func (p *BasePlugin) Version() string
Version returns the plugin version.
type BaseRunnable ¶
type BaseRunnable[I, O any] struct { // contains filtered or unexported fields }
BaseRunnable 提供 Runnable 的基础实现
实现了通用的功能如批处理、回调等 具体的 Invoke 和 Stream 需要由子类实现
func NewBaseRunnable ¶
func NewBaseRunnable[I, O any]() *BaseRunnable[I, O]
NewBaseRunnable 创建基础 Runnable
func (*BaseRunnable[I, O]) Batch ¶
func (r *BaseRunnable[I, O]) Batch(ctx context.Context, inputs []I, invoker func(context.Context, I) (O, error)) ([]O, error)
Batch 默认的批处理实现
使用 goroutines 并发执行多个输入
func (*BaseRunnable[I, O]) GetConfig ¶
func (r *BaseRunnable[I, O]) GetConfig() RunnableConfig
GetConfig 获取配置
func (*BaseRunnable[I, O]) WithCallbacks ¶
func (r *BaseRunnable[I, O]) WithCallbacks(callbacks ...Callback) *BaseRunnable[I, O]
WithCallbacks 添加回调
func (*BaseRunnable[I, O]) WithConfig ¶
func (r *BaseRunnable[I, O]) WithConfig(config RunnableConfig) *BaseRunnable[I, O]
WithConfig 配置 Runnable
type Callback ¶
type Callback interface {
// 通用回调
OnStart(ctx context.Context, input interface{}) error
OnEnd(ctx context.Context, output interface{}) error
OnError(ctx context.Context, err error) error
// LLM 回调
OnLLMStart(ctx context.Context, prompts []string, model string) error
OnLLMEnd(ctx context.Context, output string, tokenUsage int) error
OnLLMError(ctx context.Context, err error) error
// Chain 回调
OnChainStart(ctx context.Context, chainName string, input interface{}) error
OnChainEnd(ctx context.Context, chainName string, output interface{}) error
OnChainError(ctx context.Context, chainName string, err error) error
// Tool 回调
OnToolStart(ctx context.Context, toolName string, input interface{}) error
OnToolEnd(ctx context.Context, toolName string, output interface{}) error
OnToolError(ctx context.Context, toolName string, err error) error
// Agent 回调
OnAgentAction(ctx context.Context, action *AgentAction) error
OnAgentFinish(ctx context.Context, output interface{}) error
}
Callback 定义回调处理器接口
借鉴 LangChain 的回调系统,提供灵活的监控和调试能力 所有回调方法都返回 error,如果返回非 nil 错误,执行将中止
type CallbackManager ¶
type CallbackManager struct {
// contains filtered or unexported fields
}
CallbackManager 管理多个回调处理器
func NewCallbackManager ¶
func NewCallbackManager(callbacks ...Callback) *CallbackManager
NewCallbackManager 创建回调管理器
func (*CallbackManager) AddCallback ¶
func (m *CallbackManager) AddCallback(cb Callback)
AddCallback 添加回调
func (*CallbackManager) OnEnd ¶
func (m *CallbackManager) OnEnd(ctx context.Context, output interface{}) error
OnEnd 触发 OnEnd 回调
func (*CallbackManager) OnError ¶
func (m *CallbackManager) OnError(ctx context.Context, err error) error
OnError 触发 OnError 回调
func (*CallbackManager) OnStart ¶
func (m *CallbackManager) OnStart(ctx context.Context, input interface{}) error
OnStart 触发 OnStart 回调
func (*CallbackManager) RemoveCallback ¶
func (m *CallbackManager) RemoveCallback(cb Callback)
RemoveCallback 移除回调
func (*CallbackManager) TriggerCallbacks ¶
func (m *CallbackManager) TriggerCallbacks(fn func(Callback) error) error
TriggerCallbacks 触发所有回调的指定方法
type Chain ¶
type Chain interface {
Runnable[*ChainInput, *ChainOutput]
// Name 返回 Chain 的名称
Name() string
// Steps 返回包含的步骤数量
Steps() int
}
Chain 定义链式处理接口
Chain 是一种串行执行的处理模式,适用于: - 多步骤的数据处理流程 - 需要按顺序执行的分析任务 - 每个步骤依赖前一步骤的输出
Chain 现在直接实现 Runnable 接口,享受其所有能力: - Invoke: 单次执行 - Stream: 流式执行 - Batch: 批量执行 - Pipe: 管道连接 - WithCallbacks: 回调支持
type ChainInput ¶
type ChainInput struct {
// 输入数据
Data interface{} `json:"data"` // 主要输入数据
// 变量和上下文
Vars map[string]interface{} `json:"vars,omitempty"` // 变量集合,用于在步骤间传递上下文
// 执行控制
Options ChainOptions `json:"options,omitempty"` // 执行选项
}
ChainInput Chain 输入
func GetChainInput ¶
func GetChainInput() *ChainInput
GetChainInput retrieves a ChainInput from the object pool
type ChainOptions ¶
type ChainOptions struct {
// 执行控制
StopOnError bool `json:"stop_on_error,omitempty"` // 出错时是否停止
Timeout time.Duration `json:"timeout,omitempty"` // 超时时间
Parallel bool `json:"parallel,omitempty"` // 是否并行执行(如果可能)
// 步骤控制
SkipSteps []int `json:"skip_steps,omitempty"` // 跳过的步骤编号
OnlySteps []int `json:"only_steps,omitempty"` // 仅执行的步骤编号
// 额外选项
Extra map[string]interface{} `json:"extra,omitempty"` // 额外选项
}
ChainOptions Chain 执行选项
func DefaultChainOptions ¶
func DefaultChainOptions() ChainOptions
DefaultChainOptions 返回默认的 Chain 选项
type ChainOutput ¶
type ChainOutput struct {
// 输出数据
Data interface{} `json:"data"` // 最终输出数据
// 执行信息
StepsExecuted []StepExecution `json:"steps_executed"` // 执行的步骤详情
TotalLatency time.Duration `json:"total_latency"` // 总耗时
Status string `json:"status"` // 执行状态: "success", "failed", "partial"
// 额外结果
Metadata map[string]interface{} `json:"metadata,omitempty"` // 额外元数据
}
ChainOutput Chain 输出
func GetChainOutput ¶
func GetChainOutput() *ChainOutput
GetChainOutput retrieves a ChainOutput from the object pool
type ChainableAgent ¶
type ChainableAgent struct {
*BaseAgent
// contains filtered or unexported fields
}
ChainableAgent 可链式调用的 Agent
允许将多个 Agent 串联起来,前一个的输出作为后一个的输入
func NewChainableAgent ¶
func NewChainableAgent(name, description string, agents ...Agent) *ChainableAgent
NewChainableAgent 创建可链式调用的 Agent
func (*ChainableAgent) Invoke ¶
func (c *ChainableAgent) Invoke(ctx context.Context, input *AgentInput) (*AgentOutput, error)
Invoke 顺序调用所有 Agent(使用快速路径优化)
func (*ChainableAgent) InvokeFast ¶
func (c *ChainableAgent) InvokeFast(ctx context.Context, input *AgentInput) (*AgentOutput, error)
InvokeFast 快速调用链(绕过回调)
用于嵌套链场景的性能优化
type ChunkMetadata ¶
type ChunkMetadata = execution.ChunkMetadata
Type aliases for internal use within core package
type ChunkTransformFunc ¶
type ChunkTransformFunc = execution.ChunkTransformFunc
Type aliases for internal use within core package
type Container ¶
type Container struct {
// contains filtered or unexported fields
}
Container provides dependency injection for plugins.
Plugins can request shared resources like Logger, Store, LLM clients, etc. through the container instead of passing them as parameters.
func NewContainer ¶
func NewContainer() *Container
NewContainer creates a new dependency injection container.
func (*Container) MustResolve ¶
MustResolve resolves a service or panics.
func (*Container) RegisterFactory ¶
RegisterFactory registers a factory function for lazy initialization.
type CostTrackingCallback ¶
type CostTrackingCallback struct {
*BaseCallback
// contains filtered or unexported fields
}
CostTrackingCallback 成本追踪回调
追踪 LLM 调用的成本
func NewCostTrackingCallback ¶
func NewCostTrackingCallback(pricing map[string]float64) *CostTrackingCallback
NewCostTrackingCallback 创建成本追踪回调
func (*CostTrackingCallback) GetTotalCost ¶
func (c *CostTrackingCallback) GetTotalCost() float64
GetTotalCost 获取总成本
func (*CostTrackingCallback) GetTotalTokens ¶
func (c *CostTrackingCallback) GetTotalTokens() int
GetTotalTokens 获取总 token 数
type DefaultLifecycleManager ¶
type DefaultLifecycleManager struct {
// contains filtered or unexported fields
}
DefaultLifecycleManager implements interfaces.LifecycleManager.
Features:
- Priority-based startup (lower priority starts first)
- Reverse priority shutdown (higher priority stops first)
- Dependency resolution for components implementing DependencyAware
- Concurrent health checks
- Graceful shutdown with configurable timeout
func GlobalLifecycleManager ¶
func GlobalLifecycleManager() *DefaultLifecycleManager
GlobalLifecycleManager returns the global lifecycle manager instance.
func NewLifecycleManager ¶
func NewLifecycleManager() *DefaultLifecycleManager
NewLifecycleManager creates a new lifecycle manager with default config.
func NewLifecycleManagerWithConfig ¶
func NewLifecycleManagerWithConfig(config LifecycleManagerConfig) *DefaultLifecycleManager
NewLifecycleManagerWithConfig creates a new lifecycle manager with custom config.
func (*DefaultLifecycleManager) GetState ¶
func (m *DefaultLifecycleManager) GetState(name string) (interfaces.LifecycleState, error)
GetState returns the current state of a component.
func (*DefaultLifecycleManager) HealthCheckAll ¶
func (m *DefaultLifecycleManager) HealthCheckAll(ctx context.Context) map[string]interfaces.HealthStatus
HealthCheckAll returns aggregated health of all components.
func (*DefaultLifecycleManager) InitAll ¶
func (m *DefaultLifecycleManager) InitAll(ctx context.Context) error
InitAll initializes all registered components in priority order.
func (*DefaultLifecycleManager) Register ¶
func (m *DefaultLifecycleManager) Register(name string, component interfaces.Lifecycle, priority int) error
Register adds a component to be managed.
func (*DefaultLifecycleManager) RegisterWithConfig ¶
func (m *DefaultLifecycleManager) RegisterWithConfig(name string, component interfaces.Lifecycle, priority int, config interface{}) error
RegisterWithConfig registers a component with its configuration.
func (*DefaultLifecycleManager) ShutdownSignal ¶
func (m *DefaultLifecycleManager) ShutdownSignal() <-chan struct{}
ShutdownSignal returns a channel that closes when shutdown is signaled.
func (*DefaultLifecycleManager) SignalShutdown ¶
func (m *DefaultLifecycleManager) SignalShutdown()
SignalShutdown signals all components to begin shutdown.
func (*DefaultLifecycleManager) StartAll ¶
func (m *DefaultLifecycleManager) StartAll(ctx context.Context) error
StartAll starts all initialized components in priority order.
func (*DefaultLifecycleManager) StopAll ¶
func (m *DefaultLifecycleManager) StopAll(ctx context.Context) error
StopAll stops all running components in reverse priority order.
func (*DefaultLifecycleManager) Unregister ¶
func (m *DefaultLifecycleManager) Unregister(name string) error
Unregister removes a component from management.
func (*DefaultLifecycleManager) WaitForShutdown ¶
func (m *DefaultLifecycleManager) WaitForShutdown(ctx context.Context) error
WaitForShutdown blocks until all components are stopped.
type DefaultPanicHandler ¶
type DefaultPanicHandler struct{}
DefaultPanicHandler is the default panic handler implementation.
Behavior:
- Converts all panics to AgentError with CodeInternal
- Preserves panic value and stack trace in error context
- Compatible with existing error handling infrastructure
func (*DefaultPanicHandler) HandlePanic ¶
func (h *DefaultPanicHandler) HandlePanic(ctx context.Context, component, operation string, panicValue interface{}, stackTrace string) error
HandlePanic implements PanicHandler interface.
type DynamicRunnable ¶
type DynamicRunnable interface {
// Invoke 执行动态输入,返回动态输出
InvokeDynamic(ctx context.Context, input any) (any, error)
// Stream 流式执行
StreamDynamic(ctx context.Context, input any) (<-chan DynamicStreamChunk, error)
// Batch 批量执行
BatchDynamic(ctx context.Context, inputs []any) ([]any, error)
// TypeInfo 返回类型信息,用于运行时验证
TypeInfo() TypeInfo
}
DynamicRunnable 是插件兼容的动态 Runnable 接口
设计理念:
- 在插件边界使用 any 类型,保证运行时兼容性
- 通过 InputSchema/OutputSchema 提供运行时类型信息
- 支持 JSON 序列化的输入输出,便于跨进程通信
使用场景:
- Go 原生插件 (.so)
- 动态注册的第三方组件
- RPC/HTTP 调用的远程组件
type DynamicStreamChunk ¶
type DynamicStreamChunk struct {
Data any `json:"data,omitempty"`
Error error `json:"-"`
Done bool `json:"done"`
}
DynamicStreamChunk 动态流式输出块
type DynamicToTypedAdapter ¶
type DynamicToTypedAdapter[I, O any] struct { // contains filtered or unexported fields }
DynamicToTypedAdapter 将 DynamicRunnable 转换为泛型 Runnable
用于在类型安全的代码中使用动态加载的组件
func NewDynamicToTypedAdapter ¶
func NewDynamicToTypedAdapter[I, O any](dynamic DynamicRunnable) *DynamicToTypedAdapter[I, O]
NewDynamicToTypedAdapter 创建动态到类型安全的适配器
func (*DynamicToTypedAdapter[I, O]) Batch ¶
func (a *DynamicToTypedAdapter[I, O]) Batch(ctx context.Context, inputs []I) ([]O, error)
Batch 实现批量执行
func (*DynamicToTypedAdapter[I, O]) Invoke ¶
func (a *DynamicToTypedAdapter[I, O]) Invoke(ctx context.Context, input I) (O, error)
Invoke 实现 Runnable[I, O] 接口
func (*DynamicToTypedAdapter[I, O]) Pipe ¶
func (a *DynamicToTypedAdapter[I, O]) Pipe(next Runnable[O, any]) Runnable[I, any]
Pipe 连接到另一个 Runnable
func (*DynamicToTypedAdapter[I, O]) Stream ¶
func (a *DynamicToTypedAdapter[I, O]) Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error)
Stream 实现流式执行
func (*DynamicToTypedAdapter[I, O]) WithCallbacks ¶
func (a *DynamicToTypedAdapter[I, O]) WithCallbacks(callbacks ...Callback) Runnable[I, O]
WithCallbacks 添加回调
func (*DynamicToTypedAdapter[I, O]) WithConfig ¶
func (a *DynamicToTypedAdapter[I, O]) WithConfig(config RunnableConfig) Runnable[I, O]
WithConfig 配置 Runnable
func (*DynamicToTypedAdapter[I, O]) WithValidator ¶
func (a *DynamicToTypedAdapter[I, O]) WithValidator(validator func(any) error) *DynamicToTypedAdapter[I, O]
WithValidator 添加运行时验证器
type EnhancedPluginRegistry ¶
type EnhancedPluginRegistry struct {
// contains filtered or unexported fields
}
EnhancedPluginRegistry extends PluginRegistry with version management and DI.
func GlobalEnhancedPluginRegistry ¶
func GlobalEnhancedPluginRegistry() *EnhancedPluginRegistry
GlobalEnhancedPluginRegistry returns the global enhanced plugin registry.
func NewEnhancedPluginRegistry ¶
func NewEnhancedPluginRegistry() *EnhancedPluginRegistry
NewEnhancedPluginRegistry creates a new enhanced plugin registry.
func (*EnhancedPluginRegistry) Container ¶
func (r *EnhancedPluginRegistry) Container() *Container
Container returns the dependency injection container.
func (*EnhancedPluginRegistry) GetAllAgents ¶
func (r *EnhancedPluginRegistry) GetAllAgents() map[string][]DynamicRunnable
GetAllAgents returns all agents from all registered plugins.
func (*EnhancedPluginRegistry) GetAllTools ¶
func (r *EnhancedPluginRegistry) GetAllTools() map[string][]DynamicRunnable
GetAllTools returns all tools from all registered plugins.
func (*EnhancedPluginRegistry) GetLatestPlugin ¶
func (r *EnhancedPluginRegistry) GetLatestPlugin(name string) (Plugin, string, error)
GetLatestPlugin retrieves the latest version of a plugin.
func (*EnhancedPluginRegistry) GetPlugin ¶
func (r *EnhancedPluginRegistry) GetPlugin(name, version string) (Plugin, error)
GetPlugin retrieves a specific version of a plugin.
func (*EnhancedPluginRegistry) InitializeAll ¶
func (r *EnhancedPluginRegistry) InitializeAll(ctx context.Context) error
InitializeAll initializes all plugins in dependency order.
func (*EnhancedPluginRegistry) Lifecycle ¶
func (r *EnhancedPluginRegistry) Lifecycle() *DefaultLifecycleManager
Lifecycle returns the lifecycle manager.
func (*EnhancedPluginRegistry) ListPlugins ¶
func (r *EnhancedPluginRegistry) ListPlugins() []string
ListPlugins returns all registered plugin names.
func (*EnhancedPluginRegistry) ListVersions ¶
func (r *EnhancedPluginRegistry) ListVersions(name string) []string
ListVersions returns all versions of a plugin.
func (*EnhancedPluginRegistry) RegisterPlugin ¶
func (r *EnhancedPluginRegistry) RegisterPlugin(plugin Plugin, config *PluginConfig) error
RegisterPlugin registers a plugin with optional version.
type ExecutionStep ¶
type ExecutionStep struct {
// 步骤信息
Step int `json:"step"` // 步骤编号
Name string `json:"name"` // 步骤名称
Type string `json:"type"` // 类型: "agent", "chain", "tool", "decision"
Description string `json:"description"` // 描述
// 执行信息
ComponentName string `json:"component_name"` // 组件名称
Input interface{} `json:"input"` // 输入
Output interface{} `json:"output"` // 输出
Status string `json:"status"` // 状态: "pending", "running", "success", "failed", "skipped"
// 时间信息
StartTime time.Time `json:"start_time"` // 开始时间
EndTime time.Time `json:"end_time"` // 结束时间
Duration time.Duration `json:"duration"` // 耗时
// 错误信息
Error string `json:"error,omitempty"` // 错误信息
RetryCount int `json:"retry_count,omitempty"` // 重试次数
PartialError string `json:"partial_error,omitempty"` // 部分错误
// 元数据
Metadata map[string]interface{} `json:"metadata,omitempty"` // 额外元数据
}
ExecutionStep 执行步骤
type FastInvoker ¶
type FastInvoker interface {
// InvokeFast 快速执行 Agent,跳过回调和中间件
//
// 参数:
// - ctx: 执行上下文(用于取消和超时控制)
// - input: Agent 输入
//
// 返回:
// - output: Agent 输出
// - error: 执行错误
//
// 性能特性:
// - 无回调触发
// - 无中间件执行
// - 最小化内存分配
// - 优化的执行路径
InvokeFast(ctx context.Context, input *AgentInput) (*AgentOutput, error)
}
FastInvoker 定义快速调用接口
FastInvoker 提供绕过回调和中间件的高性能执行路径,适用于:
- Chain 内部调用
- 嵌套 Agent 调用
- 高频循环场景
- 性能关键路径
性能收益:
- 延迟降低 4-6%
- 内存分配减少 5-8%
- 无回调遍历开销
- 无虚拟方法分派开销
注意事项:
- InvokeFast 不触发任何回调(OnStart/OnFinish/OnLLM*/OnTool*)
- 不执行中间件逻辑
- 适用于内部调用,外部入口应使用标准 Invoke 方法
- 调试时建议使用标准 Invoke 以获得完整追踪信息
使用示例:
// 检查 Agent 是否支持快速调用
if fastAgent, ok := agent.(FastInvoker); ok {
output, err := fastAgent.InvokeFast(ctx, input)
} else {
output, err := agent.Invoke(ctx, input)
}
参考文档:docs/guides/INVOKE_FAST_OPTIMIZATION.md
type FunctionalLifecycle ¶
type FunctionalLifecycle struct {
// contains filtered or unexported fields
}
FunctionalLifecycle allows creating Lifecycle implementations using functions.
func NewFunctionalLifecycle ¶
func NewFunctionalLifecycle(name string, opts ...FunctionalLifecycleOption) *FunctionalLifecycle
NewFunctionalLifecycle creates a new FunctionalLifecycle.
func (*FunctionalLifecycle) HealthCheck ¶
func (f *FunctionalLifecycle) HealthCheck(ctx context.Context) interfaces.HealthStatus
HealthCheck calls the configured health function.
func (*FunctionalLifecycle) Init ¶
func (f *FunctionalLifecycle) Init(ctx context.Context, config interface{}) error
Init calls the configured init function.
type FunctionalLifecycleOption ¶
type FunctionalLifecycleOption func(*FunctionalLifecycle)
FunctionalLifecycleOption configures a FunctionalLifecycle.
func WithHealthFunc ¶
func WithHealthFunc(fn func(context.Context) interfaces.HealthStatus) FunctionalLifecycleOption
WithHealthFunc sets the HealthCheck function.
func WithInitFunc ¶
func WithInitFunc(fn func(context.Context, interface{}) error) FunctionalLifecycleOption
WithInitFunc sets the Init function.
func WithStartFunc ¶
func WithStartFunc(fn func(context.Context) error) FunctionalLifecycleOption
WithStartFunc sets the Start function.
func WithStopFunc ¶
func WithStopFunc(fn func(context.Context) error) FunctionalLifecycleOption
WithStopFunc sets the Stop function.
type Generator ¶ added in v0.2.0
Generator 定义生成器类型(基于 Go 1.25 iter.Seq2)
Generator 提供惰性求值的流式处理能力,相比 Channel 有以下优势:
- 零内存分配(无 channel、goroutine 开销)
- 支持早期终止(通过 yield 返回值)
- 统一的迭代接口(for-range 循环)
示例:
gen := agent.RunGenerator(ctx, input)
for output, err := range gen {
if err != nil {
return err
}
fmt.Println(output)
}
func Filter ¶ added in v0.2.0
Filter 过滤 Generator 输出
参数:
- gen - Generator 实例
- predicate - 过滤条件函数
返回:
- 新的 Generator,仅包含满足条件的元素
示例:
filtered := Filter(gen, func(data T) bool {
return data.Score > 0.5
})
func FromChannel ¶ added in v0.2.0
func FromChannel[T any](ch <-chan StreamChunk[T]) Generator[T]
FromChannel 将 Channel 转换为 Generator
参数:
- ch - 输入 channel
返回:
- Generator 实例
示例:
gen := FromChannel(ch)
for data, err := range gen {
// 处理数据
}
func GeneratorFunc ¶ added in v0.2.0
GeneratorFunc 将函数转换为 Generator
参数:
- fn - 生成器函数,接受 yield 函数作为参数
返回:
- Generator 实例
示例:
gen := GeneratorFunc(func(yield func(T, error) bool) {
for i := 0; i < 10; i++ {
if !yield(data, nil) {
return // 早期终止
}
}
})
type Interrupt ¶
type Interrupt struct {
// ID is a unique identifier for this interrupt
ID string
// Type specifies what kind of interrupt this is
Type InterruptType
// Priority indicates how urgently this needs to be handled
Priority InterruptPriority
// Message describes what human action is needed
Message string
// Context provides additional context for the interrupt
Context map[string]interface{}
// State is a snapshot of the agent state at interrupt time
State state.State
// CreatedAt is when the interrupt was created
CreatedAt time.Time
// ExpiresAt is when the interrupt expires (optional)
ExpiresAt *time.Time
// Metadata holds additional interrupt metadata
Metadata map[string]interface{}
}
Interrupt represents a point where human intervention is needed.
type InterruptConfig ¶
type InterruptConfig struct {
// EnableAutoSave saves state before each interrupt
EnableAutoSave bool
// DefaultTimeout is the default timeout for interrupts
DefaultTimeout time.Duration
// RequireReason requires a reason for all responses
RequireReason bool
// EnableHistory keeps history of all interrupts
EnableHistory bool
}
InterruptConfig configures interrupt behavior.
func DefaultInterruptConfig ¶
func DefaultInterruptConfig() *InterruptConfig
DefaultInterruptConfig returns default interrupt configuration.
type InterruptManager ¶
type InterruptManager struct {
// contains filtered or unexported fields
}
InterruptManager manages interrupts and their lifecycle.
func NewInterruptManager ¶
func NewInterruptManager(checkpointer checkpoint.Checkpointer) *InterruptManager
NewInterruptManager creates a new interrupt manager.
func (*InterruptManager) CancelInterrupt ¶
func (m *InterruptManager) CancelInterrupt(interruptID string) error
CancelInterrupt cancels a pending interrupt.
func (*InterruptManager) CreateInterrupt ¶
func (m *InterruptManager) CreateInterrupt(ctx context.Context, interrupt *Interrupt) (*Interrupt, *InterruptResponse, error)
CreateInterrupt creates a new interrupt and waits for human response. Returns the created interrupt (with ID assigned) and the response.
func (*InterruptManager) GetInterrupt ¶
func (m *InterruptManager) GetInterrupt(interruptID string) (*Interrupt, error)
GetInterrupt retrieves an interrupt by ID.
func (*InterruptManager) ListPendingInterrupts ¶
func (m *InterruptManager) ListPendingInterrupts() []*Interrupt
ListPendingInterrupts returns all interrupts awaiting response.
func (*InterruptManager) OnInterruptCreated ¶
func (m *InterruptManager) OnInterruptCreated(fn func(*Interrupt))
OnInterruptCreated sets a hook for when interrupts are created.
func (*InterruptManager) OnInterruptResolved ¶
func (m *InterruptManager) OnInterruptResolved(fn func(*Interrupt, *InterruptResponse))
OnInterruptResolved sets a hook for when interrupts are resolved.
func (*InterruptManager) RespondToInterrupt ¶
func (m *InterruptManager) RespondToInterrupt(interruptID string, response *InterruptResponse) error
RespondToInterrupt provides a response to an existing interrupt.
type InterruptPriority ¶
type InterruptPriority string
InterruptPriority defines the priority/severity of an interrupt.
const ( // InterruptPriorityLow can be handled asynchronously InterruptPriorityLow InterruptPriority = "low" // InterruptPriorityMedium should be handled in a reasonable timeframe InterruptPriorityMedium InterruptPriority = "medium" // InterruptPriorityHigh requires immediate attention InterruptPriorityHigh InterruptPriority = "high" // InterruptPriorityCritical blocks execution until resolved InterruptPriorityCritical InterruptPriority = "critical" )
type InterruptResponse ¶
type InterruptResponse struct {
// InterruptID is the ID of the interrupt being responded to
InterruptID string
// Approved indicates if the action was approved
Approved bool
// Input contains any human-provided input/feedback
Input map[string]interface{}
// Reason explains why the decision was made
Reason string
// RespondedAt is when the response was provided
RespondedAt time.Time
// RespondedBy identifies who provided the response
RespondedBy string
}
InterruptResponse represents the human response to an interrupt.
type InterruptRule ¶
type InterruptRule struct {
// Name identifies the rule
Name string
// Condition evaluates if an interrupt should be triggered
Condition func(ctx context.Context, state state.State) bool
// CreateInterrupt creates the interrupt if condition is met
CreateInterrupt func(ctx context.Context, state state.State) *Interrupt
}
InterruptRule defines a condition that triggers an interrupt.
type InterruptType ¶
type InterruptType string
InterruptType defines the type of interrupt.
const ( // InterruptTypeApproval requires human approval to continue InterruptTypeApproval InterruptType = "approval" // InterruptTypeInput requires human input/feedback InterruptTypeInput InterruptType = "input" // InterruptTypeReview requires human review before proceeding InterruptTypeReview InterruptType = "review" // InterruptTypeDecision requires human decision making InterruptTypeDecision InterruptType = "decision" )
type InterruptableExecutor ¶
type InterruptableExecutor struct {
// contains filtered or unexported fields
}
InterruptableExecutor wraps execution with interrupt capability.
func NewInterruptableExecutor ¶
func NewInterruptableExecutor(manager *InterruptManager, checkpointer checkpoint.Checkpointer) *InterruptableExecutor
NewInterruptableExecutor creates a new interruptable executor.
func (*InterruptableExecutor) AddInterruptRule ¶
func (e *InterruptableExecutor) AddInterruptRule(rule InterruptRule)
AddInterruptRule adds a rule that triggers interrupts.
func (*InterruptableExecutor) CheckInterrupts ¶
func (e *InterruptableExecutor) CheckInterrupts(ctx context.Context, state state.State) ([]*Interrupt, error)
CheckInterrupts evaluates all rules and creates interrupts if needed.
type LegacyStreamChunk ¶
type LegacyStreamChunk = execution.LegacyStreamChunk
Type aliases for internal use within core package
type LifecycleManagerConfig ¶
type LifecycleManagerConfig struct {
// DefaultInitTimeout is the default timeout for Init operations
DefaultInitTimeout time.Duration
// DefaultStartTimeout is the default timeout for Start operations
DefaultStartTimeout time.Duration
// DefaultStopTimeout is the default timeout for Stop operations
DefaultStopTimeout time.Duration
// HealthCheckInterval is the interval between automatic health checks
HealthCheckInterval time.Duration
// ContinueOnError determines whether to continue if a component fails
ContinueOnError bool
}
LifecycleManagerConfig configures the lifecycle manager.
func DefaultLifecycleManagerConfig ¶
func DefaultLifecycleManagerConfig() LifecycleManagerConfig
DefaultLifecycleManagerConfig returns sensible defaults.
type Logger ¶
type Logger interface {
Info(msg string, fields ...interface{})
Error(msg string, fields ...interface{})
Debug(msg string, fields ...interface{})
}
Logger 日志接口
type LoggingCallback ¶
type LoggingCallback struct {
*BaseCallback
// contains filtered or unexported fields
}
LoggingCallback 日志记录回调
将所有事件记录到日志
func NewLoggingCallback ¶
func NewLoggingCallback(logger Logger, verbose bool) *LoggingCallback
NewLoggingCallback 创建日志回调
func (*LoggingCallback) OnEnd ¶
func (l *LoggingCallback) OnEnd(ctx context.Context, output interface{}) error
func (*LoggingCallback) OnError ¶
func (l *LoggingCallback) OnError(ctx context.Context, err error) error
func (*LoggingCallback) OnLLMStart ¶
func (*LoggingCallback) OnStart ¶
func (l *LoggingCallback) OnStart(ctx context.Context, input interface{}) error
func (*LoggingCallback) OnToolEnd ¶
func (l *LoggingCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error
func (*LoggingCallback) OnToolStart ¶
func (l *LoggingCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error
type MetricsCallback ¶
type MetricsCallback struct {
*BaseCallback
// contains filtered or unexported fields
}
MetricsCallback 指标收集回调
收集执行指标(延迟、调用次数等)
func NewMetricsCallback ¶
func NewMetricsCallback(metrics MetricsCollector) *MetricsCallback
NewMetricsCallback 创建指标回调
func (*MetricsCallback) OnLLMStart ¶
func (*MetricsCallback) OnToolEnd ¶
func (m *MetricsCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error
func (*MetricsCallback) OnToolStart ¶
func (m *MetricsCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error
type MetricsCollector ¶
type MetricsCollector interface {
IncrementCounter(name string, value int64, tags map[string]string)
RecordHistogram(name string, value float64, tags map[string]string)
RecordGauge(name string, value float64, tags map[string]string)
}
MetricsCollector 指标收集器接口
type Middleware ¶
type Middleware = middleware.Middleware
Type aliases for internal use within core package
type MiddlewareChain ¶
type MiddlewareChain = middleware.MiddlewareChain
Type aliases for internal use within core package
type MiddlewareRequest ¶
type MiddlewareRequest = middleware.MiddlewareRequest
Type aliases for internal use within core package
type MiddlewareResponse ¶
type MiddlewareResponse = middleware.MiddlewareResponse
Type aliases for internal use within core package
type NoOpMetricsCollector ¶
type NoOpMetricsCollector struct{}
NoOpMetricsCollector is a no-op metrics collector (default).
This is the default implementation that does nothing. Replace with a real implementation (e.g., PrometheusMetricsCollector) in production.
func (*NoOpMetricsCollector) RecordPanic ¶
func (c *NoOpMetricsCollector) RecordPanic(ctx context.Context, component, operation string, panicValue interface{})
RecordPanic implements PanicMetricsCollector interface (no-op).
type NoOpPanicLogger ¶
type NoOpPanicLogger struct{}
NoOpPanicLogger is a no-op logger (default).
This is the default implementation that does nothing. Replace with a real implementation (e.g., StructuredPanicLogger) in production.
type Orchestrator ¶
type Orchestrator interface {
// Execute 执行编排任务
Execute(ctx context.Context, request *OrchestratorRequest) (*OrchestratorResponse, error)
// RegisterAgent 注册 Agent
RegisterAgent(name string, agent Agent) error
// RegisterChain 注册 Chain
RegisterChain(name string, chain Chain) error
// RegisterTool 注册 Tool
RegisterTool(name string, tool Tool) error
// Name 返回编排器名称
Name() string
}
Orchestrator 定义编排器接口
Orchestrator 负责协调多个 Agent、Chain 和 Tool 的执行,适用于: - 复杂的多步骤工作流 - 需要多个 Agent 协作的场景 - 动态决策和条件分支
type OrchestratorOptions ¶
type OrchestratorOptions struct {
// 日志选项
EnableLogging bool `json:"enable_logging"` // 是否启用日志
LogLevel string `json:"log_level"` // 日志级别
// 监控选项
EnableMetrics bool `json:"enable_metrics"` // 是否启用指标
EnableTracing bool `json:"enable_tracing"` // 是否启用追踪
// 回调
OnStepStart func(step ExecutionStep) `json:"-"` // 步骤开始回调
OnStepComplete func(step ExecutionStep) `json:"-"` // 步骤完成回调
OnError func(err error) `json:"-"` // 错误回调
// 额外选项
Extra map[string]interface{} `json:"extra,omitempty"` // 额外选项
}
OrchestratorOptions 编排器选项
func DefaultOrchestratorOptions ¶
func DefaultOrchestratorOptions() OrchestratorOptions
DefaultOrchestratorOptions 返回默认编排选项
type OrchestratorRequest ¶
type OrchestratorRequest struct {
// 任务信息
TaskID string `json:"task_id"` // 任务 ID
TaskType string `json:"task_type"` // 任务类型
Description string `json:"description"` // 任务描述
Parameters map[string]interface{} `json:"parameters"` // 任务参数
// 执行策略
Strategy OrchestratorStrategy `json:"strategy"` // 编排策略
// 选项
Options OrchestratorOptions `json:"options"` // 执行选项
// 元数据
SessionID string `json:"session_id"` // 会话 ID
Timestamp time.Time `json:"timestamp"` // 时间戳
}
OrchestratorRequest 编排器请求
type OrchestratorResponse ¶
type OrchestratorResponse struct {
// 执行结果
Result interface{} `json:"result"` // 最终结果
Status string `json:"status"` // 状态: "success", "failed", "partial"
Message string `json:"message"` // 结果消息
// 执行过程
ExecutionPlan []ExecutionStep `json:"execution_plan"` // 执行计划
ExecutionSteps []ExecutionStep `json:"execution_steps"` // 实际执行的步骤
// 性能指标
TotalLatency time.Duration `json:"total_latency"` // 总延迟
StartTime time.Time `json:"start_time"` // 开始时间
EndTime time.Time `json:"end_time"` // 结束时间
// 元数据
Metadata map[string]interface{} `json:"metadata"` // 额外元数据
}
OrchestratorResponse 编排器响应
type OrchestratorStrategy ¶
type OrchestratorStrategy struct {
// 执行模式
Mode string `json:"mode"` // "sequential", "parallel", "hybrid"
// 重试策略
EnableRetry bool `json:"enable_retry"` // 是否启用重试
MaxRetries int `json:"max_retries"` // 最大重试次数
RetryBackoff int `json:"retry_backoff"` // 重试退避时间(秒)
// 失败处理
FailurePolicy string `json:"failure_policy"` // "stop", "continue", "rollback"
// 超时策略
GlobalTimeout time.Duration `json:"global_timeout"` // 全局超时
StepTimeout time.Duration `json:"step_timeout"` // 单步超时
}
OrchestratorStrategy 编排策略
func DefaultOrchestratorStrategy ¶
func DefaultOrchestratorStrategy() OrchestratorStrategy
DefaultOrchestratorStrategy 返回默认编排策略
type PanicHandler ¶
type PanicHandler interface {
// HandlePanic processes a recovered panic and returns an appropriate error.
//
// Parameters:
// - ctx: The execution context
// - component: The component where panic occurred (e.g., "runnable", "lifecycle_manager")
// - operation: The operation being performed (e.g., "invoke", "init", "start")
// - panicValue: The value passed to panic()
// - stackTrace: The stack trace at the point of panic
//
// Returns:
// - An error representing the panic, suitable for returning to callers
HandlePanic(ctx context.Context, component, operation string, panicValue interface{}, stackTrace string) error
}
PanicHandler defines the interface for handling recovered panics.
Implementations can provide custom recovery strategies, such as:
- Converting panics to domain-specific errors
- Adding custom context information
- Implementing retry logic
- Graceful degradation
type PanicHandlerRegistry ¶
type PanicHandlerRegistry struct {
// contains filtered or unexported fields
}
PanicHandlerRegistry manages panic handling implementations with thread-safe hot-swapping.
Features:
- Thread-safe read/write operations
- Lock-free reads using atomic pointers
- Hot-swappable implementations at runtime
- Global singleton with sensible defaults
func GlobalPanicHandlerRegistry ¶
func GlobalPanicHandlerRegistry() *PanicHandlerRegistry
GlobalPanicHandlerRegistry returns the global panic handler registry instance.
This singleton is used by all panic recovery code in the system. Customize by calling SetHandler, SetMetricsCollector, or SetLogger.
func NewPanicHandlerRegistry ¶
func NewPanicHandlerRegistry() *PanicHandlerRegistry
NewPanicHandlerRegistry creates a new registry with default implementations.
func (*PanicHandlerRegistry) GetHandler ¶
func (r *PanicHandlerRegistry) GetHandler() PanicHandler
GetHandler returns the current panic handler (lock-free read).
func (*PanicHandlerRegistry) GetLogger ¶
func (r *PanicHandlerRegistry) GetLogger() PanicLogger
GetLogger returns the current panic logger (lock-free read).
func (*PanicHandlerRegistry) GetMetricsCollector ¶
func (r *PanicHandlerRegistry) GetMetricsCollector() PanicMetricsCollector
GetMetricsCollector returns the current metrics collector (lock-free read).
func (*PanicHandlerRegistry) HandlePanic ¶
func (r *PanicHandlerRegistry) HandlePanic(ctx context.Context, component, operation string, panicValue interface{}) error
HandlePanic is a convenience method that orchestrates the full panic handling flow: 1. Convert panic to error using handler 2. Record metrics 3. Log the event
This is the recommended way to handle panics with all customizations applied.
func (*PanicHandlerRegistry) SetHandler ¶
func (r *PanicHandlerRegistry) SetHandler(handler PanicHandler)
SetHandler replaces the panic handler (thread-safe).
func (*PanicHandlerRegistry) SetLogger ¶
func (r *PanicHandlerRegistry) SetLogger(logger PanicLogger)
SetLogger replaces the panic logger (thread-safe).
func (*PanicHandlerRegistry) SetMetricsCollector ¶
func (r *PanicHandlerRegistry) SetMetricsCollector(collector PanicMetricsCollector)
SetMetricsCollector replaces the metrics collector (thread-safe).
type PanicLogger ¶
type PanicLogger interface {
// LogPanic logs a panic event with all relevant context.
//
// Parameters:
// - ctx: The execution context
// - component: The component where panic occurred
// - operation: The operation being performed
// - panicValue: The value passed to panic()
// - stackTrace: The stack trace at the point of panic
// - recoveredError: The error returned from HandlePanic
LogPanic(ctx context.Context, component, operation string, panicValue interface{}, stackTrace string, recoveredError error)
}
PanicLogger defines the interface for logging panic events.
Implementations can provide specialized logging:
- Structured logging (JSON, logfmt)
- Different log levels based on panic type
- Integration with centralized logging systems
- Alert triggering for critical panics
type PanicMetricsCollector ¶
type PanicMetricsCollector interface {
// RecordPanic records that a panic occurred.
//
// Parameters:
// - ctx: The execution context
// - component: The component where panic occurred
// - operation: The operation being performed
// - panicValue: The value passed to panic()
RecordPanic(ctx context.Context, component, operation string, panicValue interface{})
}
PanicMetricsCollector defines the interface for collecting panic statistics.
Implementations can integrate with monitoring systems:
- Prometheus counters and histograms
- StatsD metrics
- Custom telemetry systems
- Application performance monitoring (APM)
type Plugin ¶
type Plugin interface {
interfaces.Lifecycle
// Name returns the plugin's unique identifier.
Name() string
// Version returns the plugin's version string.
Version() string
// GetTools returns tools provided by this plugin.
GetTools() []DynamicRunnable
// GetAgents returns agents provided by this plugin.
GetAgents() []DynamicRunnable
// GetMiddleware returns middleware provided by this plugin.
GetMiddleware() []interface{}
}
Plugin represents a loadable plugin that can provide tools, agents, or other components.
Plugins implement lifecycle management and can access shared resources through the dependency injection container.
type PluginConfig ¶
type PluginConfig struct {
// Name is the plugin identifier
Name string
// Version is the plugin version
Version string
// Config is plugin-specific configuration
Config map[string]interface{}
// Dependencies lists other plugins this plugin depends on
Dependencies []PluginDependency
// Enabled determines if the plugin should be loaded
Enabled bool
}
PluginConfig contains configuration for a plugin instance.
type PluginDependency ¶
type PluginDependency struct {
// Name of the required plugin
Name string
// VersionConstraint (e.g., ">=1.0.0", "~1.2.0")
VersionConstraint string
// Optional marks this dependency as optional
Optional bool
}
PluginDependency specifies a dependency on another plugin.
type PluginMetadata ¶
type PluginMetadata struct {
Name string `json:"name"`
Version string `json:"version"`
Description string `json:"description"`
Author string `json:"author,omitempty"`
Tags []string `json:"tags,omitempty"`
TypeInfo TypeInfo `json:"type_info"`
}
PluginMetadata 插件元数据
type PluginRegistry ¶
type PluginRegistry struct {
// contains filtered or unexported fields
}
PluginRegistry 插件注册中心
管理动态加载的组件,提供:
- 组件注册和发现
- 类型信息查询
- 版本管理
func GlobalPluginRegistry ¶
func GlobalPluginRegistry() *PluginRegistry
GlobalPluginRegistry 获取全局插件注册中心
func (*PluginRegistry) Get ¶
func (r *PluginRegistry) Get(name string) (DynamicRunnable, error)
Get 获取插件
func (*PluginRegistry) GetMetadata ¶
func (r *PluginRegistry) GetMetadata(name string) (*PluginMetadata, error)
GetMetadata 获取插件元数据
func (*PluginRegistry) Register ¶
func (r *PluginRegistry) Register(name string, plugin DynamicRunnable, metadata *PluginMetadata) error
Register 注册插件
func (*PluginRegistry) Unregister ¶
func (r *PluginRegistry) Unregister(name string) error
Unregister 注销插件
type ReasoningStep ¶
type ReasoningStep struct {
Step int `json:"step"` // 步骤编号
Action string `json:"action"` // 执行的操作
Description string `json:"description"` // 操作描述
Result string `json:"result"` // 操作结果
Duration time.Duration `json:"duration"` // 耗时
Success bool `json:"success"` // 是否成功
Error string `json:"error"` // 错误信息
}
ReasoningStep 推理步骤
type RetryPolicy ¶
type RetryPolicy struct {
MaxRetries int // 最大重试次数
Backoff BackoffPolicy // 退避策略
}
RetryPolicy 重试策略
type Runnable ¶
type Runnable[I, O any] interface { // Invoke 执行单个输入并返回输出 Invoke(ctx context.Context, input I) (O, error) // Stream 流式执行,返回输出通道 // 调用者负责从通道读取并处理结果 Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error) // Batch 批量执行多个输入 Batch(ctx context.Context, inputs []I) ([]O, error) // Pipe 连接到另一个 Runnable,形成管道 // 当前 Runnable 的输出成为下一个 Runnable 的输入 Pipe(next Runnable[O, any]) Runnable[I, any] // WithCallbacks 添加回调处理器 WithCallbacks(callbacks ...Callback) Runnable[I, O] // WithConfig 配置 Runnable WithConfig(config RunnableConfig) Runnable[I, O] }
Runnable 定义统一的可执行接口
借鉴 LangChain 的 Runnable 设计,提供统一的执行接口,支持: - 单个输入执行 (Invoke) - 流式执行 (Stream) - 批量执行 (Batch) - 管道连接 (Pipe) - 回调支持 (WithCallbacks)
泛型参数:
- I: 输入类型
- O: 输出类型
type RunnableConfig ¶
type RunnableConfig struct {
// Callbacks 回调处理器列表
Callbacks []Callback
// Tags 标签,用于标识和分类
Tags []string
// Metadata 元数据
Metadata map[string]interface{}
// MaxConcurrency 最大并发数(用于 Batch)
MaxConcurrency int
// RetryPolicy 重试策略
RetryPolicy *RetryPolicy
}
RunnableConfig Runnable 配置
type RunnableFunc ¶
type RunnableFunc[I, O any] struct { *BaseRunnable[I, O] // contains filtered or unexported fields }
RunnableFunc 函数式 Runnable
将普通函数包装成 Runnable
func NewRunnableFunc ¶
func NewRunnableFunc[I, O any](fn func(context.Context, I) (O, error)) *RunnableFunc[I, O]
NewRunnableFunc 创建函数式 Runnable
func (*RunnableFunc[I, O]) Batch ¶
func (f *RunnableFunc[I, O]) Batch(ctx context.Context, inputs []I) ([]O, error)
Batch 批量执行
func (*RunnableFunc[I, O]) Invoke ¶
func (f *RunnableFunc[I, O]) Invoke(ctx context.Context, input I) (O, error)
Invoke 执行函数
func (*RunnableFunc[I, O]) Pipe ¶
func (f *RunnableFunc[I, O]) Pipe(next Runnable[O, any]) Runnable[I, any]
Pipe 连接到另一个 Runnable
func (*RunnableFunc[I, O]) Stream ¶
func (f *RunnableFunc[I, O]) Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error)
Stream 流式执行(默认实现:包装成单个块)
func (*RunnableFunc[I, O]) WithCallbacks ¶
func (f *RunnableFunc[I, O]) WithCallbacks(callbacks ...Callback) Runnable[I, O]
WithCallbacks 添加回调(重写以返回正确的类型)
func (*RunnableFunc[I, O]) WithConfig ¶
func (f *RunnableFunc[I, O]) WithConfig(config RunnableConfig) Runnable[I, O]
WithConfig 配置 Runnable(重写以返回正确的类型)
type RunnablePipe ¶
type RunnablePipe[I, M, O any] struct { // contains filtered or unexported fields }
RunnablePipe 管道 Runnable,连接两个 Runnable
func NewRunnablePipe ¶
func NewRunnablePipe[I, M, O any](first Runnable[I, M], second Runnable[M, O]) *RunnablePipe[I, M, O]
NewRunnablePipe 创建管道 Runnable
func (*RunnablePipe[I, M, O]) Batch ¶
func (p *RunnablePipe[I, M, O]) Batch(ctx context.Context, inputs []I) ([]O, error)
Batch 批量执行管道
func (*RunnablePipe[I, M, O]) Invoke ¶
func (p *RunnablePipe[I, M, O]) Invoke(ctx context.Context, input I) (O, error)
Invoke 执行管道
func (*RunnablePipe[I, M, O]) Pipe ¶
func (p *RunnablePipe[I, M, O]) Pipe(next Runnable[O, any]) Runnable[I, any]
Pipe 连接到另一个 Runnable
func (*RunnablePipe[I, M, O]) Stream ¶
func (p *RunnablePipe[I, M, O]) Stream(ctx context.Context, input I) (<-chan StreamChunk[O], error)
Stream 流式执行管道
func (*RunnablePipe[I, M, O]) WithCallbacks ¶
func (p *RunnablePipe[I, M, O]) WithCallbacks(callbacks ...Callback) Runnable[I, O]
WithCallbacks 添加回调
func (*RunnablePipe[I, M, O]) WithConfig ¶
func (p *RunnablePipe[I, M, O]) WithConfig(config RunnableConfig) Runnable[I, O]
WithConfig 配置管道
type RunnableSequence ¶
type RunnableSequence struct {
// contains filtered or unexported fields
}
RunnableSequence 顺序执行多个 Runnable
类似于 Unix 管道: r1 | r2 | r3
func NewRunnableSequence ¶
func NewRunnableSequence(runnables ...Runnable[any, any]) *RunnableSequence
NewRunnableSequence 创建顺序执行的 Runnable 序列
func (*RunnableSequence) Stream ¶
func (s *RunnableSequence) Stream(ctx context.Context, input any) (<-chan StreamChunk[any], error)
Stream 流式执行序列
func (*RunnableSequence) WithCallbacks ¶
func (s *RunnableSequence) WithCallbacks(callbacks ...Callback) Runnable[any, any]
WithCallbacks 添加回调
func (*RunnableSequence) WithConfig ¶
func (s *RunnableSequence) WithConfig(config RunnableConfig) Runnable[any, any]
WithConfig 配置序列
type SimpleAgent ¶
type SimpleAgent = interfaces.Agent
SimpleAgent is a type alias for the canonical Agent interface.
For new code that doesn't need generic typing, use interfaces.Agent directly. This alias provides a migration path for existing code.
type Span ¶
type Span interface {
End()
SetAttribute(key string, value interface{})
SetStatus(code StatusCode, description string)
RecordError(err error)
}
Span 追踪 span 接口
type StatusCode ¶
type StatusCode int
StatusCode 状态码
const ( StatusCodeOK StatusCode = iota StatusCodeError )
type StdoutCallback ¶
type StdoutCallback struct {
*BaseCallback
// contains filtered or unexported fields
}
StdoutCallback 标准输出回调
将事件输出到标准输出,用于调试
func NewStdoutCallback ¶
func NewStdoutCallback(color bool) *StdoutCallback
NewStdoutCallback 创建标准输出回调
func (*StdoutCallback) OnError ¶
func (s *StdoutCallback) OnError(ctx context.Context, err error) error
func (*StdoutCallback) OnLLMStart ¶
func (*StdoutCallback) OnToolEnd ¶
func (s *StdoutCallback) OnToolEnd(ctx context.Context, toolName string, output interface{}) error
func (*StdoutCallback) OnToolStart ¶
func (s *StdoutCallback) OnToolStart(ctx context.Context, toolName string, input interface{}) error
type Step ¶
type Step interface {
// Execute 执行步骤
Execute(ctx context.Context, input interface{}) (interface{}, error)
// Name 返回步骤名称
Name() string
// Description 返回步骤描述
Description() string
}
Step 定义 Chain 中的单个步骤接口
type StepExecution ¶
type StepExecution struct {
StepNumber int `json:"step_number"` // 步骤编号
StepName string `json:"step_name"` // 步骤名称
Description string `json:"description"` // 步骤描述
Input interface{} `json:"input"` // 输入
Output interface{} `json:"output"` // 输出
Duration time.Duration `json:"duration"` // 耗时
Success bool `json:"success"` // 是否成功
Error string `json:"error"` // 错误信息
Skipped bool `json:"skipped"` // 是否跳过
}
StepExecution 步骤执行记录
type StreamChunk ¶
StreamChunk 流式输出的数据块
type StreamConsumer ¶
type StreamConsumer = execution.StreamConsumer
Type aliases for internal use within core package
type StreamOptions ¶
type StreamOptions = execution.StreamOptions
Type aliases for internal use within core package
type StreamOutput ¶
type StreamOutput = execution.StreamOutput
Type aliases for internal use within core package
type StreamState ¶
type StreamState = execution.StreamState
Type aliases for internal use within core package
type StreamStatus ¶
type StreamStatus = execution.StreamStatus
Type aliases for internal use within core package
type Tool ¶
type Tool interface {
// Execute 执行工具
Execute(ctx context.Context, input *ToolInput) (*ToolOutput, error)
// Name 返回工具名称
Name() string
// Description 返回工具描述
Description() string
// Parameters 返回工具参数定义
Parameters() []ToolParameter
}
Tool 定义工具接口
type ToolCall ¶
type ToolCall struct {
ToolName string `json:"tool_name"` // 工具名称
Input map[string]interface{} `json:"input"` // 输入参数
Output interface{} `json:"output"` // 输出结果
Duration time.Duration `json:"duration"` // 耗时
Success bool `json:"success"` // 是否成功
Error string `json:"error"` // 错误信息
}
ToolCall 工具调用记录
type ToolInput ¶
type ToolInput struct {
Action string `json:"action"` // 操作类型
Parameters map[string]interface{} `json:"parameters"` // 参数
Context map[string]interface{} `json:"context"` // 上下文
}
ToolInput 工具输入
type ToolOutput ¶
type ToolOutput struct {
Success bool `json:"success"` // 是否成功
Data interface{} `json:"data"` // 输出数据
Message string `json:"message"` // 消息
Error string `json:"error"` // 错误信息
}
ToolOutput 工具输出
type ToolParameter ¶
type ToolParameter struct {
Name string `json:"name"` // 参数名称
Type string `json:"type"` // 参数类型
Description string `json:"description"` // 参数描述
Required bool `json:"required"` // 是否必需
Default interface{} `json:"default"` // 默认值
}
ToolParameter 工具参数定义
type Tracer ¶
type Tracer interface {
StartSpan(ctx context.Context, name string, attrs map[string]interface{}) (context.Context, Span)
}
Tracer 追踪器接口
type TracingCallback ¶
type TracingCallback struct {
*BaseCallback
// contains filtered or unexported fields
}
TracingCallback 分布式追踪回调
集成 OpenTelemetry 或其他追踪系统
func NewTracingCallback ¶
func NewTracingCallback(tracer Tracer) *TracingCallback
NewTracingCallback 创建追踪回调
func (*TracingCallback) OnLLMError ¶
func (t *TracingCallback) OnLLMError(ctx context.Context, err error) error
func (*TracingCallback) OnLLMStart ¶
type TypeInfo ¶
type TypeInfo struct {
// InputType 输入类型的反射信息
InputType reflect.Type `json:"-"`
// OutputType 输出类型的反射信息
OutputType reflect.Type `json:"-"`
// InputSchema JSON Schema 描述(可选,用于验证)
InputSchema string `json:"input_schema,omitempty"`
// OutputSchema JSON Schema 描述(可选,用于验证)
OutputSchema string `json:"output_schema,omitempty"`
// Name 组件名称
Name string `json:"name"`
// Description 组件描述
Description string `json:"description,omitempty"`
}
TypeInfo 提供运行时类型信息
用于:
- 插件注册时的类型验证
- 运行时类型检查和转换
- 文档生成和 API 描述
type TypedToDynamicAdapter ¶
type TypedToDynamicAdapter[I, O any] struct { // contains filtered or unexported fields }
TypedToDynamicAdapter 将泛型 Runnable 转换为 DynamicRunnable
这是桥接编译时类型安全和运行时灵活性的关键组件
func NewTypedToDynamicAdapter ¶
func NewTypedToDynamicAdapter[I, O any](typed Runnable[I, O], name string) *TypedToDynamicAdapter[I, O]
NewTypedToDynamicAdapter 创建类型安全到动态的适配器
func (*TypedToDynamicAdapter[I, O]) BatchDynamic ¶
func (a *TypedToDynamicAdapter[I, O]) BatchDynamic(ctx context.Context, inputs []any) ([]any, error)
BatchDynamic 实现批量执行
func (*TypedToDynamicAdapter[I, O]) InvokeDynamic ¶
InvokeDynamic 实现 DynamicRunnable 接口
func (*TypedToDynamicAdapter[I, O]) StreamDynamic ¶
func (a *TypedToDynamicAdapter[I, O]) StreamDynamic(ctx context.Context, input any) (<-chan DynamicStreamChunk, error)
StreamDynamic 实现流式执行
func (*TypedToDynamicAdapter[I, O]) TypeInfo ¶
func (a *TypedToDynamicAdapter[I, O]) TypeInfo() TypeInfo
TypeInfo 返回类型信息
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package state provides type-safe state management with schema validation.
|
Package state provides type-safe state management with schema validation. |