websocket

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package websocket provides WebSocket transport (server and client) for Voxray.

Package websocket: WebsocketServiceBase provides reconnection, backoff, and send-with-retry for services that hold a long-lived WebSocket (e.g. realtime, Sarvam streaming).

Package websocket provides a WebSocket transport for Voxray (one connection = one session).

Index

Constants

View Source
const DefaultReadLimit = 1 << 20

DefaultReadLimit is the maximum WebSocket message size in bytes (1MB). Prevents memory exhaustion from oversized frames.

View Source
const DefaultSessionTimeout = 5 * time.Minute

DefaultSessionTimeout is the default inactivity duration before the server closes a WebSocket. Zero disables inactivity timeouts.

Variables

This section is empty.

Functions

func Upgrade

func Upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error)

Upgrade upgrades the HTTP connection to WebSocket and returns the connection. Used by server for custom handlers (e.g. telephony) that need to read handshake messages before creating ConnTransport.

Types

type ClientConfig

type ClientConfig struct {
	InBufSize  int
	OutBufSize int
}

ClientConfig configures the WebSocket client. InBufSize and OutBufSize set the Input and Output channel capacities; zero or negative values are replaced by 64.

type ClientTransport

type ClientTransport struct {
	// contains filtered or unexported fields
}

ClientTransport is an outbound WebSocket transport that connects to a Voxray server. It implements transport.Transport: Input receives frames from the server, Output sends frames to the server. Close is idempotent; do not send on Output after Close.

func NewClientTransport

func NewClientTransport(wsURL string, cfg *ClientConfig) (*ClientTransport, error)

NewClientTransport creates a client transport for the given WebSocket URL (e.g. ws://host/ws or wss://host/ws). The connection is established when Start is called. Returns an error if the URL is invalid or the scheme is not ws or wss.

func (*ClientTransport) Close

func (t *ClientTransport) Close() error

Close closes the connection and the Input/Output channels. Idempotent; safe to call from any goroutine.

func (*ClientTransport) Input

func (t *ClientTransport) Input() <-chan frames.Frame

Input returns the channel of frames received from the server. Closed when the transport is closed.

func (*ClientTransport) Output

func (t *ClientTransport) Output() chan<- frames.Frame

Output returns the channel to send frames to the server. Do not send after Close.

func (*ClientTransport) Start

func (t *ClientTransport) Start(ctx context.Context) error

Start dials the WebSocket URL and starts the read and write loops. Returns an error if the context is nil or the dial fails. When ctx is canceled, the transport is closed.

type ConnTransport

type ConnTransport struct {

	// WriteMessageFunc, when non-nil, is used instead of conn.WriteMessage in writeOne (e.g. for tests to count or capture writes). When nil, conn.WriteMessage is used.
	WriteMessageFunc func(messageType int, data []byte) error
	// contains filtered or unexported fields
}

ConnTransport handles a single WebSocket connection as a Voxray transport. It exposes Input (frames from client) and Output (frames to client) and closes when the connection ends or Close is called. THREAD SAFETY: single reader goroutine (readLoop) and single writer goroutine (writeLoop); only they touch conn. Close is idempotent; do not send on Output after Close.

func NewConnTransport

func NewConnTransport(conn *websocket.Conn, inBuf, outBuf int, serializer serialize.Serializer, opts *ConnTransportOptions) *ConnTransport

NewConnTransport builds a transport for an already-upgraded WebSocket connection. If serializer is nil, JSON text messages are used. inBuf and outBuf set channel sizes; zero or negative values default to 64. opts may be nil; when non-nil and WriteCoalesceMs > 0, coalescing is enabled so callers need not set fields after construction. The caller must not use conn for reads or writes after passing it here.

func (*ConnTransport) Close

func (t *ConnTransport) Close() error

Close closes the WebSocket and the Input/Output channels. Idempotent; safe to call from any goroutine. After Close, sending on Output may panic.

func (*ConnTransport) Done

func (t *ConnTransport) Done() <-chan struct{}

Done returns a channel that is closed when the transport has shut down. Safe to select from multiple goroutines.

func (*ConnTransport) Input

func (t *ConnTransport) Input() <-chan frames.Frame

Input returns the channel of frames received from the client. The channel is closed when the transport is closed. Receive-only; safe to read from multiple goroutines.

func (*ConnTransport) LastActivity

func (t *ConnTransport) LastActivity() time.Time

LastActivity returns the last time a frame was successfully read from or written to the client. Returns zero time if no activity has been recorded. Used for session timeouts.

func (*ConnTransport) Output

func (t *ConnTransport) Output() chan<- frames.Frame

Output returns the channel to send frames to the client. Do not send after calling Close; the channel is closed on Close.

func (*ConnTransport) Start

func (t *ConnTransport) Start(ctx context.Context) error

Start starts the read and write loops and returns immediately. The context drives shutdown: when it is canceled, the transport is closed. Returns an error if ctx is nil.

type ConnTransportOptions

type ConnTransportOptions struct {
	// WriteCoalesceMs when > 0 enables write coalescing (drain up to WriteCoalesceMaxFrames within this many ms).
	WriteCoalesceMs int
	// WriteCoalesceMaxFrames is the max frames per coalesced batch; 0 means default 10.
	WriteCoalesceMaxFrames int

	// MaxDurationAfterFirstAudio when > 0 starts a one-shot timer on the
	// first inbound *frames.AudioRawFrame and invokes OnMaxDurationTimeout
	// when the duration elapses.
	MaxDurationAfterFirstAudio time.Duration
	OnMaxDurationTimeout       func()
}

ConnTransportOptions optionally configures write coalescing when creating a ConnTransport. Pass nil to NewConnTransport for default behaviour (no coalescing).

type ReportErrorFunc

type ReportErrorFunc func(*frames.ErrorFrame)

ReportErrorFunc is called to report connection errors (e.g. push ErrorFrame).

type Server

type Server struct {
	Host string
	Port int
	// SessionTimeout controls how long a connection may remain idle before
	// it is closed. If zero or negative, no inactivity timeout is enforced.
	SessionTimeout time.Duration
	// OnConn is called for each new connection; it receives the transport which should be linked to a pipeline.
	OnConn func(ctx context.Context, tr *ConnTransport)
	// RegisterHandlers, if non-nil, is called with the HTTP mux before the server
	// starts to allow registration of additional HTTP handlers (e.g. WebRTC signaling).
	RegisterHandlers func(mux *http.ServeMux)
	// ReadHeaderTimeout, ReadTimeout, WriteTimeout, IdleTimeout, MaxHeaderBytes, ShutdownTimeout set HTTP server timeouts.
	// Zero values use package defaults (10s, 30s, 30s, 60s, 1MiB, 30s).
	ReadHeaderTimeout time.Duration
	ReadTimeout       time.Duration
	WriteTimeout      time.Duration
	IdleTimeout       time.Duration
	MaxHeaderBytes    int
	ShutdownTimeout   time.Duration
	// TLS: when both are non-empty, ListenAndServe uses ListenAndServeTLS(certFile, keyFile).
	TLSCertFile string
	TLSKeyFile  string
	// CheckAuth if non-nil is called before upgrading to WebSocket. If it returns false, the handler returns (caller should have written 401).
	CheckAuth func(http.ResponseWriter, *http.Request) bool
	// GetSerializer if non-nil is called per request to choose the frame serializer (e.g. RTVI when query has rtvi=1).
	GetSerializer func(r *http.Request) serialize.Serializer
	// WriteCoalesceMs when > 0 enables write coalescing on each ConnTransport (drain up to WriteCoalesceMaxFrames within this many ms).
	WriteCoalesceMs        int
	WriteCoalesceMaxFrames int

	// MaxDurationAfterFirstAudio when > 0 starts a one-shot max-duration
	// timer on each connection after the first inbound *frames.AudioRawFrame.
	// When the timer fires, the per-connection context is canceled, which
	// terminates the transport and associated pipeline session.
	MaxDurationAfterFirstAudio time.Duration
}

Server is an HTTP server that upgrades requests to /ws to WebSocket and creates a ConnTransport per connection. SessionTimeout is the idle timeout before closing a connection; zero or negative disables it. OnConn is called in a new goroutine for each connection. RegisterHandlers, if set, is called once with the mux before serving.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(ctx context.Context) error

ListenAndServe starts the HTTP server and blocks until ctx is canceled. It registers /ws and, if set, RegisterHandlers. Port 0 is treated as 8080.

func (*Server) ServeWithListener

func (s *Server) ServeWithListener(ctx context.Context, listener net.Listener) error

ServeWithListener runs the same handler logic as ListenAndServe but on the given listener. The listener is not closed when the server shuts down. Used for tests with dynamic ports.

type WebSocketConnector

type WebSocketConnector interface {
	// Conn returns the current WebSocket connection (may be nil).
	Conn() *websocket.Conn
	// SetConn sets the current connection (after Connect or reconnect).
	SetConn(conn *websocket.Conn)
	// Connect establishes a new connection and sets it via SetConn.
	Connect(ctx context.Context) error
	// Disconnect closes the current connection and clears it.
	Disconnect() error
	// ReceiveMessages runs the receive loop until error or connection close.
	// Called repeatedly by ReceiveLoop when reconnection is enabled.
	ReceiveMessages(ctx context.Context) error
}

WebSocketConnector is implemented by services that own a WebSocket connection. WebsocketServiceBase uses it to verify, reconnect, and run the receive loop.

type WebsocketServiceBase

type WebsocketServiceBase struct {
	Connector        WebSocketConnector
	ReconnectOnError bool
	// contains filtered or unexported fields
}

WebsocketServiceBase provides automatic reconnection with exponential backoff, connection verification (ping), and send-with-retry for WebSocket-based services. Embed or compose with a type that implements WebSocketConnector.

func NewWebsocketServiceBase

func NewWebsocketServiceBase(connector WebSocketConnector, reconnectOnError bool) *WebsocketServiceBase

NewWebsocketServiceBase returns a base with the given connector and reconnect policy.

func (*WebsocketServiceBase) Disconnect

func (b *WebsocketServiceBase) Disconnect() error

Disconnect marks disconnecting and closes the connection.

func (*WebsocketServiceBase) MaybeTryReconnect

func (b *WebsocketServiceBase) MaybeTryReconnect(ctx context.Context, message string, reportError ReportErrorFunc, err error) bool

MaybeTryReconnect decides whether to reconnect after an error or graceful close. Returns true if the caller should continue the receive loop (reconnect succeeded).

func (*WebsocketServiceBase) ReceiveLoop

func (b *WebsocketServiceBase) ReceiveLoop(ctx context.Context, reportError ReportErrorFunc)

ReceiveLoop runs ReceiveMessages in a loop, reconnecting on error when ReconnectOnError is true. reportError is called for connection errors when not reconnecting. Stops when context is canceled, Disconnect is called, or reconnection is disabled and an error occurs.

func (*WebsocketServiceBase) Reconnect

func (b *WebsocketServiceBase) Reconnect(ctx context.Context) bool

Reconnect disconnects then connects and verifies. Returns true if connection is healthy after.

func (*WebsocketServiceBase) SendWithRetry

func (b *WebsocketServiceBase) SendWithRetry(ctx context.Context, messageType int, data []byte, reportError ReportErrorFunc) error

SendWithRetry sends the message; on error tries reconnect once and retries send.

func (*WebsocketServiceBase) SetDisconnecting

func (b *WebsocketServiceBase) SetDisconnecting(disconnecting bool)

SetDisconnecting sets whether the service is intentionally disconnecting (disables reconnect).

func (*WebsocketServiceBase) TryReconnect

func (b *WebsocketServiceBase) TryReconnect(ctx context.Context, maxRetries int, reportError ReportErrorFunc) bool

TryReconnect attempts reconnection up to maxRetries with exponential backoff. If reportError is non-nil, it is called with an ErrorFrame on final failure. Returns true if reconnection and verification succeeded.

func (*WebsocketServiceBase) VerifyConnection

func (b *WebsocketServiceBase) VerifyConnection() bool

VerifyConnection pings the current connection. Returns false if conn is nil or closed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL