Documentation
¶
Index ¶
- Variables
- func FilterStream(input <-chan StreamEvent, filter *StreamFilter) <-chan StreamEvent
- func GetChunk() *core.LegacyStreamChunk
- func MergeStreams(streams ...<-chan StreamEvent) <-chan StreamEvent
- func PutChunk(chunk *core.LegacyStreamChunk)
- func PutStreamChunk(chunk *StreamChunk)
- func SSEHandler(...) http.HandlerFunc
- func StreamToChunkedTransfer(ctx context.Context, w http.ResponseWriter, source core.StreamOutput) error
- func StreamToSSE(ctx context.Context, w http.ResponseWriter, source core.StreamOutput) error
- func StreamToWebSocket(ctx context.Context, conn *websocket.Conn, source core.StreamOutput) error
- func TransformStream(input <-chan StreamEvent, transform func(StreamEvent) StreamEvent) <-chan StreamEvent
- func WebSocketStreamHandler(...) http.HandlerFunc
- type BatchMiddleware
- type BufferMiddleware
- type ChunkedTransferStreamer
- type DataPipelineAgent
- func (a *DataPipelineAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
- func (a *DataPipelineAgent) ExecuteStream(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)
- func (a *DataPipelineAgent) ProcessWithTransform(ctx context.Context, dataSource []interface{}, ...) (core.StreamOutput, error)
- func (a *DataPipelineAgent) StreamFilter(ctx context.Context, source core.StreamOutput, ...) (core.StreamOutput, error)
- func (a *DataPipelineAgent) StreamMap(ctx context.Context, source core.StreamOutput, ...) (core.StreamOutput, error)
- func (a *DataPipelineAgent) StreamReduce(ctx context.Context, source core.StreamOutput, initial interface{}, ...) (interface{}, error)
- type DataPipelineConfig
- type FilterMiddleware
- type FuncStreamHandler
- type MultiModeStream
- func (s *MultiModeStream) Close() error
- func (s *MultiModeStream) GetWriter(mode StreamMode) (*StreamWriter, error)
- func (s *MultiModeStream) Stream(mode StreamMode, event StreamEvent) error
- func (s *MultiModeStream) Subscribe(mode StreamMode) (<-chan StreamEvent, error)
- func (s *MultiModeStream) SubscribeAll() <-chan StreamEvent
- type Multiplexer
- func (m *Multiplexer) AddConsumer(consumer core.StreamConsumer) (string, error)
- func (m *Multiplexer) Close() error
- func (m *Multiplexer) Consumers() []string
- func (m *Multiplexer) RemoveConsumer(id string) error
- func (m *Multiplexer) Start(ctx context.Context, source core.StreamOutput) error
- func (m *Multiplexer) Stats() MultiplexerStats
- type MultiplexerStats
- type PollingStreamer
- type ProgressAgent
- type ProgressConfig
- type ProgressTracker
- type Reader
- func (r *Reader) Cancel() error
- func (r *Reader) Close() error
- func (r *Reader) Collect() ([]*core.LegacyStreamChunk, error)
- func (r *Reader) CollectText() (string, error)
- func (r *Reader) Context() context.Context
- func (r *Reader) Drain() error
- func (r *Reader) IsClosed() bool
- func (r *Reader) IsPaused() bool
- func (r *Reader) IsRunning() bool
- func (r *Reader) Next() (*core.LegacyStreamChunk, error)
- func (r *Reader) Pause() error
- func (r *Reader) Resume() error
- func (r *Reader) Stats() ReaderStats
- func (r *Reader) Status() *core.StreamStatus
- type ReaderStats
- type RetryMiddleware
- type RingBuffer
- func (rb *RingBuffer) Clear()
- func (rb *RingBuffer) Count() int
- func (rb *RingBuffer) IsEmpty() bool
- func (rb *RingBuffer) IsFull() bool
- func (rb *RingBuffer) Peek() *core.LegacyStreamChunk
- func (rb *RingBuffer) Pop() *core.LegacyStreamChunk
- func (rb *RingBuffer) Push(chunk *core.LegacyStreamChunk) bool
- func (rb *RingBuffer) Resize(newSize int)
- func (rb *RingBuffer) Size() int
- func (rb *RingBuffer) ToSlice() []*core.LegacyStreamChunk
- func (rb *RingBuffer) Usage() float64
- type SSEStreamer
- type SimpleStreamConsumer
- type StreamAggregator
- type StreamChunk
- type StreamConfig
- type StreamEvent
- type StreamFilter
- type StreamHandler
- type StreamManager
- func (m *StreamManager) Buffer(ctx context.Context, input <-chan *StreamChunk, size int) <-chan []*StreamChunk
- func (m *StreamManager) Collect(ctx context.Context, stream <-chan *StreamChunk) ([]*StreamChunk, error)
- func (m *StreamManager) Filter(ctx context.Context, input <-chan *StreamChunk, ...) <-chan *StreamChunk
- func (m *StreamManager) Merge(ctx context.Context, streams ...<-chan *StreamChunk) <-chan *StreamChunk
- func (m *StreamManager) Process(ctx context.Context, stream <-chan *StreamChunk, handler StreamHandler) error
- func (m *StreamManager) Transform(ctx context.Context, input <-chan *StreamChunk, ...) <-chan *StreamChunk
- type StreamManagerConfig
- type StreamMode
- type StreamModeSelector
- type StreamMultiplexer
- type StreamRateLimiter
- type StreamStats
- type StreamWriter
- type StreamingLLMAgent
- type StreamingLLMAgentWithRealStreaming
- type StreamingLLMConfig
- type TeeMiddleware
- type TextAccumulatorConsumer
- type ThrottleMiddleware
- type TransformMiddleware
- type WebSocketBidirectionalStream
- type WebSocketStreamer
- func (w *WebSocketStreamer) Close() error
- func (w *WebSocketStreamer) IsClosed() bool
- func (w *WebSocketStreamer) ReadChunk() (*core.LegacyStreamChunk, error)
- func (w *WebSocketStreamer) WriteBinary(data []byte) error
- func (w *WebSocketStreamer) WriteChunk(chunk *core.LegacyStreamChunk) error
- func (w *WebSocketStreamer) WriteError(err error) error
- func (w *WebSocketStreamer) WriteProgress(progress float64, message string) error
- func (w *WebSocketStreamer) WriteText(text string) error
- type Writer
- func (w *Writer) Channel() <-chan *core.LegacyStreamChunk
- func (w *Writer) Close() error
- func (w *Writer) IsClosed() bool
- func (w *Writer) Stats() WriterStats
- func (w *Writer) Write(p []byte) (n int, err error)
- func (w *Writer) WriteBatch(chunks []*core.LegacyStreamChunk) error
- func (w *Writer) WriteChunk(chunk *core.LegacyStreamChunk) error
- func (w *Writer) WriteError(err error) error
- func (w *Writer) WriteProgress(progress float64, message string) error
- func (w *Writer) WriteStatus(status string) error
- func (w *Writer) WriteText(text string) error
- type WriterStats
Constants ¶
This section is empty.
Variables ¶
var WebSocketUpgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true }, }
WebSocketUpgrader WebSocket 升级器
Functions ¶
func FilterStream ¶
func FilterStream(input <-chan StreamEvent, filter *StreamFilter) <-chan StreamEvent
FilterStream creates a filtered stream
func GetChunk ¶ added in v0.6.0
func GetChunk() *core.LegacyStreamChunk
GetChunk 从对象池中获取一个 chunk 使用完毕后应调用 PutChunk 归还到池中
func MergeStreams ¶
func MergeStreams(streams ...<-chan StreamEvent) <-chan StreamEvent
MergeStreams merges multiple stream channels
func PutChunk ¶ added in v0.6.0
func PutChunk(chunk *core.LegacyStreamChunk)
PutChunk 将 chunk 归还到对象池 在归还前会重置 chunk 的状态,避免数据泄漏
func PutStreamChunk ¶ added in v0.6.0
func PutStreamChunk(chunk *StreamChunk)
PutStreamChunk 将 StreamChunk 归还到对象池
func SSEHandler ¶
func SSEHandler(handler func(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)) http.HandlerFunc
SSEHandler SSE HTTP 处理器
func StreamToChunkedTransfer ¶
func StreamToChunkedTransfer(ctx context.Context, w http.ResponseWriter, source core.StreamOutput) error
StreamToChunkedTransfer 将 StreamOutput 转换为 Chunked Transfer
func StreamToSSE ¶
func StreamToSSE(ctx context.Context, w http.ResponseWriter, source core.StreamOutput) error
StreamToSSE 将 StreamOutput 转换为 SSE 流
func StreamToWebSocket ¶
StreamToWebSocket 将 StreamOutput 转换为 WebSocket 流
func TransformStream ¶
func TransformStream(input <-chan StreamEvent, transform func(StreamEvent) StreamEvent) <-chan StreamEvent
TransformStream transforms stream events
func WebSocketStreamHandler ¶
func WebSocketStreamHandler(handler func(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)) http.HandlerFunc
WebSocketStreamHandler WebSocket 流处理器
Types ¶
type BatchMiddleware ¶
type BatchMiddleware struct {
// contains filtered or unexported fields
}
BatchMiddleware 批处理中间件
BatchMiddleware 将多个块聚合成批次: - 减少下游处理次数 - 提高吞吐量
func NewBatchMiddleware ¶
func NewBatchMiddleware(batchSize int, timeout time.Duration) *BatchMiddleware
NewBatchMiddleware 创建批处理中间件
func (*BatchMiddleware) Apply ¶
func (m *BatchMiddleware) Apply(ctx context.Context, source execution.StreamOutput) (execution.StreamOutput, error)
Apply 应用中间件
type BufferMiddleware ¶
type BufferMiddleware struct {
// contains filtered or unexported fields
}
BufferMiddleware 缓冲中间件
BufferMiddleware 控制流的缓冲行为: - 动态调整缓冲区大小 - 防止内存溢出 - 提供背压控制
func NewBufferMiddleware ¶
func NewBufferMiddleware(minSize, maxSize int, threshold float64) *BufferMiddleware
NewBufferMiddleware 创建缓冲中间件
func (*BufferMiddleware) Apply ¶
func (m *BufferMiddleware) Apply(ctx context.Context, source execution.StreamOutput) (execution.StreamOutput, error)
Apply 应用中间件
type ChunkedTransferStreamer ¶
type ChunkedTransferStreamer struct {
// contains filtered or unexported fields
}
ChunkedTransferStreamer HTTP Chunked Transfer Encoding 流支持
func NewChunkedTransferStreamer ¶
func NewChunkedTransferStreamer(w http.ResponseWriter) (*ChunkedTransferStreamer, error)
NewChunkedTransferStreamer 创建 Chunked Transfer 流
func (*ChunkedTransferStreamer) WriteChunk ¶
func (c *ChunkedTransferStreamer) WriteChunk(chunk *core.LegacyStreamChunk) error
WriteChunk 写入数据块
type DataPipelineAgent ¶
DataPipelineAgent 数据流处理 Agent
DataPipelineAgent 用于处理大数据集的渐进式处理: - 逐批次处理数据 - 避免一次性加载所有数据到内存 - 实时返回处理结果
func NewDataPipelineAgent ¶
func NewDataPipelineAgent(config *DataPipelineConfig) *DataPipelineAgent
NewDataPipelineAgent 创建数据管道 Agent
func (*DataPipelineAgent) Execute ¶
func (a *DataPipelineAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
Execute 同步执行
func (*DataPipelineAgent) ExecuteStream ¶
func (a *DataPipelineAgent) ExecuteStream(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)
ExecuteStream 流式执行
func (*DataPipelineAgent) ProcessWithTransform ¶
func (a *DataPipelineAgent) ProcessWithTransform( ctx context.Context, dataSource []interface{}, transform func(interface{}) (interface{}, error), ) (core.StreamOutput, error)
ProcessWithTransform 使用转换函数处理数据
func (*DataPipelineAgent) StreamFilter ¶
func (a *DataPipelineAgent) StreamFilter( ctx context.Context, source core.StreamOutput, filter func(*core.LegacyStreamChunk) bool, ) (core.StreamOutput, error)
StreamFilter 流式过滤器
func (*DataPipelineAgent) StreamMap ¶
func (a *DataPipelineAgent) StreamMap( ctx context.Context, source core.StreamOutput, mapper func(*core.LegacyStreamChunk) (*core.LegacyStreamChunk, error), ) (core.StreamOutput, error)
StreamMap 流式映射
func (*DataPipelineAgent) StreamReduce ¶
func (a *DataPipelineAgent) StreamReduce( ctx context.Context, source core.StreamOutput, initial interface{}, reducer func(accumulator, current interface{}) (interface{}, error), ) (interface{}, error)
StreamReduce 流式归约
type DataPipelineConfig ¶
type DataPipelineConfig struct {
BatchSize int // 批次大小
ProcessDelay time.Duration // 处理延迟
EnableProgress bool // 启用进度报告
ProgressInterval time.Duration // 进度更新间隔
MaxWorkers int // 最大工作协程数
}
DataPipelineConfig 数据管道配置
func DefaultDataPipelineConfig ¶
func DefaultDataPipelineConfig() *DataPipelineConfig
DefaultDataPipelineConfig 返回默认配置
type FilterMiddleware ¶
type FilterMiddleware struct {
// contains filtered or unexported fields
}
FilterMiddleware 过滤中间件
FilterMiddleware 过滤流中的数据块: - 只传递符合条件的块 - 跳过不需要的数据
func NewFilterMiddleware ¶
func NewFilterMiddleware(predicate func(*execution.LegacyStreamChunk) bool) *FilterMiddleware
NewFilterMiddleware 创建过滤中间件
func (*FilterMiddleware) Apply ¶
func (m *FilterMiddleware) Apply(ctx context.Context, source execution.StreamOutput) (execution.StreamOutput, error)
Apply 应用中间件
type FuncStreamHandler ¶
type FuncStreamHandler struct {
// contains filtered or unexported fields
}
FuncStreamHandler 函数式流处理器
func NewFuncStreamHandler ¶
func NewFuncStreamHandler( onChunk func(*StreamChunk) error, onComplete func() error, onError func(error) error, ) *FuncStreamHandler
NewFuncStreamHandler 创建函数式流处理器
func (*FuncStreamHandler) OnChunk ¶
func (h *FuncStreamHandler) OnChunk(chunk *StreamChunk) error
OnChunk 处理数据块
func (*FuncStreamHandler) OnComplete ¶
func (h *FuncStreamHandler) OnComplete() error
OnComplete 完成时调用
func (*FuncStreamHandler) OnError ¶
func (h *FuncStreamHandler) OnError(err error) error
OnError 错误时调用
type MultiModeStream ¶
type MultiModeStream struct {
// contains filtered or unexported fields
}
MultiModeStream handles different streaming modes
func NewMultiModeStream ¶
func NewMultiModeStream(ctx context.Context, config *StreamConfig) *MultiModeStream
NewMultiModeStream creates a new multi-mode stream
func (*MultiModeStream) GetWriter ¶
func (s *MultiModeStream) GetWriter(mode StreamMode) (*StreamWriter, error)
GetWriter returns a writer for a specific mode
func (*MultiModeStream) Stream ¶
func (s *MultiModeStream) Stream(mode StreamMode, event StreamEvent) error
Stream sends an event to the appropriate mode channel
func (*MultiModeStream) Subscribe ¶
func (s *MultiModeStream) Subscribe(mode StreamMode) (<-chan StreamEvent, error)
Subscribe returns a channel for receiving events of a specific mode
func (*MultiModeStream) SubscribeAll ¶
func (s *MultiModeStream) SubscribeAll() <-chan StreamEvent
SubscribeAll returns a merged channel for all configured modes
type Multiplexer ¶
type Multiplexer struct {
// contains filtered or unexported fields
}
Multiplexer 流多路复用器实现
Multiplexer 允许多个消费者同时读取同一个流: - 广播数据到所有消费者 - 独立的错误处理 - 背压管理
func NewMultiplexer ¶
func NewMultiplexer(ctx context.Context, opts *core.StreamOptions) *Multiplexer
NewMultiplexer 创建新的多路复用器
func (*Multiplexer) AddConsumer ¶
func (m *Multiplexer) AddConsumer(consumer core.StreamConsumer) (string, error)
AddConsumer 添加消费者
func (*Multiplexer) RemoveConsumer ¶
func (m *Multiplexer) RemoveConsumer(id string) error
RemoveConsumer 移除消费者
func (*Multiplexer) Start ¶
func (m *Multiplexer) Start(ctx context.Context, source core.StreamOutput) error
Start 开始多路复用
type MultiplexerStats ¶
MultiplexerStats 多路复用器统计信息
type PollingStreamer ¶
type PollingStreamer struct {
// contains filtered or unexported fields
}
PollingStreamer 轮询流支持
PollingStreamer 用于不支持持久连接的场景: - 客户端定期轮询 - 服务器缓存最新数据
func NewPollingStreamer ¶
func NewPollingStreamer(sessionID string, timeout time.Duration) *PollingStreamer
NewPollingStreamer 创建轮询流
func (*PollingStreamer) Poll ¶
func (p *PollingStreamer) Poll(lastIndex int) ([]*core.LegacyStreamChunk, int, error)
Poll 轮询新数据
func (*PollingStreamer) WriteChunk ¶
func (p *PollingStreamer) WriteChunk(chunk *core.LegacyStreamChunk) error
WriteChunk 写入数据块
type ProgressAgent ¶
ProgressAgent 带进度反馈的 Agent
ProgressAgent 用于长时间运行的任务: - 实时进度更新 - 阶段性状态报告 - ETA 预估
func NewProgressAgent ¶
func NewProgressAgent(config *ProgressConfig) *ProgressAgent
NewProgressAgent 创建进度 Agent
func (*ProgressAgent) Execute ¶
func (a *ProgressAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
Execute 同步执行
func (*ProgressAgent) ExecuteStream ¶
func (a *ProgressAgent) ExecuteStream(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)
ExecuteStream 流式执行
type ProgressConfig ¶
type ProgressConfig struct {
EnableProgress bool // 启用进度报告
ProgressInterval time.Duration // 进度更新间隔
EnableETA bool // 启用 ETA 计算
EnablePhases bool // 启用阶段报告
}
ProgressConfig 进度配置
func DefaultProgressConfig ¶
func DefaultProgressConfig() *ProgressConfig
DefaultProgressConfig 返回默认配置
type ProgressTracker ¶
type ProgressTracker struct {
// contains filtered or unexported fields
}
ProgressTracker 进度跟踪器
func NewProgressTracker ¶
func NewProgressTracker(total int64, writer *Writer, config *ProgressConfig) *ProgressTracker
NewProgressTracker 创建进度跟踪器
func (*ProgressTracker) Increment ¶
func (pt *ProgressTracker) Increment(delta int64) error
Increment 增加进度
func (*ProgressTracker) Report ¶
func (pt *ProgressTracker) Report(current int64) error
Report 报告当前进度
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader 流读取器实现
func NewReader ¶
func NewReader(ctx context.Context, ch <-chan *core.LegacyStreamChunk, opts *core.StreamOptions) *Reader
NewReader 创建新的流读取器
func (*Reader) Collect ¶
func (r *Reader) Collect() ([]*core.LegacyStreamChunk, error)
Collect 收集所有数据块 注意:此方法会将所有数据加载到内存,受 MaxCollectSize 限制防止 OOM 性能优化:根据 BufferSize 预分配切片容量,减少扩容和内存复制
func (*Reader) CollectText ¶
CollectText 收集所有文本数据 使用 strings.Builder 提高性能,并受 MaxCollectSize 限制防止 OOM
type ReaderStats ¶
type ReaderStats struct {
ChunksRead int64
BytesRead int64
ErrorCount int64
StartTime time.Time
LastReadTime time.Time
ElapsedTime time.Duration
}
ReaderStats 读取器统计信息
type RetryMiddleware ¶
type RetryMiddleware struct {
// contains filtered or unexported fields
}
RetryMiddleware 重试中间件
RetryMiddleware 在错误时重试: - 自动重试失败的操作 - 指数退避
func NewRetryMiddleware ¶
func NewRetryMiddleware(maxRetries int, backoff time.Duration) *RetryMiddleware
NewRetryMiddleware 创建重试中间件
func (*RetryMiddleware) Apply ¶
func (m *RetryMiddleware) Apply(ctx context.Context, source execution.StreamOutput) (execution.StreamOutput, error)
Apply 应用中间件
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
RingBuffer 环形缓冲区实现
RingBuffer 提供高效的固定大小缓冲: - O(1) 读写操作 - 自动覆盖旧数据 - 线程安全 - 零内存分配(预分配)
func (*RingBuffer) Push ¶
func (rb *RingBuffer) Push(chunk *core.LegacyStreamChunk) bool
Push 添加元素到缓冲区
func (*RingBuffer) ToSlice ¶
func (rb *RingBuffer) ToSlice() []*core.LegacyStreamChunk
ToSlice 转换为切片
type SSEStreamer ¶
type SSEStreamer struct {
// contains filtered or unexported fields
}
SSEStreamer Server-Sent Events 流支持
SSEStreamer 将流输出转换为 SSE 格式: - 单向服务器推送 - 自动重连 - 标准 HTTP 协议
func NewSSEStreamer ¶
func NewSSEStreamer(w http.ResponseWriter) (*SSEStreamer, error)
NewSSEStreamer 创建 SSE 流
func (*SSEStreamer) WriteChunk ¶
func (s *SSEStreamer) WriteChunk(chunk *core.LegacyStreamChunk) error
WriteChunk 写入数据块
func (*SSEStreamer) WriteProgress ¶
func (s *SSEStreamer) WriteProgress(progress float64, message string) error
WriteProgress 写入进度
type SimpleStreamConsumer ¶
type SimpleStreamConsumer struct {
OnChunkFunc func(*core.LegacyStreamChunk) error
OnStartFunc func() error
OnCompleteFunc func() error
OnErrorFunc func(error) error
}
SimpleStreamConsumer 简单的流消费者实现
func (*SimpleStreamConsumer) OnChunk ¶
func (c *SimpleStreamConsumer) OnChunk(chunk *core.LegacyStreamChunk) error
func (*SimpleStreamConsumer) OnComplete ¶
func (c *SimpleStreamConsumer) OnComplete() error
func (*SimpleStreamConsumer) OnError ¶
func (c *SimpleStreamConsumer) OnError(err error) error
func (*SimpleStreamConsumer) OnStart ¶
func (c *SimpleStreamConsumer) OnStart() error
type StreamAggregator ¶
type StreamAggregator struct {
// contains filtered or unexported fields
}
StreamAggregator aggregates multiple streams
func NewStreamAggregator ¶
func NewStreamAggregator() *StreamAggregator
NewStreamAggregator creates a new aggregator
func (*StreamAggregator) AddStream ¶
func (a *StreamAggregator) AddStream(stream *MultiModeStream)
AddStream adds a stream to the aggregator
func (*StreamAggregator) AggregateMode ¶
func (a *StreamAggregator) AggregateMode(mode StreamMode) <-chan StreamEvent
AggregateMode aggregates a specific mode from all streams
type StreamChunk ¶
type StreamChunk struct {
Data interface{}
Metadata map[string]interface{}
Timestamp time.Time
ChunkID int
Error error
Done bool
}
StreamChunk 流式数据块(通用版本)
func GetStreamChunk ¶ added in v0.6.0
func GetStreamChunk() *StreamChunk
GetStreamChunk 从对象池中获取一个 StreamChunk
type StreamConfig ¶
type StreamConfig struct {
// Modes specifies which stream modes to enable
Modes []StreamMode
// BufferSize for each stream channel
BufferSize int
// IncludeMetadata adds extra metadata to events
IncludeMetadata bool
// Callback for handling stream events (optional)
Callback func(mode StreamMode, event StreamEvent)
}
StreamConfig configures multi-mode streaming
func DefaultStreamConfig ¶
func DefaultStreamConfig() *StreamConfig
DefaultStreamConfig returns default configuration
type StreamEvent ¶
type StreamEvent struct {
Mode StreamMode `json:"mode"`
Type string `json:"type"`
Data interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
StreamEvent represents an event in the stream
type StreamFilter ¶
type StreamFilter struct {
Modes []StreamMode // Filter by modes
Types []string // Filter by event types
Predicate func(StreamEvent) bool // Custom filter function
}
StreamFilter filters stream events
func (*StreamFilter) Apply ¶
func (f *StreamFilter) Apply(event StreamEvent) bool
Apply applies the filter to an event
type StreamHandler ¶
type StreamHandler interface {
// OnChunk 处理数据块
OnChunk(chunk *StreamChunk) error
// OnComplete 完成时调用
OnComplete() error
// OnError 错误时调用
OnError(err error) error
}
StreamHandler 流式处理器接口
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager 流式管理器
提供流式数据处理的高级功能: - 缓冲管理 - 背压控制 - 错误处理 - 超时控制
func NewStreamManager ¶
func NewStreamManager(config StreamManagerConfig) *StreamManager
NewStreamManager 创建流式管理器
func (*StreamManager) Buffer ¶
func (m *StreamManager) Buffer(ctx context.Context, input <-chan *StreamChunk, size int) <-chan []*StreamChunk
Buffer 缓冲流式数据
func (*StreamManager) Collect ¶
func (m *StreamManager) Collect(ctx context.Context, stream <-chan *StreamChunk) ([]*StreamChunk, error)
Collect 收集所有流式数据
func (*StreamManager) Filter ¶
func (m *StreamManager) Filter(ctx context.Context, input <-chan *StreamChunk, predicate func(*StreamChunk) bool) <-chan *StreamChunk
Filter 过滤流式数据
func (*StreamManager) Merge ¶
func (m *StreamManager) Merge(ctx context.Context, streams ...<-chan *StreamChunk) <-chan *StreamChunk
Merge 合并多个流
func (*StreamManager) Process ¶
func (m *StreamManager) Process(ctx context.Context, stream <-chan *StreamChunk, handler StreamHandler) error
Process 处理流式数据
func (*StreamManager) Transform ¶
func (m *StreamManager) Transform(ctx context.Context, input <-chan *StreamChunk, transformer func(*StreamChunk) (*StreamChunk, error)) <-chan *StreamChunk
Transform 转换流式数据
type StreamManagerConfig ¶
StreamManagerConfig 流式管理器配置
type StreamMode ¶
type StreamMode string
StreamMode defines different streaming modes
const ( // StreamModeMessages streams LLM tokens as they are generated StreamModeMessages StreamMode = "messages" // StreamModeUpdates streams state updates after each step StreamModeUpdates StreamMode = "updates" // StreamModeCustom streams custom data from tools StreamModeCustom StreamMode = "custom" // StreamModeValues streams full state snapshots StreamModeValues StreamMode = "values" // StreamModeDebug streams detailed debug information StreamModeDebug StreamMode = "debug" )
type StreamModeSelector ¶
type StreamModeSelector struct {
RequiredModes []StreamMode
OptionalModes []StreamMode
FallbackMode StreamMode
}
StreamModeSelector helps select appropriate modes
func (*StreamModeSelector) SelectModes ¶
func (s *StreamModeSelector) SelectModes(available []StreamMode) []StreamMode
SelectModes returns the modes to use based on availability
type StreamMultiplexer ¶
type StreamMultiplexer struct {
// contains filtered or unexported fields
}
StreamMultiplexer 流式多路复用器
将一个流广播到多个消费者
func NewStreamMultiplexer ¶
func NewStreamMultiplexer(input <-chan *StreamChunk) *StreamMultiplexer
NewStreamMultiplexer 创建流式多路复用器
func (*StreamMultiplexer) AddConsumer ¶
func (m *StreamMultiplexer) AddConsumer(bufferSize int) <-chan *StreamChunk
AddConsumer 添加消费者
type StreamRateLimiter ¶
type StreamRateLimiter struct {
// contains filtered or unexported fields
}
StreamRateLimiter 流式速率限制器
func NewStreamRateLimiter ¶
func NewStreamRateLimiter(rate int) *StreamRateLimiter
NewStreamRateLimiter 创建流式速率限制器
func (*StreamRateLimiter) Limit ¶
func (l *StreamRateLimiter) Limit(ctx context.Context, input <-chan *StreamChunk) <-chan *StreamChunk
Limit 限制流速
type StreamStats ¶
type StreamStats struct {
ChunksProcessed int64
BytesProcessed int64
ErrorsCount int64
StartTime time.Time
EndTime time.Time
// contains filtered or unexported fields
}
StreamStats 流式统计信息
type StreamWriter ¶
type StreamWriter struct {
// contains filtered or unexported fields
}
StreamWriter provides a writer interface for a specific mode
func (*StreamWriter) Write ¶
func (w *StreamWriter) Write(eventType string, data interface{}) error
Write sends data to the stream
func (*StreamWriter) WriteWithMetadata ¶
func (w *StreamWriter) WriteWithMetadata(eventType string, data interface{}, metadata map[string]interface{}) error
WriteWithMetadata sends data with metadata
type StreamingLLMAgent ¶
StreamingLLMAgent LLM 流式响应 Agent
StreamingLLMAgent 支持 LLM 的流式输出: - 逐字返回生成的文本 - 实时显示思考过程 - 降低首字符延迟
func NewStreamingLLMAgent ¶
func NewStreamingLLMAgent(llmClient llm.Client, config *StreamingLLMConfig) *StreamingLLMAgent
NewStreamingLLMAgent 创建 LLM 流式 Agent
func (*StreamingLLMAgent) Execute ¶
func (a *StreamingLLMAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)
Execute 同步执行(兼容 Agent 接口)
func (*StreamingLLMAgent) ExecuteStream ¶
func (a *StreamingLLMAgent) ExecuteStream(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)
ExecuteStream 流式执行
type StreamingLLMAgentWithRealStreaming ¶
type StreamingLLMAgentWithRealStreaming struct {
*StreamingLLMAgent
}
StreamingLLMAgentWithRealStreaming 支持真实 LLM 流式 API 的 Agent
注意:这需要 LLM 客户端支持真实的流式 API(如 OpenAI streaming API)
func NewStreamingLLMAgentWithRealStreaming ¶
func NewStreamingLLMAgentWithRealStreaming(llmClient llm.Client, config *StreamingLLMConfig) *StreamingLLMAgentWithRealStreaming
NewStreamingLLMAgentWithRealStreaming 创建支持真实流式的 Agent
func (*StreamingLLMAgentWithRealStreaming) ExecuteStream ¶
func (a *StreamingLLMAgentWithRealStreaming) ExecuteStream(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)
ExecuteStream 使用真实的流式 API
type StreamingLLMConfig ¶
type StreamingLLMConfig struct {
// LLM 配置
Model string
Temperature float64
MaxTokens int
// 流式配置
ChunkSize int // 每次发送的字符数
ChunkDelay time.Duration // 块之间的延迟(模拟打字效果)
EnableProgress bool // 是否发送进度更新
ProgressInterval time.Duration // 进度更新间隔
// 错误处理
RetryOnError bool
MaxRetries int
}
StreamingLLMConfig LLM 流式配置
func DefaultStreamingLLMConfig ¶
func DefaultStreamingLLMConfig() *StreamingLLMConfig
DefaultStreamingLLMConfig 返回默认配置
type TeeMiddleware ¶
type TeeMiddleware struct {
// contains filtered or unexported fields
}
TeeMiddleware 分支中间件
TeeMiddleware 将流复制到多个目标: - 同时输出到多个消费者 - 不影响原始流 - 支持不同的处理速度
func NewTeeMiddleware ¶
func NewTeeMiddleware(outputs ...execution.StreamConsumer) *TeeMiddleware
NewTeeMiddleware 创建分支中间件
func (*TeeMiddleware) Apply ¶
func (m *TeeMiddleware) Apply(ctx context.Context, source execution.StreamOutput) (execution.StreamOutput, error)
Apply 应用中间件
type TextAccumulatorConsumer ¶
type TextAccumulatorConsumer struct {
// contains filtered or unexported fields
}
TextAccumulatorConsumer 累积文本的消费者
func NewTextAccumulatorConsumer ¶
func NewTextAccumulatorConsumer() *TextAccumulatorConsumer
NewTextAccumulatorConsumer 创建文本累积器,默认限制 100MB
func NewTextAccumulatorConsumerWithLimit ¶
func NewTextAccumulatorConsumerWithLimit(maxSize int64) *TextAccumulatorConsumer
NewTextAccumulatorConsumerWithLimit 创建带自定义大小限制的文本累积器
func (*TextAccumulatorConsumer) OnChunk ¶
func (c *TextAccumulatorConsumer) OnChunk(chunk *core.LegacyStreamChunk) error
func (*TextAccumulatorConsumer) OnComplete ¶
func (c *TextAccumulatorConsumer) OnComplete() error
func (*TextAccumulatorConsumer) OnError ¶
func (c *TextAccumulatorConsumer) OnError(err error) error
func (*TextAccumulatorConsumer) OnStart ¶
func (c *TextAccumulatorConsumer) OnStart() error
func (*TextAccumulatorConsumer) Text ¶
func (c *TextAccumulatorConsumer) Text() string
type ThrottleMiddleware ¶
type ThrottleMiddleware struct {
// contains filtered or unexported fields
}
ThrottleMiddleware 限流中间件
ThrottleMiddleware 控制流的速率: - 限制每秒的块数 - 平滑流量 - 防止下游过载
func NewThrottleMiddleware ¶
func NewThrottleMiddleware(maxChunksPerSec float64) *ThrottleMiddleware
NewThrottleMiddleware 创建限流中间件
func (*ThrottleMiddleware) Apply ¶
func (m *ThrottleMiddleware) Apply(ctx context.Context, source execution.StreamOutput) (execution.StreamOutput, error)
Apply 应用中间件
type TransformMiddleware ¶
type TransformMiddleware struct {
// contains filtered or unexported fields
}
TransformMiddleware 转换中间件
TransformMiddleware 转换流中的数据: - 修改数据格式 - 过滤数据 - 聚合数据
func NewTransformMiddleware ¶
func NewTransformMiddleware(fn core.ChunkTransformFunc) *TransformMiddleware
NewTransformMiddleware 创建转换中间件
func (*TransformMiddleware) Apply ¶
func (m *TransformMiddleware) Apply(ctx context.Context, source execution.StreamOutput) (execution.StreamOutput, error)
Apply 应用中间件
type WebSocketBidirectionalStream ¶
type WebSocketBidirectionalStream struct {
// contains filtered or unexported fields
}
WebSocketBidirectionalStream 双向 WebSocket 流
func NewWebSocketBidirectionalStreamWithContext ¶
func NewWebSocketBidirectionalStreamWithContext(parentCtx context.Context, conn *websocket.Conn) *WebSocketBidirectionalStream
NewWebSocketBidirectionalStreamWithContext 创建双向流,使用父上下文
func (*WebSocketBidirectionalStream) Close ¶
func (s *WebSocketBidirectionalStream) Close() error
Close 关闭流
func (*WebSocketBidirectionalStream) Input ¶
func (s *WebSocketBidirectionalStream) Input() <-chan *core.LegacyStreamChunk
Input 返回输入通道
func (*WebSocketBidirectionalStream) Output ¶
func (s *WebSocketBidirectionalStream) Output() chan<- *core.LegacyStreamChunk
Output 返回输出通道
type WebSocketStreamer ¶
type WebSocketStreamer struct {
// contains filtered or unexported fields
}
WebSocketStreamer WebSocket 流支持
WebSocketStreamer 提供双向流式通信: - 实时双向通信 - 低延迟 - 支持二进制数据
func NewWebSocketStreamer ¶
func NewWebSocketStreamer(conn *websocket.Conn) *WebSocketStreamer
NewWebSocketStreamer 创建 WebSocket 流
func (*WebSocketStreamer) ReadChunk ¶
func (w *WebSocketStreamer) ReadChunk() (*core.LegacyStreamChunk, error)
ReadChunk 读取数据块
func (*WebSocketStreamer) WriteBinary ¶
func (w *WebSocketStreamer) WriteBinary(data []byte) error
WriteBinary 写入二进制数据
func (*WebSocketStreamer) WriteChunk ¶
func (w *WebSocketStreamer) WriteChunk(chunk *core.LegacyStreamChunk) error
WriteChunk 写入数据块
func (*WebSocketStreamer) WriteError ¶
func (w *WebSocketStreamer) WriteError(err error) error
WriteError 写入错误
func (*WebSocketStreamer) WriteProgress ¶
func (w *WebSocketStreamer) WriteProgress(progress float64, message string) error
WriteProgress 写入进度
func (*WebSocketStreamer) WriteText ¶
func (w *WebSocketStreamer) WriteText(text string) error
WriteText 写入文本
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer 流写入器实现
func NewWriter ¶
func NewWriter(ctx context.Context, opts *core.StreamOptions) *Writer
NewWriter 创建新的流写入器
func (*Writer) Channel ¶
func (w *Writer) Channel() <-chan *core.LegacyStreamChunk
Channel 返回写入通道(供 Reader 使用)
func (*Writer) WriteBatch ¶
func (w *Writer) WriteBatch(chunks []*core.LegacyStreamChunk) error
WriteBatch 批量写入数据块
func (*Writer) WriteChunk ¶
func (w *Writer) WriteChunk(chunk *core.LegacyStreamChunk) error
Write 写入数据块
func (*Writer) WriteProgress ¶
WriteProgress 写入进度更新