Documentation
¶
Overview ¶
Example (CombinedOptimizations) ¶
Example_combinedOptimizations demonstrates using all optimizations together
ctx := context.Background()
fmt.Println("=== Combined Performance Optimizations Demo ===")
// 1. Create agent pool
factory := func() (core.Agent, error) {
agent := NewMockAgent("combined-agent", 10*time.Millisecond)
// Wrap with cache
config := CacheConfig{
MaxSize: 1000,
TTL: 10 * time.Minute,
EnableStats: true,
}
return NewCachedAgent(agent, config), nil
}
poolConfig := PoolConfig{
InitialSize: 5,
MaxSize: 20,
IdleTimeout: 5 * time.Minute,
}
pool, err := NewAgentPool(factory, poolConfig)
if err != nil {
panic(err)
}
defer pool.Close()
// 2. Create batch executor with pooled agents
batchConfig := BatchConfig{
MaxConcurrency: 10,
Timeout: 5 * time.Minute,
ErrorPolicy: ErrorPolicyContinue,
EnableStats: true,
}
// Custom agent wrapper that uses pool
pooledAgent := &poolAgent{pool: pool}
executor := NewBatchExecutor(pooledAgent, batchConfig)
// 3. Prepare batch with some repeated tasks (to benefit from cache)
inputs := make([]*core.AgentInput, 50)
for i := 0; i < 50; i++ {
// Every 5th task is repeated
taskID := i / 5
inputs[i] = &core.AgentInput{
Task: fmt.Sprintf("Analyze resource #%d", taskID),
Instruction: "Check for anomalies",
Context: map[string]interface{}{
"resourceID": taskID,
},
Timestamp: time.Now(),
}
}
// 4. Execute batch
fmt.Println("Executing 50 tasks (with repeats) using pool + cache + batch...")
start := time.Now()
result := executor.Execute(ctx, inputs)
duration := time.Since(start)
// 5. Print comprehensive results
fmt.Printf("\n--- Results ---\n")
fmt.Printf("Execution time: %v\n", duration)
fmt.Printf("Success rate: %d/%d (%.2f%%)\n",
result.Stats.SuccessCount,
result.Stats.TotalCount,
float64(result.Stats.SuccessCount)/float64(result.Stats.TotalCount)*100)
// Pool statistics
poolStats := pool.Stats()
fmt.Printf("\n--- Pool Statistics ---\n")
fmt.Printf("Agents: %d/%d (%.2f%% utilization)\n",
poolStats.ActiveCount, poolStats.MaxSize, poolStats.UtilizationPct)
fmt.Printf("Total acquisitions: %d\n", poolStats.AcquiredTotal)
fmt.Printf("Avg wait time: %v\n", poolStats.AvgWaitTime)
// Batch executor statistics
execStats := executor.Stats()
fmt.Printf("\n--- Batch Executor Statistics ---\n")
fmt.Printf("Total executions: %d\n", execStats.TotalExecutions)
fmt.Printf("Total tasks: %d\n", execStats.TotalTasks)
fmt.Printf("Success rate: %.2f%%\n", execStats.SuccessRate)
fmt.Printf("Avg execution time: %v\n", execStats.AvgDuration)
fmt.Println("\n=== Demo Complete ===")
Example (CustomCacheKey) ¶
Example_customCacheKey demonstrates custom cache key generation
ctx := context.Background()
agent := NewMockAgent("custom-cache-agent", 1*time.Millisecond)
// Custom key generator that ignores timestamp
customKeyGen := func(input *core.AgentInput) string {
return fmt.Sprintf("%s:%s", input.Task, input.Instruction)
}
config := CacheConfig{
MaxSize: 1000,
TTL: 10 * time.Minute,
EnableStats: true,
KeyGenerator: customKeyGen,
}
cachedAgent := NewCachedAgent(agent, config)
defer cachedAgent.Close()
// Execute with different timestamps but same task
for i := 0; i < 5; i++ {
input := &core.AgentInput{
Task: "Same task",
Instruction: "Same instruction",
Timestamp: time.Now(), // Different timestamp
}
start := time.Now()
_, _ = cachedAgent.Invoke(ctx, input)
duration := time.Since(start)
if i == 0 {
fmt.Printf("Execution %d: %v (cache miss)\n", i+1, duration)
} else {
fmt.Printf("Execution %d: %v (cache hit)\n", i+1, duration)
}
}
stats := cachedAgent.Stats()
fmt.Printf("\nCache hit rate: %.2f%%\n", stats.HitRate)
Example (StreamingBatch) ¶
Example_streamingBatch demonstrates streaming batch execution
ctx := context.Background()
agent := NewMockAgent("streaming-agent", 50*time.Millisecond)
config := BatchConfig{
MaxConcurrency: 5,
Timeout: 5 * time.Minute,
ErrorPolicy: ErrorPolicyContinue,
}
executor := NewBatchExecutor(agent, config)
// Prepare inputs
inputs := make([]*core.AgentInput, 20)
for i := 0; i < 20; i++ {
inputs[i] = &core.AgentInput{
Task: fmt.Sprintf("Stream task #%d", i),
Instruction: "Process",
Timestamp: time.Now(),
}
}
// Execute with streaming results
fmt.Println("Executing tasks with streaming results...")
resultChan, errorChan := executor.ExecuteStream(ctx, inputs)
successCount := 0
errorCount := 0
// Process results as they arrive
done := false
for !done {
select {
case output, ok := <-resultChan:
if !ok {
done = true
break
}
if output != nil {
successCount++
fmt.Printf("✓ Task completed: %s\n", output.Status)
}
case err, ok := <-errorChan:
if !ok {
break
}
errorCount++
fmt.Printf("✗ Task failed (index %d): %v\n", err.Index, err.Error)
}
}
fmt.Printf("\nStreaming complete: %d successes, %d errors\n", successCount, errorCount)
Index ¶
- Variables
- func CloneAgentInput(src *core.AgentInput, pool *DataPools) *core.AgentInput
- func CloneAgentOutput(src *core.AgentOutput, pool *DataPools) *core.AgentOutput
- func GetAgentInput() *core.AgentInput
- func GetAgentOutput() *core.AgentOutput
- func InitDefaultPoolManager(config *PoolManagerConfig)
- func PutAgentInput(input *core.AgentInput)
- func PutAgentOutput(output *core.AgentOutput)
- type AdaptivePoolStrategy
- type AgentFactory
- type AgentPool
- type AllPoolStats
- type BatchConfig
- type BatchError
- type BatchExecutor
- func (b *BatchExecutor) Execute(ctx context.Context, inputs []*core.AgentInput) *BatchResult
- func (b *BatchExecutor) ExecuteStream(ctx context.Context, inputs []*core.AgentInput) (<-chan *core.AgentOutput, <-chan BatchError)
- func (b *BatchExecutor) ExecuteWithCallback(ctx context.Context, inputs []*core.AgentInput, ...) *BatchResult
- func (b *BatchExecutor) Stats() ExecutorStats
- type BatchInput
- type BatchResult
- type BatchStats
- type CacheConfig
- type CacheEntry
- type CacheStats
- type CachedAgent
- func (c *CachedAgent) Batch(ctx context.Context, inputs []*core.AgentInput) ([]*core.AgentOutput, error)
- func (c *CachedAgent) Capabilities() []string
- func (c *CachedAgent) Close() error
- func (c *CachedAgent) Description() string
- func (c *CachedAgent) Invalidate(input *core.AgentInput)
- func (c *CachedAgent) InvalidateAll()
- func (c *CachedAgent) Invoke(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
- func (c *CachedAgent) Name() string
- func (c *CachedAgent) Pipe(next core.Runnable[*core.AgentOutput, any]) core.Runnable[*core.AgentInput, any]
- func (c *CachedAgent) Stats() CacheStats
- func (c *CachedAgent) Stream(ctx context.Context, input *core.AgentInput) (<-chan core.StreamChunk[*core.AgentOutput], error)
- func (c *CachedAgent) WithCallbacks(callbacks ...core.Callback) core.Runnable[*core.AgentInput, *core.AgentOutput]
- func (c *CachedAgent) WithConfig(config core.RunnableConfig) core.Runnable[*core.AgentInput, *core.AgentOutput]
- type DataPoolStats
- type DataPools
- func (p *DataPools) GetAgentInput() *core.AgentInput
- func (p *DataPools) GetAgentOutput() *core.AgentOutput
- func (p *DataPools) GetContextMap() map[string]interface{}
- func (p *DataPools) GetMetadataMap() map[string]interface{}
- func (p *DataPools) GetReasoningSlice() []core.ReasoningStep
- func (p *DataPools) GetReasoningStep() *core.ReasoningStep
- func (p *DataPools) GetStats() DataPoolStats
- func (p *DataPools) GetStringSlice() []string
- func (p *DataPools) GetToolCall() *core.ToolCall
- func (p *DataPools) GetToolCallSlice() []core.ToolCall
- func (p *DataPools) PutAgentInput(input *core.AgentInput)
- func (p *DataPools) PutAgentOutput(output *core.AgentOutput)
- func (p *DataPools) PutContextMap(m map[string]interface{})
- func (p *DataPools) PutMetadataMap(m map[string]interface{})
- func (p *DataPools) PutReasoningSlice(s []core.ReasoningStep)
- func (p *DataPools) PutReasoningStep(step *core.ReasoningStep)
- func (p *DataPools) PutStringSlice(s []string)
- func (p *DataPools) PutToolCall(tc *core.ToolCall)
- func (p *DataPools) PutToolCallSlice(s []core.ToolCall)
- type ErrorPolicy
- type ExecutorStats
- type MetricsCollector
- type MetricsPoolStrategy
- type ObjectPoolStats
- type ObjectPoolStatsSnapshot
- type OptimizedAgentPool
- func (p *OptimizedAgentPool) Acquire(ctx context.Context) (core.Agent, error)
- func (p *OptimizedAgentPool) Close() error
- func (p *OptimizedAgentPool) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
- func (p *OptimizedAgentPool) Release(agent core.Agent) error
- func (p *OptimizedAgentPool) Stats() PoolStats
- type PoolAgent
- func (a *PoolAgent) Close() error
- func (a *PoolAgent) Configure(config *PoolManagerConfig) error
- func (a *PoolAgent) DisablePool(poolType PoolType)
- func (a *PoolAgent) EnablePool(poolType PoolType)
- func (a *PoolAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
- func (a *PoolAgent) GetAgentInput() *core.AgentInput
- func (a *PoolAgent) GetAgentOutput() *core.AgentOutput
- func (a *PoolAgent) GetAllStats() map[PoolType]*ObjectPoolStatsSnapshot
- func (a *PoolAgent) GetBuffer() *bytes.Buffer
- func (a *PoolAgent) GetConfig() *PoolManagerConfig
- func (a *PoolAgent) GetMessage() *interfaces.Message
- func (a *PoolAgent) GetStats(poolType PoolType) *ObjectPoolStatsSnapshot
- func (a *PoolAgent) GetToolInput() *interfaces.ToolInput
- func (a *PoolAgent) GetToolOutput() *interfaces.ToolOutput
- func (a *PoolAgent) IsPoolEnabled(poolType PoolType) bool
- func (a *PoolAgent) PutAgentInput(input *core.AgentInput)
- func (a *PoolAgent) PutAgentOutput(output *core.AgentOutput)
- func (a *PoolAgent) PutBuffer(buf *bytes.Buffer)
- func (a *PoolAgent) PutMessage(msg *interfaces.Message)
- func (a *PoolAgent) PutToolInput(input *interfaces.ToolInput)
- func (a *PoolAgent) PutToolOutput(output *interfaces.ToolOutput)
- func (a *PoolAgent) ResetStats()
- type PoolConfig
- type PoolManager
- type PoolManagerAgent
- type PoolManagerConfig
- type PoolStats
- type PoolStrategy
- type PoolType
- type PooledAgentInput
- type PooledAgentOutput
- type PriorityPoolStrategy
- type ScenarioBasedStrategy
- type ScenarioType
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrPoolClosed 池已关闭错误 ErrPoolClosed = errors.New("agent pool is closed") // ErrPoolTimeout 获取 Agent 超时错误 ErrPoolTimeout = errors.New("timeout acquiring agent from pool") )
var AgentInputPool = &sync.Pool{ New: func() interface{} { return &core.AgentInput{ Context: make(map[string]interface{}), } }, }
AgentInputPool core.AgentInput 对象池
用于 Agent 输入对象的复用
var AgentOutputPool = &sync.Pool{ New: func() interface{} { return &core.AgentOutput{ ReasoningSteps: make([]core.ReasoningStep, 0, 4), ToolCalls: make([]core.ToolCall, 0, 4), Metadata: make(map[string]interface{}), } }, }
AgentOutputPool core.AgentOutput 对象池
用于 Agent 输出对象的复用
var ByteBufferPool = &sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, }
ByteBufferPool bytes.Buffer 对象池
用于高频场景的 Buffer 复用,减少内存分配和 GC 压力 常见使用场景:JSON 序列化、HTTP 请求体构建、字符串拼接
var DefaultDataPools = NewDataPools()
DefaultDataPools 全局默认数据池实例
可以直接使用,也可以创建自定义实例
var MessagePool = &sync.Pool{ New: func() interface{} { return &interfaces.Message{} }, }
MessagePool interfaces.Message 对象池
用于 LLM 通信中的消息对象复用
var ToolInputPool = &sync.Pool{ New: func() interface{} { return &interfaces.ToolInput{ Args: make(map[string]interface{}), } }, }
ToolInputPool interfaces.ToolInput 对象池
用于工具调用输入对象的复用
var ToolOutputPool = &sync.Pool{ New: func() interface{} { return &interfaces.ToolOutput{ Metadata: make(map[string]interface{}), } }, }
ToolOutputPool interfaces.ToolOutput 对象池
用于工具调用输出对象的复用
Functions ¶
func CloneAgentInput ¶
func CloneAgentInput(src *core.AgentInput, pool *DataPools) *core.AgentInput
CloneAgentInput 克隆 AgentInput (使用池化对象)
用于需要保留输入数据的场景
func CloneAgentOutput ¶
func CloneAgentOutput(src *core.AgentOutput, pool *DataPools) *core.AgentOutput
CloneAgentOutput 克隆 AgentOutput (使用池化对象)
用于需要保留输出数据的场景
func InitDefaultPoolManager ¶
func InitDefaultPoolManager(config *PoolManagerConfig)
InitDefaultPoolManager 初始化默认池管理器
func PutAgentOutput ¶
func PutAgentOutput(output *core.AgentOutput)
PutAgentOutput 归还 AgentOutput 到默认池
Types ¶
type AdaptivePoolStrategy ¶
type AdaptivePoolStrategy struct {
// contains filtered or unexported fields
}
AdaptivePoolStrategy 自适应池策略 根据使用频率和内存压力动态调整池行为
func NewAdaptivePoolStrategy ¶
func NewAdaptivePoolStrategy(config *PoolManagerConfig) *AdaptivePoolStrategy
NewAdaptivePoolStrategy 创建自适应池策略
func (*AdaptivePoolStrategy) PostPut ¶
func (s *AdaptivePoolStrategy) PostPut(poolType PoolType)
PostPut 归还后钩子
func (*AdaptivePoolStrategy) PreGet ¶
func (s *AdaptivePoolStrategy) PreGet(poolType PoolType)
PreGet 获取前钩子
func (*AdaptivePoolStrategy) ShouldPool ¶
func (s *AdaptivePoolStrategy) ShouldPool(poolType PoolType, size int) bool
ShouldPool 决定是否池化
type AgentPool ¶
type AgentPool struct {
// contains filtered or unexported fields
}
AgentPool Agent 池
Example ¶
ExampleAgentPool demonstrates Agent pooling usage
ctx := context.Background()
// Define agent factory
factory := func() (core.Agent, error) {
return NewMockAgent("example-agent", 10*time.Millisecond), nil
}
// Configure pool
config := PoolConfig{
InitialSize: 5, // Pre-create 5 agents
MaxSize: 20, // Max 20 agents
IdleTimeout: 5 * time.Minute,
MaxLifetime: 30 * time.Minute,
AcquireTimeout: 5 * time.Second,
CleanupInterval: 1 * time.Minute,
}
// Create pool
pool, err := NewAgentPool(factory, config)
if err != nil {
panic(err)
}
defer pool.Close()
// Execute tasks
for i := 0; i < 10; i++ {
input := &core.AgentInput{
Task: fmt.Sprintf("Task #%d", i),
Instruction: "Process this task",
Timestamp: time.Now(),
}
output, err := pool.Execute(ctx, input)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("Task %d: %s\n", i, output.Status)
}
// Check pool statistics
stats := pool.Stats()
fmt.Printf("\nPool Statistics:\n")
fmt.Printf(" Total agents: %d\n", stats.TotalCount)
fmt.Printf(" Active: %d, Idle: %d\n", stats.ActiveCount, stats.IdleCount)
fmt.Printf(" Utilization: %.2f%%\n", stats.UtilizationPct)
fmt.Printf(" Total acquisitions: %d\n", stats.AcquiredTotal)
func NewAgentPool ¶
func NewAgentPool(factory AgentFactory, config PoolConfig) (*AgentPool, error)
NewAgentPool 创建新的 Agent 池
func (*AgentPool) Execute ¶
func (p *AgentPool) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
Execute 执行 Agent(自动借用和归还)
type AllPoolStats ¶
type AllPoolStats struct {
ByteBuffer ObjectPoolStats
Message ObjectPoolStats
ToolInput ObjectPoolStats
ToolOutput ObjectPoolStats
AgentInput ObjectPoolStats
AgentOutput ObjectPoolStats
}
AllPoolStats 所有对象池的统计信息
type BatchConfig ¶
type BatchConfig struct {
// MaxConcurrency 最大并发数
MaxConcurrency int
// Timeout 批量执行超时时间
Timeout time.Duration
// ErrorPolicy 错误处理策略
ErrorPolicy ErrorPolicy
// EnableStats 是否启用统计
EnableStats bool
}
BatchConfig 批量执行配置
type BatchError ¶
type BatchError struct {
Index int // 输入索引
Input *core.AgentInput // 输入
Error error // 错误
}
BatchError 批量执行错误
type BatchExecutor ¶
type BatchExecutor struct {
// contains filtered or unexported fields
}
BatchExecutor 批量执行器
Example ¶
ExampleBatchExecutor demonstrates batch execution
ctx := context.Background()
// Create agent
agent := NewMockAgent("example-agent", 100*time.Microsecond)
// Configure batch executor
config := BatchConfig{
MaxConcurrency: 10,
Timeout: 5 * time.Minute,
ErrorPolicy: ErrorPolicyContinue,
EnableStats: true,
}
executor := NewBatchExecutor(agent, config)
// Prepare batch inputs
inputs := make([]*core.AgentInput, 100)
for i := 0; i < 100; i++ {
inputs[i] = &core.AgentInput{
Task: fmt.Sprintf("Process item #%d", i),
Instruction: "Perform analysis",
Timestamp: time.Now(),
}
}
// Execute batch
fmt.Println("Executing 100 tasks with 10 concurrent workers...")
start := time.Now()
result := executor.Execute(ctx, inputs)
duration := time.Since(start)
// Print results
fmt.Printf("\nBatch Execution Results:\n")
fmt.Printf(" Total tasks: %d\n", result.Stats.TotalCount)
fmt.Printf(" Successful: %d\n", result.Stats.SuccessCount)
fmt.Printf(" Failed: %d\n", result.Stats.FailureCount)
fmt.Printf(" Total duration: %v\n", duration)
fmt.Printf(" Avg duration per task: %v\n", result.Stats.AvgDuration)
fmt.Printf(" Min duration: %v\n", result.Stats.MinDuration)
fmt.Printf(" Max duration: %v\n", result.Stats.MaxDuration)
// Calculate theoretical serial time
serialTime := time.Duration(100) * 100 * time.Microsecond
speedup := float64(serialTime) / float64(duration)
fmt.Printf(" Speedup vs serial: %.2fx\n", speedup)
func NewBatchExecutor ¶
func NewBatchExecutor(agent core.Agent, config BatchConfig) *BatchExecutor
NewBatchExecutor 创建新的批量执行器
func (*BatchExecutor) Execute ¶
func (b *BatchExecutor) Execute(ctx context.Context, inputs []*core.AgentInput) *BatchResult
Execute 执行批量任务 Uses a worker pool pattern: launches only MaxConcurrency workers that pull work from a shared channel, instead of spawning len(inputs) goroutines.
func (*BatchExecutor) ExecuteStream ¶
func (b *BatchExecutor) ExecuteStream( ctx context.Context, inputs []*core.AgentInput, ) (<-chan *core.AgentOutput, <-chan BatchError)
ExecuteStream 流式执行批量任务(返回结果通道)
func (*BatchExecutor) ExecuteWithCallback ¶
func (b *BatchExecutor) ExecuteWithCallback( ctx context.Context, inputs []*core.AgentInput, callback func(index int, output *core.AgentOutput, err error), ) *BatchResult
ExecuteWithCallback 执行批量任务(带回调)
type BatchInput ¶
type BatchInput struct {
Inputs []*core.AgentInput
Config BatchConfig
}
BatchInput 批量输入
type BatchResult ¶
type BatchResult struct {
Results []*core.AgentOutput // 成功的结果
Errors []BatchError // 错误列表
Stats BatchStats // 统计信息
}
BatchResult 批量执行结果
type BatchStats ¶
type BatchStats struct {
TotalCount int // 总任务数
SuccessCount int // 成功数
FailureCount int // 失败数
Duration time.Duration // 总耗时
AvgDuration time.Duration // 平均耗时
MinDuration time.Duration // 最小耗时
MaxDuration time.Duration // 最大耗时
}
BatchStats 批量执行统计
type CacheConfig ¶
type CacheConfig struct {
// MaxSize 最大缓存条目数
MaxSize int
// TTL 缓存过期时间
TTL time.Duration
// CleanupInterval 清理间隔
CleanupInterval time.Duration
// EnableStats 是否启用统计
EnableStats bool
// KeyGenerator 自定义缓存键生成器
KeyGenerator func(*core.AgentInput) string
}
CacheConfig 缓存配置
type CacheEntry ¶
type CacheEntry struct {
Output *core.AgentOutput
CreatedAt time.Time
ExpiresAt time.Time
HitCount atomic.Int64
}
CacheEntry 缓存条目
type CacheStats ¶
type CacheStats struct {
Size int // 当前缓存大小
MaxSize int // 最大缓存大小
Hits int64 // 缓存命中次数
Misses int64 // 缓存未命中次数
HitRate float64 // 命中率百分比
Evictions int64 // 驱逐次数
Expirations int64 // 过期次数
AvgHitTime time.Duration // 平均命中响应时间
AvgMissTime time.Duration // 平均未命中响应时间
}
CacheStats 缓存统计信息
type CachedAgent ¶
type CachedAgent struct {
// contains filtered or unexported fields
}
CachedAgent 缓存包装器
Example ¶
ExampleCachedAgent demonstrates caching usage
ctx := context.Background()
// Create base agent
agent := NewMockAgent("example-agent", 1*time.Millisecond)
// Configure cache
config := CacheConfig{
MaxSize: 1000,
TTL: 10 * time.Minute,
CleanupInterval: 1 * time.Minute,
EnableStats: true,
}
// Wrap with cache
cachedAgent := NewCachedAgent(agent, config)
defer cachedAgent.Close()
// Same input (will be cached)
input := &core.AgentInput{
Task: "Analyze logs",
Instruction: "Find errors in application logs",
Context: map[string]interface{}{
"timeRange": "last 1 hour",
},
Timestamp: time.Now(),
}
// First execution (cache miss)
start := time.Now()
output1, _ := cachedAgent.Invoke(ctx, input)
duration1 := time.Since(start)
fmt.Printf("First execution: %v (cache miss)\n", duration1)
// Second execution (cache hit)
start = time.Now()
output2, _ := cachedAgent.Invoke(ctx, input)
duration2 := time.Since(start)
fmt.Printf("Second execution: %v (cache hit)\n", duration2)
fmt.Printf("Speedup: %.2fx\n", float64(duration1)/float64(duration2))
// Check cache statistics
stats := cachedAgent.Stats()
fmt.Printf("\nCache Statistics:\n")
fmt.Printf(" Size: %d/%d\n", stats.Size, stats.MaxSize)
fmt.Printf(" Hits: %d, Misses: %d\n", stats.Hits, stats.Misses)
fmt.Printf(" Hit rate: %.2f%%\n", stats.HitRate)
fmt.Printf(" Avg hit time: %v\n", stats.AvgHitTime)
fmt.Printf(" Avg miss time: %v\n", stats.AvgMissTime)
// Verify results are identical
fmt.Printf("\nResults match: %v\n", output1.Result == output2.Result)
func NewCachedAgent ¶
func NewCachedAgent(agent core.Agent, config CacheConfig) *CachedAgent
NewCachedAgent 创建新的缓存 Agent
func (*CachedAgent) Batch ¶
func (c *CachedAgent) Batch(ctx context.Context, inputs []*core.AgentInput) ([]*core.AgentOutput, error)
Batch 批量执行 Agent(委托给内部 agent)
func (*CachedAgent) Capabilities ¶
func (c *CachedAgent) Capabilities() []string
Capabilities 返回 Agent 能力列表
func (*CachedAgent) Description ¶
func (c *CachedAgent) Description() string
Description 返回 Agent 描述
func (*CachedAgent) Invalidate ¶
func (c *CachedAgent) Invalidate(input *core.AgentInput)
Invalidate 失效指定缓存键
func (*CachedAgent) Invoke ¶
func (c *CachedAgent) Invoke(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
Invoke 执行 Agent(带缓存)
func (*CachedAgent) Pipe ¶
func (c *CachedAgent) Pipe(next core.Runnable[*core.AgentOutput, any]) core.Runnable[*core.AgentInput, any]
Pipe 连接到另一个 Runnable(委托给内部 agent)
func (*CachedAgent) Stream ¶
func (c *CachedAgent) Stream(ctx context.Context, input *core.AgentInput) (<-chan core.StreamChunk[*core.AgentOutput], error)
Stream 流式执行 Agent(委托给内部 agent)
func (*CachedAgent) WithCallbacks ¶
func (c *CachedAgent) WithCallbacks(callbacks ...core.Callback) core.Runnable[*core.AgentInput, *core.AgentOutput]
WithCallbacks 添加回调处理器(委托给内部 agent)
func (*CachedAgent) WithConfig ¶
func (c *CachedAgent) WithConfig(config core.RunnableConfig) core.Runnable[*core.AgentInput, *core.AgentOutput]
WithConfig 配置 Agent(委托给内部 agent)
type DataPoolStats ¶
type DataPoolStats struct {
InputGetCount int64 // AgentInput Get 次数
InputPutCount int64 // AgentInput Put 次数
OutputGetCount int64 // AgentOutput Get 次数
OutputPutCount int64 // AgentOutput Put 次数
ReasoningGetCount int64 // ReasoningStep Get 次数
ReasoningPutCount int64 // ReasoningStep Put 次数
ToolCallGetCount int64 // ToolCall Get 次数
ToolCallPutCount int64 // ToolCall Put 次数
MapGetCount int64 // Map Get 次数
MapPutCount int64 // Map Put 次数
SliceGetCount int64 // Slice Get 次数
SlicePutCount int64 // Slice Put 次数
PoolHitRate float64 // 池命中率 (%)
}
DataPoolStats 数据池统计信息
type DataPools ¶
type DataPools struct {
// contains filtered or unexported fields
}
DataPools 数据对象池集合
提供 Agent 执行过程中频繁分配的数据对象的复用,显著减轻 GC 压力。 实现零分配(Zero Allocation)目标,特别是在关键热路径上。
性能目标:
- Memory per agent: < 50MB
- Zero allocation in critical paths
- GC pressure reduction: > 80%
func (*DataPools) GetAgentInput ¶
func (p *DataPools) GetAgentInput() *core.AgentInput
GetAgentInput 从池中获取 AgentInput
使用完毕后必须调用 PutAgentInput 归还到池中
func (*DataPools) GetAgentOutput ¶
func (p *DataPools) GetAgentOutput() *core.AgentOutput
GetAgentOutput 从池中获取 AgentOutput
使用完毕后必须调用 PutAgentOutput 归还到池中
func (*DataPools) GetContextMap ¶
GetContextMap 从池中获取 Context map
func (*DataPools) GetMetadataMap ¶
GetMetadataMap 从池中获取 Metadata map
func (*DataPools) GetReasoningSlice ¶
func (p *DataPools) GetReasoningSlice() []core.ReasoningStep
GetReasoningSlice 从池中获取 ReasoningStep slice
func (*DataPools) GetReasoningStep ¶
func (p *DataPools) GetReasoningStep() *core.ReasoningStep
GetReasoningStep 从池中获取 ReasoningStep
func (*DataPools) GetStringSlice ¶
GetStringSlice 从池中获取 string slice
func (*DataPools) GetToolCall ¶
GetToolCall 从池中获取 ToolCall
func (*DataPools) GetToolCallSlice ¶
GetToolCallSlice 从池中获取 ToolCall slice
func (*DataPools) PutAgentInput ¶
func (p *DataPools) PutAgentInput(input *core.AgentInput)
PutAgentInput 将 AgentInput 归还到池中
会重置所有字段,准备下次复用
func (*DataPools) PutAgentOutput ¶
func (p *DataPools) PutAgentOutput(output *core.AgentOutput)
PutAgentOutput 将 AgentOutput 归还到池中
会重置所有字段,并使用切片零分配技巧 (slice[:0])
func (*DataPools) PutContextMap ¶
PutContextMap 将 Context map 归还到池中
func (*DataPools) PutMetadataMap ¶
PutMetadataMap 将 Metadata map 归还到池中
func (*DataPools) PutReasoningSlice ¶
func (p *DataPools) PutReasoningSlice(s []core.ReasoningStep)
PutReasoningSlice 将 ReasoningStep slice 归还到池中
func (*DataPools) PutReasoningStep ¶
func (p *DataPools) PutReasoningStep(step *core.ReasoningStep)
PutReasoningStep 将 ReasoningStep 归还到池中
func (*DataPools) PutStringSlice ¶
PutStringSlice 将 string slice 归还到池中
func (*DataPools) PutToolCall ¶
PutToolCall 将 ToolCall 归还到池中
func (*DataPools) PutToolCallSlice ¶
PutToolCallSlice 将 ToolCall slice 归还到池中
type ErrorPolicy ¶
type ErrorPolicy string
ErrorPolicy 错误处理策略
const ( // ErrorPolicyFailFast 快速失败(遇到第一个错误就停止) ErrorPolicyFailFast ErrorPolicy = "fail_fast" // ErrorPolicyContinue 继续执行(收集所有错误) ErrorPolicyContinue ErrorPolicy = "continue" )
type ExecutorStats ¶
type ExecutorStats struct {
TotalExecutions int64 // 总执行次数
TotalTasks int64 // 总任务数
SuccessTasks int64 // 成功任务数
FailedTasks int64 // 失败任务数
AvgTasksPerExec float64 // 平均每次执行的任务数
SuccessRate float64 // 成功率百分比
AvgDuration time.Duration // 平均执行时间
}
ExecutorStats 执行器统计信息
type MetricsCollector ¶
type MetricsCollector interface {
RecordPoolGet(poolType PoolType, latency time.Duration)
RecordPoolPut(poolType PoolType, latency time.Duration)
RecordPoolHit(poolType PoolType)
RecordPoolMiss(poolType PoolType)
}
MetricsCollector 指标收集器接口
type MetricsPoolStrategy ¶
type MetricsPoolStrategy struct {
// contains filtered or unexported fields
}
MetricsPoolStrategy 带指标收集的池策略
func NewMetricsPoolStrategy ¶
func NewMetricsPoolStrategy(base PoolStrategy, collector MetricsCollector) *MetricsPoolStrategy
NewMetricsPoolStrategy 创建带指标的池策略
func (*MetricsPoolStrategy) PostPut ¶
func (s *MetricsPoolStrategy) PostPut(poolType PoolType)
PostPut 归还后钩子(记录指标)
func (*MetricsPoolStrategy) PreGet ¶
func (s *MetricsPoolStrategy) PreGet(poolType PoolType)
PreGet 获取前钩子(记录指标)
func (*MetricsPoolStrategy) ShouldPool ¶
func (s *MetricsPoolStrategy) ShouldPool(poolType PoolType, size int) bool
ShouldPool 决定是否池化(委托给基础策略)
type ObjectPoolStats ¶
type ObjectPoolStats struct {
Gets atomic.Int64 // 获取次数
Puts atomic.Int64 // 归还次数
News atomic.Int64 // 新建次数
Current atomic.Int64 // 当前池中对象数(估算)
}
ObjectPoolStats 对象池统计信息(内部使用,包含原子操作)
type ObjectPoolStatsSnapshot ¶
type ObjectPoolStatsSnapshot struct {
Gets int64 // 获取次数
Puts int64 // 归还次数
News int64 // 新建次数
Current int64 // 当前池中对象数(估算)
}
ObjectPoolStatsSnapshot 对象池统计信息快照(用于返回和显示)
type OptimizedAgentPool ¶
type OptimizedAgentPool struct {
// contains filtered or unexported fields
}
OptimizedAgentPool 优化的 Agent 池
性能优化: - O(1) Acquire/Release(基于 channel 空闲队列) - 无锁空闲队列(channel 本身是并发安全的) - Map 实现 O(1) Agent 查找 - 细粒度锁减少竞争
适用场景: - 高并发场景(1000+ concurrent agents) - 频繁的 Acquire/Release 操作 - 对延迟敏感的应用
func NewOptimizedAgentPool ¶
func NewOptimizedAgentPool(factory AgentFactory, config PoolConfig) (*OptimizedAgentPool, error)
NewOptimizedAgentPool 创建优化的 Agent 池
func (*OptimizedAgentPool) Execute ¶
func (p *OptimizedAgentPool) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
Execute 执行 Agent(自动借用和归还)
type PoolAgent ¶
type PoolAgent struct {
// contains filtered or unexported fields
}
PoolAgent 池代理 - Agent 模式实现
func (*PoolAgent) Configure ¶
func (a *PoolAgent) Configure(config *PoolManagerConfig) error
Configure 配置管理器
func (*PoolAgent) DisablePool ¶
DisablePool 禁用指定池
func (*PoolAgent) Execute ¶
func (a *PoolAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
Execute Agent 模式执行接口
func (*PoolAgent) GetAgentInput ¶
func (a *PoolAgent) GetAgentInput() *core.AgentInput
GetAgentInput 获取 AgentInput
func (*PoolAgent) GetAgentOutput ¶
func (a *PoolAgent) GetAgentOutput() *core.AgentOutput
GetAgentOutput 获取 AgentOutput
func (*PoolAgent) GetAllStats ¶
func (a *PoolAgent) GetAllStats() map[PoolType]*ObjectPoolStatsSnapshot
GetAllStats 获取所有池的统计信息
func (*PoolAgent) GetMessage ¶
func (a *PoolAgent) GetMessage() *interfaces.Message
GetMessage 获取 Message
func (*PoolAgent) GetStats ¶
func (a *PoolAgent) GetStats(poolType PoolType) *ObjectPoolStatsSnapshot
GetStats 获取指定池的统计信息
func (*PoolAgent) GetToolInput ¶
func (a *PoolAgent) GetToolInput() *interfaces.ToolInput
GetToolInput 获取 ToolInput
func (*PoolAgent) GetToolOutput ¶
func (a *PoolAgent) GetToolOutput() *interfaces.ToolOutput
GetToolOutput 获取 ToolOutput
func (*PoolAgent) IsPoolEnabled ¶
IsPoolEnabled 检查池是否启用
func (*PoolAgent) PutAgentInput ¶
func (a *PoolAgent) PutAgentInput(input *core.AgentInput)
PutAgentInput 归还 AgentInput
func (*PoolAgent) PutAgentOutput ¶
func (a *PoolAgent) PutAgentOutput(output *core.AgentOutput)
PutAgentOutput 归还 AgentOutput
func (*PoolAgent) PutMessage ¶
func (a *PoolAgent) PutMessage(msg *interfaces.Message)
PutMessage 归还 Message
func (*PoolAgent) PutToolInput ¶
func (a *PoolAgent) PutToolInput(input *interfaces.ToolInput)
PutToolInput 归还 ToolInput
func (*PoolAgent) PutToolOutput ¶
func (a *PoolAgent) PutToolOutput(output *interfaces.ToolOutput)
PutToolOutput 归还 ToolOutput
type PoolConfig ¶
type PoolConfig struct {
// InitialSize 初始池大小
InitialSize int
// MaxSize 最大池大小
MaxSize int
// IdleTimeout 空闲超时时间(超过此时间的空闲 Agent 将被回收)
IdleTimeout time.Duration
// MaxLifetime Agent 最大生命周期(超过此时间的 Agent 将被销毁)
MaxLifetime time.Duration
// AcquireTimeout 获取 Agent 超时时间
AcquireTimeout time.Duration
// CleanupInterval 清理间隔
CleanupInterval time.Duration
}
PoolConfig 池配置
type PoolManager ¶
type PoolManager interface {
// GetBuffer 获取 ByteBuffer
GetBuffer() *bytes.Buffer
PutBuffer(buf *bytes.Buffer)
// GetMessage 获取 Message
GetMessage() *interfaces.Message
PutMessage(msg *interfaces.Message)
// GetToolInput 获取 ToolInput
GetToolInput() *interfaces.ToolInput
PutToolInput(input *interfaces.ToolInput)
// GetToolOutput 获取 ToolOutput
GetToolOutput() *interfaces.ToolOutput
PutToolOutput(output *interfaces.ToolOutput)
// GetAgentInput 获取 AgentInput
GetAgentInput() *core.AgentInput
PutAgentInput(input *core.AgentInput)
// GetAgentOutput 获取 AgentOutput
GetAgentOutput() *core.AgentOutput
PutAgentOutput(output *core.AgentOutput)
// Configuration 配置管理
Configure(config *PoolManagerConfig) error
GetConfig() *PoolManagerConfig
EnablePool(poolType PoolType)
DisablePool(poolType PoolType)
IsPoolEnabled(poolType PoolType) bool
// Statistics 统计信息
GetStats(poolType PoolType) *ObjectPoolStatsSnapshot
GetAllStats() map[PoolType]*ObjectPoolStatsSnapshot
ResetStats()
// Lifecycle
Close() error
}
PoolManager 对象池管理器接口
func CreateIsolatedPoolManager ¶
func CreateIsolatedPoolManager(config *PoolManagerConfig) PoolManager
CreateIsolatedPoolManager 创建隔离的池管理器 用于测试或特定场景,不影响全局状态
func GetDefaultPoolManager ¶
func GetDefaultPoolManager() PoolManager
GetDefaultPoolManager 获取默认池管理器
type PoolManagerAgent ¶
type PoolManagerAgent struct {
// contains filtered or unexported fields
}
PoolManagerAgent 将 PoolAgent 包装为真正的 Agent
func NewPoolManagerAgent ¶
func NewPoolManagerAgent(name string, config *PoolManagerConfig) *PoolManagerAgent
NewPoolManagerAgent 创建池管理器 Agent
func (*PoolManagerAgent) Capabilities ¶
func (a *PoolManagerAgent) Capabilities() []string
Capabilities 返回能力列表
func (*PoolManagerAgent) Description ¶
func (a *PoolManagerAgent) Description() string
Description 返回描述
func (*PoolManagerAgent) Execute ¶
func (a *PoolManagerAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
Execute 执行 Agent
type PoolManagerConfig ¶
type PoolManagerConfig struct {
// 启用配置
EnabledPools map[PoolType]bool
// 池大小限制
MaxBufferSize int // ByteBuffer 最大大小 (默认 64KB)
MaxMapSize int // Map 最大键数 (默认 100)
MaxSliceSize int // Slice 最大容量 (默认 100)
// 策略配置
UseStrategy PoolStrategy // 可选:自定义策略
}
PoolManagerConfig 池管理器配置
func DefaultPoolManagerConfig ¶
func DefaultPoolManagerConfig() *PoolManagerConfig
DefaultPoolManagerConfig 默认配置
type PoolStats ¶
type PoolStats struct {
TotalCount int // 当前池中 Agent 总数
ActiveCount int // 正在使用的 Agent 数
IdleCount int // 空闲 Agent 数
MaxSize int // 最大池大小
CreatedTotal int64 // 创建的 Agent 总数
AcquiredTotal int64 // 获取的 Agent 总数
ReleasedTotal int64 // 释放的 Agent 总数
RecycledTotal int64 // 回收的 Agent 总数
WaitCount int64 // 等待次数
AvgWaitTime time.Duration // 平均等待时间
UtilizationPct float64 // 利用率百分比
}
PoolStats 池统计信息
type PoolStrategy ¶
type PoolStrategy interface {
// ShouldPool 决定是否应该池化对象
ShouldPool(poolType PoolType, size int) bool
// PreGet 获取对象前的钩子
PreGet(poolType PoolType)
// PostPut 归还对象后的钩子
PostPut(poolType PoolType)
}
PoolStrategy 池策略接口
type PooledAgentInput ¶
type PooledAgentInput struct {
Input *core.AgentInput
// contains filtered or unexported fields
}
PooledAgentInput 池化的 AgentInput 辅助结构
自动管理对象的生命周期,使用 defer 自动归还
func NewPooledAgentInput ¶
func NewPooledAgentInput(pool *DataPools) *PooledAgentInput
NewPooledAgentInput 创建池化的 AgentInput
使用示例:
input := NewPooledAgentInput(pool) defer input.Release() // 使用 input.Input
type PooledAgentOutput ¶
type PooledAgentOutput struct {
Output *core.AgentOutput
// contains filtered or unexported fields
}
PooledAgentOutput 池化的 AgentOutput 辅助结构
自动管理对象的生命周期,使用 defer 自动归还
func NewPooledAgentOutput ¶
func NewPooledAgentOutput(pool *DataPools) *PooledAgentOutput
NewPooledAgentOutput 创建池化的 AgentOutput
使用示例:
output := NewPooledAgentOutput(pool) defer output.Release() // 使用 output.Output
func (*PooledAgentOutput) Release ¶
func (p *PooledAgentOutput) Release()
Release 释放 AgentOutput 回池中
type PriorityPoolStrategy ¶
type PriorityPoolStrategy struct {
// contains filtered or unexported fields
}
PriorityPoolStrategy 优先级池策略 根据池类型的优先级决定池化行为
func NewPriorityPoolStrategy ¶
func NewPriorityPoolStrategy(config *PoolManagerConfig, maxPools int) *PriorityPoolStrategy
NewPriorityPoolStrategy 创建优先级池策略
func (*PriorityPoolStrategy) PostPut ¶
func (s *PriorityPoolStrategy) PostPut(poolType PoolType)
func (*PriorityPoolStrategy) PreGet ¶
func (s *PriorityPoolStrategy) PreGet(poolType PoolType)
func (*PriorityPoolStrategy) ShouldPool ¶
func (s *PriorityPoolStrategy) ShouldPool(poolType PoolType, size int) bool
ShouldPool 根据优先级决定是否池化
type ScenarioBasedStrategy ¶
type ScenarioBasedStrategy struct {
// contains filtered or unexported fields
}
ScenarioBasedStrategy 场景驱动的池策略
func NewScenarioBasedStrategy ¶
func NewScenarioBasedStrategy(config *PoolManagerConfig) *ScenarioBasedStrategy
NewScenarioBasedStrategy 创建场景驱动策略
func (*ScenarioBasedStrategy) PostPut ¶
func (s *ScenarioBasedStrategy) PostPut(poolType PoolType)
func (*ScenarioBasedStrategy) PreGet ¶
func (s *ScenarioBasedStrategy) PreGet(poolType PoolType)
func (*ScenarioBasedStrategy) SetScenario ¶
func (s *ScenarioBasedStrategy) SetScenario(scenario ScenarioType)
SetScenario 设置当前场景
func (*ScenarioBasedStrategy) ShouldPool ¶
func (s *ScenarioBasedStrategy) ShouldPool(poolType PoolType, size int) bool
ShouldPool 根据场景决定是否池化
type ScenarioType ¶
type ScenarioType string
ScenarioType 场景类型
const ( ScenarioLLMCalls ScenarioType = "llm_calls" ScenarioToolExec ScenarioType = "tool_execution" ScenarioJSONProcess ScenarioType = "json_processing" ScenarioStreaming ScenarioType = "streaming" ScenarioGeneral ScenarioType = "general" )