streaming

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBufferFull   = errors.New("buffer full, backpressure applied")
	ErrStreamClosed = errors.New("stream closed")
	ErrSlowConsumer = errors.New("consumer too slow")
)

Functions

func BytesToString

func BytesToString(b []byte) string

BytesToString 不复制便将字节转换为字符串.

func StringToBytes

func StringToBytes(s string) []byte

StringToBytes 不复制就将字符串转换为字节.

Types

type BackpressureConfig

type BackpressureConfig struct {
	BufferSize      int           `json:"buffer_size"`
	HighWaterMark   float64       `json:"high_water_mark"` // 0.0-1.0
	LowWaterMark    float64       `json:"low_water_mark"`  // 0.0-1.0
	SlowConsumerTTL time.Duration `json:"slow_consumer_ttl"`
	DropPolicy      DropPolicy    `json:"drop_policy"`
}

BackpressureConfig 配置背压行为.

func DefaultBackpressureConfig

func DefaultBackpressureConfig() BackpressureConfig

DefaultBackpressureConfig 返回优化的默认值.

type BackpressureStream

type BackpressureStream struct {
	// contains filtered or unexported fields
}

BackpressureStream 实现支持背压的流.

func NewBackpressureStream

func NewBackpressureStream(config BackpressureConfig) *BackpressureStream

NewBackpressureStream 创建新的支持背压的流.

func (*BackpressureStream) BufferLevel

func (s *BackpressureStream) BufferLevel() float64

BufferLevel 返回当前缓冲区利用率 (0.0-1.0).

func (*BackpressureStream) Close

func (s *BackpressureStream) Close() error

Close 关闭流. 使用写锁(Lock)确保与 Write() 的 RLock 互斥, 防止在 Write 发送到 buffer channel 的同时关闭 channel 导致 panic。

func (*BackpressureStream) IsPaused

func (s *BackpressureStream) IsPaused() bool

IsPaused 返回流是否因背压而暂停.

func (*BackpressureStream) Read

func (s *BackpressureStream) Read(ctx context.Context) (Token, error)

Read 从流中读取 token.

func (*BackpressureStream) ReadChan

func (s *BackpressureStream) ReadChan() <-chan Token

ReadChan 返回用于读取 token 的通道.

func (*BackpressureStream) Stats

func (s *BackpressureStream) Stats() StreamStats

Stats 返回流统计信息.

func (*BackpressureStream) Write

func (s *BackpressureStream) Write(ctx context.Context, token Token) error

Write 向流发送一个带背压处理的 token. 使用 RLock 防止与 Close() 并发执行时向已关闭 channel 发送导致 panic。

type ChunkReader

type ChunkReader struct {
	// contains filtered or unexported fields
}

ChunkReader 提供零拷贝块读取.

func NewChunkReader

func NewChunkReader(data []byte, chunkSize int) *ChunkReader

NewChunkReader 创建了新的块读取器 。

func (*ChunkReader) Next

func (r *ChunkReader) Next() ([]byte, bool)

下一个不复制则返回下一个块 。

func (*ChunkReader) Reset

func (r *ChunkReader) Reset()

重置读者.

type DropPolicy

type DropPolicy int

DropPolicy 定义缓冲区满后的处理策略.

const (
	DropPolicyBlock  DropPolicy = iota // Block producer
	DropPolicyOldest                   // Drop oldest tokens
	DropPolicyNewest                   // Drop newest tokens
	DropPolicyError                    // Return error
)

func (DropPolicy) String

func (d DropPolicy) String() string

String returns the string representation of DropPolicy.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter 为流提供基于 token 的速率限制.

func NewRateLimiter

func NewRateLimiter(tokensPerSec float64, burst int) *RateLimiter

NewRateLimiter 创建新的速率限制器.

func (*RateLimiter) Allow

func (r *RateLimiter) Allow() bool

Allow 检查是否可以消耗一个 token.

func (*RateLimiter) Wait

func (r *RateLimiter) Wait(ctx context.Context) error

Wait 阻塞直到一个 token 可用.

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer提供无锁环缓冲来进行流. readIdx 和 writeIdx 使用 atomic 操作保证并发安全。

func NewRingBuffer

func NewRingBuffer(size int) *RingBuffer

NewRingBuffer创建了新的环缓冲(尺寸必须是2的功率).

func (*RingBuffer) Available

func (r *RingBuffer) Available() int

可用返回可用的字节数(使用 atomic 读取保证并发安全)。

func (*RingBuffer) Free

func (r *RingBuffer) Free() int

自由返回自由写入的字节数(使用 atomic 读取保证并发安全)。

func (*RingBuffer) Get

func (r *RingBuffer) Get() (byte, bool)

读取一个字节(使用 atomic 操作保证并发安全)。

func (*RingBuffer) Put

func (r *RingBuffer) Put(b byte) bool

写一个字节(使用 atomic 操作保证并发安全)

type StreamMultiplexer

type StreamMultiplexer struct {
	// contains filtered or unexported fields
}

StreamMultiplexer 将一个流扇出给多个消费者.

func NewStreamMultiplexer

func NewStreamMultiplexer(source *BackpressureStream) *StreamMultiplexer

NewStreamMultiplexer 创建新的多路复用器.

func (*StreamMultiplexer) AddConsumer

AddConsumer 添加一个消费流.

func (*StreamMultiplexer) Start

func (m *StreamMultiplexer) Start(ctx context.Context)

Start 启动多路复用.

type StreamStats

type StreamStats struct {
	Produced   int64     `json:"produced"`
	Consumed   int64     `json:"consumed"`
	Dropped    int64     `json:"dropped"`
	Blocked    int64     `json:"blocked"`
	BufferSize int       `json:"buffer_size"`
	BufferCap  int       `json:"buffer_cap"`
	IsPaused   bool      `json:"is_paused"`
	LastWrite  time.Time `json:"last_write"`
	LastRead   time.Time `json:"last_read"`
}

StreamStats 包含流统计数据.

type StringView

type StringView struct {
	// contains filtered or unexported fields
}

StringView提供字节的零复制字符串视图.

func NewStringView

func NewStringView(data []byte) StringView

NewStringView 不复制就从字节创建了字符串视图.

func (StringView) Bytes

func (s StringView) Bytes() []byte

字节返回基本的字节 。

func (StringView) Len

func (s StringView) Len() int

Len返回长度。

func (StringView) String

func (s StringView) String() string

字符串不复制返回字符串( 如果隐藏字节更改, 则不安全) 。

type Token

type Token struct {
	Content   string    `json:"content"`
	Index     int       `json:"index"`
	Timestamp time.Time `json:"timestamp"`
	Final     bool      `json:"final"`
}

Token 表示流式传输的 token.

type ZeroCopyBuffer

type ZeroCopyBuffer struct {
	// contains filtered or unexported fields
}

ZeroCopyBuffer提供零拷贝缓冲操作.

func NewZeroCopyBuffer

func NewZeroCopyBuffer(size int) *ZeroCopyBuffer

NewZero CopyBuffer创建了新的零拷贝缓冲器.

func (*ZeroCopyBuffer) Bytes

func (b *ZeroCopyBuffer) Bytes() []byte

字节返回未读部分而不复制 。

func (*ZeroCopyBuffer) BytesUnsafe

func (b *ZeroCopyBuffer) BytesUnsafe() []byte

字节不安全返回字节没有锁(调用器必须确保安全).

func (*ZeroCopyBuffer) Len

func (b *ZeroCopyBuffer) Len() int

Len 返回未读字节数 。

func (*ZeroCopyBuffer) Read

func (b *ZeroCopyBuffer) Read(p []byte) (int, error)

在不复制的情况下读取数据(返回部分为内部缓冲). 注意: 使用写锁(Lock)而非读锁(RLock),因为此方法会修改 readPos。 在 RLock 下写 readPos 违反读写锁语义,会导致并发数据竞争。

func (*ZeroCopyBuffer) Reset

func (b *ZeroCopyBuffer) Reset()

重置缓冲器用于再利用 。

func (*ZeroCopyBuffer) Write

func (b *ZeroCopyBuffer) Write(p []byte) (int, error)

写入数据而不复制.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL