streaming

package
v1.3.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package streaming provides a common WebSocket streaming session abstraction used by provider implementations (OpenAI Realtime, Gemini Live, etc.).

The package separates transport-level concerns (connect, send, receive, heartbeat, reconnect) from provider-specific protocol details (message encoding/decoding).

Index

Constants

View Source
const (
	DefaultDialTimeout      = 10 * time.Second
	DefaultWriteWait        = 10 * time.Second
	DefaultMaxMessageSize   = 16 * 1024 * 1024 // 16MB
	DefaultMaxRetries       = 3
	DefaultRetryBackoffBase = 1 * time.Second
	DefaultRetryBackoffMax  = 30 * time.Second
	DefaultCloseGracePeriod = 5 * time.Second
)

Default connection constants.

View Source
const (
	DefaultResponseChannelSize = 10
)

Default session constants.

Variables

This section is empty.

Functions

This section is empty.

Types

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn manages a WebSocket connection with retry, heartbeat, and graceful shutdown. It handles the transport layer while leaving message encoding to the caller.

func NewConn

func NewConn(cfg *ConnConfig) *Conn

NewConn creates a new Conn. Call Connect or ConnectWithRetry to establish the connection.

func (*Conn) Close

func (c *Conn) Close() error

Close gracefully closes the WebSocket connection.

func (*Conn) Connect

func (c *Conn) Connect(ctx context.Context) error

Connect establishes a WebSocket connection.

func (*Conn) ConnectWithRetry

func (c *Conn) ConnectWithRetry(ctx context.Context) error

ConnectWithRetry attempts to connect with exponential backoff and jitter.

func (*Conn) IsClosed

func (c *Conn) IsClosed() bool

IsClosed returns whether the connection has been closed.

func (*Conn) IsConnected

func (c *Conn) IsConnected() bool

IsConnected returns true if the connection has been established and has not been closed.

func (*Conn) Receive

func (c *Conn) Receive(ctx context.Context) ([]byte, error)

Receive reads a single message from the WebSocket. The call blocks until a message arrives or the context is canceled.

func (*Conn) ReceiveLoop

func (c *Conn) ReceiveLoop(ctx context.Context, msgCh chan<- []byte) error

ReceiveLoop continuously reads messages and sends them to msgCh. It returns when the connection is closed, an error occurs, or the context is canceled.

func (*Conn) Reset

func (c *Conn) Reset()

Reset closes the current connection and prepares for a new one. This is useful for reconnection flows where the caller needs to re-establish the connection with a fresh state.

func (*Conn) Send

func (c *Conn) Send(msg interface{}) error

Send JSON-encodes msg and writes it to the WebSocket.

func (*Conn) SendRaw

func (c *Conn) SendRaw(data []byte) error

SendRaw writes pre-encoded data to the WebSocket.

func (*Conn) StartHeartbeat

func (c *Conn) StartHeartbeat(ctx context.Context, interval time.Duration)

StartHeartbeat starts a goroutine that sends WebSocket ping frames at the given interval.

type ConnConfig

type ConnConfig struct {
	// URL is the WebSocket endpoint URL.
	URL string

	// Headers are sent during the WebSocket handshake.
	Headers http.Header

	// DialTimeout is the handshake timeout. Defaults to DefaultDialTimeout.
	DialTimeout time.Duration

	// WriteWait is the write deadline for each message. Defaults to DefaultWriteWait.
	WriteWait time.Duration

	// MaxMessageSize is the read limit. Defaults to DefaultMaxMessageSize.
	MaxMessageSize int64

	// MaxRetries is the number of connection attempts for ConnectWithRetry.
	// Defaults to DefaultMaxRetries.
	MaxRetries int

	// RetryBackoffBase is the initial backoff delay. Defaults to DefaultRetryBackoffBase.
	RetryBackoffBase time.Duration

	// RetryBackoffMax caps the backoff delay. Defaults to DefaultRetryBackoffMax.
	RetryBackoffMax time.Duration

	// CloseGracePeriod is the deadline for writing the close frame.
	// Defaults to DefaultCloseGracePeriod.
	CloseGracePeriod time.Duration

	// Logger receives debug/warn/error log messages. Optional.
	Logger Logger
}

ConnConfig configures the WebSocket connection behavior.

type ErrorClassifier

type ErrorClassifier func(err error) (shouldReconnect bool)

ErrorClassifier inspects a receive-loop error and decides whether the session should attempt reconnection or give up.

type Logger

type Logger interface {
	Debug(msg string, keysAndValues ...interface{})
	Info(msg string, keysAndValues ...interface{})
	Warn(msg string, keysAndValues ...interface{})
	Error(msg string, keysAndValues ...interface{})
}

Logger is an optional interface for structured logging.

type MessageHandler

type MessageHandler func(data []byte) ([]providers.StreamChunk, error)

MessageHandler processes a raw WebSocket message and converts it into zero or more StreamChunk values. Returning a non-nil error signals a fatal session error.

type ReconnectHook

type ReconnectHook func(ctx context.Context, conn *Conn) error

ReconnectHook is called when the session wants to reconnect. The implementation should re-establish provider-specific setup (e.g. resend a Gemini setup message or wait for an OpenAI session.created event) on the provided Conn. Return nil on success, or an error to abandon the reconnection attempt.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session manages a bidirectional streaming session over a WebSocket connection. It runs a receive loop that decodes messages via the caller-provided MessageHandler and emits StreamChunk values on the Response channel. The session supports optional automatic reconnection on transient errors.

func NewSession

func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error)

NewSession creates and starts a streaming session. The receive loop is started automatically in a background goroutine.

func (*Session) Close

func (s *Session) Close() error

Close terminates the session and closes the underlying connection. Safe to call multiple times.

func (*Session) Closed

func (s *Session) Closed() bool

Closed reports whether the session has been closed.

func (*Session) Conn

func (s *Session) Conn() *Conn

Conn returns the underlying connection, allowing callers to perform provider-specific operations (e.g., direct Receive for setup handshakes).

func (*Session) Done

func (s *Session) Done() <-chan struct{}

Done returns a channel that is closed when the session context is canceled.

func (*Session) Error

func (s *Session) Error() error

Error returns the first error that caused the session to end, or nil.

func (*Session) Response

func (s *Session) Response() <-chan providers.StreamChunk

Response returns the channel for receiving streaming response chunks. The channel is closed when the session ends.

func (*Session) Send

func (s *Session) Send(msg interface{}) error

Send JSON-encodes and sends a message through the underlying connection. Returns an error if the session is closed.

func (*Session) SendRaw

func (s *Session) SendRaw(data []byte) error

SendRaw sends pre-encoded data through the underlying connection.

type SessionConfig

type SessionConfig struct {
	// Conn is the underlying WebSocket connection. Required.
	Conn *Conn

	// OnMessage processes raw WebSocket messages into StreamChunks. Required.
	OnMessage MessageHandler

	// OnError classifies receive errors. Optional — when nil, all errors are fatal.
	OnError ErrorClassifier

	// OnReconnect is called to re-establish provider state after a new connection.
	// Optional — when nil, reconnection is disabled.
	OnReconnect ReconnectHook

	// MaxReconnectAttempts limits how many times the session will try to reconnect.
	// Defaults to 0 (no reconnection) when OnReconnect is nil.
	MaxReconnectAttempts int

	// ResponseChannelSize sets the buffer size of the response channel.
	// Defaults to DefaultResponseChannelSize.
	ResponseChannelSize int

	// Logger for session-level messages. Optional.
	Logger Logger
}

SessionConfig configures a streaming Session.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL