Documentation
¶
Overview ¶
Package streaming provides a common WebSocket streaming session abstraction used by provider implementations (OpenAI Realtime, Gemini Live, etc.).
The package separates transport-level concerns (connect, send, receive, heartbeat, reconnect) from provider-specific protocol details (message encoding/decoding).
Index ¶
- Constants
- type Conn
- func (c *Conn) Close() error
- func (c *Conn) Connect(ctx context.Context) error
- func (c *Conn) ConnectWithRetry(ctx context.Context) error
- func (c *Conn) IsClosed() bool
- func (c *Conn) IsConnected() bool
- func (c *Conn) Receive(ctx context.Context) ([]byte, error)
- func (c *Conn) ReceiveLoop(ctx context.Context, msgCh chan<- []byte) error
- func (c *Conn) Reset()
- func (c *Conn) Send(msg interface{}) error
- func (c *Conn) SendRaw(data []byte) error
- func (c *Conn) StartHeartbeat(ctx context.Context, interval time.Duration)
- type ConnConfig
- type ErrorClassifier
- type Logger
- type MessageHandler
- type ReconnectHook
- type Session
- func (s *Session) Close() error
- func (s *Session) Closed() bool
- func (s *Session) Conn() *Conn
- func (s *Session) Done() <-chan struct{}
- func (s *Session) Error() error
- func (s *Session) Response() <-chan providers.StreamChunk
- func (s *Session) Send(msg interface{}) error
- func (s *Session) SendRaw(data []byte) error
- type SessionConfig
Constants ¶
const ( DefaultDialTimeout = 10 * time.Second DefaultWriteWait = 10 * time.Second DefaultMaxMessageSize = 16 * 1024 * 1024 // 16MB DefaultMaxRetries = 3 DefaultRetryBackoffBase = 1 * time.Second DefaultRetryBackoffMax = 30 * time.Second DefaultCloseGracePeriod = 5 * time.Second )
Default connection constants.
const (
DefaultResponseChannelSize = 10
)
Default session constants.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn manages a WebSocket connection with retry, heartbeat, and graceful shutdown. It handles the transport layer while leaving message encoding to the caller.
func NewConn ¶
func NewConn(cfg *ConnConfig) *Conn
NewConn creates a new Conn. Call Connect or ConnectWithRetry to establish the connection.
func (*Conn) ConnectWithRetry ¶
ConnectWithRetry attempts to connect with exponential backoff and jitter.
func (*Conn) IsConnected ¶
IsConnected returns true if the connection has been established and has not been closed.
func (*Conn) Receive ¶
Receive reads a single message from the WebSocket. The call blocks until a message arrives or the context is canceled.
func (*Conn) ReceiveLoop ¶
ReceiveLoop continuously reads messages and sends them to msgCh. It returns when the connection is closed, an error occurs, or the context is canceled.
func (*Conn) Reset ¶
func (c *Conn) Reset()
Reset closes the current connection and prepares for a new one. This is useful for reconnection flows where the caller needs to re-establish the connection with a fresh state.
type ConnConfig ¶
type ConnConfig struct {
// URL is the WebSocket endpoint URL.
URL string
// Headers are sent during the WebSocket handshake.
Headers http.Header
// DialTimeout is the handshake timeout. Defaults to DefaultDialTimeout.
DialTimeout time.Duration
// WriteWait is the write deadline for each message. Defaults to DefaultWriteWait.
WriteWait time.Duration
// MaxMessageSize is the read limit. Defaults to DefaultMaxMessageSize.
MaxMessageSize int64
// MaxRetries is the number of connection attempts for ConnectWithRetry.
// Defaults to DefaultMaxRetries.
MaxRetries int
// RetryBackoffBase is the initial backoff delay. Defaults to DefaultRetryBackoffBase.
RetryBackoffBase time.Duration
// RetryBackoffMax caps the backoff delay. Defaults to DefaultRetryBackoffMax.
RetryBackoffMax time.Duration
// CloseGracePeriod is the deadline for writing the close frame.
// Defaults to DefaultCloseGracePeriod.
CloseGracePeriod time.Duration
// Logger receives debug/warn/error log messages. Optional.
Logger Logger
}
ConnConfig configures the WebSocket connection behavior.
type ErrorClassifier ¶
ErrorClassifier inspects a receive-loop error and decides whether the session should attempt reconnection or give up.
type Logger ¶
type Logger interface {
Debug(msg string, keysAndValues ...interface{})
Info(msg string, keysAndValues ...interface{})
Warn(msg string, keysAndValues ...interface{})
Error(msg string, keysAndValues ...interface{})
}
Logger is an optional interface for structured logging.
type MessageHandler ¶
type MessageHandler func(data []byte) ([]providers.StreamChunk, error)
MessageHandler processes a raw WebSocket message and converts it into zero or more StreamChunk values. Returning a non-nil error signals a fatal session error.
type ReconnectHook ¶
ReconnectHook is called when the session wants to reconnect. The implementation should re-establish provider-specific setup (e.g. resend a Gemini setup message or wait for an OpenAI session.created event) on the provided Conn. Return nil on success, or an error to abandon the reconnection attempt.
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session manages a bidirectional streaming session over a WebSocket connection. It runs a receive loop that decodes messages via the caller-provided MessageHandler and emits StreamChunk values on the Response channel. The session supports optional automatic reconnection on transient errors.
func NewSession ¶
func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error)
NewSession creates and starts a streaming session. The receive loop is started automatically in a background goroutine.
func (*Session) Close ¶
Close terminates the session and closes the underlying connection. Safe to call multiple times.
func (*Session) Conn ¶
Conn returns the underlying connection, allowing callers to perform provider-specific operations (e.g., direct Receive for setup handshakes).
func (*Session) Done ¶
func (s *Session) Done() <-chan struct{}
Done returns a channel that is closed when the session context is canceled.
func (*Session) Response ¶
func (s *Session) Response() <-chan providers.StreamChunk
Response returns the channel for receiving streaming response chunks. The channel is closed when the session ends.
type SessionConfig ¶
type SessionConfig struct {
// Conn is the underlying WebSocket connection. Required.
Conn *Conn
// OnMessage processes raw WebSocket messages into StreamChunks. Required.
OnMessage MessageHandler
// OnError classifies receive errors. Optional — when nil, all errors are fatal.
OnError ErrorClassifier
// OnReconnect is called to re-establish provider state after a new connection.
// Optional — when nil, reconnection is disabled.
OnReconnect ReconnectHook
// MaxReconnectAttempts limits how many times the session will try to reconnect.
// Defaults to 0 (no reconnection) when OnReconnect is nil.
MaxReconnectAttempts int
// ResponseChannelSize sets the buffer size of the response channel.
// Defaults to DefaultResponseChannelSize.
ResponseChannelSize int
// Logger for session-level messages. Optional.
Logger Logger
}
SessionConfig configures a streaming Session.