streaming

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WebSocketStreamFactory

func WebSocketStreamFactory(url string, logger *zap.Logger) func() (StreamConnection, error)

WebSocketStreamFactory 创建一个 connFactory 函数,用于 BidirectionalStream 的重连。 url 是 WebSocket 服务端地址(如 "ws://localhost:8080/stream")。

Types

type AudioDecoder

type AudioDecoder interface {
	Decode(data []byte) ([]byte, error)
}

AudioDecoder解码音频数据.

type AudioEncoder

type AudioEncoder interface {
	Encode(pcm []byte) ([]byte, error)
}

音频编码器编码音频数据.

type AudioStreamAdapter

type AudioStreamAdapter struct {
	// contains filtered or unexported fields
}

AudioStreamAdapter 调整音频流用于双向通信.

func NewAudioStreamAdapter

func NewAudioStreamAdapter(stream *BidirectionalStream, sampleRate, channels int) *AudioStreamAdapter

新AudioStreamAdapter创建了一个新的音频流适配器.

func (*AudioStreamAdapter) ReceiveAudio

func (a *AudioStreamAdapter) ReceiveAudio() <-chan []byte

DuiceAudio返回已解码的音频块 。

func (*AudioStreamAdapter) SendAudio

func (a *AudioStreamAdapter) SendAudio(pcm []byte) error

SendAudio发送音频数据.

type BidirectionalStream

type BidirectionalStream struct {
	ID     string
	Config StreamConfig
	State  StreamState
	// contains filtered or unexported fields
}

双向结构管理实时双向通信.

func NewBidirectionalStream

func NewBidirectionalStream(
	config StreamConfig,
	handler StreamHandler,
	conn StreamConnection,
	connFactory func() (StreamConnection, error),
	logger *zap.Logger,
) *BidirectionalStream

NewBiFireStream 创建了新的双向流.

func (*BidirectionalStream) Close

func (s *BidirectionalStream) Close() error

关上溪口.

func (*BidirectionalStream) GetState

func (s *BidirectionalStream) GetState() StreamState

GetState 返回当前流状态 。

func (*BidirectionalStream) Receive

func (s *BidirectionalStream) Receive() <-chan StreamChunk

接收输入通道以接收数据 。

func (*BidirectionalStream) Send

func (s *BidirectionalStream) Send(chunk StreamChunk) error

发送数据到外出流.

func (*BidirectionalStream) Start

func (s *BidirectionalStream) Start(ctx context.Context) error

开始双向流 。

type StreamChunk

type StreamChunk struct {
	ID        string         `json:"id"`
	Type      StreamType     `json:"type"`
	Data      []byte         `json:"data"`
	Text      string         `json:"text,omitempty"`
	Timestamp time.Time      `json:"timestamp"`
	Sequence  int64          `json:"sequence"`
	IsFinal   bool           `json:"is_final"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

StreamChunk代表了一整批流数据.

type StreamConfig

type StreamConfig struct {
	BufferSize     int           `json:"buffer_size"`
	MaxLatencyMS   int           `json:"max_latency_ms"`
	SampleRate     int           `json:"sample_rate"`
	Channels       int           `json:"channels"`
	EnableVAD      bool          `json:"enable_vad"`
	ChunkDuration  time.Duration `json:"chunk_duration"`
	ReconnectDelay time.Duration `json:"reconnect_delay"`
	// 新增字段
	HeartbeatInterval time.Duration `json:"heartbeat_interval"` // 心跳间隔,默认 30s
	HeartbeatTimeout  time.Duration `json:"heartbeat_timeout"`  // 心跳超时,默认 10s
	MaxReconnects     int           `json:"max_reconnects"`     // 最大重连次数,默认 5
	EnableHeartbeat   bool          `json:"enable_heartbeat"`   // 是否启用心跳
}

StreamConfig 配置双向流.

func DefaultStreamConfig

func DefaultStreamConfig() StreamConfig

默认 StreamConfig 返回默认流化配置 。

type StreamConnection

type StreamConnection interface {
	// ReadChunk 从连接读取一个数据块(阻塞直到有数据或出错)
	ReadChunk(ctx context.Context) (*StreamChunk, error)
	// WriteChunk 向连接写入一个数据块
	WriteChunk(ctx context.Context, chunk StreamChunk) error
	// Close 关闭连接
	Close() error
	// IsAlive 检查连接是否存活
	IsAlive() bool
}

StreamConnection 底层流式连接接口(WebSocket、gRPC stream 等)

type StreamHandler

type StreamHandler interface {
	OnInbound(ctx context.Context, chunk StreamChunk) (*StreamChunk, error)
	OnOutbound(ctx context.Context, chunk StreamChunk) error
	OnStateChange(state StreamState)
}

StreamHandler处理流数据.

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager管理多条流.

func NewStreamManager

func NewStreamManager(logger *zap.Logger) *StreamManager

NewStreamManager创建了新流管理器.

func (*StreamManager) CloseStream

func (m *StreamManager) CloseStream(id string) error

关闭 Stream 关闭并去除一串流.

func (*StreamManager) CreateStream

func (m *StreamManager) CreateStream(
	config StreamConfig,
	handler StreamHandler,
	conn StreamConnection,
	connFactory func() (StreamConnection, error),
) *BidirectionalStream

创建 Stream 创建一个新流 。

func (*StreamManager) GetStream

func (m *StreamManager) GetStream(id string) (*BidirectionalStream, bool)

Get Stream通过ID检索出一条流.

type StreamReader

type StreamReader struct {
	// contains filtered or unexported fields
}

StreamReader将一流包裹为io. 读者.

func NewStreamReader

func NewStreamReader(stream *BidirectionalStream) *StreamReader

NewStream Reader创建了新流读取器.

func (*StreamReader) Read

func (r *StreamReader) Read(p []byte) (n int, err error)

type StreamSession

type StreamSession struct {
	ID         string
	Stream     *BidirectionalStream
	StartTime  time.Time
	EndTime    time.Time
	BytesSent  int64
	BytesRecv  int64
	ChunksSent int64
	ChunksRecv int64
	// contains filtered or unexported fields
}

串流会管理完整的串流会话。

func NewStreamSession

func NewStreamSession(stream *BidirectionalStream) *StreamSession

NewStream Session 创建了新流会话.

func (*StreamSession) RecordReceived

func (s *StreamSession) RecordReceived(bytes int64)

记录收到数据。

func (*StreamSession) RecordSent

func (s *StreamSession) RecordSent(bytes int64)

记录发送数据。

type StreamState

type StreamState string

流州代表流州.

const (
	StateDisconnected StreamState = "disconnected"
	StateConnecting   StreamState = "connecting"
	StateConnected    StreamState = "connected"
	StateStreaming    StreamState = "streaming"
	StatePaused       StreamState = "paused"
	StateError        StreamState = "error"
)

type StreamType

type StreamType string

StreamType 定义了流内容的类型.

const (
	StreamTypeText  StreamType = "text"
	StreamTypeAudio StreamType = "audio"
	StreamTypeVideo StreamType = "video"
	StreamTypeMixed StreamType = "mixed"
)

type StreamWriter

type StreamWriter struct {
	// contains filtered or unexported fields
}

StreamWriter)将一流包裹为io. 编剧.

func NewStreamWriter

func NewStreamWriter(stream *BidirectionalStream) *StreamWriter

NewStreamWriter创建了新流作家.

func (*StreamWriter) Write

func (w *StreamWriter) Write(p []byte) (n int, err error)

type TextStreamAdapter

type TextStreamAdapter struct {
	// contains filtered or unexported fields
}

TextStreamAdapter 适应文本流.

func NewTextStreamAdapter

func NewTextStreamAdapter(stream *BidirectionalStream) *TextStreamAdapter

新TextStreamAdapter创建了新的文本流适配器.

func (*TextStreamAdapter) ReceiveText

func (t *TextStreamAdapter) ReceiveText() <-chan string

接收文本返回文本块 。

func (*TextStreamAdapter) SendText

func (t *TextStreamAdapter) SendText(text string, isFinal bool) error

发送文本数据 。

type WebSocketStreamConnection

type WebSocketStreamConnection struct {
	// contains filtered or unexported fields
}

WebSocketStreamConnection 将 nhooyr.io/websocket 连接适配为 StreamConnection 接口。 写操作通过 mutex 保护,因为 WebSocket 不支持并发写。

func NewWebSocketStreamConnection

func NewWebSocketStreamConnection(conn *websocket.Conn, logger *zap.Logger) *WebSocketStreamConnection

NewWebSocketStreamConnection 从已建立的 WebSocket 连接创建适配器。

func (*WebSocketStreamConnection) Close

func (w *WebSocketStreamConnection) Close() error

Close 关闭 WebSocket 连接。

func (*WebSocketStreamConnection) IsAlive

func (w *WebSocketStreamConnection) IsAlive() bool

IsAlive 检查连接是否存活。

func (*WebSocketStreamConnection) ReadChunk

ReadChunk 从 WebSocket 读取一个 JSON 编码的 StreamChunk。

func (*WebSocketStreamConnection) WriteChunk

func (w *WebSocketStreamConnection) WriteChunk(ctx context.Context, chunk StreamChunk) error

WriteChunk 将 StreamChunk 序列化为 JSON 并通过 WebSocket 发送。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL