streaming

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

包 streaming 提供实时双向流式交互能力。

概述

streaming 实现了基于 channel 的双向流通信框架,支持文本、音频、 视频及混合类型数据的实时传输。内置心跳检测、自动重连(指数退避) 和连接状态机管理,适用于对延迟敏感的 Agent 交互场景。

核心接口

  • StreamConnection:底层流式连接抽象(WebSocket、gRPC 等),定义 ReadChunk / WriteChunk / Close / IsAlive 四个方法
  • StreamHandler:流数据处理回调,分别处理入站、出站数据和状态变更
  • AudioEncoder / AudioDecoder:音频编解码器接口,用于音频流适配

主要能力

  • BidirectionalStream:核心双向流,管理入站/出站 channel、序列号、 心跳和自动重连,通过 connFactory 支持断线重建连接
  • StreamManager:多流管理器,统一创建、检索和关闭流实例
  • StreamSession:流会话统计,跟踪发送/接收的字节数和 chunk 数
  • WebSocketStreamConnection:将 nhooyr.io/websocket 适配为 StreamConnection,写操作通过 mutex 保护并发安全
  • AudioStreamAdapter / TextStreamAdapter:类型化适配器,简化 音频 PCM 和文本数据的收发
  • StreamReader / StreamWriter:将流包装为标准 io.Reader / io.Writer

与其他包协同

  • agent/voice:语音 Agent 通过 AudioStreamAdapter 进行实时音频传输
  • agent/hitl:流式交互中可注入人工中断节点

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WebSocketStreamFactory

func WebSocketStreamFactory(url string, logger *zap.Logger) func() (StreamConnection, error)

WebSocketStreamFactory 创建一个 connFactory 函数,用于 BidirectionalStream 的重连。 url 是 WebSocket 服务端地址(如 "ws://localhost:8080/stream")。

Types

type AudioDecoder

type AudioDecoder interface {
	Decode(data []byte) ([]byte, error)
}

AudioDecoder解码音频数据.

type AudioEncoder

type AudioEncoder interface {
	Encode(pcm []byte) ([]byte, error)
}

音频编码器编码音频数据.

type AudioStreamAdapter

type AudioStreamAdapter struct {
	// contains filtered or unexported fields
}

AudioStreamAdapter 调整音频流用于双向通信.

func NewAudioStreamAdapter

func NewAudioStreamAdapter(stream *BidirectionalStream, sampleRate, channels int) *AudioStreamAdapter

新AudioStreamAdapter创建了一个新的音频流适配器.

func (*AudioStreamAdapter) ReceiveAudio

func (a *AudioStreamAdapter) ReceiveAudio() <-chan []byte

DuiceAudio返回已解码的音频块 。

func (*AudioStreamAdapter) SendAudio

func (a *AudioStreamAdapter) SendAudio(pcm []byte) error

SendAudio发送音频数据.

type BidirectionalStream

type BidirectionalStream struct {
	ID     string
	Config StreamConfig
	State  StreamState
	// contains filtered or unexported fields
}

双向结构管理实时双向通信.

func NewBidirectionalStream

func NewBidirectionalStream(
	config StreamConfig,
	handler StreamHandler,
	conn StreamConnection,
	connFactory func() (StreamConnection, error),
	logger *zap.Logger,
) *BidirectionalStream

NewBiFireStream 创建了新的双向流.

func (*BidirectionalStream) Close

func (s *BidirectionalStream) Close() error

关上溪口.

func (*BidirectionalStream) GetState

func (s *BidirectionalStream) GetState() StreamState

GetState 返回当前流状态 。

func (*BidirectionalStream) Receive

func (s *BidirectionalStream) Receive() <-chan StreamChunk

接收输入通道以接收数据 。

func (*BidirectionalStream) Send

func (s *BidirectionalStream) Send(chunk StreamChunk) error

发送数据到外出流.

func (*BidirectionalStream) Start

func (s *BidirectionalStream) Start(ctx context.Context) error

开始双向流 。

type StreamChunk

type StreamChunk struct {
	ID        string         `json:"id"`
	Type      StreamType     `json:"type"`
	Data      []byte         `json:"data"`
	Text      string         `json:"text,omitempty"`
	Timestamp time.Time      `json:"timestamp"`
	Sequence  int64          `json:"sequence"`
	IsFinal   bool           `json:"is_final"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

StreamChunk代表了一整批流数据.

type StreamConfig

type StreamConfig struct {
	BufferSize     int           `json:"buffer_size"`
	MaxLatencyMS   int           `json:"max_latency_ms"`
	SampleRate     int           `json:"sample_rate"`
	Channels       int           `json:"channels"`
	EnableVAD      bool          `json:"enable_vad"`
	ChunkDuration  time.Duration `json:"chunk_duration"`
	ReconnectDelay time.Duration `json:"reconnect_delay"`
	// 新增字段
	HeartbeatInterval time.Duration `json:"heartbeat_interval"` // 心跳间隔,默认 30s
	HeartbeatTimeout  time.Duration `json:"heartbeat_timeout"`  // 心跳超时,默认 10s
	MaxReconnects     int           `json:"max_reconnects"`     // 最大重连次数,默认 5
	EnableHeartbeat   bool          `json:"enable_heartbeat"`   // 是否启用心跳
}

StreamConfig 配置双向流.

func DefaultStreamConfig

func DefaultStreamConfig() StreamConfig

默认 StreamConfig 返回默认流化配置 。

type StreamConnection

type StreamConnection interface {
	// ReadChunk 从连接读取一个数据块(阻塞直到有数据或出错)
	ReadChunk(ctx context.Context) (*StreamChunk, error)
	// WriteChunk 向连接写入一个数据块
	WriteChunk(ctx context.Context, chunk StreamChunk) error
	// Close 关闭连接
	Close() error
	// IsAlive 检查连接是否存活
	IsAlive() bool
}

StreamConnection 底层流式连接接口(WebSocket、gRPC stream 等)

type StreamHandler

type StreamHandler interface {
	OnInbound(ctx context.Context, chunk StreamChunk) (*StreamChunk, error)
	OnOutbound(ctx context.Context, chunk StreamChunk) error
	OnStateChange(state StreamState)
}

StreamHandler处理流数据.

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager管理多条流.

func NewStreamManager

func NewStreamManager(logger *zap.Logger) *StreamManager

NewStreamManager创建了新流管理器.

func (*StreamManager) CloseStream

func (m *StreamManager) CloseStream(id string) error

关闭 Stream 关闭并去除一串流.

func (*StreamManager) CreateStream

func (m *StreamManager) CreateStream(
	config StreamConfig,
	handler StreamHandler,
	conn StreamConnection,
	connFactory func() (StreamConnection, error),
) *BidirectionalStream

创建 Stream 创建一个新流 。

func (*StreamManager) GetStream

func (m *StreamManager) GetStream(id string) (*BidirectionalStream, bool)

Get Stream通过ID检索出一条流.

type StreamReader

type StreamReader struct {
	// contains filtered or unexported fields
}

StreamReader将一流包裹为io. 读者.

func NewStreamReader

func NewStreamReader(stream *BidirectionalStream) *StreamReader

NewStream Reader创建了新流读取器.

func (*StreamReader) Read

func (r *StreamReader) Read(p []byte) (n int, err error)

type StreamSession

type StreamSession struct {
	ID         string
	Stream     *BidirectionalStream
	StartTime  time.Time
	EndTime    time.Time
	BytesSent  int64
	BytesRecv  int64
	ChunksSent int64
	ChunksRecv int64
	// contains filtered or unexported fields
}

串流会管理完整的串流会话。

func NewStreamSession

func NewStreamSession(stream *BidirectionalStream) *StreamSession

NewStream Session 创建了新流会话.

func (*StreamSession) RecordReceived

func (s *StreamSession) RecordReceived(bytes int64)

记录收到数据。

func (*StreamSession) RecordSent

func (s *StreamSession) RecordSent(bytes int64)

记录发送数据。

type StreamState

type StreamState string

流州代表流州.

const (
	StateDisconnected StreamState = "disconnected"
	StateConnecting   StreamState = "connecting"
	StateConnected    StreamState = "connected"
	StateStreaming    StreamState = "streaming"
	StatePaused       StreamState = "paused"
	StateError        StreamState = "error"
)

type StreamType

type StreamType string

StreamType 定义了流内容的类型.

const (
	StreamTypeText  StreamType = "text"
	StreamTypeAudio StreamType = "audio"
	StreamTypeVideo StreamType = "video"
	StreamTypeMixed StreamType = "mixed"
)

type StreamWriter

type StreamWriter struct {
	// contains filtered or unexported fields
}

StreamWriter)将一流包裹为io. 编剧.

func NewStreamWriter

func NewStreamWriter(stream *BidirectionalStream) *StreamWriter

NewStreamWriter创建了新流作家.

func (*StreamWriter) Write

func (w *StreamWriter) Write(p []byte) (n int, err error)

type TextStreamAdapter

type TextStreamAdapter struct {
	// contains filtered or unexported fields
}

TextStreamAdapter 适应文本流.

func NewTextStreamAdapter

func NewTextStreamAdapter(stream *BidirectionalStream) *TextStreamAdapter

新TextStreamAdapter创建了新的文本流适配器.

func (*TextStreamAdapter) ReceiveText

func (t *TextStreamAdapter) ReceiveText() <-chan string

接收文本返回文本块 。

func (*TextStreamAdapter) SendText

func (t *TextStreamAdapter) SendText(text string, isFinal bool) error

发送文本数据 。

type WebSocketStreamConnection

type WebSocketStreamConnection struct {
	// contains filtered or unexported fields
}

WebSocketStreamConnection 将 nhooyr.io/websocket 连接适配为 StreamConnection 接口。 写操作通过 mutex 保护,因为 WebSocket 不支持并发写。

func NewWebSocketStreamConnection

func NewWebSocketStreamConnection(conn *websocket.Conn, logger *zap.Logger) *WebSocketStreamConnection

NewWebSocketStreamConnection 从已建立的 WebSocket 连接创建适配器。

func (*WebSocketStreamConnection) Close

func (w *WebSocketStreamConnection) Close() error

Close 关闭 WebSocket 连接。

func (*WebSocketStreamConnection) IsAlive

func (w *WebSocketStreamConnection) IsAlive() bool

IsAlive 检查连接是否存活。

func (*WebSocketStreamConnection) ReadChunk

ReadChunk 从 WebSocket 读取一个 JSON 编码的 StreamChunk。

func (*WebSocketStreamConnection) WriteChunk

func (w *WebSocketStreamConnection) WriteChunk(ctx context.Context, chunk StreamChunk) error

WriteChunk 将 StreamChunk 序列化为 JSON 并通过 WebSocket 发送。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL