streaming

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

包 streaming 提供面向 LLM 流式输出场景的高性能数据传输原语, 包括零拷贝缓冲、背压流控、速率限制与流多路复用。

概述

在大语言模型的流式响应中,token 以高频增量方式到达,对缓冲效率和 流量控制提出了较高要求。本包围绕这两个核心问题提供一组可组合的构建块:

  • 零拷贝缓冲:减少内存分配与数据复制开销。
  • 背压流控:在生产者速度超过消费者时自动施加反压。
  • 速率限制:基于令牌桶算法控制 token 消费速率。
  • 流多路复用:将单一源流扇出到多个消费者。

核心接口

  • ZeroCopyBuffer — 可增长的零拷贝读写缓冲,支持并发安全访问。
  • RingBuffer — 无锁环形缓冲,适用于单生产者/单消费者场景。
  • ChunkReader — 对连续字节切片进行零拷贝分块读取。
  • StringView — 基于 unsafe 的零拷贝 []byte→string 视图。
  • BackpressureStream — 带高/低水位线的背压流,支持 Block、DropOldest、 DropNewest、Error 四种丢弃策略。
  • StreamMultiplexer — 将一个 BackpressureStream 扇出给多个消费者。
  • RateLimiter — 令牌桶速率限制器,支持阻塞等待。

主要能力

  • 零拷贝:BytesToString / StringToBytes 利用 unsafe 实现零分配转换。
  • 背压控制:通过 HighWaterMark / LowWaterMark 自动暂停与恢复生产者。
  • 可观测:BackpressureStream.Stats() 暴露 produced/consumed/dropped 等指标。
  • 扇出:StreamMultiplexer 支持运行时动态添加消费者。

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBufferFull   = errors.New("buffer full, backpressure applied")
	ErrStreamClosed = errors.New("stream closed")
	ErrSlowConsumer = errors.New("consumer too slow")
)

Functions

func BytesToString

func BytesToString(b []byte) string

BytesToString 不复制便将字节转换为字符串.

func StringToBytes

func StringToBytes(s string) []byte

StringToBytes 不复制就将字符串转换为字节.

Types

type BackpressureConfig

type BackpressureConfig struct {
	BufferSize      int           `json:"buffer_size"`
	HighWaterMark   float64       `json:"high_water_mark"` // 0.0-1.0
	LowWaterMark    float64       `json:"low_water_mark"`  // 0.0-1.0
	SlowConsumerTTL time.Duration `json:"slow_consumer_ttl"`
	DropPolicy      DropPolicy    `json:"drop_policy"`
}

BackpressureConfig 配置背压行为.

func DefaultBackpressureConfig

func DefaultBackpressureConfig() BackpressureConfig

DefaultBackpressureConfig 返回优化的默认值.

type BackpressureStream

type BackpressureStream struct {
	// contains filtered or unexported fields
}

BackpressureStream 实现支持背压的流.

func NewBackpressureStream

func NewBackpressureStream(config BackpressureConfig) *BackpressureStream

NewBackpressureStream 创建新的支持背压的流.

func (*BackpressureStream) BufferLevel

func (s *BackpressureStream) BufferLevel() float64

BufferLevel 返回当前缓冲区利用率 (0.0-1.0).

func (*BackpressureStream) Close

func (s *BackpressureStream) Close() error

Close 关闭流. 使用写锁(Lock)确保与 Write() 的 RLock 互斥, 防止在 Write 发送到 buffer channel 的同时关闭 channel 导致 panic。

func (*BackpressureStream) IsPaused

func (s *BackpressureStream) IsPaused() bool

IsPaused 返回流是否因背压而暂停.

func (*BackpressureStream) Read

func (s *BackpressureStream) Read(ctx context.Context) (Token, error)

Read 从流中读取 token.

func (*BackpressureStream) ReadChan

func (s *BackpressureStream) ReadChan() <-chan Token

ReadChan 返回用于读取 token 的通道.

func (*BackpressureStream) Stats

func (s *BackpressureStream) Stats() StreamStats

Stats 返回流统计信息.

func (*BackpressureStream) Write

func (s *BackpressureStream) Write(ctx context.Context, token Token) error

Write 向流发送一个带背压处理的 token. 使用 RLock 防止与 Close() 并发执行时向已关闭 channel 发送导致 panic。

type ChunkReader

type ChunkReader struct {
	// contains filtered or unexported fields
}

ChunkReader 提供零拷贝块读取.

func NewChunkReader

func NewChunkReader(data []byte, chunkSize int) *ChunkReader

NewChunkReader 创建了新的块读取器 。

func (*ChunkReader) Next

func (r *ChunkReader) Next() ([]byte, bool)

下一个不复制则返回下一个块 。

func (*ChunkReader) Reset

func (r *ChunkReader) Reset()

重置读者.

type DropPolicy

type DropPolicy int

DropPolicy 定义缓冲区满后的处理策略.

const (
	DropPolicyBlock  DropPolicy = iota // Block producer
	DropPolicyOldest                   // Drop oldest tokens
	DropPolicyNewest                   // Drop newest tokens
	DropPolicyError                    // Return error
)

func (DropPolicy) String

func (d DropPolicy) String() string

String returns the string representation of DropPolicy.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter 为流提供基于 token 的速率限制.

func NewRateLimiter

func NewRateLimiter(tokensPerSec float64, burst int) *RateLimiter

NewRateLimiter 创建新的速率限制器.

func (*RateLimiter) Allow

func (r *RateLimiter) Allow() bool

Allow 检查是否可以消耗一个 token.

func (*RateLimiter) Wait

func (r *RateLimiter) Wait(ctx context.Context) error

Wait 阻塞直到一个 token 可用.

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer提供无锁环缓冲来进行流. readIdx 和 writeIdx 使用 atomic 操作保证并发安全。

func NewRingBuffer

func NewRingBuffer(size int) *RingBuffer

NewRingBuffer创建了新的环缓冲(尺寸必须是2的功率).

func (*RingBuffer) Available

func (r *RingBuffer) Available() int

可用返回可用的字节数(使用 atomic 读取保证并发安全)。

func (*RingBuffer) Free

func (r *RingBuffer) Free() int

自由返回自由写入的字节数(使用 atomic 读取保证并发安全)。

func (*RingBuffer) Get

func (r *RingBuffer) Get() (byte, bool)

读取一个字节(使用 atomic 操作保证并发安全)。

func (*RingBuffer) Put

func (r *RingBuffer) Put(b byte) bool

写一个字节(使用 atomic 操作保证并发安全)

type StreamMultiplexer

type StreamMultiplexer struct {
	// contains filtered or unexported fields
}

StreamMultiplexer 将一个流扇出给多个消费者.

func NewStreamMultiplexer

func NewStreamMultiplexer(source *BackpressureStream) *StreamMultiplexer

NewStreamMultiplexer 创建新的多路复用器.

func (*StreamMultiplexer) AddConsumer

AddConsumer 添加一个消费流.

func (*StreamMultiplexer) Start

func (m *StreamMultiplexer) Start(ctx context.Context)

Start 启动多路复用.

type StreamStats

type StreamStats struct {
	Produced   int64     `json:"produced"`
	Consumed   int64     `json:"consumed"`
	Dropped    int64     `json:"dropped"`
	Blocked    int64     `json:"blocked"`
	BufferSize int       `json:"buffer_size"`
	BufferCap  int       `json:"buffer_cap"`
	IsPaused   bool      `json:"is_paused"`
	LastWrite  time.Time `json:"last_write"`
	LastRead   time.Time `json:"last_read"`
}

StreamStats 包含流统计数据.

type StringView

type StringView struct {
	// contains filtered or unexported fields
}

StringView提供字节的零复制字符串视图.

func NewStringView

func NewStringView(data []byte) StringView

NewStringView 不复制就从字节创建了字符串视图.

func (StringView) Bytes

func (s StringView) Bytes() []byte

字节返回基本的字节 。

func (StringView) Len

func (s StringView) Len() int

Len返回长度。

func (StringView) String

func (s StringView) String() string

字符串不复制返回字符串( 如果隐藏字节更改, 则不安全) 。

type Token

type Token struct {
	Content   string    `json:"content"`
	Index     int       `json:"index"`
	Timestamp time.Time `json:"timestamp"`
	Final     bool      `json:"final"`
}

Token 表示流式传输的 token.

type ZeroCopyBuffer

type ZeroCopyBuffer struct {
	// contains filtered or unexported fields
}

ZeroCopyBuffer提供零拷贝缓冲操作.

func NewZeroCopyBuffer

func NewZeroCopyBuffer(size int) *ZeroCopyBuffer

NewZero CopyBuffer创建了新的零拷贝缓冲器.

func (*ZeroCopyBuffer) Bytes

func (b *ZeroCopyBuffer) Bytes() []byte

字节返回未读部分而不复制 。

func (*ZeroCopyBuffer) BytesUnsafe

func (b *ZeroCopyBuffer) BytesUnsafe() []byte

字节不安全返回字节没有锁(调用器必须确保安全).

func (*ZeroCopyBuffer) Len

func (b *ZeroCopyBuffer) Len() int

Len 返回未读字节数 。

func (*ZeroCopyBuffer) Read

func (b *ZeroCopyBuffer) Read(p []byte) (int, error)

在不复制的情况下读取数据(返回部分为内部缓冲). 注意: 使用写锁(Lock)而非读锁(RLock),因为此方法会修改 readPos。 在 RLock 下写 readPos 违反读写锁语义,会导致并发数据竞争。

func (*ZeroCopyBuffer) Reset

func (b *ZeroCopyBuffer) Reset()

重置缓冲器用于再利用 。

func (*ZeroCopyBuffer) Write

func (b *ZeroCopyBuffer) Write(p []byte) (int, error)

写入数据而不复制.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL