Documentation
¶
Index ¶
- func WebSocketStreamFactory(url string, logger *zap.Logger) func() (StreamConnection, error)
- type AudioDecoder
- type AudioEncoder
- type AudioStreamAdapter
- type BidirectionalStream
- type StreamChunk
- type StreamConfig
- type StreamConnection
- type StreamHandler
- type StreamManager
- type StreamReader
- type StreamSession
- type StreamState
- type StreamType
- type StreamWriter
- type TextStreamAdapter
- type WebSocketStreamConnection
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WebSocketStreamFactory ¶
func WebSocketStreamFactory(url string, logger *zap.Logger) func() (StreamConnection, error)
WebSocketStreamFactory 创建一个 connFactory 函数,用于 BidirectionalStream 的重连。 url 是 WebSocket 服务端地址(如 "ws://localhost:8080/stream")。
Types ¶
type AudioDecoder ¶
AudioDecoder解码音频数据.
type AudioStreamAdapter ¶
type AudioStreamAdapter struct {
// contains filtered or unexported fields
}
AudioStreamAdapter 调整音频流用于双向通信.
func NewAudioStreamAdapter ¶
func NewAudioStreamAdapter(stream *BidirectionalStream, sampleRate, channels int) *AudioStreamAdapter
新AudioStreamAdapter创建了一个新的音频流适配器.
func (*AudioStreamAdapter) ReceiveAudio ¶
func (a *AudioStreamAdapter) ReceiveAudio() <-chan []byte
DuiceAudio返回已解码的音频块 。
func (*AudioStreamAdapter) SendAudio ¶
func (a *AudioStreamAdapter) SendAudio(pcm []byte) error
SendAudio发送音频数据.
type BidirectionalStream ¶
type BidirectionalStream struct {
ID string
Config StreamConfig
State StreamState
// contains filtered or unexported fields
}
双向结构管理实时双向通信.
func NewBidirectionalStream ¶
func NewBidirectionalStream( config StreamConfig, handler StreamHandler, conn StreamConnection, connFactory func() (StreamConnection, error), logger *zap.Logger, ) *BidirectionalStream
NewBiFireStream 创建了新的双向流.
func (*BidirectionalStream) GetState ¶
func (s *BidirectionalStream) GetState() StreamState
GetState 返回当前流状态 。
func (*BidirectionalStream) Receive ¶
func (s *BidirectionalStream) Receive() <-chan StreamChunk
接收输入通道以接收数据 。
func (*BidirectionalStream) Send ¶
func (s *BidirectionalStream) Send(chunk StreamChunk) error
发送数据到外出流.
type StreamChunk ¶
type StreamChunk struct {
ID string `json:"id"`
Type StreamType `json:"type"`
Data []byte `json:"data"`
Text string `json:"text,omitempty"`
Timestamp time.Time `json:"timestamp"`
Sequence int64 `json:"sequence"`
IsFinal bool `json:"is_final"`
Metadata map[string]any `json:"metadata,omitempty"`
}
StreamChunk代表了一整批流数据.
type StreamConfig ¶
type StreamConfig struct {
BufferSize int `json:"buffer_size"`
MaxLatencyMS int `json:"max_latency_ms"`
SampleRate int `json:"sample_rate"`
Channels int `json:"channels"`
EnableVAD bool `json:"enable_vad"`
ChunkDuration time.Duration `json:"chunk_duration"`
ReconnectDelay time.Duration `json:"reconnect_delay"`
// 新增字段
HeartbeatInterval time.Duration `json:"heartbeat_interval"` // 心跳间隔,默认 30s
HeartbeatTimeout time.Duration `json:"heartbeat_timeout"` // 心跳超时,默认 10s
MaxReconnects int `json:"max_reconnects"` // 最大重连次数,默认 5
EnableHeartbeat bool `json:"enable_heartbeat"` // 是否启用心跳
}
StreamConfig 配置双向流.
type StreamConnection ¶
type StreamConnection interface {
// ReadChunk 从连接读取一个数据块(阻塞直到有数据或出错)
ReadChunk(ctx context.Context) (*StreamChunk, error)
// WriteChunk 向连接写入一个数据块
WriteChunk(ctx context.Context, chunk StreamChunk) error
// Close 关闭连接
Close() error
// IsAlive 检查连接是否存活
IsAlive() bool
}
StreamConnection 底层流式连接接口(WebSocket、gRPC stream 等)
type StreamHandler ¶
type StreamHandler interface {
OnInbound(ctx context.Context, chunk StreamChunk) (*StreamChunk, error)
OnOutbound(ctx context.Context, chunk StreamChunk) error
OnStateChange(state StreamState)
}
StreamHandler处理流数据.
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager管理多条流.
func NewStreamManager ¶
func NewStreamManager(logger *zap.Logger) *StreamManager
NewStreamManager创建了新流管理器.
func (*StreamManager) CloseStream ¶
func (m *StreamManager) CloseStream(id string) error
关闭 Stream 关闭并去除一串流.
func (*StreamManager) CreateStream ¶
func (m *StreamManager) CreateStream( config StreamConfig, handler StreamHandler, conn StreamConnection, connFactory func() (StreamConnection, error), ) *BidirectionalStream
创建 Stream 创建一个新流 。
func (*StreamManager) GetStream ¶
func (m *StreamManager) GetStream(id string) (*BidirectionalStream, bool)
Get Stream通过ID检索出一条流.
type StreamReader ¶
type StreamReader struct {
// contains filtered or unexported fields
}
StreamReader将一流包裹为io. 读者.
func NewStreamReader ¶
func NewStreamReader(stream *BidirectionalStream) *StreamReader
NewStream Reader创建了新流读取器.
type StreamSession ¶
type StreamSession struct {
ID string
Stream *BidirectionalStream
StartTime time.Time
EndTime time.Time
BytesSent int64
BytesRecv int64
ChunksSent int64
ChunksRecv int64
// contains filtered or unexported fields
}
串流会管理完整的串流会话。
func NewStreamSession ¶
func NewStreamSession(stream *BidirectionalStream) *StreamSession
NewStream Session 创建了新流会话.
type StreamState ¶
type StreamState string
流州代表流州.
const ( StateDisconnected StreamState = "disconnected" StateConnecting StreamState = "connecting" StateConnected StreamState = "connected" StateStreaming StreamState = "streaming" StatePaused StreamState = "paused" StateError StreamState = "error" )
type StreamType ¶
type StreamType string
StreamType 定义了流内容的类型.
const ( StreamTypeText StreamType = "text" StreamTypeAudio StreamType = "audio" StreamTypeVideo StreamType = "video" StreamTypeMixed StreamType = "mixed" )
type StreamWriter ¶
type StreamWriter struct {
// contains filtered or unexported fields
}
StreamWriter)将一流包裹为io. 编剧.
func NewStreamWriter ¶
func NewStreamWriter(stream *BidirectionalStream) *StreamWriter
NewStreamWriter创建了新流作家.
type TextStreamAdapter ¶
type TextStreamAdapter struct {
// contains filtered or unexported fields
}
TextStreamAdapter 适应文本流.
func NewTextStreamAdapter ¶
func NewTextStreamAdapter(stream *BidirectionalStream) *TextStreamAdapter
新TextStreamAdapter创建了新的文本流适配器.
func (*TextStreamAdapter) ReceiveText ¶
func (t *TextStreamAdapter) ReceiveText() <-chan string
接收文本返回文本块 。
type WebSocketStreamConnection ¶
type WebSocketStreamConnection struct {
// contains filtered or unexported fields
}
WebSocketStreamConnection 将 nhooyr.io/websocket 连接适配为 StreamConnection 接口。 写操作通过 mutex 保护,因为 WebSocket 不支持并发写。
func NewWebSocketStreamConnection ¶
func NewWebSocketStreamConnection(conn *websocket.Conn, logger *zap.Logger) *WebSocketStreamConnection
NewWebSocketStreamConnection 从已建立的 WebSocket 连接创建适配器。
func (*WebSocketStreamConnection) Close ¶
func (w *WebSocketStreamConnection) Close() error
Close 关闭 WebSocket 连接。
func (*WebSocketStreamConnection) IsAlive ¶
func (w *WebSocketStreamConnection) IsAlive() bool
IsAlive 检查连接是否存活。
func (*WebSocketStreamConnection) ReadChunk ¶
func (w *WebSocketStreamConnection) ReadChunk(ctx context.Context) (*StreamChunk, error)
ReadChunk 从 WebSocket 读取一个 JSON 编码的 StreamChunk。
func (*WebSocketStreamConnection) WriteChunk ¶
func (w *WebSocketStreamConnection) WriteChunk(ctx context.Context, chunk StreamChunk) error
WriteChunk 将 StreamChunk 序列化为 JSON 并通过 WebSocket 发送。