streaming

package
v1.2.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: Apache-2.0 Imports: 9 Imported by: 3

Documentation

Overview

Package streaming provides functionality for accumulating streaming chunks and other chunk-related workflows

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccumulatedData

type AccumulatedData struct {
	RequestID             string
	Model                 string
	Status                string
	Stream                bool
	Latency               int64 // in milliseconds
	TimeToFirstToken      int64 // Time to first token in milliseconds (streaming only)
	StartTimestamp        time.Time
	EndTimestamp          time.Time
	OutputMessage         *schemas.ChatMessage
	OutputMessages        []schemas.ResponsesMessage // For responses API
	ToolCalls             []schemas.ChatAssistantMessageToolCall
	ErrorDetails          *schemas.BifrostError
	TokenUsage            *schemas.BifrostLLMUsage
	CacheDebug            *schemas.BifrostCacheDebug
	Cost                  *float64
	AudioOutput           *schemas.BifrostSpeechResponse
	TranscriptionOutput   *schemas.BifrostTranscriptionResponse
	ImageGenerationOutput *schemas.BifrostImageGenerationResponse
	FinishReason          *string
	RawResponse           *string
}

AccumulatedData contains the accumulated data for a stream

type Accumulator

type Accumulator struct {
	// contains filtered or unexported fields
}

Accumulator manages accumulation of streaming chunks

func NewAccumulator

func NewAccumulator(pricingManager *modelcatalog.ModelCatalog, logger schemas.Logger) *Accumulator

NewAccumulator creates a new accumulator

func (*Accumulator) Cleanup

func (a *Accumulator) Cleanup()

Cleanup cleans up the accumulator

func (*Accumulator) CleanupStreamAccumulator added in v1.1.2

func (a *Accumulator) CleanupStreamAccumulator(requestID string) error

CleanupStreamAccumulator decrements the reference counter for a stream accumulator. The accumulator is only cleaned up when the reference counter reaches 0. This function is idempotent - calling it after cleanup has already happened is safe.

func (*Accumulator) CreateStreamAccumulator

func (a *Accumulator) CreateStreamAccumulator(requestID string, startTimestamp time.Time) *StreamAccumulator

CreateStreamAccumulator creates a new stream accumulator for a request It increments the reference counter atomically for concurrent access tracking

func (*Accumulator) ProcessStreamingResponse

func (a *Accumulator) ProcessStreamingResponse(ctx *schemas.BifrostContext, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*ProcessedStreamResponse, error)

ProcessStreamingResponse processes a streaming response It handles chat, audio, and responses streaming responses

type AudioStreamChunk

type AudioStreamChunk struct {
	Timestamp          time.Time                            // When chunk was received
	Delta              *schemas.BifrostSpeechStreamResponse // The actual delta content
	FinishReason       *string                              // If this is the final chunk
	TokenUsage         *schemas.SpeechUsage                 // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug           // Semantic cache debug if available
	Cost               *float64                             // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError                // Error if any
	ChunkIndex         int                                  // Index of the chunk in the stream
	RawResponse        *string
}

AudioStreamChunk represents a single streaming chunk

type ChatStreamChunk

type ChatStreamChunk struct {
	Timestamp          time.Time                              // When chunk was received
	Delta              *schemas.ChatStreamResponseChoiceDelta // The actual delta content
	FinishReason       *string                                // If this is the final chunk
	TokenUsage         *schemas.BifrostLLMUsage               // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug             // Semantic cache debug if available
	Cost               *float64                               // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError                  // Error if any
	ChunkIndex         int                                    // Index of the chunk in the stream
	RawResponse        *string                                // Raw response if available
}

ChatStreamChunk represents a single streaming chunk

type ImageStreamChunk added in v1.2.9

type ImageStreamChunk struct {
	Timestamp          time.Time                                     // When chunk was received
	Delta              *schemas.BifrostImageGenerationStreamResponse // The actual stream response
	FinishReason       *string                                       // If this is the final chunk
	ChunkIndex         int                                           // Index of the chunk in the stream
	ImageIndex         int                                           // Index of the image in the stream
	ErrorDetails       *schemas.BifrostError                         // Error if any
	Cost               *float64                                      // Cost in dollars from pricing plugin
	SemanticCacheDebug *schemas.BifrostCacheDebug                    // Semantic cache debug if available
	TokenUsage         *schemas.ImageUsage                           // Token usage if available
	RawResponse        *string                                       // Raw response if available
}

ImageStreamChunk represents a single image streaming chunk

type ProcessedStreamResponse

type ProcessedStreamResponse struct {
	RequestID  string
	StreamType StreamType
	Provider   schemas.ModelProvider
	Model      string
	Data       *AccumulatedData
	RawRequest *interface{}
}

ProcessedStreamResponse represents a processed streaming response

func (*ProcessedStreamResponse) ToBifrostResponse

func (p *ProcessedStreamResponse) ToBifrostResponse() *schemas.BifrostResponse

ToBifrostResponse converts a ProcessedStreamResponse to a BifrostResponse

type ResponsesStreamChunk added in v1.1.6

type ResponsesStreamChunk struct {
	Timestamp          time.Time                               // When chunk was received
	StreamResponse     *schemas.BifrostResponsesStreamResponse // The actual stream response
	FinishReason       *string                                 // If this is the final chunk
	TokenUsage         *schemas.BifrostLLMUsage                // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug              // Semantic cache debug if available
	Cost               *float64                                // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError                   // Error if any
	ChunkIndex         int                                     // Index of the chunk in the stream
	RawResponse        *string
}

ResponsesStreamChunk represents a single responses streaming chunk

type StreamAccumulator

type StreamAccumulator struct {
	RequestID                 string
	StartTimestamp            time.Time
	FirstChunkTimestamp       time.Time // Timestamp when the first chunk was received (for TTFT calculation)
	ChatStreamChunks          []*ChatStreamChunk
	ResponsesStreamChunks     []*ResponsesStreamChunk
	TranscriptionStreamChunks []*TranscriptionStreamChunk
	AudioStreamChunks         []*AudioStreamChunk
	ImageStreamChunks         []*ImageStreamChunk

	// De-dup maps to prevent chunk loss on out-of-order arrival
	ChatChunksSeen          map[int]struct{}
	ResponsesChunksSeen     map[int]struct{}
	TranscriptionChunksSeen map[int]struct{}
	AudioChunksSeen         map[int]struct{}
	ImageChunksSeen         map[string]struct{} // Composite key: "imageIndex:chunkIndex" to scope de-dup per image

	// Track highest ChunkIndex for metadata extraction (TokenUsage, Cost, FinishReason)
	MaxChatChunkIndex          int
	MaxResponsesChunkIndex     int
	MaxTranscriptionChunkIndex int
	MaxAudioChunkIndex         int

	IsComplete     bool
	FinalTimestamp time.Time

	Timestamp time.Time
	// contains filtered or unexported fields
}

StreamAccumulator manages accumulation of streaming chunks

type StreamType

type StreamType string
const (
	StreamTypeText          StreamType = "text.completion"
	StreamTypeChat          StreamType = "chat.completion"
	StreamTypeAudio         StreamType = "audio.speech"
	StreamTypeImage         StreamType = "image.generation"
	StreamTypeTranscription StreamType = "audio.transcription"
	StreamTypeResponses     StreamType = "responses"
)

type TranscriptionStreamChunk

type TranscriptionStreamChunk struct {
	Timestamp          time.Time                                   // When chunk was received
	Delta              *schemas.BifrostTranscriptionStreamResponse // The actual delta content
	FinishReason       *string                                     // If this is the final chunk
	TokenUsage         *schemas.TranscriptionUsage                 // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug                  // Semantic cache debug if available
	Cost               *float64                                    // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError                       // Error if any
	ChunkIndex         int                                         // Index of the chunk in the stream
	RawResponse        *string
}

TranscriptionStreamChunk represents a single transcription streaming chunk

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL