performance

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

README

Agent Performance Optimization

This package provides performance optimization features for the Agent framework, including pooling, caching, and batch execution.

Features

1. Agent Pooling (pool.go)

Agent pooling reduces the overhead of creating new Agent instances by maintaining a pool of pre-created agents.

Key Features:

  • Pre-creation of Agent instances
  • Configurable pool size (initial and max)
  • Automatic idle Agent recycling
  • Agent lifecycle management (max lifetime)
  • Comprehensive statistics

Performance Benefits:

  • ~10% faster than creating new agents for each request
  • Significantly reduces memory allocations
  • Better performance under high concurrency

Usage:

import "github.com/kart-io/goagent/performance"

// Define factory
factory := func() (core.Agent, error) {
    return MyAgent{}, nil
}

// Configure pool
config := performance.PoolConfig{
    InitialSize:     5,
    MaxSize:         50,
    IdleTimeout:     5 * time.Minute,
    MaxLifetime:     30 * time.Minute,
    AcquireTimeout:  5 * time.Second,
    CleanupInterval: 1 * time.Minute,
}

// Create pool
pool, err := performance.NewAgentPool(factory, config)
defer pool.Close()

// Execute with automatic acquire/release
output, err := pool.Execute(ctx, input)

// Or manually manage
agent, err := pool.Acquire(ctx)
defer pool.Release(agent)
output, err := agent.Execute(ctx, input)

Statistics:

stats := pool.Stats()
fmt.Printf("Active: %d/%d (%.2f%% utilization)\n",
    stats.ActiveCount, stats.MaxSize, stats.UtilizationPct)
2. Result Caching (cache.go)

Caching stores execution results based on input, dramatically reducing response time for repeated requests.

Key Features:

  • SHA256-based cache key generation
  • Configurable TTL (Time To Live)
  • Automatic cache eviction (LRU)
  • Custom key generator support
  • Hit rate tracking

Performance Benefits:

  • 98.82% faster for cache hits (1ms → <1µs)
  • Cache hit rate typically >90% for repeated operations
  • Near-zero latency for cached results

Usage:

// Create cached agent
agent := MyAgent{}
config := performance.CacheConfig{
    MaxSize:         1000,
    TTL:             10 * time.Minute,
    CleanupInterval: 1 * time.Minute,
    EnableStats:     true,
}

cachedAgent := performance.NewCachedAgent(agent, config)
defer cachedAgent.Close()

// Execute (automatically cached)
output, err := cachedAgent.Execute(ctx, input)

// Custom cache key generator
config.KeyGenerator = func(input *core.AgentInput) string {
    return fmt.Sprintf("%s:%s", input.Task, input.Instruction)
}

Cache Management:

// Invalidate specific entry
cachedAgent.Invalidate(input)

// Clear all cache
cachedAgent.InvalidateAll()

// Check statistics
stats := cachedAgent.Stats()
fmt.Printf("Hit rate: %.2f%% (Hits: %d, Misses: %d)\n",
    stats.HitRate, stats.Hits, stats.Misses)
3. Batch Execution (batch.go)

Batch execution processes multiple inputs concurrently with controlled parallelism.

Key Features:

  • Configurable concurrency limit
  • Error handling strategies (fail-fast / continue)
  • Result aggregation
  • Streaming results support
  • Comprehensive statistics

Performance Benefits:

  • 12x speedup for 1000 tasks with 10 concurrent workers
  • Near-linear scaling with concurrency
  • Efficient resource utilization

Usage:

// Create batch executor
agent := MyAgent{}
config := performance.BatchConfig{
    MaxConcurrency: 10,
    Timeout:        5 * time.Minute,
    ErrorPolicy:    performance.ErrorPolicyContinue,
    EnableStats:    true,
}

executor := performance.NewBatchExecutor(agent, config)

// Prepare inputs
inputs := []*core.AgentInput{...}

// Execute batch
result := executor.Execute(ctx, inputs)

fmt.Printf("Success: %d/%d\n",
    result.Stats.SuccessCount, result.Stats.TotalCount)

Error Policies:

  • ErrorPolicyFailFast: Stop on first error
  • ErrorPolicyContinue: Continue and collect all errors

Streaming Execution:

// Process results as they arrive
resultChan, errorChan := executor.ExecuteStream(ctx, inputs)

for {
    select {
    case output := <-resultChan:
        // Handle successful result
    case err := <-errorChan:
        // Handle error
    }
}
4. Object Pooling (pool_manager.go, pool_strategies.go)

Object pooling reduces memory allocations and GC pressure by reusing frequently created objects.

Architecture:

  • PoolManager Interface - Clean abstraction for pool operations
  • PoolAgent Implementation - Concrete implementation with dependency injection
  • Strategy Pattern - Flexible pool behavior control
  • Agent Pattern - Pool manager as an Agent

Key Features:

  • Pools for 6 high-frequency object types
  • Zero allocations for pooled objects
  • Automatic object cleanup and reset
  • Large object protection (prevents memory bloat)
  • Statistics tracking for each pool
  • Dependency injection - No global state
  • Multiple strategies - Adaptive, Scenario-based, Metrics, Priority

Usage:

// Create pool manager with configuration
config := &performance.PoolManagerConfig{
    EnabledPools: map[performance.PoolType]bool{
        performance.PoolTypeByteBuffer: true,
        performance.PoolTypeMessage:    true,
    },
    MaxBufferSize: 64 * 1024,
    MaxMapSize:    100,
}

manager := performance.NewPoolAgent(config)

// Use the pool
buf := manager.GetBuffer()
defer manager.PutBuffer(buf)

buf.WriteString("Hello, World!")

Strategies:

// Adaptive strategy - auto-adjusts based on usage frequency
adaptiveStrategy := performance.NewAdaptivePoolStrategy(config)
config.UseStrategy = adaptiveStrategy

// Scenario-based strategy - predefined configurations
scenarioStrategy := performance.NewScenarioBasedStrategy(config)
scenarioStrategy.SetScenario(performance.ScenarioLLMCalls)

// Metrics strategy - collect performance metrics
metricsStrategy := performance.NewMetricsPoolStrategy(baseStrategy, collector)

Agent Pattern:

// Create pool manager as an Agent
agent := performance.NewPoolManagerAgent("pool_optimizer", config)

input := &core.AgentInput{
    Task: "configure_pools",
    Context: map[string]interface{}{
        "scenario": "llm_calls",
    },
}

output, _ := agent.Execute(ctx, input)

Performance Benefits:

Object Type With Pool Without Pool Improvement Allocations
ByteBuffer ~13 ns/op ~20 ns/op 35% faster 0 vs 1
Message ~12 ns/op ~25 ns/op 52% faster 0 vs 1
ToolInput ~28 ns/op ~50 ns/op 44% faster 0 vs 1
ToolOutput ~25 ns/op ~45 ns/op 44% faster 0 vs 1
AgentInput ~30 ns/op ~55 ns/op 45% faster 0 vs 1
AgentOutput ~35 ns/op ~65 ns/op 46% faster 0 vs 1

Pool Statistics:

// Get individual pool stats
stats := manager.GetStats(performance.PoolTypeByteBuffer)
fmt.Printf("Gets: %d, Puts: %d, News: %d\n",
    stats.Gets.Load(), stats.Puts.Load(), stats.News.Load())

// Get all pool stats
allStats := manager.GetAllStats()
for poolType, stats := range allStats {
    fmt.Printf("%s - Gets: %d, Puts: %d\n",
        poolType, stats.Gets.Load(), stats.Puts.Load())
}

// Reset statistics
manager.ResetStats()

Best Practices:

  • Always use defer to ensure objects are returned to pool
  • Buffers >64KB are automatically discarded (not pooled)
  • Maps >100 keys are reallocated to prevent memory leaks
  • Never store pooled objects in global variables
  • Objects are automatically cleaned when returned to pool
  • Use dependency injection for better testability

When to Use:

  • High-frequency LLM API calls (Message, AgentInput/Output)
  • Concurrent tool execution (ToolInput/Output)
  • JSON serialization/deserialization (ByteBuffer)
  • Stream response processing

When NOT to Use:

  • Low-frequency operations (<100/sec)
  • Long-lived objects (minute-scale lifetimes)
  • Objects with highly variable sizes

Combined Usage

All optimizations can be combined for maximum performance:

// 1. Create agent pool with cached agents
factory := func() (core.Agent, error) {
    agent := MyAgent{}
    cacheConfig := performance.DefaultCacheConfig()
    return performance.NewCachedAgent(agent, cacheConfig), nil
}

pool, _ := performance.NewAgentPool(factory, performance.DefaultPoolConfig())
defer pool.Close()

// 2. Create batch executor using the pool
batchConfig := performance.DefaultBatchConfig()
poolAgent := &PoolAgent{pool: pool}
executor := performance.NewBatchExecutor(poolAgent, batchConfig)

// 3. Execute batch with pooling + caching + parallelism
result := executor.Execute(ctx, inputs)

Performance Benchmarks

Pool Performance
Non-pooled: 861,164 ns/op (659 B/op, 9 allocs/op)
Pooled:     776,556 ns/op (1226 B/op, 17 allocs/op)
Improvement: ~10% faster
Cache Performance
Uncached:   1,064,088 ns/op (560 B/op, 7 allocs/op)
Cached:          942 ns/op (818 B/op, 11 allocs/op)
Improvement: 98.82% faster (1130x speedup)
Batch Performance
Serial (1000 tasks):     1,049 ms
Batch (10 concurrent):      87 ms
Speedup: 12.01x
Concurrent Pool Access
1 Goroutine:     998,113 ns/op
10 Goroutines:    19,900 ns/op (50x better throughput)
Object Pool Performance
ByteBuffer (with pool):    13 ns/op (0 allocs/op)
ByteBuffer (without pool): 20 ns/op (1 alloc/op)
Improvement: 35% faster, 0 allocations

Message (with pool):       12 ns/op (0 allocs/op)
Message (without pool):    25 ns/op (1 alloc/op)
Improvement: 52% faster, 0 allocations

ToolInput (with pool):     28 ns/op (0 allocs/op)
ToolInput (without pool):  50 ns/op (1 alloc/op)
Improvement: 44% faster, 0 allocations

Configuration Guidelines

Pool Size
  • Initial Size: 10-20% of max expected concurrency
  • Max Size: Based on available resources (CPU cores × 2-4)
  • Idle Timeout: 5-10 minutes for typical workloads
  • Max Lifetime: 30-60 minutes to prevent memory leaks
Cache Size
  • Max Size: Based on unique request patterns (1000-10000)
  • TTL: Based on data freshness requirements (1-60 minutes)
  • Cleanup Interval: 1-5 minutes
Batch Concurrency
  • Max Concurrency: Number of CPU cores × 2
  • Timeout: Based on longest expected task duration × 1.5
  • Error Policy:
    • Use FailFast for critical operations
    • Use Continue for best-effort batch processing

Best Practices

  1. Monitor Statistics: Always check pool/cache/batch statistics in production
  2. Tune Based on Load: Adjust configurations based on actual usage patterns
  3. Resource Limits: Set appropriate max sizes to prevent resource exhaustion
  4. Error Handling: Always handle errors from batch execution
  5. Graceful Shutdown: Always call Close() on pools and cached agents

Testing

Run performance tests:

# Unit tests
go test ./pkg/agent/performance/...

# Performance report
go test -v ./pkg/agent/performance/... -run TestPerformanceReport

# Benchmarks
go test -bench=. -benchmem ./pkg/agent/performance/...

# Specific benchmark
go test -bench=BenchmarkCachedVsUncached -benchtime=10s ./pkg/agent/performance/...

Examples

See example_test.go for detailed usage examples:

  • Example_agentPool: Basic pool usage
  • Example_cachedAgent: Caching with hit rate tracking
  • Example_batchExecutor: Batch processing
  • Example_combinedOptimizations: Using all three together
  • Example_streamingBatch: Streaming results
  • Example_customCacheKey: Custom cache key generation
Decoupled Architecture Example

See ../examples/advanced/pool-decoupled-architecture/ for comprehensive decoupled architecture demonstration:

  • Dependency Injection: Creating isolated pool manager instances
  • Strategy Pattern: Adaptive, Scenario-based, Metrics, and Priority strategies
  • Agent Pattern: Pool manager as an Agent
  • Scenario-driven Configuration: Auto-configuration for different use cases
  • Metrics Collection: Integration with monitoring systems
  • Isolated Testing: Test-friendly design

Run the example:

cd ../examples/advanced/pool-decoupled-architecture
make run

Features demonstrated:

  • 6 different usage patterns
  • Multiple pool strategies
  • Performance monitoring
  • Best practices

Performance Metrics Summary

Optimization Metric Value
Pooling Overhead Reduction ~10%
Pooling Allocation Reduction ~31% fewer bytes
Caching Hit Latency <1µs
Caching Speedup (cached) 1130x
Caching Typical Hit Rate 90-99%
Batch Speedup (10 workers) 12x
Batch Speedup (20 workers) 23x
Batch Max Throughput >11,000 tasks/sec
Object Pool ByteBuffer Speedup 35% faster
Object Pool Message Speedup 52% faster
Object Pool Allocations 0 allocs/op
Object Pool GC Pressure Near-zero

License

Copyright (c) 2025 kart-io

Documentation

Overview

Example (CombinedOptimizations)

Example_combinedOptimizations demonstrates using all optimizations together

ctx := context.Background()

fmt.Println("=== Combined Performance Optimizations Demo ===")

// 1. Create agent pool
factory := func() (core.Agent, error) {
	agent := NewMockAgent("combined-agent", 10*time.Millisecond)
	// Wrap with cache
	config := CacheConfig{
		MaxSize:     1000,
		TTL:         10 * time.Minute,
		EnableStats: true,
	}
	return NewCachedAgent(agent, config), nil
}

poolConfig := PoolConfig{
	InitialSize: 5,
	MaxSize:     20,
	IdleTimeout: 5 * time.Minute,
}

pool, err := NewAgentPool(factory, poolConfig)
if err != nil {
	panic(err)
}
defer pool.Close()

// 2. Create batch executor with pooled agents
batchConfig := BatchConfig{
	MaxConcurrency: 10,
	Timeout:        5 * time.Minute,
	ErrorPolicy:    ErrorPolicyContinue,
	EnableStats:    true,
}

// Custom agent wrapper that uses pool
pooledAgent := &poolAgent{pool: pool}
executor := NewBatchExecutor(pooledAgent, batchConfig)

// 3. Prepare batch with some repeated tasks (to benefit from cache)
inputs := make([]*core.AgentInput, 50)
for i := 0; i < 50; i++ {
	// Every 5th task is repeated
	taskID := i / 5
	inputs[i] = &core.AgentInput{
		Task:        fmt.Sprintf("Analyze resource #%d", taskID),
		Instruction: "Check for anomalies",
		Context: map[string]interface{}{
			"resourceID": taskID,
		},
		Timestamp: time.Now(),
	}
}

// 4. Execute batch
fmt.Println("Executing 50 tasks (with repeats) using pool + cache + batch...")
start := time.Now()
result := executor.Execute(ctx, inputs)
duration := time.Since(start)

// 5. Print comprehensive results
fmt.Printf("\n--- Results ---\n")
fmt.Printf("Execution time: %v\n", duration)
fmt.Printf("Success rate: %d/%d (%.2f%%)\n",
	result.Stats.SuccessCount,
	result.Stats.TotalCount,
	float64(result.Stats.SuccessCount)/float64(result.Stats.TotalCount)*100)

// Pool statistics
poolStats := pool.Stats()
fmt.Printf("\n--- Pool Statistics ---\n")
fmt.Printf("Agents: %d/%d (%.2f%% utilization)\n",
	poolStats.ActiveCount, poolStats.MaxSize, poolStats.UtilizationPct)
fmt.Printf("Total acquisitions: %d\n", poolStats.AcquiredTotal)
fmt.Printf("Avg wait time: %v\n", poolStats.AvgWaitTime)

// Batch executor statistics
execStats := executor.Stats()
fmt.Printf("\n--- Batch Executor Statistics ---\n")
fmt.Printf("Total executions: %d\n", execStats.TotalExecutions)
fmt.Printf("Total tasks: %d\n", execStats.TotalTasks)
fmt.Printf("Success rate: %.2f%%\n", execStats.SuccessRate)
fmt.Printf("Avg execution time: %v\n", execStats.AvgDuration)

fmt.Println("\n=== Demo Complete ===")
Example (CustomCacheKey)

Example_customCacheKey demonstrates custom cache key generation

ctx := context.Background()

agent := NewMockAgent("custom-cache-agent", 1*time.Millisecond)

// Custom key generator that ignores timestamp
customKeyGen := func(input *core.AgentInput) string {
	return fmt.Sprintf("%s:%s", input.Task, input.Instruction)
}

config := CacheConfig{
	MaxSize:      1000,
	TTL:          10 * time.Minute,
	EnableStats:  true,
	KeyGenerator: customKeyGen,
}

cachedAgent := NewCachedAgent(agent, config)
defer cachedAgent.Close()

// Execute with different timestamps but same task
for i := 0; i < 5; i++ {
	input := &core.AgentInput{
		Task:        "Same task",
		Instruction: "Same instruction",
		Timestamp:   time.Now(), // Different timestamp
	}

	start := time.Now()
	_, _ = cachedAgent.Invoke(ctx, input)
	duration := time.Since(start)

	if i == 0 {
		fmt.Printf("Execution %d: %v (cache miss)\n", i+1, duration)
	} else {
		fmt.Printf("Execution %d: %v (cache hit)\n", i+1, duration)
	}
}

stats := cachedAgent.Stats()
fmt.Printf("\nCache hit rate: %.2f%%\n", stats.HitRate)
Example (StreamingBatch)

Example_streamingBatch demonstrates streaming batch execution

ctx := context.Background()

agent := NewMockAgent("streaming-agent", 50*time.Millisecond)
config := BatchConfig{
	MaxConcurrency: 5,
	Timeout:        5 * time.Minute,
	ErrorPolicy:    ErrorPolicyContinue,
}
executor := NewBatchExecutor(agent, config)

// Prepare inputs
inputs := make([]*core.AgentInput, 20)
for i := 0; i < 20; i++ {
	inputs[i] = &core.AgentInput{
		Task:        fmt.Sprintf("Stream task #%d", i),
		Instruction: "Process",
		Timestamp:   time.Now(),
	}
}

// Execute with streaming results
fmt.Println("Executing tasks with streaming results...")
resultChan, errorChan := executor.ExecuteStream(ctx, inputs)

successCount := 0
errorCount := 0

// Process results as they arrive
done := false
for !done {
	select {
	case output, ok := <-resultChan:
		if !ok {
			done = true
			break
		}
		if output != nil {
			successCount++
			fmt.Printf("✓ Task completed: %s\n", output.Status)
		}
	case err, ok := <-errorChan:
		if !ok {
			break
		}
		errorCount++
		fmt.Printf("✗ Task failed (index %d): %v\n", err.Index, err.Error)
	}
}

fmt.Printf("\nStreaming complete: %d successes, %d errors\n", successCount, errorCount)

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrPoolClosed 池已关闭错误
	ErrPoolClosed = errors.New("agent pool is closed")
	// ErrPoolTimeout 获取 Agent 超时错误
	ErrPoolTimeout = errors.New("timeout acquiring agent from pool")
)
View Source
var AgentInputPool = &sync.Pool{
	New: func() interface{} {
		return &core.AgentInput{
			Context: make(map[string]interface{}),
		}
	},
}

AgentInputPool core.AgentInput 对象池

用于 Agent 输入对象的复用

View Source
var AgentOutputPool = &sync.Pool{
	New: func() interface{} {
		return &core.AgentOutput{
			ReasoningSteps: make([]core.ReasoningStep, 0, 4),
			ToolCalls:      make([]core.ToolCall, 0, 4),
			Metadata:       make(map[string]interface{}),
		}
	},
}

AgentOutputPool core.AgentOutput 对象池

用于 Agent 输出对象的复用

View Source
var ByteBufferPool = &sync.Pool{
	New: func() interface{} {
		return new(bytes.Buffer)
	},
}

ByteBufferPool bytes.Buffer 对象池

用于高频场景的 Buffer 复用,减少内存分配和 GC 压力 常见使用场景:JSON 序列化、HTTP 请求体构建、字符串拼接

View Source
var DefaultDataPools = NewDataPools()

DefaultDataPools 全局默认数据池实例

可以直接使用,也可以创建自定义实例

View Source
var MessagePool = &sync.Pool{
	New: func() interface{} {
		return &interfaces.Message{}
	},
}

MessagePool interfaces.Message 对象池

用于 LLM 通信中的消息对象复用

View Source
var ToolInputPool = &sync.Pool{
	New: func() interface{} {
		return &interfaces.ToolInput{
			Args: make(map[string]interface{}),
		}
	},
}

ToolInputPool interfaces.ToolInput 对象池

用于工具调用输入对象的复用

View Source
var ToolOutputPool = &sync.Pool{
	New: func() interface{} {
		return &interfaces.ToolOutput{
			Metadata: make(map[string]interface{}),
		}
	},
}

ToolOutputPool interfaces.ToolOutput 对象池

用于工具调用输出对象的复用

Functions

func CloneAgentInput

func CloneAgentInput(src *core.AgentInput, pool *DataPools) *core.AgentInput

CloneAgentInput 克隆 AgentInput (使用池化对象)

用于需要保留输入数据的场景

func CloneAgentOutput

func CloneAgentOutput(src *core.AgentOutput, pool *DataPools) *core.AgentOutput

CloneAgentOutput 克隆 AgentOutput (使用池化对象)

用于需要保留输出数据的场景

func GetAgentInput

func GetAgentInput() *core.AgentInput

GetAgentInput 从默认池获取 AgentInput

func GetAgentOutput

func GetAgentOutput() *core.AgentOutput

GetAgentOutput 从默认池获取 AgentOutput

func InitDefaultPoolManager

func InitDefaultPoolManager(config *PoolManagerConfig)

InitDefaultPoolManager 初始化默认池管理器

func PutAgentInput

func PutAgentInput(input *core.AgentInput)

PutAgentInput 归还 AgentInput 到默认池

func PutAgentOutput

func PutAgentOutput(output *core.AgentOutput)

PutAgentOutput 归还 AgentOutput 到默认池

Types

type AdaptivePoolStrategy

type AdaptivePoolStrategy struct {
	// contains filtered or unexported fields
}

AdaptivePoolStrategy 自适应池策略 根据使用频率和内存压力动态调整池行为

func NewAdaptivePoolStrategy

func NewAdaptivePoolStrategy(config *PoolManagerConfig) *AdaptivePoolStrategy

NewAdaptivePoolStrategy 创建自适应池策略

func (*AdaptivePoolStrategy) PostPut

func (s *AdaptivePoolStrategy) PostPut(poolType PoolType)

PostPut 归还后钩子

func (*AdaptivePoolStrategy) PreGet

func (s *AdaptivePoolStrategy) PreGet(poolType PoolType)

PreGet 获取前钩子

func (*AdaptivePoolStrategy) ShouldPool

func (s *AdaptivePoolStrategy) ShouldPool(poolType PoolType, size int) bool

ShouldPool 决定是否池化

type AgentFactory

type AgentFactory func() (core.Agent, error)

AgentFactory Agent 工厂函数

type AgentPool

type AgentPool struct {
	// contains filtered or unexported fields
}

AgentPool Agent 池

Example

ExampleAgentPool demonstrates Agent pooling usage

ctx := context.Background()

// Define agent factory
factory := func() (core.Agent, error) {
	return NewMockAgent("example-agent", 10*time.Millisecond), nil
}

// Configure pool
config := PoolConfig{
	InitialSize:     5,  // Pre-create 5 agents
	MaxSize:         20, // Max 20 agents
	IdleTimeout:     5 * time.Minute,
	MaxLifetime:     30 * time.Minute,
	AcquireTimeout:  5 * time.Second,
	CleanupInterval: 1 * time.Minute,
}

// Create pool
pool, err := NewAgentPool(factory, config)
if err != nil {
	panic(err)
}
defer pool.Close()

// Execute tasks
for i := 0; i < 10; i++ {
	input := &core.AgentInput{
		Task:        fmt.Sprintf("Task #%d", i),
		Instruction: "Process this task",
		Timestamp:   time.Now(),
	}

	output, err := pool.Execute(ctx, input)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
		continue
	}

	fmt.Printf("Task %d: %s\n", i, output.Status)
}

// Check pool statistics
stats := pool.Stats()
fmt.Printf("\nPool Statistics:\n")
fmt.Printf("  Total agents: %d\n", stats.TotalCount)
fmt.Printf("  Active: %d, Idle: %d\n", stats.ActiveCount, stats.IdleCount)
fmt.Printf("  Utilization: %.2f%%\n", stats.UtilizationPct)
fmt.Printf("  Total acquisitions: %d\n", stats.AcquiredTotal)

func NewAgentPool

func NewAgentPool(factory AgentFactory, config PoolConfig) (*AgentPool, error)

NewAgentPool 创建新的 Agent 池

func (*AgentPool) Acquire

func (p *AgentPool) Acquire(ctx context.Context) (core.Agent, error)

Acquire 从池中获取一个 Agent

func (*AgentPool) Close

func (p *AgentPool) Close() error

Close 关闭池

func (*AgentPool) Execute

func (p *AgentPool) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)

Execute 执行 Agent(自动借用和归还)

func (*AgentPool) Release

func (p *AgentPool) Release(agent core.Agent) error

Release 将 Agent 归还到池中

func (*AgentPool) Stats

func (p *AgentPool) Stats() PoolStats

Stats 返回池的统计信息

type AllPoolStats

type AllPoolStats struct {
	ByteBuffer  ObjectPoolStats
	Message     ObjectPoolStats
	ToolInput   ObjectPoolStats
	ToolOutput  ObjectPoolStats
	AgentInput  ObjectPoolStats
	AgentOutput ObjectPoolStats
}

AllPoolStats 所有对象池的统计信息

type BatchConfig

type BatchConfig struct {
	// MaxConcurrency 最大并发数
	MaxConcurrency int
	// Timeout 批量执行超时时间
	Timeout time.Duration
	// ErrorPolicy 错误处理策略
	ErrorPolicy ErrorPolicy
	// EnableStats 是否启用统计
	EnableStats bool
}

BatchConfig 批量执行配置

func DefaultBatchConfig

func DefaultBatchConfig() BatchConfig

DefaultBatchConfig 返回默认批量执行配置

type BatchError

type BatchError struct {
	Index int              // 输入索引
	Input *core.AgentInput // 输入
	Error error            // 错误
}

BatchError 批量执行错误

type BatchExecutor

type BatchExecutor struct {
	// contains filtered or unexported fields
}

BatchExecutor 批量执行器

Example

ExampleBatchExecutor demonstrates batch execution

ctx := context.Background()

// Create agent
agent := NewMockAgent("example-agent", 100*time.Microsecond)

// Configure batch executor
config := BatchConfig{
	MaxConcurrency: 10,
	Timeout:        5 * time.Minute,
	ErrorPolicy:    ErrorPolicyContinue,
	EnableStats:    true,
}

executor := NewBatchExecutor(agent, config)

// Prepare batch inputs
inputs := make([]*core.AgentInput, 100)
for i := 0; i < 100; i++ {
	inputs[i] = &core.AgentInput{
		Task:        fmt.Sprintf("Process item #%d", i),
		Instruction: "Perform analysis",
		Timestamp:   time.Now(),
	}
}

// Execute batch
fmt.Println("Executing 100 tasks with 10 concurrent workers...")
start := time.Now()
result := executor.Execute(ctx, inputs)
duration := time.Since(start)

// Print results
fmt.Printf("\nBatch Execution Results:\n")
fmt.Printf("  Total tasks: %d\n", result.Stats.TotalCount)
fmt.Printf("  Successful: %d\n", result.Stats.SuccessCount)
fmt.Printf("  Failed: %d\n", result.Stats.FailureCount)
fmt.Printf("  Total duration: %v\n", duration)
fmt.Printf("  Avg duration per task: %v\n", result.Stats.AvgDuration)
fmt.Printf("  Min duration: %v\n", result.Stats.MinDuration)
fmt.Printf("  Max duration: %v\n", result.Stats.MaxDuration)

// Calculate theoretical serial time
serialTime := time.Duration(100) * 100 * time.Microsecond
speedup := float64(serialTime) / float64(duration)
fmt.Printf("  Speedup vs serial: %.2fx\n", speedup)

func NewBatchExecutor

func NewBatchExecutor(agent core.Agent, config BatchConfig) *BatchExecutor

NewBatchExecutor 创建新的批量执行器

func (*BatchExecutor) Execute

func (b *BatchExecutor) Execute(ctx context.Context, inputs []*core.AgentInput) *BatchResult

Execute 执行批量任务 Uses a worker pool pattern: launches only MaxConcurrency workers that pull work from a shared channel, instead of spawning len(inputs) goroutines.

func (*BatchExecutor) ExecuteStream

func (b *BatchExecutor) ExecuteStream(
	ctx context.Context,
	inputs []*core.AgentInput,
) (<-chan *core.AgentOutput, <-chan BatchError)

ExecuteStream 流式执行批量任务(返回结果通道)

func (*BatchExecutor) ExecuteWithCallback

func (b *BatchExecutor) ExecuteWithCallback(
	ctx context.Context,
	inputs []*core.AgentInput,
	callback func(index int, output *core.AgentOutput, err error),
) *BatchResult

ExecuteWithCallback 执行批量任务(带回调)

func (*BatchExecutor) Stats

func (b *BatchExecutor) Stats() ExecutorStats

Stats 返回批量执行器的统计信息

type BatchInput

type BatchInput struct {
	Inputs []*core.AgentInput
	Config BatchConfig
}

BatchInput 批量输入

type BatchResult

type BatchResult struct {
	Results []*core.AgentOutput // 成功的结果
	Errors  []BatchError        // 错误列表
	Stats   BatchStats          // 统计信息
}

BatchResult 批量执行结果

type BatchStats

type BatchStats struct {
	TotalCount   int           // 总任务数
	SuccessCount int           // 成功数
	FailureCount int           // 失败数
	Duration     time.Duration // 总耗时
	AvgDuration  time.Duration // 平均耗时
	MinDuration  time.Duration // 最小耗时
	MaxDuration  time.Duration // 最大耗时
}

BatchStats 批量执行统计

type CacheConfig

type CacheConfig struct {
	// MaxSize 最大缓存条目数
	MaxSize int
	// TTL 缓存过期时间
	TTL time.Duration
	// CleanupInterval 清理间隔
	CleanupInterval time.Duration
	// EnableStats 是否启用统计
	EnableStats bool
	// KeyGenerator 自定义缓存键生成器
	KeyGenerator func(*core.AgentInput) string
}

CacheConfig 缓存配置

func DefaultCacheConfig

func DefaultCacheConfig() CacheConfig

DefaultCacheConfig 返回默认缓存配置

type CacheEntry

type CacheEntry struct {
	Output    *core.AgentOutput
	CreatedAt time.Time
	ExpiresAt time.Time
	HitCount  atomic.Int64
}

CacheEntry 缓存条目

type CacheStats

type CacheStats struct {
	Size        int           // 当前缓存大小
	MaxSize     int           // 最大缓存大小
	Hits        int64         // 缓存命中次数
	Misses      int64         // 缓存未命中次数
	HitRate     float64       // 命中率百分比
	Evictions   int64         // 驱逐次数
	Expirations int64         // 过期次数
	AvgHitTime  time.Duration // 平均命中响应时间
	AvgMissTime time.Duration // 平均未命中响应时间
}

CacheStats 缓存统计信息

type CachedAgent

type CachedAgent struct {
	// contains filtered or unexported fields
}

CachedAgent 缓存包装器

Example

ExampleCachedAgent demonstrates caching usage

ctx := context.Background()

// Create base agent
agent := NewMockAgent("example-agent", 1*time.Millisecond)

// Configure cache
config := CacheConfig{
	MaxSize:         1000,
	TTL:             10 * time.Minute,
	CleanupInterval: 1 * time.Minute,
	EnableStats:     true,
}

// Wrap with cache
cachedAgent := NewCachedAgent(agent, config)
defer cachedAgent.Close()

// Same input (will be cached)
input := &core.AgentInput{
	Task:        "Analyze logs",
	Instruction: "Find errors in application logs",
	Context: map[string]interface{}{
		"timeRange": "last 1 hour",
	},
	Timestamp: time.Now(),
}

// First execution (cache miss)
start := time.Now()
output1, _ := cachedAgent.Invoke(ctx, input)
duration1 := time.Since(start)
fmt.Printf("First execution: %v (cache miss)\n", duration1)

// Second execution (cache hit)
start = time.Now()
output2, _ := cachedAgent.Invoke(ctx, input)
duration2 := time.Since(start)
fmt.Printf("Second execution: %v (cache hit)\n", duration2)

fmt.Printf("Speedup: %.2fx\n", float64(duration1)/float64(duration2))

// Check cache statistics
stats := cachedAgent.Stats()
fmt.Printf("\nCache Statistics:\n")
fmt.Printf("  Size: %d/%d\n", stats.Size, stats.MaxSize)
fmt.Printf("  Hits: %d, Misses: %d\n", stats.Hits, stats.Misses)
fmt.Printf("  Hit rate: %.2f%%\n", stats.HitRate)
fmt.Printf("  Avg hit time: %v\n", stats.AvgHitTime)
fmt.Printf("  Avg miss time: %v\n", stats.AvgMissTime)

// Verify results are identical
fmt.Printf("\nResults match: %v\n", output1.Result == output2.Result)

func NewCachedAgent

func NewCachedAgent(agent core.Agent, config CacheConfig) *CachedAgent

NewCachedAgent 创建新的缓存 Agent

func (*CachedAgent) Batch

func (c *CachedAgent) Batch(ctx context.Context, inputs []*core.AgentInput) ([]*core.AgentOutput, error)

Batch 批量执行 Agent(委托给内部 agent)

func (*CachedAgent) Capabilities

func (c *CachedAgent) Capabilities() []string

Capabilities 返回 Agent 能力列表

func (*CachedAgent) Close

func (c *CachedAgent) Close() error

Close 关闭缓存

func (*CachedAgent) Description

func (c *CachedAgent) Description() string

Description 返回 Agent 描述

func (*CachedAgent) Invalidate

func (c *CachedAgent) Invalidate(input *core.AgentInput)

Invalidate 失效指定缓存键

func (*CachedAgent) InvalidateAll

func (c *CachedAgent) InvalidateAll()

InvalidateAll 清空所有缓存

func (*CachedAgent) Invoke

func (c *CachedAgent) Invoke(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)

Invoke 执行 Agent(带缓存)

func (*CachedAgent) Name

func (c *CachedAgent) Name() string

Name 返回 Agent 名称

func (*CachedAgent) Pipe

Pipe 连接到另一个 Runnable(委托给内部 agent)

func (*CachedAgent) Stats

func (c *CachedAgent) Stats() CacheStats

Stats 返回缓存统计信息

func (*CachedAgent) Stream

func (c *CachedAgent) Stream(ctx context.Context, input *core.AgentInput) (<-chan core.StreamChunk[*core.AgentOutput], error)

Stream 流式执行 Agent(委托给内部 agent)

func (*CachedAgent) WithCallbacks

func (c *CachedAgent) WithCallbacks(callbacks ...core.Callback) core.Runnable[*core.AgentInput, *core.AgentOutput]

WithCallbacks 添加回调处理器(委托给内部 agent)

func (*CachedAgent) WithConfig

WithConfig 配置 Agent(委托给内部 agent)

type DataPoolStats

type DataPoolStats struct {
	InputGetCount     int64   // AgentInput Get 次数
	InputPutCount     int64   // AgentInput Put 次数
	OutputGetCount    int64   // AgentOutput Get 次数
	OutputPutCount    int64   // AgentOutput Put 次数
	ReasoningGetCount int64   // ReasoningStep Get 次数
	ReasoningPutCount int64   // ReasoningStep Put 次数
	ToolCallGetCount  int64   // ToolCall Get 次数
	ToolCallPutCount  int64   // ToolCall Put 次数
	MapGetCount       int64   // Map Get 次数
	MapPutCount       int64   // Map Put 次数
	SliceGetCount     int64   // Slice Get 次数
	SlicePutCount     int64   // Slice Put 次数
	PoolHitRate       float64 // 池命中率 (%)
}

DataPoolStats 数据池统计信息

type DataPools

type DataPools struct {
	// contains filtered or unexported fields
}

DataPools 数据对象池集合

提供 Agent 执行过程中频繁分配的数据对象的复用,显著减轻 GC 压力。 实现零分配(Zero Allocation)目标,特别是在关键热路径上。

性能目标:

  • Memory per agent: < 50MB
  • Zero allocation in critical paths
  • GC pressure reduction: > 80%

func NewDataPools

func NewDataPools() *DataPools

NewDataPools 创建新的数据池集合

func (*DataPools) GetAgentInput

func (p *DataPools) GetAgentInput() *core.AgentInput

GetAgentInput 从池中获取 AgentInput

使用完毕后必须调用 PutAgentInput 归还到池中

func (*DataPools) GetAgentOutput

func (p *DataPools) GetAgentOutput() *core.AgentOutput

GetAgentOutput 从池中获取 AgentOutput

使用完毕后必须调用 PutAgentOutput 归还到池中

func (*DataPools) GetContextMap

func (p *DataPools) GetContextMap() map[string]interface{}

GetContextMap 从池中获取 Context map

func (*DataPools) GetMetadataMap

func (p *DataPools) GetMetadataMap() map[string]interface{}

GetMetadataMap 从池中获取 Metadata map

func (*DataPools) GetReasoningSlice

func (p *DataPools) GetReasoningSlice() []core.ReasoningStep

GetReasoningSlice 从池中获取 ReasoningStep slice

func (*DataPools) GetReasoningStep

func (p *DataPools) GetReasoningStep() *core.ReasoningStep

GetReasoningStep 从池中获取 ReasoningStep

func (*DataPools) GetStats

func (p *DataPools) GetStats() DataPoolStats

GetStats 返回池化统计信息

func (*DataPools) GetStringSlice

func (p *DataPools) GetStringSlice() []string

GetStringSlice 从池中获取 string slice

func (*DataPools) GetToolCall

func (p *DataPools) GetToolCall() *core.ToolCall

GetToolCall 从池中获取 ToolCall

func (*DataPools) GetToolCallSlice

func (p *DataPools) GetToolCallSlice() []core.ToolCall

GetToolCallSlice 从池中获取 ToolCall slice

func (*DataPools) PutAgentInput

func (p *DataPools) PutAgentInput(input *core.AgentInput)

PutAgentInput 将 AgentInput 归还到池中

会重置所有字段,准备下次复用

func (*DataPools) PutAgentOutput

func (p *DataPools) PutAgentOutput(output *core.AgentOutput)

PutAgentOutput 将 AgentOutput 归还到池中

会重置所有字段,并使用切片零分配技巧 (slice[:0])

func (*DataPools) PutContextMap

func (p *DataPools) PutContextMap(m map[string]interface{})

PutContextMap 将 Context map 归还到池中

func (*DataPools) PutMetadataMap

func (p *DataPools) PutMetadataMap(m map[string]interface{})

PutMetadataMap 将 Metadata map 归还到池中

func (*DataPools) PutReasoningSlice

func (p *DataPools) PutReasoningSlice(s []core.ReasoningStep)

PutReasoningSlice 将 ReasoningStep slice 归还到池中

func (*DataPools) PutReasoningStep

func (p *DataPools) PutReasoningStep(step *core.ReasoningStep)

PutReasoningStep 将 ReasoningStep 归还到池中

func (*DataPools) PutStringSlice

func (p *DataPools) PutStringSlice(s []string)

PutStringSlice 将 string slice 归还到池中

func (*DataPools) PutToolCall

func (p *DataPools) PutToolCall(tc *core.ToolCall)

PutToolCall 将 ToolCall 归还到池中

func (*DataPools) PutToolCallSlice

func (p *DataPools) PutToolCallSlice(s []core.ToolCall)

PutToolCallSlice 将 ToolCall slice 归还到池中

type ErrorPolicy

type ErrorPolicy string

ErrorPolicy 错误处理策略

const (
	// ErrorPolicyFailFast 快速失败(遇到第一个错误就停止)
	ErrorPolicyFailFast ErrorPolicy = "fail_fast"
	// ErrorPolicyContinue 继续执行(收集所有错误)
	ErrorPolicyContinue ErrorPolicy = "continue"
)

type ExecutorStats

type ExecutorStats struct {
	TotalExecutions int64         // 总执行次数
	TotalTasks      int64         // 总任务数
	SuccessTasks    int64         // 成功任务数
	FailedTasks     int64         // 失败任务数
	AvgTasksPerExec float64       // 平均每次执行的任务数
	SuccessRate     float64       // 成功率百分比
	AvgDuration     time.Duration // 平均执行时间
}

ExecutorStats 执行器统计信息

type MetricsCollector

type MetricsCollector interface {
	RecordPoolGet(poolType PoolType, latency time.Duration)
	RecordPoolPut(poolType PoolType, latency time.Duration)
	RecordPoolHit(poolType PoolType)
	RecordPoolMiss(poolType PoolType)
}

MetricsCollector 指标收集器接口

type MetricsPoolStrategy

type MetricsPoolStrategy struct {
	// contains filtered or unexported fields
}

MetricsPoolStrategy 带指标收集的池策略

func NewMetricsPoolStrategy

func NewMetricsPoolStrategy(base PoolStrategy, collector MetricsCollector) *MetricsPoolStrategy

NewMetricsPoolStrategy 创建带指标的池策略

func (*MetricsPoolStrategy) PostPut

func (s *MetricsPoolStrategy) PostPut(poolType PoolType)

PostPut 归还后钩子(记录指标)

func (*MetricsPoolStrategy) PreGet

func (s *MetricsPoolStrategy) PreGet(poolType PoolType)

PreGet 获取前钩子(记录指标)

func (*MetricsPoolStrategy) ShouldPool

func (s *MetricsPoolStrategy) ShouldPool(poolType PoolType, size int) bool

ShouldPool 决定是否池化(委托给基础策略)

type ObjectPoolStats

type ObjectPoolStats struct {
	Gets    atomic.Int64 // 获取次数
	Puts    atomic.Int64 // 归还次数
	News    atomic.Int64 // 新建次数
	Current atomic.Int64 // 当前池中对象数(估算)
}

ObjectPoolStats 对象池统计信息(内部使用,包含原子操作)

type ObjectPoolStatsSnapshot

type ObjectPoolStatsSnapshot struct {
	Gets    int64 // 获取次数
	Puts    int64 // 归还次数
	News    int64 // 新建次数
	Current int64 // 当前池中对象数(估算)
}

ObjectPoolStatsSnapshot 对象池统计信息快照(用于返回和显示)

type OptimizedAgentPool

type OptimizedAgentPool struct {
	// contains filtered or unexported fields
}

OptimizedAgentPool 优化的 Agent 池

性能优化: - O(1) Acquire/Release(基于 channel 空闲队列) - 无锁空闲队列(channel 本身是并发安全的) - Map 实现 O(1) Agent 查找 - 细粒度锁减少竞争

适用场景: - 高并发场景(1000+ concurrent agents) - 频繁的 Acquire/Release 操作 - 对延迟敏感的应用

func NewOptimizedAgentPool

func NewOptimizedAgentPool(factory AgentFactory, config PoolConfig) (*OptimizedAgentPool, error)

NewOptimizedAgentPool 创建优化的 Agent 池

func (*OptimizedAgentPool) Acquire

func (p *OptimizedAgentPool) Acquire(ctx context.Context) (core.Agent, error)

Acquire 从池中获取一个 Agent(O(1) 操作)

func (*OptimizedAgentPool) Close

func (p *OptimizedAgentPool) Close() error

Close 关闭池

func (*OptimizedAgentPool) Execute

func (p *OptimizedAgentPool) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)

Execute 执行 Agent(自动借用和归还)

func (*OptimizedAgentPool) Release

func (p *OptimizedAgentPool) Release(agent core.Agent) error

Release 将 Agent 归还到池中(O(1) 操作)

func (*OptimizedAgentPool) Stats

func (p *OptimizedAgentPool) Stats() PoolStats

Stats 返回池的统计信息

type PoolAgent

type PoolAgent struct {
	// contains filtered or unexported fields
}

PoolAgent 池代理 - Agent 模式实现

func NewPoolAgent

func NewPoolAgent(config *PoolManagerConfig) *PoolAgent

NewPoolAgent 创建新的池代理

func (*PoolAgent) Close

func (a *PoolAgent) Close() error

Close 关闭池管理器

func (*PoolAgent) Configure

func (a *PoolAgent) Configure(config *PoolManagerConfig) error

Configure 配置管理器

func (*PoolAgent) DisablePool

func (a *PoolAgent) DisablePool(poolType PoolType)

DisablePool 禁用指定池

func (*PoolAgent) EnablePool

func (a *PoolAgent) EnablePool(poolType PoolType)

EnablePool 启用指定池

func (*PoolAgent) Execute

func (a *PoolAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)

Execute Agent 模式执行接口

func (*PoolAgent) GetAgentInput

func (a *PoolAgent) GetAgentInput() *core.AgentInput

GetAgentInput 获取 AgentInput

func (*PoolAgent) GetAgentOutput

func (a *PoolAgent) GetAgentOutput() *core.AgentOutput

GetAgentOutput 获取 AgentOutput

func (*PoolAgent) GetAllStats

func (a *PoolAgent) GetAllStats() map[PoolType]*ObjectPoolStatsSnapshot

GetAllStats 获取所有池的统计信息

func (*PoolAgent) GetBuffer

func (a *PoolAgent) GetBuffer() *bytes.Buffer

GetBuffer 获取 ByteBuffer

func (*PoolAgent) GetConfig

func (a *PoolAgent) GetConfig() *PoolManagerConfig

GetConfig 获取当前配置

func (*PoolAgent) GetMessage

func (a *PoolAgent) GetMessage() *interfaces.Message

GetMessage 获取 Message

func (*PoolAgent) GetStats

func (a *PoolAgent) GetStats(poolType PoolType) *ObjectPoolStatsSnapshot

GetStats 获取指定池的统计信息

func (*PoolAgent) GetToolInput

func (a *PoolAgent) GetToolInput() *interfaces.ToolInput

GetToolInput 获取 ToolInput

func (*PoolAgent) GetToolOutput

func (a *PoolAgent) GetToolOutput() *interfaces.ToolOutput

GetToolOutput 获取 ToolOutput

func (*PoolAgent) IsPoolEnabled

func (a *PoolAgent) IsPoolEnabled(poolType PoolType) bool

IsPoolEnabled 检查池是否启用

func (*PoolAgent) PutAgentInput

func (a *PoolAgent) PutAgentInput(input *core.AgentInput)

PutAgentInput 归还 AgentInput

func (*PoolAgent) PutAgentOutput

func (a *PoolAgent) PutAgentOutput(output *core.AgentOutput)

PutAgentOutput 归还 AgentOutput

func (*PoolAgent) PutBuffer

func (a *PoolAgent) PutBuffer(buf *bytes.Buffer)

PutBuffer 归还 ByteBuffer

func (*PoolAgent) PutMessage

func (a *PoolAgent) PutMessage(msg *interfaces.Message)

PutMessage 归还 Message

func (*PoolAgent) PutToolInput

func (a *PoolAgent) PutToolInput(input *interfaces.ToolInput)

PutToolInput 归还 ToolInput

func (*PoolAgent) PutToolOutput

func (a *PoolAgent) PutToolOutput(output *interfaces.ToolOutput)

PutToolOutput 归还 ToolOutput

func (*PoolAgent) ResetStats

func (a *PoolAgent) ResetStats()

ResetStats 重置统计信息

type PoolConfig

type PoolConfig struct {
	// InitialSize 初始池大小
	InitialSize int
	// MaxSize 最大池大小
	MaxSize int
	// IdleTimeout 空闲超时时间(超过此时间的空闲 Agent 将被回收)
	IdleTimeout time.Duration
	// MaxLifetime Agent 最大生命周期(超过此时间的 Agent 将被销毁)
	MaxLifetime time.Duration
	// AcquireTimeout 获取 Agent 超时时间
	AcquireTimeout time.Duration
	// CleanupInterval 清理间隔
	CleanupInterval time.Duration
}

PoolConfig 池配置

func DefaultPoolConfig

func DefaultPoolConfig() PoolConfig

DefaultPoolConfig 返回默认池配置

type PoolManager

type PoolManager interface {
	// GetBuffer 获取 ByteBuffer
	GetBuffer() *bytes.Buffer
	PutBuffer(buf *bytes.Buffer)

	// GetMessage 获取 Message
	GetMessage() *interfaces.Message
	PutMessage(msg *interfaces.Message)

	// GetToolInput 获取 ToolInput
	GetToolInput() *interfaces.ToolInput
	PutToolInput(input *interfaces.ToolInput)

	// GetToolOutput 获取 ToolOutput
	GetToolOutput() *interfaces.ToolOutput
	PutToolOutput(output *interfaces.ToolOutput)

	// GetAgentInput 获取 AgentInput
	GetAgentInput() *core.AgentInput
	PutAgentInput(input *core.AgentInput)

	// GetAgentOutput 获取 AgentOutput
	GetAgentOutput() *core.AgentOutput
	PutAgentOutput(output *core.AgentOutput)

	// Configuration 配置管理
	Configure(config *PoolManagerConfig) error
	GetConfig() *PoolManagerConfig
	EnablePool(poolType PoolType)
	DisablePool(poolType PoolType)
	IsPoolEnabled(poolType PoolType) bool

	// Statistics 统计信息
	GetStats(poolType PoolType) *ObjectPoolStatsSnapshot
	GetAllStats() map[PoolType]*ObjectPoolStatsSnapshot
	ResetStats()

	// Lifecycle
	Close() error
}

PoolManager 对象池管理器接口

func CreateIsolatedPoolManager

func CreateIsolatedPoolManager(config *PoolManagerConfig) PoolManager

CreateIsolatedPoolManager 创建隔离的池管理器 用于测试或特定场景,不影响全局状态

func GetDefaultPoolManager

func GetDefaultPoolManager() PoolManager

GetDefaultPoolManager 获取默认池管理器

type PoolManagerAgent

type PoolManagerAgent struct {
	// contains filtered or unexported fields
}

PoolManagerAgent 将 PoolAgent 包装为真正的 Agent

func NewPoolManagerAgent

func NewPoolManagerAgent(name string, config *PoolManagerConfig) *PoolManagerAgent

NewPoolManagerAgent 创建池管理器 Agent

func (*PoolManagerAgent) Capabilities

func (a *PoolManagerAgent) Capabilities() []string

Capabilities 返回能力列表

func (*PoolManagerAgent) Description

func (a *PoolManagerAgent) Description() string

Description 返回描述

func (*PoolManagerAgent) Execute

func (a *PoolManagerAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)

Execute 执行 Agent

func (*PoolManagerAgent) Name

func (a *PoolManagerAgent) Name() string

Name 返回 Agent 名称

type PoolManagerConfig

type PoolManagerConfig struct {
	// 启用配置
	EnabledPools map[PoolType]bool

	// 池大小限制
	MaxBufferSize int // ByteBuffer 最大大小 (默认 64KB)
	MaxMapSize    int // Map 最大键数 (默认 100)
	MaxSliceSize  int // Slice 最大容量 (默认 100)

	// 策略配置
	UseStrategy PoolStrategy // 可选:自定义策略
}

PoolManagerConfig 池管理器配置

func DefaultPoolManagerConfig

func DefaultPoolManagerConfig() *PoolManagerConfig

DefaultPoolManagerConfig 默认配置

type PoolStats

type PoolStats struct {
	TotalCount     int           // 当前池中 Agent 总数
	ActiveCount    int           // 正在使用的 Agent 数
	IdleCount      int           // 空闲 Agent 数
	MaxSize        int           // 最大池大小
	CreatedTotal   int64         // 创建的 Agent 总数
	AcquiredTotal  int64         // 获取的 Agent 总数
	ReleasedTotal  int64         // 释放的 Agent 总数
	RecycledTotal  int64         // 回收的 Agent 总数
	WaitCount      int64         // 等待次数
	AvgWaitTime    time.Duration // 平均等待时间
	UtilizationPct float64       // 利用率百分比
}

PoolStats 池统计信息

type PoolStrategy

type PoolStrategy interface {
	// ShouldPool 决定是否应该池化对象
	ShouldPool(poolType PoolType, size int) bool

	// PreGet 获取对象前的钩子
	PreGet(poolType PoolType)

	// PostPut 归还对象后的钩子
	PostPut(poolType PoolType)
}

PoolStrategy 池策略接口

type PoolType

type PoolType string

PoolType 定义池类型

const (
	PoolTypeByteBuffer  PoolType = "bytebuffer"
	PoolTypeMessage     PoolType = "message"
	PoolTypeToolInput   PoolType = "toolinput"
	PoolTypeToolOutput  PoolType = "tooloutput"
	PoolTypeAgentInput  PoolType = "agentinput"
	PoolTypeAgentOutput PoolType = "agentoutput"
)

type PooledAgentInput

type PooledAgentInput struct {
	Input *core.AgentInput
	// contains filtered or unexported fields
}

PooledAgentInput 池化的 AgentInput 辅助结构

自动管理对象的生命周期,使用 defer 自动归还

func NewPooledAgentInput

func NewPooledAgentInput(pool *DataPools) *PooledAgentInput

NewPooledAgentInput 创建池化的 AgentInput

使用示例:

input := NewPooledAgentInput(pool)
defer input.Release()
// 使用 input.Input

func (*PooledAgentInput) Release

func (p *PooledAgentInput) Release()

Release 释放 AgentInput 回池中

type PooledAgentOutput

type PooledAgentOutput struct {
	Output *core.AgentOutput
	// contains filtered or unexported fields
}

PooledAgentOutput 池化的 AgentOutput 辅助结构

自动管理对象的生命周期,使用 defer 自动归还

func NewPooledAgentOutput

func NewPooledAgentOutput(pool *DataPools) *PooledAgentOutput

NewPooledAgentOutput 创建池化的 AgentOutput

使用示例:

output := NewPooledAgentOutput(pool)
defer output.Release()
// 使用 output.Output

func (*PooledAgentOutput) Release

func (p *PooledAgentOutput) Release()

Release 释放 AgentOutput 回池中

type PriorityPoolStrategy

type PriorityPoolStrategy struct {
	// contains filtered or unexported fields
}

PriorityPoolStrategy 优先级池策略 根据池类型的优先级决定池化行为

func NewPriorityPoolStrategy

func NewPriorityPoolStrategy(config *PoolManagerConfig, maxPools int) *PriorityPoolStrategy

NewPriorityPoolStrategy 创建优先级池策略

func (*PriorityPoolStrategy) PostPut

func (s *PriorityPoolStrategy) PostPut(poolType PoolType)

func (*PriorityPoolStrategy) PreGet

func (s *PriorityPoolStrategy) PreGet(poolType PoolType)

func (*PriorityPoolStrategy) ShouldPool

func (s *PriorityPoolStrategy) ShouldPool(poolType PoolType, size int) bool

ShouldPool 根据优先级决定是否池化

type ScenarioBasedStrategy

type ScenarioBasedStrategy struct {
	// contains filtered or unexported fields
}

ScenarioBasedStrategy 场景驱动的池策略

func NewScenarioBasedStrategy

func NewScenarioBasedStrategy(config *PoolManagerConfig) *ScenarioBasedStrategy

NewScenarioBasedStrategy 创建场景驱动策略

func (*ScenarioBasedStrategy) PostPut

func (s *ScenarioBasedStrategy) PostPut(poolType PoolType)

func (*ScenarioBasedStrategy) PreGet

func (s *ScenarioBasedStrategy) PreGet(poolType PoolType)

func (*ScenarioBasedStrategy) SetScenario

func (s *ScenarioBasedStrategy) SetScenario(scenario ScenarioType)

SetScenario 设置当前场景

func (*ScenarioBasedStrategy) ShouldPool

func (s *ScenarioBasedStrategy) ShouldPool(poolType PoolType, size int) bool

ShouldPool 根据场景决定是否池化

type ScenarioType

type ScenarioType string

ScenarioType 场景类型

const (
	ScenarioLLMCalls    ScenarioType = "llm_calls"
	ScenarioToolExec    ScenarioType = "tool_execution"
	ScenarioJSONProcess ScenarioType = "json_processing"
	ScenarioStreaming   ScenarioType = "streaming"
	ScenarioGeneral     ScenarioType = "general"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL