streaming

package
v1.1.48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2025 License: Apache-2.0 Imports: 7 Imported by: 3

Documentation

Overview

Package streaming provides functionality for accumulating streaming chunks and other chunk-related workflows

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccumulatedData

type AccumulatedData struct {
	RequestID           string
	Model               string
	Status              string
	Stream              bool
	Latency             int64 // in milliseconds
	StartTimestamp      time.Time
	EndTimestamp        time.Time
	OutputMessage       *schemas.ChatMessage
	OutputMessages      []schemas.ResponsesMessage // For responses API
	ToolCalls           []schemas.ChatAssistantMessageToolCall
	ErrorDetails        *schemas.BifrostError
	TokenUsage          *schemas.BifrostLLMUsage
	CacheDebug          *schemas.BifrostCacheDebug
	Cost                *float64
	AudioOutput         *schemas.BifrostSpeechResponse
	TranscriptionOutput *schemas.BifrostTranscriptionResponse
	FinishReason        *string
	RawResponse         *string
}

AccumulatedData contains the accumulated data for a stream

type Accumulator

type Accumulator struct {
	// contains filtered or unexported fields
}

Accumulator manages accumulation of streaming chunks

func NewAccumulator

func NewAccumulator(pricingManager *modelcatalog.ModelCatalog, logger schemas.Logger) *Accumulator

NewAccumulator creates a new accumulator

func (*Accumulator) Cleanup

func (a *Accumulator) Cleanup()

Cleanup cleans up the accumulator

func (*Accumulator) CleanupStreamAccumulator added in v1.1.2

func (a *Accumulator) CleanupStreamAccumulator(requestID string) error

CleanupStreamAccumulator cleans up the stream accumulator for a request

func (*Accumulator) CreateStreamAccumulator

func (a *Accumulator) CreateStreamAccumulator(requestID string, startTimestamp time.Time) *StreamAccumulator

CreateStreamAccumulator creates a new stream accumulator for a request

func (*Accumulator) ProcessStreamingResponse

func (a *Accumulator) ProcessStreamingResponse(ctx *schemas.BifrostContext, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*ProcessedStreamResponse, error)

ProcessStreamingResponse processes a streaming response It handles chat, audio, and responses streaming responses

type AudioStreamChunk

type AudioStreamChunk struct {
	Timestamp          time.Time                            // When chunk was received
	Delta              *schemas.BifrostSpeechStreamResponse // The actual delta content
	FinishReason       *string                              // If this is the final chunk
	TokenUsage         *schemas.SpeechUsage                 // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug           // Semantic cache debug if available
	Cost               *float64                             // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError                // Error if any
	ChunkIndex         int                                  // Index of the chunk in the stream
	RawResponse        *string
}

AudioStreamChunk represents a single streaming chunk

type ChatStreamChunk

type ChatStreamChunk struct {
	Timestamp          time.Time                              // When chunk was received
	Delta              *schemas.ChatStreamResponseChoiceDelta // The actual delta content
	FinishReason       *string                                // If this is the final chunk
	TokenUsage         *schemas.BifrostLLMUsage               // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug             // Semantic cache debug if available
	Cost               *float64                               // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError                  // Error if any
	ChunkIndex         int                                    // Index of the chunk in the stream
	RawResponse        *string                                // Raw response if available
}

ChatStreamChunk represents a single streaming chunk

type ProcessedStreamResponse

type ProcessedStreamResponse struct {
	Type       StreamResponseType
	RequestID  string
	StreamType StreamType
	Provider   schemas.ModelProvider
	Model      string
	Data       *AccumulatedData
	RawRequest *interface{}
}

ProcessedStreamResponse represents a processed streaming response

func (*ProcessedStreamResponse) ToBifrostResponse

func (p *ProcessedStreamResponse) ToBifrostResponse() *schemas.BifrostResponse

ToBifrostResponse converts a ProcessedStreamResponse to a BifrostResponse

type ResponsesStreamChunk added in v1.1.6

type ResponsesStreamChunk struct {
	Timestamp          time.Time                               // When chunk was received
	StreamResponse     *schemas.BifrostResponsesStreamResponse // The actual stream response
	FinishReason       *string                                 // If this is the final chunk
	TokenUsage         *schemas.BifrostLLMUsage                // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug              // Semantic cache debug if available
	Cost               *float64                                // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError                   // Error if any
	ChunkIndex         int                                     // Index of the chunk in the stream
	RawResponse        *string
}

ResponsesStreamChunk represents a single responses streaming chunk

type StreamAccumulator

type StreamAccumulator struct {
	RequestID                 string
	StartTimestamp            time.Time
	ChatStreamChunks          []*ChatStreamChunk
	ResponsesStreamChunks     []*ResponsesStreamChunk
	TranscriptionStreamChunks []*TranscriptionStreamChunk
	AudioStreamChunks         []*AudioStreamChunk
	IsComplete                bool
	FinalTimestamp            time.Time

	Timestamp time.Time
	// contains filtered or unexported fields
}

StreamAccumulator manages accumulation of streaming chunks

type StreamResponseType

type StreamResponseType string
const (
	StreamResponseTypeDelta StreamResponseType = "delta"
	StreamResponseTypeFinal StreamResponseType = "final"
)

type StreamType

type StreamType string
const (
	StreamTypeText          StreamType = "text.completion"
	StreamTypeChat          StreamType = "chat.completion"
	StreamTypeAudio         StreamType = "audio.speech"
	StreamTypeTranscription StreamType = "audio.transcription"
	StreamTypeResponses     StreamType = "responses"
)

type TranscriptionStreamChunk

type TranscriptionStreamChunk struct {
	Timestamp          time.Time                                   // When chunk was received
	Delta              *schemas.BifrostTranscriptionStreamResponse // The actual delta content
	FinishReason       *string                                     // If this is the final chunk
	TokenUsage         *schemas.TranscriptionUsage                 // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug                  // Semantic cache debug if available
	Cost               *float64                                    // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError                       // Error if any
	ChunkIndex         int                                         // Index of the chunk in the stream
	RawResponse        *string
}

TranscriptionStreamChunk represents a single transcription streaming chunk

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL