streaming

package
v1.0.66 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package streaming provides a shared streaming request executor for AI providers. This consolidates common streaming patterns to reduce boilerplate across providers.

Package streaming provides streaming utilities for AI provider implementations. This file contains tool conversion functions for OpenAI-compatible providers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConvertOpenAICompatibleToolCallsToUniversal

func ConvertOpenAICompatibleToolCallsToUniversal(toolCalls []OpenAICompatibleToolCall) []types.ToolCall

ConvertOpenAICompatibleToolCallsToUniversal converts OpenAI-compatible tool calls to universal format This function can be used by any provider that uses OpenAI's tool call format (OpenAI, Qwen, etc.)

func CreateAnthropicStream

func CreateAnthropicStream(response *http.Response) types.ChatCompletionStream

CreateAnthropicStream creates a stream for Anthropic responses

func CreateCustomStream

func CreateCustomStream(response *http.Response, parser StreamParser) types.ChatCompletionStream

CreateCustomStream creates a stream with a custom parser

func CreateErrorStream

func CreateErrorStream(err error) types.ChatCompletionStream

CreateErrorStream creates a stream that immediately returns an error

func CreateOpenAIStream

func CreateOpenAIStream(response *http.Response) types.ChatCompletionStream

CreateOpenAIStream creates a stream for OpenAI-compatible responses

func JSONLineProcessor

func JSONLineProcessor(data string, target interface{}, chunkExtractor func(interface{}) types.ChatCompletionChunk) (types.ChatCompletionChunk, error, bool)

JSONLineProcessor provides standard JSON line processing with error handling

func SSELineProcessor

func SSELineProcessor(data string, responseParser func(string) (types.ChatCompletionChunk, bool)) (types.ChatCompletionChunk, error, bool)

SSELineProcessor provides standard SSE (Server-Sent Events) line processing

func StreamFromContext

func StreamFromContext(ctx context.Context, baseStream types.ChatCompletionStream) types.ChatCompletionStream

StreamFromContext creates a stream from context with cancellation support

Types

type AnthropicStreamParser

type AnthropicStreamParser struct {
	// contains filtered or unexported fields
}

AnthropicStreamParser provides a parser for Anthropic's streaming format

func NewAnthropicStreamParser

func NewAnthropicStreamParser() *AnthropicStreamParser

NewAnthropicStreamParser creates a new Anthropic stream parser

func (*AnthropicStreamParser) ParseLine

ParseLine parses a line from an Anthropic stream

type BaseStream

type BaseStream struct {
	// contains filtered or unexported fields
}

BaseStream provides a base implementation of ChatCompletionStream

func NewBaseStream

func NewBaseStream(processor *StreamProcessor, parser StreamParser) *BaseStream

NewBaseStream creates a new base stream

func (*BaseStream) Close

func (bs *BaseStream) Close() error

Close closes the stream

func (*BaseStream) Next

func (bs *BaseStream) Next() (types.ChatCompletionChunk, error)

Next returns the next chunk from the stream

type ContextAwareStream

type ContextAwareStream struct {
	// contains filtered or unexported fields
}

ContextAwareStream wraps a stream with context awareness

func (*ContextAwareStream) Close

func (cas *ContextAwareStream) Close() error

Close closes the underlying stream

func (*ContextAwareStream) GetStreamError

func (cas *ContextAwareStream) GetStreamError() streaming.StreamError

GetStreamError returns a StreamError based on the context state

func (*ContextAwareStream) Next

Next returns the next chunk, respecting context cancellation Detects context cancellation and timeout errors for proper SSE error forwarding

type ErrorStream

type ErrorStream struct {
	// contains filtered or unexported fields
}

ErrorStream is a stream that always returns an error

func (*ErrorStream) Close

func (es *ErrorStream) Close() error

func (*ErrorStream) Next

type GenericSSEStream

type GenericSSEStream struct {
	// contains filtered or unexported fields
}

GenericSSEStream wraps SSE parsing for any provider that implements SSELineParser. It handles the low-level SSE protocol details (reading lines, handling "data:" prefix) while delegating provider-specific parsing to the SSELineParser implementation.

func NewGenericSSEStream

func NewGenericSSEStream(resp *http.Response, parser SSELineParser) *GenericSSEStream

NewGenericSSEStream creates a new generic SSE stream with the given response and parser. The parser parameter defines how individual SSE lines are parsed into ChatCompletionChunk objects.

func (*GenericSSEStream) Close

func (s *GenericSSEStream) Close() error

Close closes the underlying HTTP response body and cleans up resources.

func (*GenericSSEStream) Next

Next returns the next chunk from the SSE stream. It reads lines from the stream, extracts SSE data, and uses the parser to convert them to chunks. Returns io.EOF when the stream is complete or an error if parsing fails.

type MockStream

type MockStream struct {
	// contains filtered or unexported fields
}

MockStream provides a mock implementation of ChatCompletionStream for testing

func NewMockStream

func NewMockStream(chunks []types.ChatCompletionChunk) *MockStream

NewMockStream creates a new mock stream with the given chunks

func (*MockStream) Close

func (ms *MockStream) Close() error

Close resets the mock stream

func (*MockStream) Next

func (ms *MockStream) Next() (types.ChatCompletionChunk, error)

Next returns the next chunk from the mock stream

type OpenAICompatibleFunctionDef

type OpenAICompatibleFunctionDef struct {
	Name        string                 `json:"name"`
	Description string                 `json:"description"`
	Parameters  map[string]interface{} `json:"parameters"` // JSON Schema
}

OpenAICompatibleFunctionDef represents a function definition in OpenAI-compatible format

type OpenAICompatibleParser

type OpenAICompatibleParser struct {
	// SkipEmptyContent determines whether to skip chunks with empty content
	SkipEmptyContent bool
}

OpenAICompatibleParser handles OpenAI-style SSE responses. This parser works with the standard OpenAI streaming format and can be used by any provider that follows OpenAI's streaming API conventions.

func NewOpenAICompatibleParser

func NewOpenAICompatibleParser() *OpenAICompatibleParser

NewOpenAICompatibleParser creates a new OpenAI-compatible SSE parser with default settings.

func (*OpenAICompatibleParser) IsDone

func (p *OpenAICompatibleParser) IsDone(line string) bool

IsDone checks if the given line indicates stream completion. OpenAI sends "[DONE]" as the final message in the stream.

func (*OpenAICompatibleParser) ParseLine

ParseLine parses a single SSE data line in OpenAI format into a ChatCompletionChunk. The OpenAI format is JSON with the structure:

{
  "id": "chatcmpl-...",
  "object": "chat.completion.chunk",
  "created": 1234567890,
  "model": "gpt-4",
  "choices": [{
    "index": 0,
    "delta": {
      "role": "assistant",
      "content": "Hello"
    },
    "finish_reason": null
  }]
}

type OpenAICompatibleTool

type OpenAICompatibleTool struct {
	Type     string                      `json:"type"` // Always "function"
	Function OpenAICompatibleFunctionDef `json:"function"`
}

OpenAICompatibleTool represents a tool in OpenAI-compatible API format Used by OpenAI, Qwen, and other OpenAI-compatible providers

func ConvertToOpenAICompatibleTools

func ConvertToOpenAICompatibleTools(tools []types.Tool) []OpenAICompatibleTool

ConvertToOpenAICompatibleTools converts universal tools to OpenAI-compatible format This function can be used by any provider that uses OpenAI's tool format (OpenAI, Qwen, etc.)

type OpenAICompatibleToolCall

type OpenAICompatibleToolCall struct {
	ID       string                           `json:"id"`
	Type     string                           `json:"type"` // "function"
	Function OpenAICompatibleToolCallFunction `json:"function"`
}

OpenAICompatibleToolCall represents a tool call in OpenAI-compatible format

func ConvertToOpenAICompatibleToolCalls

func ConvertToOpenAICompatibleToolCalls(toolCalls []types.ToolCall) []OpenAICompatibleToolCall

ConvertToOpenAICompatibleToolCalls converts universal tool calls to OpenAI-compatible format This function can be used by any provider that uses OpenAI's tool call format (OpenAI, Qwen, etc.)

type OpenAICompatibleToolCallFunction

type OpenAICompatibleToolCallFunction struct {
	Name      string `json:"name"`
	Arguments string `json:"arguments"` // JSON string
}

OpenAICompatibleToolCallFunction represents a function call in a tool call

type ProcessLineFunc

type ProcessLineFunc func(line string) (types.ChatCompletionChunk, error, bool)

ProcessLineFunc processes a single line from a streaming response

type RequestBuilder

type RequestBuilder struct {
	// contains filtered or unexported fields
}

RequestBuilder helps build streaming requests with a fluent API

func NewRequestBuilder

func NewRequestBuilder(providerType types.ProviderType, client *http.Client) *RequestBuilder

NewRequestBuilder creates a new request builder

func (*RequestBuilder) Build

func (b *RequestBuilder) Build() *StreamingExecutor

Build creates the streaming executor

func (*RequestBuilder) WithBaseURL

func (b *RequestBuilder) WithBaseURL(baseURL string) *RequestBuilder

WithBaseURL sets the base URL

func (*RequestBuilder) WithEndpoint

func (b *RequestBuilder) WithEndpoint(endpoint string) *RequestBuilder

WithEndpoint sets the endpoint path

func (*RequestBuilder) WithExtraHeaders

func (b *RequestBuilder) WithExtraHeaders(headers map[string]string) *RequestBuilder

WithExtraHeaders sets extra headers

func (*RequestBuilder) WithRateLimitHelper

func (b *RequestBuilder) WithRateLimitHelper(helper *common.RateLimitHelper) *RequestBuilder

WithRateLimitHelper sets the rate limit helper

func (*RequestBuilder) WithRequestPreparer

func (b *RequestBuilder) WithRequestPreparer(preparer RequestPreparer) *RequestBuilder

WithRequestPreparer sets the request preparer

func (*RequestBuilder) WithResponseHandler

func (b *RequestBuilder) WithResponseHandler(handler ResponseHandler) *RequestBuilder

WithResponseHandler sets the response handler

func (*RequestBuilder) WithStreamCreator

func (b *RequestBuilder) WithStreamCreator(creator StreamCreator) *RequestBuilder

WithStreamCreator sets the stream creator

type RequestPreparer

type RequestPreparer func(*http.Request) error

RequestPreparer allows providers to customize request before sending

type ResponseHandler

type ResponseHandler func(*http.Response) error

ResponseHandler allows providers to handle response before creating stream

type SSELineParser

type SSELineParser interface {
	// ParseLine parses a single SSE data line into a chunk.
	// Returns the parsed chunk and any error that occurred during parsing.
	ParseLine(line string) (types.ChatCompletionChunk, error)

	// IsDone checks if this line indicates stream completion.
	// Common completion signals include "[DONE]" or specific event types.
	IsDone(line string) bool
}

SSELineParser defines provider-specific parsing behavior for SSE streams. This interface allows different providers to implement their own parsing logic while using the common SSE stream infrastructure.

type StandardStreamParser

type StandardStreamParser struct {
	// Custom field mappings for provider-specific responses
	ContentField          string
	ReasoningField        string // For GLM-4.6, OpenCode/Zen style
	ReasoningContentField string // For vLLM/Synthetic style
	DoneField             string
	UsageField            string
	ToolCallsField        string
	FinishReason          string
}

StandardStreamParser provides a standard parser for OpenAI-compatible streaming responses

func NewStandardStreamParser

func NewStandardStreamParser() *StandardStreamParser

NewStandardStreamParser creates a new standard stream parser with default OpenAI mappings

func (*StandardStreamParser) ParseLine

ParseLine parses a line from the stream using standard OpenAI format

type StreamCreator

type StreamCreator func(*http.Response) types.ChatCompletionStream

StreamCreator creates a ChatCompletionStream from an HTTP response

type StreamParser

type StreamParser interface {
	ParseLine(data string) (types.ChatCompletionChunk, bool, error)
}

StreamParser defines the interface for parsing streaming responses

type StreamProcessor

type StreamProcessor struct {
	// contains filtered or unexported fields
}

StreamProcessor provides common streaming functionality for all providers

func NewStreamProcessor

func NewStreamProcessor(response *http.Response) *StreamProcessor

NewStreamProcessor creates a new stream processor

func (*StreamProcessor) Close

func (sp *StreamProcessor) Close() error

Close closes the stream and cleans up resources

func (*StreamProcessor) GetStreamError

func (sp *StreamProcessor) GetStreamError(err error) streaming.StreamError

GetStreamError creates a StreamError from a streaming error This categorizes errors for proper SSE error event forwarding

func (*StreamProcessor) IsDone

func (sp *StreamProcessor) IsDone() bool

IsDone returns whether the stream is finished

func (*StreamProcessor) MarkDone

func (sp *StreamProcessor) MarkDone()

MarkDone marks the stream as done

func (*StreamProcessor) NextChunk

func (sp *StreamProcessor) NextChunk(processLine ProcessLineFunc) (types.ChatCompletionChunk, error)

NextChunk reads and processes the next chunk from the stream

type StreamingExecutor

type StreamingExecutor struct {
	// contains filtered or unexported fields
}

StreamingExecutor handles common streaming request execution patterns

func NewStreamingExecutor

func NewStreamingExecutor(config StreamingExecutorConfig) *StreamingExecutor

NewStreamingExecutor creates a new streaming executor

func (*StreamingExecutor) BuildURL

func (e *StreamingExecutor) BuildURL(endpoint string) string

BuildURL builds a full URL from base URL and endpoint

func (*StreamingExecutor) ExecuteStreamRequest

func (e *StreamingExecutor) ExecuteStreamRequest(
	ctx context.Context,
	method, url string,
	requestBody interface{},
	authToken string,
	authType string,
) (types.ChatCompletionStream, error)

ExecuteStreamRequest executes a streaming request with standard error handling and rate limit tracking. This consolidates the common pattern found across Anthropic, OpenAI, Qwen, Gemini, and other providers.

func (*StreamingExecutor) ExecuteWithJSONBody

func (e *StreamingExecutor) ExecuteWithJSONBody(
	ctx context.Context,
	url string,
	requestBody interface{},
	authToken string,
	authType string,
) (types.ChatCompletionStream, error)

ExecuteWithJSONBody is a convenience method that marshals the request body to JSON

func (*StreamingExecutor) GetBaseURL

func (e *StreamingExecutor) GetBaseURL() string

GetBaseURL returns the base URL for the executor

type StreamingExecutorConfig

type StreamingExecutorConfig struct {
	// Provider type for error reporting
	ProviderType types.ProviderType

	// HTTP client for making requests
	Client *http.Client

	// Rate limit helper for tracking rate limits
	RateLimitHelper *common.RateLimitHelper

	// StreamCreator converts the HTTP response to a ChatCompletionStream
	StreamCreator StreamCreator

	// Optional: PrepareRequest allows modifying the request before sending
	PrepareRequest RequestPreparer

	// Optional: HandleResponse allows processing the response before creating stream
	HandleResponse ResponseHandler

	// Optional: ExtraHeaders to add to all requests
	ExtraHeaders map[string]string

	// Optional: BaseURL for the provider
	BaseURL string

	// Optional: Endpoint path for streaming (default: /chat/completions or provider-specific)
	Endpoint string
}

StreamingExecutorConfig holds configuration for the streaming executor

Directories

Path Synopsis
Package decoders provides pluggable stream decoders for various streaming formats including Server-Sent Events (SSE), NDJSON, and EventStream.
Package decoders provides pluggable stream decoders for various streaming formats including Server-Sent Events (SSE), NDJSON, and EventStream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL