Documentation
¶
Overview ¶
Package streaming provides a shared streaming request executor for AI providers. This consolidates common streaming patterns to reduce boilerplate across providers.
Package streaming provides streaming utilities for AI provider implementations. This file contains tool conversion functions for OpenAI-compatible providers.
Index ¶
- func ConvertOpenAICompatibleToolCallsToUniversal(toolCalls []OpenAICompatibleToolCall) []types.ToolCall
- func CreateAnthropicStream(response *http.Response) types.ChatCompletionStream
- func CreateCustomStream(response *http.Response, parser StreamParser) types.ChatCompletionStream
- func CreateErrorStream(err error) types.ChatCompletionStream
- func CreateOpenAIStream(response *http.Response) types.ChatCompletionStream
- func JSONLineProcessor(data string, target interface{}, ...) (types.ChatCompletionChunk, error, bool)
- func SSELineProcessor(data string, responseParser func(string) (types.ChatCompletionChunk, bool)) (types.ChatCompletionChunk, error, bool)
- func StreamFromContext(ctx context.Context, baseStream types.ChatCompletionStream) types.ChatCompletionStream
- type AnthropicStreamParser
- type BaseStream
- type ContextAwareStream
- type ErrorStream
- type GenericSSEStream
- type MockStream
- type OpenAICompatibleFunctionDef
- type OpenAICompatibleParser
- type OpenAICompatibleTool
- type OpenAICompatibleToolCall
- type OpenAICompatibleToolCallFunction
- type ProcessLineFunc
- type RequestBuilder
- func (b *RequestBuilder) Build() *StreamingExecutor
- func (b *RequestBuilder) WithBaseURL(baseURL string) *RequestBuilder
- func (b *RequestBuilder) WithEndpoint(endpoint string) *RequestBuilder
- func (b *RequestBuilder) WithExtraHeaders(headers map[string]string) *RequestBuilder
- func (b *RequestBuilder) WithRateLimitHelper(helper *common.RateLimitHelper) *RequestBuilder
- func (b *RequestBuilder) WithRequestPreparer(preparer RequestPreparer) *RequestBuilder
- func (b *RequestBuilder) WithResponseHandler(handler ResponseHandler) *RequestBuilder
- func (b *RequestBuilder) WithStreamCreator(creator StreamCreator) *RequestBuilder
- type RequestPreparer
- type ResponseHandler
- type SSELineParser
- type StandardStreamParser
- type StreamCreator
- type StreamParser
- type StreamProcessor
- type StreamingExecutor
- func (e *StreamingExecutor) BuildURL(endpoint string) string
- func (e *StreamingExecutor) ExecuteStreamRequest(ctx context.Context, method, url string, requestBody interface{}, ...) (types.ChatCompletionStream, error)
- func (e *StreamingExecutor) ExecuteWithJSONBody(ctx context.Context, url string, requestBody interface{}, authToken string, ...) (types.ChatCompletionStream, error)
- func (e *StreamingExecutor) GetBaseURL() string
- type StreamingExecutorConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConvertOpenAICompatibleToolCallsToUniversal ¶
func ConvertOpenAICompatibleToolCallsToUniversal(toolCalls []OpenAICompatibleToolCall) []types.ToolCall
ConvertOpenAICompatibleToolCallsToUniversal converts OpenAI-compatible tool calls to universal format This function can be used by any provider that uses OpenAI's tool call format (OpenAI, Qwen, etc.)
func CreateAnthropicStream ¶
func CreateAnthropicStream(response *http.Response) types.ChatCompletionStream
CreateAnthropicStream creates a stream for Anthropic responses
func CreateCustomStream ¶
func CreateCustomStream(response *http.Response, parser StreamParser) types.ChatCompletionStream
CreateCustomStream creates a stream with a custom parser
func CreateErrorStream ¶
func CreateErrorStream(err error) types.ChatCompletionStream
CreateErrorStream creates a stream that immediately returns an error
func CreateOpenAIStream ¶
func CreateOpenAIStream(response *http.Response) types.ChatCompletionStream
CreateOpenAIStream creates a stream for OpenAI-compatible responses
func JSONLineProcessor ¶
func JSONLineProcessor(data string, target interface{}, chunkExtractor func(interface{}) types.ChatCompletionChunk) (types.ChatCompletionChunk, error, bool)
JSONLineProcessor provides standard JSON line processing with error handling
func SSELineProcessor ¶
func SSELineProcessor(data string, responseParser func(string) (types.ChatCompletionChunk, bool)) (types.ChatCompletionChunk, error, bool)
SSELineProcessor provides standard SSE (Server-Sent Events) line processing
func StreamFromContext ¶
func StreamFromContext(ctx context.Context, baseStream types.ChatCompletionStream) types.ChatCompletionStream
StreamFromContext creates a stream from context with cancellation support
Types ¶
type AnthropicStreamParser ¶
type AnthropicStreamParser struct {
// contains filtered or unexported fields
}
AnthropicStreamParser provides a parser for Anthropic's streaming format
func NewAnthropicStreamParser ¶
func NewAnthropicStreamParser() *AnthropicStreamParser
NewAnthropicStreamParser creates a new Anthropic stream parser
func (*AnthropicStreamParser) ParseLine ¶
func (p *AnthropicStreamParser) ParseLine(data string) (types.ChatCompletionChunk, bool, error)
ParseLine parses a line from an Anthropic stream
type BaseStream ¶
type BaseStream struct {
// contains filtered or unexported fields
}
BaseStream provides a base implementation of ChatCompletionStream
func NewBaseStream ¶
func NewBaseStream(processor *StreamProcessor, parser StreamParser) *BaseStream
NewBaseStream creates a new base stream
func (*BaseStream) Next ¶
func (bs *BaseStream) Next() (types.ChatCompletionChunk, error)
Next returns the next chunk from the stream
type ContextAwareStream ¶
type ContextAwareStream struct {
// contains filtered or unexported fields
}
ContextAwareStream wraps a stream with context awareness
func (*ContextAwareStream) Close ¶
func (cas *ContextAwareStream) Close() error
Close closes the underlying stream
func (*ContextAwareStream) GetStreamError ¶
func (cas *ContextAwareStream) GetStreamError() streaming.StreamError
GetStreamError returns a StreamError based on the context state
func (*ContextAwareStream) Next ¶
func (cas *ContextAwareStream) Next() (types.ChatCompletionChunk, error)
Next returns the next chunk, respecting context cancellation Detects context cancellation and timeout errors for proper SSE error forwarding
type ErrorStream ¶
type ErrorStream struct {
// contains filtered or unexported fields
}
ErrorStream is a stream that always returns an error
func (*ErrorStream) Close ¶
func (es *ErrorStream) Close() error
func (*ErrorStream) Next ¶
func (es *ErrorStream) Next() (types.ChatCompletionChunk, error)
type GenericSSEStream ¶
type GenericSSEStream struct {
// contains filtered or unexported fields
}
GenericSSEStream wraps SSE parsing for any provider that implements SSELineParser. It handles the low-level SSE protocol details (reading lines, handling "data:" prefix) while delegating provider-specific parsing to the SSELineParser implementation.
func NewGenericSSEStream ¶
func NewGenericSSEStream(resp *http.Response, parser SSELineParser) *GenericSSEStream
NewGenericSSEStream creates a new generic SSE stream with the given response and parser. The parser parameter defines how individual SSE lines are parsed into ChatCompletionChunk objects.
func (*GenericSSEStream) Close ¶
func (s *GenericSSEStream) Close() error
Close closes the underlying HTTP response body and cleans up resources.
func (*GenericSSEStream) Next ¶
func (s *GenericSSEStream) Next() (types.ChatCompletionChunk, error)
Next returns the next chunk from the SSE stream. It reads lines from the stream, extracts SSE data, and uses the parser to convert them to chunks. Returns io.EOF when the stream is complete or an error if parsing fails.
type MockStream ¶
type MockStream struct {
// contains filtered or unexported fields
}
MockStream provides a mock implementation of ChatCompletionStream for testing
func NewMockStream ¶
func NewMockStream(chunks []types.ChatCompletionChunk) *MockStream
NewMockStream creates a new mock stream with the given chunks
func (*MockStream) Next ¶
func (ms *MockStream) Next() (types.ChatCompletionChunk, error)
Next returns the next chunk from the mock stream
type OpenAICompatibleFunctionDef ¶
type OpenAICompatibleFunctionDef struct {
Name string `json:"name"`
Description string `json:"description"`
Parameters map[string]interface{} `json:"parameters"` // JSON Schema
}
OpenAICompatibleFunctionDef represents a function definition in OpenAI-compatible format
type OpenAICompatibleParser ¶
type OpenAICompatibleParser struct {
// SkipEmptyContent determines whether to skip chunks with empty content
SkipEmptyContent bool
}
OpenAICompatibleParser handles OpenAI-style SSE responses. This parser works with the standard OpenAI streaming format and can be used by any provider that follows OpenAI's streaming API conventions.
func NewOpenAICompatibleParser ¶
func NewOpenAICompatibleParser() *OpenAICompatibleParser
NewOpenAICompatibleParser creates a new OpenAI-compatible SSE parser with default settings.
func (*OpenAICompatibleParser) IsDone ¶
func (p *OpenAICompatibleParser) IsDone(line string) bool
IsDone checks if the given line indicates stream completion. OpenAI sends "[DONE]" as the final message in the stream.
func (*OpenAICompatibleParser) ParseLine ¶
func (p *OpenAICompatibleParser) ParseLine(line string) (types.ChatCompletionChunk, error)
ParseLine parses a single SSE data line in OpenAI format into a ChatCompletionChunk. The OpenAI format is JSON with the structure:
{
"id": "chatcmpl-...",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-4",
"choices": [{
"index": 0,
"delta": {
"role": "assistant",
"content": "Hello"
},
"finish_reason": null
}]
}
type OpenAICompatibleTool ¶
type OpenAICompatibleTool struct {
Type string `json:"type"` // Always "function"
Function OpenAICompatibleFunctionDef `json:"function"`
}
OpenAICompatibleTool represents a tool in OpenAI-compatible API format Used by OpenAI, Qwen, and other OpenAI-compatible providers
func ConvertToOpenAICompatibleTools ¶
func ConvertToOpenAICompatibleTools(tools []types.Tool) []OpenAICompatibleTool
ConvertToOpenAICompatibleTools converts universal tools to OpenAI-compatible format This function can be used by any provider that uses OpenAI's tool format (OpenAI, Qwen, etc.)
type OpenAICompatibleToolCall ¶
type OpenAICompatibleToolCall struct {
ID string `json:"id"`
Type string `json:"type"` // "function"
Function OpenAICompatibleToolCallFunction `json:"function"`
}
OpenAICompatibleToolCall represents a tool call in OpenAI-compatible format
func ConvertToOpenAICompatibleToolCalls ¶
func ConvertToOpenAICompatibleToolCalls(toolCalls []types.ToolCall) []OpenAICompatibleToolCall
ConvertToOpenAICompatibleToolCalls converts universal tool calls to OpenAI-compatible format This function can be used by any provider that uses OpenAI's tool call format (OpenAI, Qwen, etc.)
type OpenAICompatibleToolCallFunction ¶
type OpenAICompatibleToolCallFunction struct {
Name string `json:"name"`
Arguments string `json:"arguments"` // JSON string
}
OpenAICompatibleToolCallFunction represents a function call in a tool call
type ProcessLineFunc ¶
type ProcessLineFunc func(line string) (types.ChatCompletionChunk, error, bool)
ProcessLineFunc processes a single line from a streaming response
type RequestBuilder ¶
type RequestBuilder struct {
// contains filtered or unexported fields
}
RequestBuilder helps build streaming requests with a fluent API
func NewRequestBuilder ¶
func NewRequestBuilder(providerType types.ProviderType, client *http.Client) *RequestBuilder
NewRequestBuilder creates a new request builder
func (*RequestBuilder) Build ¶
func (b *RequestBuilder) Build() *StreamingExecutor
Build creates the streaming executor
func (*RequestBuilder) WithBaseURL ¶
func (b *RequestBuilder) WithBaseURL(baseURL string) *RequestBuilder
WithBaseURL sets the base URL
func (*RequestBuilder) WithEndpoint ¶
func (b *RequestBuilder) WithEndpoint(endpoint string) *RequestBuilder
WithEndpoint sets the endpoint path
func (*RequestBuilder) WithExtraHeaders ¶
func (b *RequestBuilder) WithExtraHeaders(headers map[string]string) *RequestBuilder
WithExtraHeaders sets extra headers
func (*RequestBuilder) WithRateLimitHelper ¶
func (b *RequestBuilder) WithRateLimitHelper(helper *common.RateLimitHelper) *RequestBuilder
WithRateLimitHelper sets the rate limit helper
func (*RequestBuilder) WithRequestPreparer ¶
func (b *RequestBuilder) WithRequestPreparer(preparer RequestPreparer) *RequestBuilder
WithRequestPreparer sets the request preparer
func (*RequestBuilder) WithResponseHandler ¶
func (b *RequestBuilder) WithResponseHandler(handler ResponseHandler) *RequestBuilder
WithResponseHandler sets the response handler
func (*RequestBuilder) WithStreamCreator ¶
func (b *RequestBuilder) WithStreamCreator(creator StreamCreator) *RequestBuilder
WithStreamCreator sets the stream creator
type RequestPreparer ¶
RequestPreparer allows providers to customize request before sending
type ResponseHandler ¶
ResponseHandler allows providers to handle response before creating stream
type SSELineParser ¶
type SSELineParser interface {
// ParseLine parses a single SSE data line into a chunk.
// Returns the parsed chunk and any error that occurred during parsing.
ParseLine(line string) (types.ChatCompletionChunk, error)
// IsDone checks if this line indicates stream completion.
// Common completion signals include "[DONE]" or specific event types.
IsDone(line string) bool
}
SSELineParser defines provider-specific parsing behavior for SSE streams. This interface allows different providers to implement their own parsing logic while using the common SSE stream infrastructure.
type StandardStreamParser ¶
type StandardStreamParser struct {
// Custom field mappings for provider-specific responses
ContentField string
ReasoningField string // For GLM-4.6, OpenCode/Zen style
ReasoningContentField string // For vLLM/Synthetic style
DoneField string
UsageField string
ToolCallsField string
FinishReason string
}
StandardStreamParser provides a standard parser for OpenAI-compatible streaming responses
func NewStandardStreamParser ¶
func NewStandardStreamParser() *StandardStreamParser
NewStandardStreamParser creates a new standard stream parser with default OpenAI mappings
func (*StandardStreamParser) ParseLine ¶
func (p *StandardStreamParser) ParseLine(data string) (types.ChatCompletionChunk, bool, error)
ParseLine parses a line from the stream using standard OpenAI format
type StreamCreator ¶
type StreamCreator func(*http.Response) types.ChatCompletionStream
StreamCreator creates a ChatCompletionStream from an HTTP response
type StreamParser ¶
type StreamParser interface {
ParseLine(data string) (types.ChatCompletionChunk, bool, error)
}
StreamParser defines the interface for parsing streaming responses
type StreamProcessor ¶
type StreamProcessor struct {
// contains filtered or unexported fields
}
StreamProcessor provides common streaming functionality for all providers
func NewStreamProcessor ¶
func NewStreamProcessor(response *http.Response) *StreamProcessor
NewStreamProcessor creates a new stream processor
func (*StreamProcessor) Close ¶
func (sp *StreamProcessor) Close() error
Close closes the stream and cleans up resources
func (*StreamProcessor) GetStreamError ¶
func (sp *StreamProcessor) GetStreamError(err error) streaming.StreamError
GetStreamError creates a StreamError from a streaming error This categorizes errors for proper SSE error event forwarding
func (*StreamProcessor) IsDone ¶
func (sp *StreamProcessor) IsDone() bool
IsDone returns whether the stream is finished
func (*StreamProcessor) MarkDone ¶
func (sp *StreamProcessor) MarkDone()
MarkDone marks the stream as done
func (*StreamProcessor) NextChunk ¶
func (sp *StreamProcessor) NextChunk(processLine ProcessLineFunc) (types.ChatCompletionChunk, error)
NextChunk reads and processes the next chunk from the stream
type StreamingExecutor ¶
type StreamingExecutor struct {
// contains filtered or unexported fields
}
StreamingExecutor handles common streaming request execution patterns
func NewStreamingExecutor ¶
func NewStreamingExecutor(config StreamingExecutorConfig) *StreamingExecutor
NewStreamingExecutor creates a new streaming executor
func (*StreamingExecutor) BuildURL ¶
func (e *StreamingExecutor) BuildURL(endpoint string) string
BuildURL builds a full URL from base URL and endpoint
func (*StreamingExecutor) ExecuteStreamRequest ¶
func (e *StreamingExecutor) ExecuteStreamRequest( ctx context.Context, method, url string, requestBody interface{}, authToken string, authType string, ) (types.ChatCompletionStream, error)
ExecuteStreamRequest executes a streaming request with standard error handling and rate limit tracking. This consolidates the common pattern found across Anthropic, OpenAI, Qwen, Gemini, and other providers.
func (*StreamingExecutor) ExecuteWithJSONBody ¶
func (e *StreamingExecutor) ExecuteWithJSONBody( ctx context.Context, url string, requestBody interface{}, authToken string, authType string, ) (types.ChatCompletionStream, error)
ExecuteWithJSONBody is a convenience method that marshals the request body to JSON
func (*StreamingExecutor) GetBaseURL ¶
func (e *StreamingExecutor) GetBaseURL() string
GetBaseURL returns the base URL for the executor
type StreamingExecutorConfig ¶
type StreamingExecutorConfig struct {
// Provider type for error reporting
ProviderType types.ProviderType
// HTTP client for making requests
Client *http.Client
// Rate limit helper for tracking rate limits
RateLimitHelper *common.RateLimitHelper
// StreamCreator converts the HTTP response to a ChatCompletionStream
StreamCreator StreamCreator
// Optional: PrepareRequest allows modifying the request before sending
PrepareRequest RequestPreparer
// Optional: HandleResponse allows processing the response before creating stream
HandleResponse ResponseHandler
// Optional: ExtraHeaders to add to all requests
ExtraHeaders map[string]string
// Optional: BaseURL for the provider
BaseURL string
// Optional: Endpoint path for streaming (default: /chat/completions or provider-specific)
Endpoint string
}
StreamingExecutorConfig holds configuration for the streaming executor
Directories
¶
| Path | Synopsis |
|---|---|
|
Package decoders provides pluggable stream decoders for various streaming formats including Server-Sent Events (SSE), NDJSON, and EventStream.
|
Package decoders provides pluggable stream decoders for various streaming formats including Server-Sent Events (SSE), NDJSON, and EventStream. |