stream

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var WebSocketUpgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

WebSocketUpgrader WebSocket 升级器

Functions

func FilterStream

func FilterStream(input <-chan StreamEvent, filter *StreamFilter) <-chan StreamEvent

FilterStream creates a filtered stream

func MergeStreams

func MergeStreams(streams ...<-chan StreamEvent) <-chan StreamEvent

MergeStreams merges multiple stream channels

func SSEHandler

func SSEHandler(handler func(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)) http.HandlerFunc

SSEHandler SSE HTTP 处理器

func StreamToChunkedTransfer

func StreamToChunkedTransfer(ctx context.Context, w http.ResponseWriter, source core.StreamOutput) error

StreamToChunkedTransfer 将 StreamOutput 转换为 Chunked Transfer

func StreamToSSE

func StreamToSSE(ctx context.Context, w http.ResponseWriter, source core.StreamOutput) error

StreamToSSE 将 StreamOutput 转换为 SSE 流

func StreamToWebSocket

func StreamToWebSocket(ctx context.Context, conn *websocket.Conn, source core.StreamOutput) error

StreamToWebSocket 将 StreamOutput 转换为 WebSocket 流

func TransformStream

func TransformStream(input <-chan StreamEvent, transform func(StreamEvent) StreamEvent) <-chan StreamEvent

TransformStream transforms stream events

func WebSocketStreamHandler

func WebSocketStreamHandler(handler func(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)) http.HandlerFunc

WebSocketStreamHandler WebSocket 流处理器

Types

type BatchMiddleware

type BatchMiddleware struct {
	// contains filtered or unexported fields
}

BatchMiddleware 批处理中间件

BatchMiddleware 将多个块聚合成批次: - 减少下游处理次数 - 提高吞吐量

func NewBatchMiddleware

func NewBatchMiddleware(batchSize int, timeout time.Duration) *BatchMiddleware

NewBatchMiddleware 创建批处理中间件

func (*BatchMiddleware) Apply

Apply 应用中间件

type BufferMiddleware

type BufferMiddleware struct {
	// contains filtered or unexported fields
}

BufferMiddleware 缓冲中间件

BufferMiddleware 控制流的缓冲行为: - 动态调整缓冲区大小 - 防止内存溢出 - 提供背压控制

func NewBufferMiddleware

func NewBufferMiddleware(minSize, maxSize int, threshold float64) *BufferMiddleware

NewBufferMiddleware 创建缓冲中间件

func (*BufferMiddleware) Apply

Apply 应用中间件

type ChunkedTransferStreamer

type ChunkedTransferStreamer struct {
	// contains filtered or unexported fields
}

ChunkedTransferStreamer HTTP Chunked Transfer Encoding 流支持

func NewChunkedTransferStreamer

func NewChunkedTransferStreamer(w http.ResponseWriter) (*ChunkedTransferStreamer, error)

NewChunkedTransferStreamer 创建 Chunked Transfer 流

func (*ChunkedTransferStreamer) Close

func (c *ChunkedTransferStreamer) Close() error

Close 关闭流

func (*ChunkedTransferStreamer) WriteChunk

func (c *ChunkedTransferStreamer) WriteChunk(chunk *core.LegacyStreamChunk) error

WriteChunk 写入数据块

type DataPipelineAgent

type DataPipelineAgent struct {
	*core.BaseAgent
	// contains filtered or unexported fields
}

DataPipelineAgent 数据流处理 Agent

DataPipelineAgent 用于处理大数据集的渐进式处理: - 逐批次处理数据 - 避免一次性加载所有数据到内存 - 实时返回处理结果

func NewDataPipelineAgent

func NewDataPipelineAgent(config *DataPipelineConfig) *DataPipelineAgent

NewDataPipelineAgent 创建数据管道 Agent

func (*DataPipelineAgent) Execute

func (a *DataPipelineAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)

Execute 同步执行

func (*DataPipelineAgent) ExecuteStream

func (a *DataPipelineAgent) ExecuteStream(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)

ExecuteStream 流式执行

func (*DataPipelineAgent) ProcessWithTransform

func (a *DataPipelineAgent) ProcessWithTransform(
	ctx context.Context,
	dataSource []interface{},
	transform func(interface{}) (interface{}, error),
) (core.StreamOutput, error)

ProcessWithTransform 使用转换函数处理数据

func (*DataPipelineAgent) StreamFilter

func (a *DataPipelineAgent) StreamFilter(
	ctx context.Context,
	source core.StreamOutput,
	filter func(*core.LegacyStreamChunk) bool,
) (core.StreamOutput, error)

StreamFilter 流式过滤器

func (*DataPipelineAgent) StreamMap

StreamMap 流式映射

func (*DataPipelineAgent) StreamReduce

func (a *DataPipelineAgent) StreamReduce(
	ctx context.Context,
	source core.StreamOutput,
	initial interface{},
	reducer func(accumulator, current interface{}) (interface{}, error),
) (interface{}, error)

StreamReduce 流式归约

type DataPipelineConfig

type DataPipelineConfig struct {
	BatchSize        int           // 批次大小
	ProcessDelay     time.Duration // 处理延迟
	EnableProgress   bool          // 启用进度报告
	ProgressInterval time.Duration // 进度更新间隔
	MaxWorkers       int           // 最大工作协程数
}

DataPipelineConfig 数据管道配置

func DefaultDataPipelineConfig

func DefaultDataPipelineConfig() *DataPipelineConfig

DefaultDataPipelineConfig 返回默认配置

type FilterMiddleware

type FilterMiddleware struct {
	// contains filtered or unexported fields
}

FilterMiddleware 过滤中间件

FilterMiddleware 过滤流中的数据块: - 只传递符合条件的块 - 跳过不需要的数据

func NewFilterMiddleware

func NewFilterMiddleware(predicate func(*execution.LegacyStreamChunk) bool) *FilterMiddleware

NewFilterMiddleware 创建过滤中间件

func (*FilterMiddleware) Apply

Apply 应用中间件

type FuncStreamHandler

type FuncStreamHandler struct {
	// contains filtered or unexported fields
}

FuncStreamHandler 函数式流处理器

func NewFuncStreamHandler

func NewFuncStreamHandler(
	onChunk func(*StreamChunk) error,
	onComplete func() error,
	onError func(error) error,
) *FuncStreamHandler

NewFuncStreamHandler 创建函数式流处理器

func (*FuncStreamHandler) OnChunk

func (h *FuncStreamHandler) OnChunk(chunk *StreamChunk) error

OnChunk 处理数据块

func (*FuncStreamHandler) OnComplete

func (h *FuncStreamHandler) OnComplete() error

OnComplete 完成时调用

func (*FuncStreamHandler) OnError

func (h *FuncStreamHandler) OnError(err error) error

OnError 错误时调用

type MultiModeStream

type MultiModeStream struct {
	// contains filtered or unexported fields
}

MultiModeStream handles different streaming modes

func NewMultiModeStream

func NewMultiModeStream(ctx context.Context, config *StreamConfig) *MultiModeStream

NewMultiModeStream creates a new multi-mode stream

func (*MultiModeStream) Close

func (s *MultiModeStream) Close() error

Close closes all streams

func (*MultiModeStream) GetWriter

func (s *MultiModeStream) GetWriter(mode StreamMode) (*StreamWriter, error)

GetWriter returns a writer for a specific mode

func (*MultiModeStream) Stream

func (s *MultiModeStream) Stream(mode StreamMode, event StreamEvent) error

Stream sends an event to the appropriate mode channel

func (*MultiModeStream) Subscribe

func (s *MultiModeStream) Subscribe(mode StreamMode) (<-chan StreamEvent, error)

Subscribe returns a channel for receiving events of a specific mode

func (*MultiModeStream) SubscribeAll

func (s *MultiModeStream) SubscribeAll() <-chan StreamEvent

SubscribeAll returns a merged channel for all configured modes

type Multiplexer

type Multiplexer struct {
	// contains filtered or unexported fields
}

Multiplexer 流多路复用器实现

Multiplexer 允许多个消费者同时读取同一个流: - 广播数据到所有消费者 - 独立的错误处理 - 背压管理

func NewMultiplexer

func NewMultiplexer(ctx context.Context, opts *core.StreamOptions) *Multiplexer

NewMultiplexer 创建新的多路复用器

func (*Multiplexer) AddConsumer

func (m *Multiplexer) AddConsumer(consumer core.StreamConsumer) (string, error)

AddConsumer 添加消费者

func (*Multiplexer) Close

func (m *Multiplexer) Close() error

Close 关闭多路复用器

func (*Multiplexer) Consumers

func (m *Multiplexer) Consumers() []string

Consumers 返回所有消费者 ID

func (*Multiplexer) RemoveConsumer

func (m *Multiplexer) RemoveConsumer(id string) error

RemoveConsumer 移除消费者

func (*Multiplexer) Start

func (m *Multiplexer) Start(ctx context.Context, source core.StreamOutput) error

Start 开始多路复用

func (*Multiplexer) Stats

func (m *Multiplexer) Stats() MultiplexerStats

Stats 获取多路复用器统计信息

type MultiplexerStats

type MultiplexerStats struct {
	ConsumerCount   int
	ActiveConsumers int
	TotalErrors     int
	Running         bool
}

MultiplexerStats 多路复用器统计信息

type PollingStreamer

type PollingStreamer struct {
	// contains filtered or unexported fields
}

PollingStreamer 轮询流支持

PollingStreamer 用于不支持持久连接的场景: - 客户端定期轮询 - 服务器缓存最新数据

func NewPollingStreamer

func NewPollingStreamer(sessionID string, timeout time.Duration) *PollingStreamer

NewPollingStreamer 创建轮询流

func (*PollingStreamer) Poll

func (p *PollingStreamer) Poll(lastIndex int) ([]*core.LegacyStreamChunk, int, error)

Poll 轮询新数据

func (*PollingStreamer) SessionID

func (p *PollingStreamer) SessionID() string

SessionID 返回会话 ID

func (*PollingStreamer) WriteChunk

func (p *PollingStreamer) WriteChunk(chunk *core.LegacyStreamChunk) error

WriteChunk 写入数据块

type ProgressAgent

type ProgressAgent struct {
	*core.BaseAgent
	// contains filtered or unexported fields
}

ProgressAgent 带进度反馈的 Agent

ProgressAgent 用于长时间运行的任务: - 实时进度更新 - 阶段性状态报告 - ETA 预估

func NewProgressAgent

func NewProgressAgent(config *ProgressConfig) *ProgressAgent

NewProgressAgent 创建进度 Agent

func (*ProgressAgent) Execute

func (a *ProgressAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)

Execute 同步执行

func (*ProgressAgent) ExecuteStream

func (a *ProgressAgent) ExecuteStream(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)

ExecuteStream 流式执行

type ProgressConfig

type ProgressConfig struct {
	EnableProgress   bool          // 启用进度报告
	ProgressInterval time.Duration // 进度更新间隔
	EnableETA        bool          // 启用 ETA 计算
	EnablePhases     bool          // 启用阶段报告
}

ProgressConfig 进度配置

func DefaultProgressConfig

func DefaultProgressConfig() *ProgressConfig

DefaultProgressConfig 返回默认配置

type ProgressTracker

type ProgressTracker struct {
	// contains filtered or unexported fields
}

ProgressTracker 进度跟踪器

func NewProgressTracker

func NewProgressTracker(total int64, writer *Writer, config *ProgressConfig) *ProgressTracker

NewProgressTracker 创建进度跟踪器

func (*ProgressTracker) Complete

func (pt *ProgressTracker) Complete() error

Complete 标记完成

func (*ProgressTracker) Current

func (pt *ProgressTracker) Current() int64

Current 返回当前进度

func (*ProgressTracker) ETA

func (pt *ProgressTracker) ETA() time.Duration

ETA 返回预计剩余时间

func (*ProgressTracker) Increment

func (pt *ProgressTracker) Increment(delta int64) error

Increment 增加进度

func (*ProgressTracker) Progress

func (pt *ProgressTracker) Progress() float64

Progress 返回进度百分比

func (*ProgressTracker) Report

func (pt *ProgressTracker) Report(current int64) error

Report 报告当前进度

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader 流读取器实现

func NewReader

func NewReader(ctx context.Context, ch <-chan *core.LegacyStreamChunk, opts *core.StreamOptions) *Reader

NewReader 创建新的流读取器

func (*Reader) Cancel

func (r *Reader) Cancel() error

Cancel 取消流

func (*Reader) Close

func (r *Reader) Close() error

Close 关闭读取器

func (*Reader) Collect

func (r *Reader) Collect() ([]*core.LegacyStreamChunk, error)

Collect 收集所有数据块 注意:此方法会将所有数据加载到内存,受 MaxCollectSize 限制防止 OOM

func (*Reader) CollectText

func (r *Reader) CollectText() (string, error)

CollectText 收集所有文本数据 使用 strings.Builder 提高性能,并受 MaxCollectSize 限制防止 OOM

func (*Reader) Context

func (r *Reader) Context() context.Context

Context 返回读取器的上下文

func (*Reader) Drain

func (r *Reader) Drain() error

Drain 耗尽所有剩余数据

func (*Reader) IsClosed

func (r *Reader) IsClosed() bool

IsClosed 检查读取器是否已关闭

func (*Reader) IsPaused

func (r *Reader) IsPaused() bool

IsPaused 检查流是否暂停

func (*Reader) IsRunning

func (r *Reader) IsRunning() bool

IsRunning 检查流是否运行中

func (*Reader) Next

func (r *Reader) Next() (*core.LegacyStreamChunk, error)

Next 读取下一个数据块

func (*Reader) Pause

func (r *Reader) Pause() error

Pause 暂停流

func (*Reader) Resume

func (r *Reader) Resume() error

Resume 恢复流

func (*Reader) Stats

func (r *Reader) Stats() ReaderStats

Stats 获取统计信息

func (*Reader) Status

func (r *Reader) Status() *core.StreamStatus

Status 获取流状态

type ReaderStats

type ReaderStats struct {
	ChunksRead   int64
	BytesRead    int64
	ErrorCount   int64
	StartTime    time.Time
	LastReadTime time.Time
	ElapsedTime  time.Duration
}

ReaderStats 读取器统计信息

type RetryMiddleware

type RetryMiddleware struct {
	// contains filtered or unexported fields
}

RetryMiddleware 重试中间件

RetryMiddleware 在错误时重试: - 自动重试失败的操作 - 指数退避

func NewRetryMiddleware

func NewRetryMiddleware(maxRetries int, backoff time.Duration) *RetryMiddleware

NewRetryMiddleware 创建重试中间件

func (*RetryMiddleware) Apply

Apply 应用中间件

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer 环形缓冲区实现

RingBuffer 提供高效的固定大小缓冲: - O(1) 读写操作 - 自动覆盖旧数据 - 线程安全 - 零内存分配(预分配)

func NewRingBuffer

func NewRingBuffer(size int) *RingBuffer

NewRingBuffer 创建新的环形缓冲区

func (*RingBuffer) Clear

func (rb *RingBuffer) Clear()

Clear 清空缓冲区

func (*RingBuffer) Count

func (rb *RingBuffer) Count() int

Count 返回当前元素数量

func (*RingBuffer) IsEmpty

func (rb *RingBuffer) IsEmpty() bool

IsEmpty 检查缓冲区是否为空

func (*RingBuffer) IsFull

func (rb *RingBuffer) IsFull() bool

IsFull 检查缓冲区是否已满

func (*RingBuffer) Peek

func (rb *RingBuffer) Peek() *core.LegacyStreamChunk

Peek 查看第一个元素但不移除

func (*RingBuffer) Pop

func (rb *RingBuffer) Pop() *core.LegacyStreamChunk

Pop 从缓冲区弹出元素

func (*RingBuffer) Push

func (rb *RingBuffer) Push(chunk *core.LegacyStreamChunk) bool

Push 添加元素到缓冲区

func (*RingBuffer) Resize

func (rb *RingBuffer) Resize(newSize int)

Resize 调整缓冲区大小

func (*RingBuffer) Size

func (rb *RingBuffer) Size() int

Size 返回缓冲区大小

func (*RingBuffer) ToSlice

func (rb *RingBuffer) ToSlice() []*core.LegacyStreamChunk

ToSlice 转换为切片

func (*RingBuffer) Usage

func (rb *RingBuffer) Usage() float64

Usage 返回缓冲区使用率 (0-1)

type SSEStreamer

type SSEStreamer struct {
	// contains filtered or unexported fields
}

SSEStreamer Server-Sent Events 流支持

SSEStreamer 将流输出转换为 SSE 格式: - 单向服务器推送 - 自动重连 - 标准 HTTP 协议

func NewSSEStreamer

func NewSSEStreamer(w http.ResponseWriter) (*SSEStreamer, error)

NewSSEStreamer 创建 SSE 流

func (*SSEStreamer) Close

func (s *SSEStreamer) Close() error

Close 关闭流

func (*SSEStreamer) WriteChunk

func (s *SSEStreamer) WriteChunk(chunk *core.LegacyStreamChunk) error

WriteChunk 写入数据块

func (*SSEStreamer) WriteError

func (s *SSEStreamer) WriteError(err error) error

WriteError 写入错误

func (*SSEStreamer) WriteProgress

func (s *SSEStreamer) WriteProgress(progress float64, message string) error

WriteProgress 写入进度

func (*SSEStreamer) WriteText

func (s *SSEStreamer) WriteText(text string) error

WriteText 写入文本

type SimpleStreamConsumer

type SimpleStreamConsumer struct {
	OnChunkFunc    func(*core.LegacyStreamChunk) error
	OnStartFunc    func() error
	OnCompleteFunc func() error
	OnErrorFunc    func(error) error
}

SimpleStreamConsumer 简单的流消费者实现

func (*SimpleStreamConsumer) OnChunk

func (c *SimpleStreamConsumer) OnChunk(chunk *core.LegacyStreamChunk) error

func (*SimpleStreamConsumer) OnComplete

func (c *SimpleStreamConsumer) OnComplete() error

func (*SimpleStreamConsumer) OnError

func (c *SimpleStreamConsumer) OnError(err error) error

func (*SimpleStreamConsumer) OnStart

func (c *SimpleStreamConsumer) OnStart() error

type StreamAggregator

type StreamAggregator struct {
	// contains filtered or unexported fields
}

StreamAggregator aggregates multiple streams

func NewStreamAggregator

func NewStreamAggregator() *StreamAggregator

NewStreamAggregator creates a new aggregator

func (*StreamAggregator) AddStream

func (a *StreamAggregator) AddStream(stream *MultiModeStream)

AddStream adds a stream to the aggregator

func (*StreamAggregator) AggregateMode

func (a *StreamAggregator) AggregateMode(mode StreamMode) <-chan StreamEvent

AggregateMode aggregates a specific mode from all streams

type StreamChunk

type StreamChunk struct {
	Data      interface{}
	Metadata  map[string]interface{}
	Timestamp time.Time
	ChunkID   int
	Error     error
	Done      bool
}

StreamChunk 流式数据块(通用版本)

func NewStreamChunk

func NewStreamChunk(data interface{}) *StreamChunk

NewStreamChunk 创建流式数据块

type StreamConfig

type StreamConfig struct {
	// Modes specifies which stream modes to enable
	Modes []StreamMode

	// BufferSize for each stream channel
	BufferSize int

	// IncludeMetadata adds extra metadata to events
	IncludeMetadata bool

	// Callback for handling stream events (optional)
	Callback func(mode StreamMode, event StreamEvent)
}

StreamConfig configures multi-mode streaming

func DefaultStreamConfig

func DefaultStreamConfig() *StreamConfig

DefaultStreamConfig returns default configuration

type StreamEvent

type StreamEvent struct {
	Mode      StreamMode             `json:"mode"`
	Type      string                 `json:"type"`
	Data      interface{}            `json:"data"`
	Timestamp time.Time              `json:"timestamp"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
}

StreamEvent represents an event in the stream

type StreamFilter

type StreamFilter struct {
	Modes     []StreamMode           // Filter by modes
	Types     []string               // Filter by event types
	Predicate func(StreamEvent) bool // Custom filter function
}

StreamFilter filters stream events

func (*StreamFilter) Apply

func (f *StreamFilter) Apply(event StreamEvent) bool

Apply applies the filter to an event

type StreamHandler

type StreamHandler interface {
	// OnChunk 处理数据块
	OnChunk(chunk *StreamChunk) error

	// OnComplete 完成时调用
	OnComplete() error

	// OnError 错误时调用
	OnError(err error) error
}

StreamHandler 流式处理器接口

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager 流式管理器

提供流式数据处理的高级功能: - 缓冲管理 - 背压控制 - 错误处理 - 超时控制

func NewStreamManager

func NewStreamManager(config StreamManagerConfig) *StreamManager

NewStreamManager 创建流式管理器

func (*StreamManager) Buffer

func (m *StreamManager) Buffer(ctx context.Context, input <-chan *StreamChunk, size int) <-chan []*StreamChunk

Buffer 缓冲流式数据

func (*StreamManager) Collect

func (m *StreamManager) Collect(ctx context.Context, stream <-chan *StreamChunk) ([]*StreamChunk, error)

Collect 收集所有流式数据

func (*StreamManager) Filter

func (m *StreamManager) Filter(ctx context.Context, input <-chan *StreamChunk, predicate func(*StreamChunk) bool) <-chan *StreamChunk

Filter 过滤流式数据

func (*StreamManager) Merge

func (m *StreamManager) Merge(ctx context.Context, streams ...<-chan *StreamChunk) <-chan *StreamChunk

Merge 合并多个流

func (*StreamManager) Process

func (m *StreamManager) Process(ctx context.Context, stream <-chan *StreamChunk, handler StreamHandler) error

Process 处理流式数据

func (*StreamManager) Transform

func (m *StreamManager) Transform(ctx context.Context, input <-chan *StreamChunk, transformer func(*StreamChunk) (*StreamChunk, error)) <-chan *StreamChunk

Transform 转换流式数据

type StreamManagerConfig

type StreamManagerConfig struct {
	BufferSize int
	Timeout    time.Duration
}

StreamManagerConfig 流式管理器配置

type StreamMode

type StreamMode string

StreamMode defines different streaming modes

const (
	// StreamModeMessages streams LLM tokens as they are generated
	StreamModeMessages StreamMode = "messages"

	// StreamModeUpdates streams state updates after each step
	StreamModeUpdates StreamMode = "updates"

	// StreamModeCustom streams custom data from tools
	StreamModeCustom StreamMode = "custom"

	// StreamModeValues streams full state snapshots
	StreamModeValues StreamMode = "values"

	// StreamModeDebug streams detailed debug information
	StreamModeDebug StreamMode = "debug"
)

type StreamModeSelector

type StreamModeSelector struct {
	RequiredModes []StreamMode
	OptionalModes []StreamMode
	FallbackMode  StreamMode
}

StreamModeSelector helps select appropriate modes

func (*StreamModeSelector) SelectModes

func (s *StreamModeSelector) SelectModes(available []StreamMode) []StreamMode

SelectModes returns the modes to use based on availability

type StreamMultiplexer

type StreamMultiplexer struct {
	// contains filtered or unexported fields
}

StreamMultiplexer 流式多路复用器

将一个流广播到多个消费者

func NewStreamMultiplexer

func NewStreamMultiplexer(input <-chan *StreamChunk) *StreamMultiplexer

NewStreamMultiplexer 创建流式多路复用器

func (*StreamMultiplexer) AddConsumer

func (m *StreamMultiplexer) AddConsumer(bufferSize int) <-chan *StreamChunk

AddConsumer 添加消费者

func (*StreamMultiplexer) Start

func (m *StreamMultiplexer) Start(ctx context.Context) error

Start 开始多路复用

type StreamRateLimiter

type StreamRateLimiter struct {
	// contains filtered or unexported fields
}

StreamRateLimiter 流式速率限制器

func NewStreamRateLimiter

func NewStreamRateLimiter(rate int) *StreamRateLimiter

NewStreamRateLimiter 创建流式速率限制器

func (*StreamRateLimiter) Limit

func (l *StreamRateLimiter) Limit(ctx context.Context, input <-chan *StreamChunk) <-chan *StreamChunk

Limit 限制流速

type StreamStats

type StreamStats struct {
	ChunksProcessed int64
	BytesProcessed  int64
	ErrorsCount     int64
	StartTime       time.Time
	EndTime         time.Time
	// contains filtered or unexported fields
}

StreamStats 流式统计信息

func NewStreamStats

func NewStreamStats() *StreamStats

NewStreamStats 创建流式统计

func (*StreamStats) Complete

func (s *StreamStats) Complete()

Complete 标记完成

func (*StreamStats) Duration

func (s *StreamStats) Duration() time.Duration

Duration 计算持续时间

func (*StreamStats) RecordChunk

func (s *StreamStats) RecordChunk(size int64)

RecordChunk 记录数据块

func (*StreamStats) RecordError

func (s *StreamStats) RecordError()

RecordError 记录错误

func (*StreamStats) String

func (s *StreamStats) String() string

String 返回统计信息字符串

func (*StreamStats) Throughput

func (s *StreamStats) Throughput() float64

Throughput 计算吞吐量(块/秒)

type StreamWriter

type StreamWriter struct {
	// contains filtered or unexported fields
}

StreamWriter provides a writer interface for a specific mode

func (*StreamWriter) Write

func (w *StreamWriter) Write(eventType string, data interface{}) error

Write sends data to the stream

func (*StreamWriter) WriteWithMetadata

func (w *StreamWriter) WriteWithMetadata(eventType string, data interface{}, metadata map[string]interface{}) error

WriteWithMetadata sends data with metadata

type StreamingLLMAgent

type StreamingLLMAgent struct {
	*core.BaseAgent
	// contains filtered or unexported fields
}

StreamingLLMAgent LLM 流式响应 Agent

StreamingLLMAgent 支持 LLM 的流式输出: - 逐字返回生成的文本 - 实时显示思考过程 - 降低首字符延迟

func NewStreamingLLMAgent

func NewStreamingLLMAgent(llmClient llm.Client, config *StreamingLLMConfig) *StreamingLLMAgent

NewStreamingLLMAgent 创建 LLM 流式 Agent

func (*StreamingLLMAgent) Execute

func (a *StreamingLLMAgent) Execute(ctx context.Context, input *core.AgentInput) (*core.AgentOutput, error)

Execute 同步执行(兼容 Agent 接口)

func (*StreamingLLMAgent) ExecuteStream

func (a *StreamingLLMAgent) ExecuteStream(ctx context.Context, input *core.AgentInput) (core.StreamOutput, error)

ExecuteStream 流式执行

type StreamingLLMAgentWithRealStreaming

type StreamingLLMAgentWithRealStreaming struct {
	*StreamingLLMAgent
}

StreamingLLMAgentWithRealStreaming 支持真实 LLM 流式 API 的 Agent

注意:这需要 LLM 客户端支持真实的流式 API(如 OpenAI streaming API)

func NewStreamingLLMAgentWithRealStreaming

func NewStreamingLLMAgentWithRealStreaming(llmClient llm.Client, config *StreamingLLMConfig) *StreamingLLMAgentWithRealStreaming

NewStreamingLLMAgentWithRealStreaming 创建支持真实流式的 Agent

func (*StreamingLLMAgentWithRealStreaming) ExecuteStream

ExecuteStream 使用真实的流式 API

type StreamingLLMConfig

type StreamingLLMConfig struct {
	// LLM 配置
	Model       string
	Temperature float64
	MaxTokens   int

	// 流式配置
	ChunkSize        int           // 每次发送的字符数
	ChunkDelay       time.Duration // 块之间的延迟(模拟打字效果)
	EnableProgress   bool          // 是否发送进度更新
	ProgressInterval time.Duration // 进度更新间隔

	// 错误处理
	RetryOnError bool
	MaxRetries   int
}

StreamingLLMConfig LLM 流式配置

func DefaultStreamingLLMConfig

func DefaultStreamingLLMConfig() *StreamingLLMConfig

DefaultStreamingLLMConfig 返回默认配置

type TeeMiddleware

type TeeMiddleware struct {
	// contains filtered or unexported fields
}

TeeMiddleware 分支中间件

TeeMiddleware 将流复制到多个目标: - 同时输出到多个消费者 - 不影响原始流 - 支持不同的处理速度

func NewTeeMiddleware

func NewTeeMiddleware(outputs ...execution.StreamConsumer) *TeeMiddleware

NewTeeMiddleware 创建分支中间件

func (*TeeMiddleware) Apply

Apply 应用中间件

type TextAccumulatorConsumer

type TextAccumulatorConsumer struct {
	// contains filtered or unexported fields
}

TextAccumulatorConsumer 累积文本的消费者

func NewTextAccumulatorConsumer

func NewTextAccumulatorConsumer() *TextAccumulatorConsumer

NewTextAccumulatorConsumer 创建文本累积器,默认限制 100MB

func NewTextAccumulatorConsumerWithLimit

func NewTextAccumulatorConsumerWithLimit(maxSize int64) *TextAccumulatorConsumer

NewTextAccumulatorConsumerWithLimit 创建带自定义大小限制的文本累积器

func (*TextAccumulatorConsumer) OnChunk

func (*TextAccumulatorConsumer) OnComplete

func (c *TextAccumulatorConsumer) OnComplete() error

func (*TextAccumulatorConsumer) OnError

func (c *TextAccumulatorConsumer) OnError(err error) error

func (*TextAccumulatorConsumer) OnStart

func (c *TextAccumulatorConsumer) OnStart() error

func (*TextAccumulatorConsumer) Text

func (c *TextAccumulatorConsumer) Text() string

type ThrottleMiddleware

type ThrottleMiddleware struct {
	// contains filtered or unexported fields
}

ThrottleMiddleware 限流中间件

ThrottleMiddleware 控制流的速率: - 限制每秒的块数 - 平滑流量 - 防止下游过载

func NewThrottleMiddleware

func NewThrottleMiddleware(maxChunksPerSec float64) *ThrottleMiddleware

NewThrottleMiddleware 创建限流中间件

func (*ThrottleMiddleware) Apply

Apply 应用中间件

type TransformMiddleware

type TransformMiddleware struct {
	// contains filtered or unexported fields
}

TransformMiddleware 转换中间件

TransformMiddleware 转换流中的数据: - 修改数据格式 - 过滤数据 - 聚合数据

func NewTransformMiddleware

func NewTransformMiddleware(fn core.ChunkTransformFunc) *TransformMiddleware

NewTransformMiddleware 创建转换中间件

func (*TransformMiddleware) Apply

Apply 应用中间件

type WebSocketBidirectionalStream

type WebSocketBidirectionalStream struct {
	// contains filtered or unexported fields
}

WebSocketBidirectionalStream 双向 WebSocket 流

func NewWebSocketBidirectionalStreamWithContext

func NewWebSocketBidirectionalStreamWithContext(parentCtx context.Context, conn *websocket.Conn) *WebSocketBidirectionalStream

NewWebSocketBidirectionalStreamWithContext 创建双向流,使用父上下文

func (*WebSocketBidirectionalStream) Close

Close 关闭流

func (*WebSocketBidirectionalStream) Input

Input 返回输入通道

func (*WebSocketBidirectionalStream) Output

Output 返回输出通道

type WebSocketStreamer

type WebSocketStreamer struct {
	// contains filtered or unexported fields
}

WebSocketStreamer WebSocket 流支持

WebSocketStreamer 提供双向流式通信: - 实时双向通信 - 低延迟 - 支持二进制数据

func NewWebSocketStreamer

func NewWebSocketStreamer(conn *websocket.Conn) *WebSocketStreamer

NewWebSocketStreamer 创建 WebSocket 流

func (*WebSocketStreamer) Close

func (w *WebSocketStreamer) Close() error

Close 关闭连接

func (*WebSocketStreamer) IsClosed

func (w *WebSocketStreamer) IsClosed() bool

IsClosed 检查是否已关闭

func (*WebSocketStreamer) ReadChunk

func (w *WebSocketStreamer) ReadChunk() (*core.LegacyStreamChunk, error)

ReadChunk 读取数据块

func (*WebSocketStreamer) WriteBinary

func (w *WebSocketStreamer) WriteBinary(data []byte) error

WriteBinary 写入二进制数据

func (*WebSocketStreamer) WriteChunk

func (w *WebSocketStreamer) WriteChunk(chunk *core.LegacyStreamChunk) error

WriteChunk 写入数据块

func (*WebSocketStreamer) WriteError

func (w *WebSocketStreamer) WriteError(err error) error

WriteError 写入错误

func (*WebSocketStreamer) WriteProgress

func (w *WebSocketStreamer) WriteProgress(progress float64, message string) error

WriteProgress 写入进度

func (*WebSocketStreamer) WriteText

func (w *WebSocketStreamer) WriteText(text string) error

WriteText 写入文本

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer 流写入器实现

func NewWriter

func NewWriter(ctx context.Context, opts *core.StreamOptions) *Writer

NewWriter 创建新的流写入器

func (*Writer) Channel

func (w *Writer) Channel() <-chan *core.LegacyStreamChunk

Channel 返回写入通道(供 Reader 使用)

func (*Writer) Close

func (w *Writer) Close() error

Close 关闭写入器

func (*Writer) IsClosed

func (w *Writer) IsClosed() bool

IsClosed 检查写入器是否已关闭

func (*Writer) Stats

func (w *Writer) Stats() WriterStats

Stats 获取统计信息

func (*Writer) Write

func (w *Writer) Write(p []byte) (n int, err error)

Write 实现 io.Writer 接口

func (*Writer) WriteBatch

func (w *Writer) WriteBatch(chunks []*core.LegacyStreamChunk) error

WriteBatch 批量写入数据块

func (*Writer) WriteChunk

func (w *Writer) WriteChunk(chunk *core.LegacyStreamChunk) error

Write 写入数据块

func (*Writer) WriteError

func (w *Writer) WriteError(err error) error

WriteError 写入错误信息

func (*Writer) WriteProgress

func (w *Writer) WriteProgress(progress float64, message string) error

WriteProgress 写入进度更新

func (*Writer) WriteStatus

func (w *Writer) WriteStatus(status string) error

WriteStatus 写入状态更新

func (*Writer) WriteText

func (w *Writer) WriteText(text string) error

WriteText 写入文本数据

type WriterStats

type WriterStats struct {
	ChunksWritten int64
	BytesWritten  int64
	ErrorCount    int64
	StartTime     time.Time
	LastWriteTime time.Time
}

WriterStats 写入器统计信息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL