Documentation
¶
Overview ¶
Package streaming provides utilities for SSE (Server-Sent Events) and streaming operations.
This package provides public streaming APIs for use by external gateways and consumers. The primary use case is context-aware stream cancellation for proper SSE lifecycle management.
Key Features:
- Context-aware stream wrapping with cancellation support
- SSE error event generation for client-side error handling
- Proper categorization of streaming errors (timeout, cancellation, network, etc.)
Example Usage:
// Wrap a provider stream with context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
baseStream := provider.CreateStream(response)
stream := streaming.StreamFromContext(ctx, baseStream)
for {
chunk, err := stream.Next()
if err != nil {
// Handle SSE error forwarding
if chunk.Error != "" {
// Forward SSE error event to client
fmt.Fprintf(w, "event: error\ndata: %s\n\n", chunk.Error)
}
break
}
// Process chunk...
if chunk.Done {
break
}
}
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StreamFromContext ¶
func StreamFromContext(ctx context.Context, baseStream types.ChatCompletionStream) types.ChatCompletionStream
StreamFromContext creates a context-aware stream from a base stream with cancellation support.
This function wraps a ChatCompletionStream with context awareness, enabling proper SSE (Server-Sent Events) cancellation when the context is canceled or times out. It is critical for external gateways that need to manage streaming lifecycle and forward error events to clients.
The wrapped stream monitors the context and returns properly formatted error chunks that can be forwarded as SSE error events. The error categorization includes:
- context.DeadlineExceeded -> timeout errors
- context.Canceled -> client cancellation errors
- Other errors -> categorized by type (network, API, rate limit, etc.)
Parameters:
- ctx: The context to monitor for cancellation. Use context.WithTimeout or context.WithCancel to enable cancellation.
- baseStream: The underlying ChatCompletionStream to wrap.
Returns:
- A ChatCompletionStream that respects context cancellation and returns error chunks formatted for SSE forwarding.
Example:
// Create a stream with 30 second timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Get stream from provider
baseStream := provider.ChatCompletion(ctx, request)
// Wrap with context for SSE cancellation support
stream := streaming.StreamFromContext(ctx, baseStream)
// Stream responses with context awareness
for {
chunk, err := stream.Next()
if err != nil {
// Error chunk contains SSE-formatted error for client forwarding
if chunk.Error != "" {
// Forward to client as SSE error event
sseError := fmt.Sprintf("event: error\ndata: %s\n\n", chunk.Error)
fmt.Fprint(w, sseError)
}
break
}
// Forward normal chunk to client
forwardChunkToClient(w, chunk)
if chunk.Done {
break
}
}
Types ¶
type ContextAwareStream ¶
type ContextAwareStream struct {
// contains filtered or unexported fields
}
ContextAwareStream wraps a stream with context awareness
func (*ContextAwareStream) Close ¶
func (cas *ContextAwareStream) Close() error
Close closes the underlying stream
func (*ContextAwareStream) GetStreamError ¶
func (cas *ContextAwareStream) GetStreamError() StreamError
GetStreamError returns a StreamError based on the context state
func (*ContextAwareStream) Next ¶
func (cas *ContextAwareStream) Next() (types.ChatCompletionChunk, error)
Next returns the next chunk, respecting context cancellation Detects context cancellation and timeout errors for proper SSE error forwarding
type StreamError ¶
StreamError represents an error during streaming
func MakeStreamError ¶
func MakeStreamError(err error) StreamError
MakeStreamError creates a StreamError from an error
func (StreamError) ToSSEEvent ¶
func (se StreamError) ToSSEEvent() string
ToSSEEvent converts the StreamError to an SSE event string Format: event: error\ndata: {"type":"...","message":"..."}\n\n