Documentation
¶
Overview ¶
包 streaming 提供面向 LLM 流式输出场景的高性能数据传输原语, 包括零拷贝缓冲、背压流控、速率限制与流多路复用。
概述 ¶
在大语言模型的流式响应中,token 以高频增量方式到达,对缓冲效率和 流量控制提出了较高要求。本包围绕这两个核心问题提供一组可组合的构建块:
- 零拷贝缓冲:减少内存分配与数据复制开销。
- 背压流控:在生产者速度超过消费者时自动施加反压。
- 速率限制:基于令牌桶算法控制 token 消费速率。
- 流多路复用:将单一源流扇出到多个消费者。
核心接口 ¶
- ZeroCopyBuffer — 可增长的零拷贝读写缓冲,支持并发安全访问。
- RingBuffer — 无锁环形缓冲,适用于单生产者/单消费者场景。
- ChunkReader — 对连续字节切片进行零拷贝分块读取。
- StringView — 基于 unsafe 的零拷贝 []byte→string 视图。
- BackpressureStream — 带高/低水位线的背压流,支持 Block、DropOldest、 DropNewest、Error 四种丢弃策略。
- StreamMultiplexer — 将一个 BackpressureStream 扇出给多个消费者。
- RateLimiter — 令牌桶速率限制器,支持阻塞等待。
主要能力 ¶
- 零拷贝:BytesToString / StringToBytes 利用 unsafe 实现零分配转换。
- 背压控制:通过 HighWaterMark / LowWaterMark 自动暂停与恢复生产者。
- 可观测:BackpressureStream.Stats() 暴露 produced/consumed/dropped 等指标。
- 扇出:StreamMultiplexer 支持运行时动态添加消费者。
Index ¶
- Variables
- func BytesToString(b []byte) string
- func StringToBytes(s string) []byte
- type BackpressureConfig
- type BackpressureStream
- func (s *BackpressureStream) BufferLevel() float64
- func (s *BackpressureStream) Close() error
- func (s *BackpressureStream) IsPaused() bool
- func (s *BackpressureStream) Read(ctx context.Context) (Token, error)
- func (s *BackpressureStream) ReadChan() <-chan Token
- func (s *BackpressureStream) Stats() StreamStats
- func (s *BackpressureStream) Write(ctx context.Context, token Token) error
- type ChunkReader
- type DropPolicy
- type RateLimiter
- type RingBuffer
- type StreamMultiplexer
- type StreamStats
- type StringView
- type Token
- type ZeroCopyBuffer
Constants ¶
This section is empty.
Variables ¶
Functions ¶
Types ¶
type BackpressureConfig ¶
type BackpressureConfig struct {
BufferSize int `json:"buffer_size"`
HighWaterMark float64 `json:"high_water_mark"` // 0.0-1.0
LowWaterMark float64 `json:"low_water_mark"` // 0.0-1.0
SlowConsumerTTL time.Duration `json:"slow_consumer_ttl"`
DropPolicy DropPolicy `json:"drop_policy"`
}
BackpressureConfig 配置背压行为.
func DefaultBackpressureConfig ¶
func DefaultBackpressureConfig() BackpressureConfig
DefaultBackpressureConfig 返回优化的默认值.
type BackpressureStream ¶
type BackpressureStream struct {
// contains filtered or unexported fields
}
BackpressureStream 实现支持背压的流.
func NewBackpressureStream ¶
func NewBackpressureStream(config BackpressureConfig) *BackpressureStream
NewBackpressureStream 创建新的支持背压的流.
func (*BackpressureStream) BufferLevel ¶
func (s *BackpressureStream) BufferLevel() float64
BufferLevel 返回当前缓冲区利用率 (0.0-1.0).
func (*BackpressureStream) Close ¶
func (s *BackpressureStream) Close() error
Close 关闭流. 使用写锁(Lock)确保与 Write() 的 RLock 互斥, 防止在 Write 发送到 buffer channel 的同时关闭 channel 导致 panic。
func (*BackpressureStream) IsPaused ¶
func (s *BackpressureStream) IsPaused() bool
IsPaused 返回流是否因背压而暂停.
func (*BackpressureStream) Read ¶
func (s *BackpressureStream) Read(ctx context.Context) (Token, error)
Read 从流中读取 token.
func (*BackpressureStream) ReadChan ¶
func (s *BackpressureStream) ReadChan() <-chan Token
ReadChan 返回用于读取 token 的通道.
type ChunkReader ¶
type ChunkReader struct {
// contains filtered or unexported fields
}
ChunkReader 提供零拷贝块读取.
func NewChunkReader ¶
func NewChunkReader(data []byte, chunkSize int) *ChunkReader
NewChunkReader 创建了新的块读取器 。
type DropPolicy ¶
type DropPolicy int
DropPolicy 定义缓冲区满后的处理策略.
const ( DropPolicyBlock DropPolicy = iota // Block producer DropPolicyOldest // Drop oldest tokens DropPolicyNewest // Drop newest tokens DropPolicyError // Return error )
func (DropPolicy) String ¶
func (d DropPolicy) String() string
String returns the string representation of DropPolicy.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter 为流提供基于 token 的速率限制.
func NewRateLimiter ¶
func NewRateLimiter(tokensPerSec float64, burst int) *RateLimiter
NewRateLimiter 创建新的速率限制器.
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
RingBuffer提供无锁环缓冲来进行流. readIdx 和 writeIdx 使用 atomic 操作保证并发安全。
type StreamMultiplexer ¶
type StreamMultiplexer struct {
// contains filtered or unexported fields
}
StreamMultiplexer 将一个流扇出给多个消费者.
func NewStreamMultiplexer ¶
func NewStreamMultiplexer(source *BackpressureStream) *StreamMultiplexer
NewStreamMultiplexer 创建新的多路复用器.
func (*StreamMultiplexer) AddConsumer ¶
func (m *StreamMultiplexer) AddConsumer(config BackpressureConfig) *BackpressureStream
AddConsumer 添加一个消费流.
func (*StreamMultiplexer) Start ¶
func (m *StreamMultiplexer) Start(ctx context.Context)
Start 启动多路复用.
type StreamStats ¶
type StreamStats struct {
Produced int64 `json:"produced"`
Consumed int64 `json:"consumed"`
Dropped int64 `json:"dropped"`
Blocked int64 `json:"blocked"`
BufferSize int `json:"buffer_size"`
BufferCap int `json:"buffer_cap"`
IsPaused bool `json:"is_paused"`
LastWrite time.Time `json:"last_write"`
LastRead time.Time `json:"last_read"`
}
StreamStats 包含流统计数据.
type StringView ¶
type StringView struct {
// contains filtered or unexported fields
}
StringView提供字节的零复制字符串视图.
type Token ¶
type Token struct {
Content string `json:"content"`
Index int `json:"index"`
Timestamp time.Time `json:"timestamp"`
Final bool `json:"final"`
}
Token 表示流式传输的 token.
type ZeroCopyBuffer ¶
type ZeroCopyBuffer struct {
// contains filtered or unexported fields
}
ZeroCopyBuffer提供零拷贝缓冲操作.
func NewZeroCopyBuffer ¶
func NewZeroCopyBuffer(size int) *ZeroCopyBuffer
NewZero CopyBuffer创建了新的零拷贝缓冲器.
func (*ZeroCopyBuffer) BytesUnsafe ¶
func (b *ZeroCopyBuffer) BytesUnsafe() []byte
字节不安全返回字节没有锁(调用器必须确保安全).