Documentation
¶
Index ¶
- type ChunkMetadata
- type ChunkTransformFunc
- type ChunkType
- type LegacyStreamChunk
- type Runtime
- type RuntimeConfig
- type RuntimeManager
- func (m *RuntimeManager[C, S]) CleanupExpired(maxAge time.Duration) int
- func (m *RuntimeManager[C, S]) GetOrCreateRuntime(sessionID string, ctx C, st S, store store.Store, ...) *Runtime[C, S]
- func (m *RuntimeManager[C, S]) GetRuntime(sessionID string) (*Runtime[C, S], bool)
- func (m *RuntimeManager[C, S]) Metrics() *RuntimeMetrics
- func (m *RuntimeManager[C, S]) RemoveRuntime(sessionID string)
- func (m *RuntimeManager[C, S]) SetRuntime(sessionID string, runtime *Runtime[C, S])
- func (m *RuntimeManager[C, S]) Size() int
- type RuntimeMetrics
- type StreamConsumer
- type StreamController
- type StreamMultiplexer
- type StreamOptions
- type StreamOutput
- type StreamState
- type StreamStatus
- type StreamWriter
- type StreamingAgent
- type ToolFunc
- type ToolWithRuntime
- func (t *ToolWithRuntime[I, O, C, S]) Description() string
- func (t *ToolWithRuntime[I, O, C, S]) Execute(ctx context.Context, input I) (O, error)
- func (t *ToolWithRuntime[I, O, C, S]) Name() string
- func (t *ToolWithRuntime[I, O, C, S]) WithRuntime(runtime *Runtime[C, S]) *ToolWithRuntime[I, O, C, S]
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChunkMetadata ¶
type ChunkMetadata struct {
// 序列信息
Sequence int64 `json:"sequence"` // 序列号
Timestamp time.Time `json:"timestamp"` // 时间戳
Source string `json:"source"` // 数据源
// 进度信息
Progress float64 `json:"progress,omitempty"` // 进度百分比 (0-100)
Current int64 `json:"current,omitempty"` // 当前处理项
Total int64 `json:"total,omitempty"` // 总项数
ETA time.Duration `json:"eta,omitempty"` // 预计剩余时间
// 状态信息
Status string `json:"status,omitempty"` // 状态描述
Phase string `json:"phase,omitempty"` // 处理阶段
// 性能信息
Latency time.Duration `json:"latency,omitempty"` // 延迟
Throughput float64 `json:"throughput,omitempty"` // 吞吐量
// 扩展信息
Extra map[string]interface{} `json:"extra,omitempty"` // 额外元数据
}
ChunkMetadata 数据块元数据
type ChunkTransformFunc ¶
type ChunkTransformFunc func(*LegacyStreamChunk) (*LegacyStreamChunk, error)
ChunkTransformFunc 数据块转换函数
type ChunkType ¶
type ChunkType string
ChunkType 数据块类型
const ( ChunkTypeText ChunkType = "text" // 文本数据 ChunkTypeJSON ChunkType = "json" // JSON 数据 ChunkTypeBinary ChunkType = "binary" // 二进制数据 ChunkTypeProgress ChunkType = "progress" // 进度更新 ChunkTypeStatus ChunkType = "status" // 状态更新 ChunkTypeError ChunkType = "error" // 错误信息 ChunkTypeMetadata ChunkType = "metadata" // 元数据 ChunkTypeControl ChunkType = "control" // 控制命令 )
type LegacyStreamChunk ¶
type LegacyStreamChunk struct {
// 数据类型
Type ChunkType `json:"type"` // 数据类型
// 数据内容
Data interface{} `json:"data"` // 实际数据
Text string `json:"text,omitempty"` // 文本数据(仅 Type=ChunkTypeText 时)
// 元数据
Metadata ChunkMetadata `json:"metadata"` // 元数据
// 控制信息
IsLast bool `json:"is_last"` // 是否是最后一个块
Error error `json:"error,omitempty"` // 错误信息(如果有)
}
LegacyStreamChunk 流数据块
LegacyStreamChunk 表示流中的一个数据单元: - 可以是文本片段、JSON 对象、二进制数据等 - 包含类型信息和元数据 - 支持进度和状态信息
注意:这是旧的流式输出实现,新代码应使用 Runnable[I,O].Stream() 和泛型 StreamChunk[T]
func NewProgressChunk ¶
func NewProgressChunk(progress float64, message string) *LegacyStreamChunk
NewProgressChunk 创建进度数据块
func NewStreamChunk ¶
func NewStreamChunk(typ ChunkType, data interface{}) *LegacyStreamChunk
NewStreamChunk 创建新的流数据块
type Runtime ¶
type Runtime[C any, S state.State] struct { // Context holds user-defined application context Context C // State holds the agent's current state State S // Store provides long-term persistent storage Store store.Store // Checkpointer handles session state persistence Checkpointer checkpoint.Checkpointer // ToolCallID is the unique identifier for the current tool call ToolCallID string // SessionID is the unique identifier for the current session/thread SessionID string // Timestamp is when this runtime was created Timestamp time.Time // Metadata holds additional runtime metadata Metadata map[string]interface{} }
Runtime provides the execution environment for tools and middleware.
Inspired by LangChain's ToolRuntime, it provides access to:
- User-defined context (generic type C)
- Agent state (generic type S extending State)
- Long-term storage via Store
- Session persistence via Checkpointer
- Execution metadata (tool call ID, session ID, etc.)
Generic type parameters:
- C: Custom context type for application-specific data
- S: State type (must implement State interface)
func NewRuntime ¶
func NewRuntime[C any, S state.State]( ctx C, st S, store store.Store, checkpointer checkpoint.Checkpointer, sessionID string, ) *Runtime[C, S]
NewRuntime creates a new Runtime with the given components.
func (*Runtime[C, S]) SaveState ¶
SaveState persists the current state using the checkpointer if available.
func (*Runtime[C, S]) WithMetadata ¶
WithMetadata returns a copy of the runtime with additional metadata.
Optimization: Uses copy-on-write pattern. The metadata map is only copied when modifications are made, reducing allocations for read-heavy workloads. Pre-allocates the new map with capacity for existing entries plus the new one.
func (*Runtime[C, S]) WithToolCallID ¶
WithToolCallID returns a copy of the runtime with the specified tool call ID.
type RuntimeConfig ¶
type RuntimeConfig struct {
// EnableAutoSave automatically saves state after tool execution
EnableAutoSave bool
// SaveInterval specifies how often to auto-save state (if enabled)
SaveInterval time.Duration
// MaxStateSize limits the maximum state size in bytes
MaxStateSize int64
// EnableMetrics enables runtime metrics collection
EnableMetrics bool
}
RuntimeConfig configures runtime behavior.
func DefaultRuntimeConfig ¶
func DefaultRuntimeConfig() *RuntimeConfig
DefaultRuntimeConfig returns the default runtime configuration.
type RuntimeManager ¶
RuntimeManager manages multiple runtimes and their lifecycle.
func NewRuntimeManager ¶
func NewRuntimeManager[C any, S state.State](config *RuntimeConfig) *RuntimeManager[C, S]
NewRuntimeManager creates a new RuntimeManager.
func (*RuntimeManager[C, S]) CleanupExpired ¶
func (m *RuntimeManager[C, S]) CleanupExpired(maxAge time.Duration) int
CleanupExpired removes expired runtimes based on max age.
func (*RuntimeManager[C, S]) GetOrCreateRuntime ¶
func (m *RuntimeManager[C, S]) GetOrCreateRuntime( sessionID string, ctx C, st S, store store.Store, checkpointer checkpoint.Checkpointer, ) *Runtime[C, S]
GetOrCreateRuntime gets an existing runtime or creates a new one.
func (*RuntimeManager[C, S]) GetRuntime ¶
func (m *RuntimeManager[C, S]) GetRuntime(sessionID string) (*Runtime[C, S], bool)
GetRuntime retrieves a runtime by session ID.
func (*RuntimeManager[C, S]) Metrics ¶
func (m *RuntimeManager[C, S]) Metrics() *RuntimeMetrics
Metrics returns the runtime metrics.
func (*RuntimeManager[C, S]) RemoveRuntime ¶
func (m *RuntimeManager[C, S]) RemoveRuntime(sessionID string)
RemoveRuntime removes a runtime by session ID.
func (*RuntimeManager[C, S]) SetRuntime ¶
func (m *RuntimeManager[C, S]) SetRuntime(sessionID string, runtime *Runtime[C, S])
SetRuntime stores a runtime for a session ID.
func (*RuntimeManager[C, S]) Size ¶
func (m *RuntimeManager[C, S]) Size() int
Size returns the number of active runtimes.
type RuntimeMetrics ¶
type RuntimeMetrics struct {
// TotalToolCalls is the total number of tool calls executed
TotalToolCalls int64
// TotalStateUpdates is the total number of state updates
TotalStateUpdates int64
// TotalStorageOperations is the total number of storage operations
TotalStorageOperations int64
// TotalCheckpoints is the total number of checkpoints saved
TotalCheckpoints int64
// LastToolCall is the timestamp of the last tool call
LastToolCall time.Time
// LastStateUpdate is the timestamp of the last state update
LastStateUpdate time.Time
// AverageToolLatency is the average tool execution latency
AverageToolLatency time.Duration
}
RuntimeMetrics tracks runtime execution metrics.
func NewRuntimeMetrics ¶
func NewRuntimeMetrics() *RuntimeMetrics
NewRuntimeMetrics creates a new RuntimeMetrics instance.
type StreamConsumer ¶
type StreamConsumer interface {
// OnChunk 处理数据块
OnChunk(chunk *LegacyStreamChunk) error
// OnStart 流开始时调用
OnStart() error
// OnComplete 流完成时调用
OnComplete() error
// OnError 发生错误时调用
OnError(err error) error
}
StreamConsumer 流消费者接口
StreamConsumer 定义流数据的消费者: - 接收流中的数据块 - 处理流事件(开始、结束、错误)
type StreamController ¶
type StreamController interface {
// Pause 暂停流
Pause() error
// Resume 恢复流
Resume() error
// Cancel 取消流
Cancel() error
// Status 获取流状态
Status() *StreamStatus
// IsRunning 检查流是否运行中
IsRunning() bool
// IsPaused 检查流是否暂停
IsPaused() bool
}
StreamController 流控制器接口
StreamController 提供流的控制能力: - 暂停、恢复、取消流 - 查询流状态 - 流的监控和调试
type StreamMultiplexer ¶
type StreamMultiplexer interface {
// AddConsumer 添加消费者
AddConsumer(consumer StreamConsumer) (id string, err error)
// RemoveConsumer 移除消费者
RemoveConsumer(id string) error
// Consumers 返回所有消费者
Consumers() []string
// Start 开始多路复用
Start(ctx context.Context, source StreamOutput) error
// Close 关闭多路复用器
Close() error
}
StreamMultiplexer 流多路复用器接口
StreamMultiplexer 支持多个消费者同时读取同一个流: - 广播数据到多个消费者 - 管理消费者的生命周期
type StreamOptions ¶
type StreamOptions struct {
// 缓冲配置
BufferSize int `json:"buffer_size,omitempty"` // 缓冲区大小(块数)
EnableBuffer bool `json:"enable_buffer,omitempty"` // 是否启用缓冲
// 超时配置
ChunkTimeout time.Duration `json:"chunk_timeout,omitempty"` // 单个块的超时
StreamTimeout time.Duration `json:"stream_timeout,omitempty"` // 整个流的超时
// 重试配置
MaxRetries int `json:"max_retries,omitempty"` // 最大重试次数
RetryDelay time.Duration `json:"retry_delay,omitempty"` // 重试延迟
RetryOnError bool `json:"retry_on_error,omitempty"` // 错误时是否重试
// 背压控制
EnableBackpressure bool `json:"enable_backpressure,omitempty"` // 是否启用背压
BackpressureWindow int `json:"backpressure_window,omitempty"` // 背压窗口大小
MaxPendingChunks int `json:"max_pending_chunks,omitempty"` // 最大待处理块数
// 流控制
EnableThrottle bool `json:"enable_throttle,omitempty"` // 是否启用限流
MaxChunksPerSec float64 `json:"max_chunks_per_sec,omitempty"` // 最大块速率
MinChunkDelay time.Duration `json:"min_chunk_delay,omitempty"` // 最小块间隔
// 监控配置
EnableProgress bool `json:"enable_progress,omitempty"` // 是否发送进度更新
ProgressInterval time.Duration `json:"progress_interval,omitempty"` // 进度更新间隔
// 转换配置
EnableTransform bool `json:"enable_transform,omitempty"` // 是否启用数据转换
TransformFunc ChunkTransformFunc `json:"-"` // 数据转换函数
// 多路复用
EnableMultiplex bool `json:"enable_multiplex,omitempty"` // 是否支持多个消费者
MaxConsumers int `json:"max_consumers,omitempty"` // 最大消费者数
// 内存保护
MaxCollectSize int64 `json:"max_collect_size,omitempty"` // Collect/CollectText 最大字节数限制(默认 100MB),防止 OOM
}
StreamOptions 流配置选项
StreamOptions 控制流的行为: - 缓冲和性能设置 - 超时和重试配置 - 背压和流控制
type StreamOutput ¶
type StreamOutput interface {
// Next 读取下一个数据块
// 返回 io.EOF 表示流结束
Next() (*LegacyStreamChunk, error)
// Close 关闭流并释放资源
Close() error
// IsClosed 检查流是否已关闭
IsClosed() bool
// Context 返回流的上下文
Context() context.Context
}
StreamOutput 流式输出接口
StreamOutput 提供流式数据读取能力: - 通过 Next() 逐块读取数据 - 支持暂停、恢复、取消 - 提供进度和状态更新
type StreamState ¶
type StreamState string
StreamState 流状态枚举
const ( StreamStateIdle StreamState = "idle" // 空闲 StreamStateRunning StreamState = "running" // 运行中 StreamStatePaused StreamState = "paused" // 暂停 StreamStateError StreamState = "error" // 错误 StreamStateComplete StreamState = "complete" // 完成 StreamStateClosed StreamState = "closed" // 已关闭 )
type StreamStatus ¶
type StreamStatus struct {
State StreamState `json:"state"` // 流状态
ChunksRead int64 `json:"chunks_read"` // 已读取块数
BytesRead int64 `json:"bytes_read"` // 已读取字节数
StartTime time.Time `json:"start_time"` // 开始时间
ElapsedTime time.Duration `json:"elapsed_time"` // 已用时间
Progress float64 `json:"progress"` // 进度百分比
ErrorCount int `json:"error_count"` // 错误计数
LastError error `json:"last_error,omitempty"` // 最后的错误
}
StreamStatus 流状态
type StreamWriter ¶
type StreamWriter interface {
io.Writer
// WriteChunk 写入数据块
WriteChunk(chunk *LegacyStreamChunk) error
// WriteBatch 批量写入数据块
WriteBatch(chunks []*LegacyStreamChunk) error
// WriteText 写入文本数据
WriteText(text string) error
// WriteProgress 写入进度更新
WriteProgress(progress float64, message string) error
// WriteStatus 写入状态更新
WriteStatus(status string) error
// WriteError 写入错误信息
WriteError(err error) error
// Close 关闭写入器并标记流结束
Close() error
// IsClosed 检查写入器是否已关闭
IsClosed() bool
}
StreamWriter 流写入器接口
StreamWriter 用于生成流数据: - Agent 通过 StreamWriter 向流中写入数据 - 支持批量写入和控制信号
type StreamingAgent ¶
type StreamingAgent interface {
// ExecuteStream 以流式方式执行 Agent 逻辑
// 返回一个可以持续读取输出块的 StreamOutput
ExecuteStream(ctx context.Context, input interface{}) (StreamOutput, error)
}
StreamingAgent 支持流式输出的 Agent 接口
StreamingAgent 扩展了 Agent 接口,提供流式输出能力: - 实时返回部分结果(如 LLM 逐字输出) - 处理大数据集的渐进式处理 - 提供实时进度反馈 - 支持长时间运行任务的中间结果
Note: This is a legacy interface. New code should use the Runnable[I,O].Stream() pattern from the core package with generic StreamChunk[T] types.
type ToolFunc ¶
type ToolFunc[I, O, C any, S state.State] func(ctx context.Context, input I, runtime *Runtime[C, S]) (O, error)
ToolFunc defines the signature for tool functions with runtime access.
Generic type parameters:
- I: Input type for the tool
- O: Output type from the tool
- C: Custom context type
- S: State type (must implement State interface)
type ToolWithRuntime ¶
type ToolWithRuntime[I, O, C any, S state.State] struct { // contains filtered or unexported fields }
ToolWithRuntime wraps a ToolFunc into a Tool interface.
func NewToolWithRuntime ¶
func NewToolWithRuntime[I, O, C any, S state.State]( name, description string, fn ToolFunc[I, O, C, S], runtime *Runtime[C, S], ) *ToolWithRuntime[I, O, C, S]
NewToolWithRuntime creates a new tool that has access to runtime.
func (*ToolWithRuntime[I, O, C, S]) Description ¶
func (t *ToolWithRuntime[I, O, C, S]) Description() string
Description returns the tool description.
func (*ToolWithRuntime[I, O, C, S]) Execute ¶
func (t *ToolWithRuntime[I, O, C, S]) Execute(ctx context.Context, input I) (O, error)
Execute runs the tool function with runtime access.
func (*ToolWithRuntime[I, O, C, S]) Name ¶
func (t *ToolWithRuntime[I, O, C, S]) Name() string
Name returns the tool name.
func (*ToolWithRuntime[I, O, C, S]) WithRuntime ¶
func (t *ToolWithRuntime[I, O, C, S]) WithRuntime(runtime *Runtime[C, S]) *ToolWithRuntime[I, O, C, S]
WithRuntime returns a new tool with updated runtime.