streaming

package
v1.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package streaming provides streaming utilities for AI provider implementations. This file contains tool conversion functions for OpenAI-compatible providers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConvertOpenAICompatibleToolCallsToUniversal

func ConvertOpenAICompatibleToolCallsToUniversal(toolCalls []OpenAICompatibleToolCall) []types.ToolCall

ConvertOpenAICompatibleToolCallsToUniversal converts OpenAI-compatible tool calls to universal format This function can be used by any provider that uses OpenAI's tool call format (OpenAI, Qwen, etc.)

func CreateAnthropicStream

func CreateAnthropicStream(response *http.Response) types.ChatCompletionStream

CreateAnthropicStream creates a stream for Anthropic responses

func CreateCustomStream

func CreateCustomStream(response *http.Response, parser StreamParser) types.ChatCompletionStream

CreateCustomStream creates a stream with a custom parser

func CreateErrorStream

func CreateErrorStream(err error) types.ChatCompletionStream

CreateErrorStream creates a stream that immediately returns an error

func CreateOpenAIStream

func CreateOpenAIStream(response *http.Response) types.ChatCompletionStream

CreateOpenAIStream creates a stream for OpenAI-compatible responses

func JSONLineProcessor

func JSONLineProcessor(data string, target interface{}, chunkExtractor func(interface{}) types.ChatCompletionChunk) (types.ChatCompletionChunk, error, bool)

JSONLineProcessor provides standard JSON line processing with error handling

func SSELineProcessor

func SSELineProcessor(data string, responseParser func(string) (types.ChatCompletionChunk, bool)) (types.ChatCompletionChunk, error, bool)

SSELineProcessor provides standard SSE (Server-Sent Events) line processing

func StreamFromContext

func StreamFromContext(ctx context.Context, baseStream types.ChatCompletionStream) types.ChatCompletionStream

StreamFromContext creates a stream from context with cancellation support

Types

type AnthropicStreamParser

type AnthropicStreamParser struct{}

AnthropicStreamParser provides a parser for Anthropic's streaming format

func NewAnthropicStreamParser

func NewAnthropicStreamParser() *AnthropicStreamParser

NewAnthropicStreamParser creates a new Anthropic stream parser

func (*AnthropicStreamParser) ParseLine

ParseLine parses a line from an Anthropic stream

type BaseStream

type BaseStream struct {
	// contains filtered or unexported fields
}

BaseStream provides a base implementation of ChatCompletionStream

func NewBaseStream

func NewBaseStream(processor *StreamProcessor, parser StreamParser) *BaseStream

NewBaseStream creates a new base stream

func (*BaseStream) Close

func (bs *BaseStream) Close() error

Close closes the stream

func (*BaseStream) Next

func (bs *BaseStream) Next() (types.ChatCompletionChunk, error)

Next returns the next chunk from the stream

type ContextAwareStream

type ContextAwareStream struct {
	// contains filtered or unexported fields
}

ContextAwareStream wraps a stream with context awareness

func (*ContextAwareStream) Close

func (cas *ContextAwareStream) Close() error

Close closes the underlying stream

func (*ContextAwareStream) Next

Next returns the next chunk, respecting context cancellation

type ErrorStream

type ErrorStream struct {
	// contains filtered or unexported fields
}

ErrorStream is a stream that always returns an error

func (*ErrorStream) Close

func (es *ErrorStream) Close() error

func (*ErrorStream) Next

type GenericSSEStream

type GenericSSEStream struct {
	// contains filtered or unexported fields
}

GenericSSEStream wraps SSE parsing for any provider that implements SSELineParser. It handles the low-level SSE protocol details (reading lines, handling "data:" prefix) while delegating provider-specific parsing to the SSELineParser implementation.

func NewGenericSSEStream

func NewGenericSSEStream(resp *http.Response, parser SSELineParser) *GenericSSEStream

NewGenericSSEStream creates a new generic SSE stream with the given response and parser. The parser parameter defines how individual SSE lines are parsed into ChatCompletionChunk objects.

func (*GenericSSEStream) Close

func (s *GenericSSEStream) Close() error

Close closes the underlying HTTP response body and cleans up resources.

func (*GenericSSEStream) Next

Next returns the next chunk from the SSE stream. It reads lines from the stream, extracts SSE data, and uses the parser to convert them to chunks. Returns io.EOF when the stream is complete or an error if parsing fails.

type MockStream

type MockStream struct {
	// contains filtered or unexported fields
}

MockStream provides a mock implementation of ChatCompletionStream for testing

func NewMockStream

func NewMockStream(chunks []types.ChatCompletionChunk) *MockStream

NewMockStream creates a new mock stream with the given chunks

func (*MockStream) Close

func (ms *MockStream) Close() error

Close resets the mock stream

func (*MockStream) Next

func (ms *MockStream) Next() (types.ChatCompletionChunk, error)

Next returns the next chunk from the mock stream

type OpenAICompatibleFunctionDef

type OpenAICompatibleFunctionDef struct {
	Name        string                 `json:"name"`
	Description string                 `json:"description"`
	Parameters  map[string]interface{} `json:"parameters"` // JSON Schema
}

OpenAICompatibleFunctionDef represents a function definition in OpenAI-compatible format

type OpenAICompatibleParser

type OpenAICompatibleParser struct {
	// SkipEmptyContent determines whether to skip chunks with empty content
	SkipEmptyContent bool
}

OpenAICompatibleParser handles OpenAI-style SSE responses. This parser works with the standard OpenAI streaming format and can be used by any provider that follows OpenAI's streaming API conventions.

func NewOpenAICompatibleParser

func NewOpenAICompatibleParser() *OpenAICompatibleParser

NewOpenAICompatibleParser creates a new OpenAI-compatible SSE parser with default settings.

func (*OpenAICompatibleParser) IsDone

func (p *OpenAICompatibleParser) IsDone(line string) bool

IsDone checks if the given line indicates stream completion. OpenAI sends "[DONE]" as the final message in the stream.

func (*OpenAICompatibleParser) ParseLine

ParseLine parses a single SSE data line in OpenAI format into a ChatCompletionChunk. The OpenAI format is JSON with the structure:

{
  "id": "chatcmpl-...",
  "object": "chat.completion.chunk",
  "created": 1234567890,
  "model": "gpt-4",
  "choices": [{
    "index": 0,
    "delta": {
      "role": "assistant",
      "content": "Hello"
    },
    "finish_reason": null
  }]
}

type OpenAICompatibleTool

type OpenAICompatibleTool struct {
	Type     string                      `json:"type"` // Always "function"
	Function OpenAICompatibleFunctionDef `json:"function"`
}

OpenAICompatibleTool represents a tool in OpenAI-compatible API format Used by OpenAI, Qwen, and other OpenAI-compatible providers

func ConvertToOpenAICompatibleTools

func ConvertToOpenAICompatibleTools(tools []types.Tool) []OpenAICompatibleTool

ConvertToOpenAICompatibleTools converts universal tools to OpenAI-compatible format This function can be used by any provider that uses OpenAI's tool format (OpenAI, Qwen, etc.)

type OpenAICompatibleToolCall

type OpenAICompatibleToolCall struct {
	ID       string                           `json:"id"`
	Type     string                           `json:"type"` // "function"
	Function OpenAICompatibleToolCallFunction `json:"function"`
}

OpenAICompatibleToolCall represents a tool call in OpenAI-compatible format

func ConvertToOpenAICompatibleToolCalls

func ConvertToOpenAICompatibleToolCalls(toolCalls []types.ToolCall) []OpenAICompatibleToolCall

ConvertToOpenAICompatibleToolCalls converts universal tool calls to OpenAI-compatible format This function can be used by any provider that uses OpenAI's tool call format (OpenAI, Qwen, etc.)

type OpenAICompatibleToolCallFunction

type OpenAICompatibleToolCallFunction struct {
	Name      string `json:"name"`
	Arguments string `json:"arguments"` // JSON string
}

OpenAICompatibleToolCallFunction represents a function call in a tool call

type ProcessLineFunc

type ProcessLineFunc func(line string) (types.ChatCompletionChunk, error, bool)

ProcessLineFunc processes a single line from a streaming response

type SSELineParser

type SSELineParser interface {
	// ParseLine parses a single SSE data line into a chunk.
	// Returns the parsed chunk and any error that occurred during parsing.
	ParseLine(line string) (types.ChatCompletionChunk, error)

	// IsDone checks if this line indicates stream completion.
	// Common completion signals include "[DONE]" or specific event types.
	IsDone(line string) bool
}

SSELineParser defines provider-specific parsing behavior for SSE streams. This interface allows different providers to implement their own parsing logic while using the common SSE stream infrastructure.

type StandardStreamParser

type StandardStreamParser struct {
	// Custom field mappings for provider-specific responses
	ContentField          string
	ReasoningField        string // For GLM-4.6, OpenCode/Zen style
	ReasoningContentField string // For vLLM/Synthetic style
	DoneField             string
	UsageField            string
	ToolCallsField        string
	FinishReason          string
}

StandardStreamParser provides a standard parser for OpenAI-compatible streaming responses

func NewStandardStreamParser

func NewStandardStreamParser() *StandardStreamParser

NewStandardStreamParser creates a new standard stream parser with default OpenAI mappings

func (*StandardStreamParser) ParseLine

ParseLine parses a line from the stream using standard OpenAI format

type StreamParser

type StreamParser interface {
	ParseLine(data string) (types.ChatCompletionChunk, bool, error)
}

StreamParser defines the interface for parsing streaming responses

type StreamProcessor

type StreamProcessor struct {
	// contains filtered or unexported fields
}

StreamProcessor provides common streaming functionality for all providers

func NewStreamProcessor

func NewStreamProcessor(response *http.Response) *StreamProcessor

NewStreamProcessor creates a new stream processor

func (*StreamProcessor) Close

func (sp *StreamProcessor) Close() error

Close closes the stream and cleans up resources

func (*StreamProcessor) IsDone

func (sp *StreamProcessor) IsDone() bool

IsDone returns whether the stream is finished

func (*StreamProcessor) MarkDone

func (sp *StreamProcessor) MarkDone()

MarkDone marks the stream as done

func (*StreamProcessor) NextChunk

func (sp *StreamProcessor) NextChunk(processLine ProcessLineFunc) (types.ChatCompletionChunk, error)

NextChunk reads and processes the next chunk from the stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL