Documentation
¶
Overview ¶
Package websocket provides WebSocket transport (server and client) for Voxray.
Package websocket: WebsocketServiceBase provides reconnection, backoff, and send-with-retry for services that hold a long-lived WebSocket (e.g. realtime, Sarvam streaming).
Package websocket provides a WebSocket transport for Voxray (one connection = one session).
Index ¶
- Constants
- func Upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error)
- type ClientConfig
- type ClientTransport
- type ConnTransport
- type ConnTransportOptions
- type ReportErrorFunc
- type Server
- type WebSocketConnector
- type WebsocketServiceBase
- func (b *WebsocketServiceBase) Disconnect() error
- func (b *WebsocketServiceBase) MaybeTryReconnect(ctx context.Context, message string, reportError ReportErrorFunc, err error) bool
- func (b *WebsocketServiceBase) ReceiveLoop(ctx context.Context, reportError ReportErrorFunc)
- func (b *WebsocketServiceBase) Reconnect(ctx context.Context) bool
- func (b *WebsocketServiceBase) SendWithRetry(ctx context.Context, messageType int, data []byte, reportError ReportErrorFunc) error
- func (b *WebsocketServiceBase) SetDisconnecting(disconnecting bool)
- func (b *WebsocketServiceBase) TryReconnect(ctx context.Context, maxRetries int, reportError ReportErrorFunc) bool
- func (b *WebsocketServiceBase) VerifyConnection() bool
Constants ¶
const DefaultReadLimit = 1 << 20
DefaultReadLimit is the maximum WebSocket message size in bytes (1MB). Prevents memory exhaustion from oversized frames.
const DefaultSessionTimeout = 5 * time.Minute
DefaultSessionTimeout is the default inactivity duration before the server closes a WebSocket. Zero disables inactivity timeouts.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ClientConfig ¶
ClientConfig configures the WebSocket client. InBufSize and OutBufSize set the Input and Output channel capacities; zero or negative values are replaced by 64.
type ClientTransport ¶
type ClientTransport struct {
// contains filtered or unexported fields
}
ClientTransport is an outbound WebSocket transport that connects to a Voxray server. It implements transport.Transport: Input receives frames from the server, Output sends frames to the server. Close is idempotent; do not send on Output after Close.
func NewClientTransport ¶
func NewClientTransport(wsURL string, cfg *ClientConfig) (*ClientTransport, error)
NewClientTransport creates a client transport for the given WebSocket URL (e.g. ws://host/ws or wss://host/ws). The connection is established when Start is called. Returns an error if the URL is invalid or the scheme is not ws or wss.
func (*ClientTransport) Close ¶
func (t *ClientTransport) Close() error
Close closes the connection and the Input/Output channels. Idempotent; safe to call from any goroutine.
func (*ClientTransport) Input ¶
func (t *ClientTransport) Input() <-chan frames.Frame
Input returns the channel of frames received from the server. Closed when the transport is closed.
func (*ClientTransport) Output ¶
func (t *ClientTransport) Output() chan<- frames.Frame
Output returns the channel to send frames to the server. Do not send after Close.
type ConnTransport ¶
type ConnTransport struct {
// WriteMessageFunc, when non-nil, is used instead of conn.WriteMessage in writeOne (e.g. for tests to count or capture writes). When nil, conn.WriteMessage is used.
WriteMessageFunc func(messageType int, data []byte) error
// contains filtered or unexported fields
}
ConnTransport handles a single WebSocket connection as a Voxray transport. It exposes Input (frames from client) and Output (frames to client) and closes when the connection ends or Close is called. THREAD SAFETY: single reader goroutine (readLoop) and single writer goroutine (writeLoop); only they touch conn. Close is idempotent; do not send on Output after Close.
func NewConnTransport ¶
func NewConnTransport(conn *websocket.Conn, inBuf, outBuf int, serializer serialize.Serializer, opts *ConnTransportOptions) *ConnTransport
NewConnTransport builds a transport for an already-upgraded WebSocket connection. If serializer is nil, JSON text messages are used. inBuf and outBuf set channel sizes; zero or negative values default to 64. opts may be nil; when non-nil and WriteCoalesceMs > 0, coalescing is enabled so callers need not set fields after construction. The caller must not use conn for reads or writes after passing it here.
func (*ConnTransport) Close ¶
func (t *ConnTransport) Close() error
Close closes the WebSocket and the Input/Output channels. Idempotent; safe to call from any goroutine. After Close, sending on Output may panic.
func (*ConnTransport) Done ¶
func (t *ConnTransport) Done() <-chan struct{}
Done returns a channel that is closed when the transport has shut down. Safe to select from multiple goroutines.
func (*ConnTransport) Input ¶
func (t *ConnTransport) Input() <-chan frames.Frame
Input returns the channel of frames received from the client. The channel is closed when the transport is closed. Receive-only; safe to read from multiple goroutines.
func (*ConnTransport) LastActivity ¶
func (t *ConnTransport) LastActivity() time.Time
LastActivity returns the last time a frame was successfully read from or written to the client. Returns zero time if no activity has been recorded. Used for session timeouts.
func (*ConnTransport) Output ¶
func (t *ConnTransport) Output() chan<- frames.Frame
Output returns the channel to send frames to the client. Do not send after calling Close; the channel is closed on Close.
type ConnTransportOptions ¶
type ConnTransportOptions struct {
// WriteCoalesceMs when > 0 enables write coalescing (drain up to WriteCoalesceMaxFrames within this many ms).
WriteCoalesceMs int
// WriteCoalesceMaxFrames is the max frames per coalesced batch; 0 means default 10.
WriteCoalesceMaxFrames int
// MaxDurationAfterFirstAudio when > 0 starts a one-shot timer on the
// first inbound *frames.AudioRawFrame and invokes OnMaxDurationTimeout
// when the duration elapses.
MaxDurationAfterFirstAudio time.Duration
OnMaxDurationTimeout func()
}
ConnTransportOptions optionally configures write coalescing when creating a ConnTransport. Pass nil to NewConnTransport for default behaviour (no coalescing).
type ReportErrorFunc ¶
type ReportErrorFunc func(*frames.ErrorFrame)
ReportErrorFunc is called to report connection errors (e.g. push ErrorFrame).
type Server ¶
type Server struct {
Host string
Port int
// SessionTimeout controls how long a connection may remain idle before
// it is closed. If zero or negative, no inactivity timeout is enforced.
SessionTimeout time.Duration
// OnConn is called for each new connection; it receives the transport which should be linked to a pipeline.
OnConn func(ctx context.Context, tr *ConnTransport)
// RegisterHandlers, if non-nil, is called with the HTTP mux before the server
// starts to allow registration of additional HTTP handlers (e.g. WebRTC signaling).
RegisterHandlers func(mux *http.ServeMux)
// ReadHeaderTimeout, ReadTimeout, WriteTimeout, IdleTimeout, MaxHeaderBytes, ShutdownTimeout set HTTP server timeouts.
// Zero values use package defaults (10s, 30s, 30s, 60s, 1MiB, 30s).
ReadHeaderTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
MaxHeaderBytes int
ShutdownTimeout time.Duration
// TLS: when both are non-empty, ListenAndServe uses ListenAndServeTLS(certFile, keyFile).
TLSCertFile string
TLSKeyFile string
// CheckAuth if non-nil is called before upgrading to WebSocket. If it returns false, the handler returns (caller should have written 401).
CheckAuth func(http.ResponseWriter, *http.Request) bool
// GetSerializer if non-nil is called per request to choose the frame serializer (e.g. RTVI when query has rtvi=1).
GetSerializer func(r *http.Request) serialize.Serializer
// WriteCoalesceMs when > 0 enables write coalescing on each ConnTransport (drain up to WriteCoalesceMaxFrames within this many ms).
WriteCoalesceMs int
WriteCoalesceMaxFrames int
// MaxDurationAfterFirstAudio when > 0 starts a one-shot max-duration
// timer on each connection after the first inbound *frames.AudioRawFrame.
// When the timer fires, the per-connection context is canceled, which
// terminates the transport and associated pipeline session.
MaxDurationAfterFirstAudio time.Duration
}
Server is an HTTP server that upgrades requests to /ws to WebSocket and creates a ConnTransport per connection. SessionTimeout is the idle timeout before closing a connection; zero or negative disables it. OnConn is called in a new goroutine for each connection. RegisterHandlers, if set, is called once with the mux before serving.
func (*Server) ListenAndServe ¶
ListenAndServe starts the HTTP server and blocks until ctx is canceled. It registers /ws and, if set, RegisterHandlers. Port 0 is treated as 8080.
func (*Server) ServeWithListener ¶
ServeWithListener runs the same handler logic as ListenAndServe but on the given listener. The listener is not closed when the server shuts down. Used for tests with dynamic ports.
type WebSocketConnector ¶
type WebSocketConnector interface {
// Conn returns the current WebSocket connection (may be nil).
Conn() *websocket.Conn
// SetConn sets the current connection (after Connect or reconnect).
SetConn(conn *websocket.Conn)
// Connect establishes a new connection and sets it via SetConn.
Connect(ctx context.Context) error
// Disconnect closes the current connection and clears it.
Disconnect() error
// ReceiveMessages runs the receive loop until error or connection close.
// Called repeatedly by ReceiveLoop when reconnection is enabled.
ReceiveMessages(ctx context.Context) error
}
WebSocketConnector is implemented by services that own a WebSocket connection. WebsocketServiceBase uses it to verify, reconnect, and run the receive loop.
type WebsocketServiceBase ¶
type WebsocketServiceBase struct {
Connector WebSocketConnector
ReconnectOnError bool
// contains filtered or unexported fields
}
WebsocketServiceBase provides automatic reconnection with exponential backoff, connection verification (ping), and send-with-retry for WebSocket-based services. Embed or compose with a type that implements WebSocketConnector.
func NewWebsocketServiceBase ¶
func NewWebsocketServiceBase(connector WebSocketConnector, reconnectOnError bool) *WebsocketServiceBase
NewWebsocketServiceBase returns a base with the given connector and reconnect policy.
func (*WebsocketServiceBase) Disconnect ¶
func (b *WebsocketServiceBase) Disconnect() error
Disconnect marks disconnecting and closes the connection.
func (*WebsocketServiceBase) MaybeTryReconnect ¶
func (b *WebsocketServiceBase) MaybeTryReconnect(ctx context.Context, message string, reportError ReportErrorFunc, err error) bool
MaybeTryReconnect decides whether to reconnect after an error or graceful close. Returns true if the caller should continue the receive loop (reconnect succeeded).
func (*WebsocketServiceBase) ReceiveLoop ¶
func (b *WebsocketServiceBase) ReceiveLoop(ctx context.Context, reportError ReportErrorFunc)
ReceiveLoop runs ReceiveMessages in a loop, reconnecting on error when ReconnectOnError is true. reportError is called for connection errors when not reconnecting. Stops when context is canceled, Disconnect is called, or reconnection is disabled and an error occurs.
func (*WebsocketServiceBase) Reconnect ¶
func (b *WebsocketServiceBase) Reconnect(ctx context.Context) bool
Reconnect disconnects then connects and verifies. Returns true if connection is healthy after.
func (*WebsocketServiceBase) SendWithRetry ¶
func (b *WebsocketServiceBase) SendWithRetry(ctx context.Context, messageType int, data []byte, reportError ReportErrorFunc) error
SendWithRetry sends the message; on error tries reconnect once and retries send.
func (*WebsocketServiceBase) SetDisconnecting ¶
func (b *WebsocketServiceBase) SetDisconnecting(disconnecting bool)
SetDisconnecting sets whether the service is intentionally disconnecting (disables reconnect).
func (*WebsocketServiceBase) TryReconnect ¶
func (b *WebsocketServiceBase) TryReconnect(ctx context.Context, maxRetries int, reportError ReportErrorFunc) bool
TryReconnect attempts reconnection up to maxRetries with exponential backoff. If reportError is non-nil, it is called with an ErrorFrame on final failure. Returns true if reconnection and verification succeeded.
func (*WebsocketServiceBase) VerifyConnection ¶
func (b *WebsocketServiceBase) VerifyConnection() bool
VerifyConnection pings the current connection. Returns false if conn is nil or closed.