streaming

package
v1.0.61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package streaming provides utilities for SSE (Server-Sent Events) and streaming operations.

This package provides public streaming APIs for use by external gateways and consumers. The primary use case is context-aware stream cancellation for proper SSE lifecycle management.

Key Features:

  • Context-aware stream wrapping with cancellation support
  • SSE error event generation for client-side error handling
  • Proper categorization of streaming errors (timeout, cancellation, network, etc.)

Example Usage:

// Wrap a provider stream with context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

baseStream := provider.CreateStream(response)
stream := streaming.StreamFromContext(ctx, baseStream)

for {
    chunk, err := stream.Next()
    if err != nil {
        // Handle SSE error forwarding
        if chunk.Error != "" {
            // Forward SSE error event to client
            fmt.Fprintf(w, "event: error\ndata: %s\n\n", chunk.Error)
        }
        break
    }
    // Process chunk...
    if chunk.Done {
        break
    }
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StreamFromContext

func StreamFromContext(ctx context.Context, baseStream types.ChatCompletionStream) types.ChatCompletionStream

StreamFromContext creates a context-aware stream from a base stream with cancellation support.

This function wraps a ChatCompletionStream with context awareness, enabling proper SSE (Server-Sent Events) cancellation when the context is canceled or times out. It is critical for external gateways that need to manage streaming lifecycle and forward error events to clients.

The wrapped stream monitors the context and returns properly formatted error chunks that can be forwarded as SSE error events. The error categorization includes:

  • context.DeadlineExceeded -> timeout errors
  • context.Canceled -> client cancellation errors
  • Other errors -> categorized by type (network, API, rate limit, etc.)

Parameters:

  • ctx: The context to monitor for cancellation. Use context.WithTimeout or context.WithCancel to enable cancellation.
  • baseStream: The underlying ChatCompletionStream to wrap.

Returns:

  • A ChatCompletionStream that respects context cancellation and returns error chunks formatted for SSE forwarding.

Example:

// Create a stream with 30 second timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Get stream from provider
baseStream := provider.ChatCompletion(ctx, request)

// Wrap with context for SSE cancellation support
stream := streaming.StreamFromContext(ctx, baseStream)

// Stream responses with context awareness
for {
    chunk, err := stream.Next()
    if err != nil {
        // Error chunk contains SSE-formatted error for client forwarding
        if chunk.Error != "" {
            // Forward to client as SSE error event
            sseError := fmt.Sprintf("event: error\ndata: %s\n\n", chunk.Error)
            fmt.Fprint(w, sseError)
        }
        break
    }

    // Forward normal chunk to client
    forwardChunkToClient(w, chunk)

    if chunk.Done {
        break
    }
}

Types

type ContextAwareStream

type ContextAwareStream struct {
	// contains filtered or unexported fields
}

ContextAwareStream wraps a stream with context awareness

func (*ContextAwareStream) Close

func (cas *ContextAwareStream) Close() error

Close closes the underlying stream

func (*ContextAwareStream) GetStreamError

func (cas *ContextAwareStream) GetStreamError() StreamError

GetStreamError returns a StreamError based on the context state

func (*ContextAwareStream) Next

Next returns the next chunk, respecting context cancellation Detects context cancellation and timeout errors for proper SSE error forwarding

type ErrorType

type ErrorType string

ErrorType represents the type of streaming error

const (
	ErrorTypeStreamInterrupted ErrorType = "stream_interrupted"
	ErrorTypeAPIError          ErrorType = "api_error"
	ErrorTypeRateLimit         ErrorType = "rate_limit"
	ErrorTypeNetwork           ErrorType = "network"
	ErrorTypeTimeout           ErrorType = "timeout"
	ErrorTypeContextCanceled   ErrorType = "context_canceled"
)

type StreamError

type StreamError struct {
	Type    ErrorType `json:"type"`
	Message string    `json:"message"`
}

StreamError represents an error during streaming

func MakeStreamError

func MakeStreamError(err error) StreamError

MakeStreamError creates a StreamError from an error

func (StreamError) ToSSEEvent

func (se StreamError) ToSSEEvent() string

ToSSEEvent converts the StreamError to an SSE event string Format: event: error\ndata: {"type":"...","message":"..."}\n\n

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL