Documentation
¶
Index ¶
- Variables
- func BytesToString(b []byte) string
- func StringToBytes(s string) []byte
- type BackpressureConfig
- type BackpressureStream
- func (s *BackpressureStream) BufferLevel() float64
- func (s *BackpressureStream) Close() error
- func (s *BackpressureStream) IsPaused() bool
- func (s *BackpressureStream) Read(ctx context.Context) (Token, error)
- func (s *BackpressureStream) ReadChan() <-chan Token
- func (s *BackpressureStream) Stats() StreamStats
- func (s *BackpressureStream) Write(ctx context.Context, token Token) error
- type ChunkReader
- type DropPolicy
- type RateLimiter
- type RingBuffer
- type StreamMultiplexer
- type StreamStats
- type StringView
- type Token
- type ZeroCopyBuffer
Constants ¶
This section is empty.
Variables ¶
Functions ¶
Types ¶
type BackpressureConfig ¶
type BackpressureConfig struct {
BufferSize int `json:"buffer_size"`
HighWaterMark float64 `json:"high_water_mark"` // 0.0-1.0
LowWaterMark float64 `json:"low_water_mark"` // 0.0-1.0
SlowConsumerTTL time.Duration `json:"slow_consumer_ttl"`
DropPolicy DropPolicy `json:"drop_policy"`
}
BackpressureConfig 配置背压行为.
func DefaultBackpressureConfig ¶
func DefaultBackpressureConfig() BackpressureConfig
DefaultBackpressureConfig 返回优化的默认值.
type BackpressureStream ¶
type BackpressureStream struct {
// contains filtered or unexported fields
}
BackpressureStream 实现支持背压的流.
func NewBackpressureStream ¶
func NewBackpressureStream(config BackpressureConfig) *BackpressureStream
NewBackpressureStream 创建新的支持背压的流.
func (*BackpressureStream) BufferLevel ¶
func (s *BackpressureStream) BufferLevel() float64
BufferLevel 返回当前缓冲区利用率 (0.0-1.0).
func (*BackpressureStream) Close ¶
func (s *BackpressureStream) Close() error
Close 关闭流. 使用写锁(Lock)确保与 Write() 的 RLock 互斥, 防止在 Write 发送到 buffer channel 的同时关闭 channel 导致 panic。
func (*BackpressureStream) IsPaused ¶
func (s *BackpressureStream) IsPaused() bool
IsPaused 返回流是否因背压而暂停.
func (*BackpressureStream) Read ¶
func (s *BackpressureStream) Read(ctx context.Context) (Token, error)
Read 从流中读取 token.
func (*BackpressureStream) ReadChan ¶
func (s *BackpressureStream) ReadChan() <-chan Token
ReadChan 返回用于读取 token 的通道.
type ChunkReader ¶
type ChunkReader struct {
// contains filtered or unexported fields
}
ChunkReader 提供零拷贝块读取.
func NewChunkReader ¶
func NewChunkReader(data []byte, chunkSize int) *ChunkReader
NewChunkReader 创建了新的块读取器 。
type DropPolicy ¶
type DropPolicy int
DropPolicy 定义缓冲区满后的处理策略.
const ( DropPolicyBlock DropPolicy = iota // Block producer DropPolicyOldest // Drop oldest tokens DropPolicyNewest // Drop newest tokens DropPolicyError // Return error )
func (DropPolicy) String ¶
func (d DropPolicy) String() string
String returns the string representation of DropPolicy.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter 为流提供基于 token 的速率限制.
func NewRateLimiter ¶
func NewRateLimiter(tokensPerSec float64, burst int) *RateLimiter
NewRateLimiter 创建新的速率限制器.
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
RingBuffer提供无锁环缓冲来进行流. readIdx 和 writeIdx 使用 atomic 操作保证并发安全。
type StreamMultiplexer ¶
type StreamMultiplexer struct {
// contains filtered or unexported fields
}
StreamMultiplexer 将一个流扇出给多个消费者.
func NewStreamMultiplexer ¶
func NewStreamMultiplexer(source *BackpressureStream) *StreamMultiplexer
NewStreamMultiplexer 创建新的多路复用器.
func (*StreamMultiplexer) AddConsumer ¶
func (m *StreamMultiplexer) AddConsumer(config BackpressureConfig) *BackpressureStream
AddConsumer 添加一个消费流.
func (*StreamMultiplexer) Start ¶
func (m *StreamMultiplexer) Start(ctx context.Context)
Start 启动多路复用.
type StreamStats ¶
type StreamStats struct {
Produced int64 `json:"produced"`
Consumed int64 `json:"consumed"`
Dropped int64 `json:"dropped"`
Blocked int64 `json:"blocked"`
BufferSize int `json:"buffer_size"`
BufferCap int `json:"buffer_cap"`
IsPaused bool `json:"is_paused"`
LastWrite time.Time `json:"last_write"`
LastRead time.Time `json:"last_read"`
}
StreamStats 包含流统计数据.
type StringView ¶
type StringView struct {
// contains filtered or unexported fields
}
StringView提供字节的零复制字符串视图.
type Token ¶
type Token struct {
Content string `json:"content"`
Index int `json:"index"`
Timestamp time.Time `json:"timestamp"`
Final bool `json:"final"`
}
Token 表示流式传输的 token.
type ZeroCopyBuffer ¶
type ZeroCopyBuffer struct {
// contains filtered or unexported fields
}
ZeroCopyBuffer提供零拷贝缓冲操作.
func NewZeroCopyBuffer ¶
func NewZeroCopyBuffer(size int) *ZeroCopyBuffer
NewZero CopyBuffer创建了新的零拷贝缓冲器.
func (*ZeroCopyBuffer) BytesUnsafe ¶
func (b *ZeroCopyBuffer) BytesUnsafe() []byte
字节不安全返回字节没有锁(调用器必须确保安全).