streaming

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2025 License: Apache-2.0 Imports: 7 Imported by: 3

Documentation

Overview

Package streaming provides functionality for accumulating streaming chunks and other chunk-related workflows

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccumulatedData

type AccumulatedData struct {
	RequestID           string
	Model               string
	Status              string
	Stream              bool
	Latency             float64
	StartTimestamp      time.Time
	EndTimestamp        time.Time
	OutputMessage       *schemas.ChatMessage
	ToolCalls           []schemas.ChatAssistantMessageToolCall
	ErrorDetails        *schemas.BifrostError
	TokenUsage          *schemas.LLMUsage
	CacheDebug          *schemas.BifrostCacheDebug
	Cost                *float64
	Object              string
	AudioOutput         *schemas.BifrostSpeech
	TranscriptionOutput *schemas.BifrostTranscribe
	FinishReason        *string
}

AccumulatedData contains the accumulated data for a stream

type Accumulator

type Accumulator struct {
	// contains filtered or unexported fields
}

Accumulator manages accumulation of streaming chunks

func NewAccumulator

func NewAccumulator(pricingManager *pricing.PricingManager, logger schemas.Logger) *Accumulator

NewAccumulator creates a new accumulator

func (*Accumulator) Cleanup

func (a *Accumulator) Cleanup()

Cleanup cleans up the accumulator

func (*Accumulator) CreateStreamAccumulator

func (a *Accumulator) CreateStreamAccumulator(requestID string, startTimestamp time.Time) *StreamAccumulator

CreateStreamAccumulator creates a new stream accumulator for a request

func (*Accumulator) ProcessStreamingResponse

func (a *Accumulator) ProcessStreamingResponse(ctx *context.Context, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*ProcessedStreamResponse, error)

ProcessStreamingResponse processes a streaming response It handles both audio and chat streaming responses

type AudioStreamChunk

type AudioStreamChunk struct {
	Timestamp          time.Time                  // When chunk was received
	Delta              *schemas.BifrostSpeech     // The actual delta content
	FinishReason       *string                    // If this is the final chunk
	TokenUsage         *schemas.AudioLLMUsage     // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug // Semantic cache debug if available
	Cost               *float64                   // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError      // Error if any
}

AudioStreamChunk represents a single streaming chunk

type ChatStreamChunk

type ChatStreamChunk struct {
	Timestamp          time.Time                   // When chunk was received
	Delta              *schemas.BifrostStreamDelta // The actual delta content
	FinishReason       *string                     // If this is the final chunk
	TokenUsage         *schemas.LLMUsage           // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug  // Semantic cache debug if available
	Cost               *float64                    // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError       // Error if any
}

ChatStreamChunk represents a single streaming chunk

type ProcessedStreamResponse

type ProcessedStreamResponse struct {
	Type       StreamResponseType
	RequestID  string
	StreamType StreamType
	Provider   schemas.ModelProvider
	Model      string
	Data       *AccumulatedData
}

ProcessedStreamResponse represents a processed streaming response

func (*ProcessedStreamResponse) ToBifrostResponse

func (p *ProcessedStreamResponse) ToBifrostResponse() *schemas.BifrostResponse

ToBifrostResponse converts a ProcessedStreamResponse to a BifrostResponse

type StreamAccumulator

type StreamAccumulator struct {
	RequestID                 string
	StartTimestamp            time.Time
	ChatStreamChunks          []*ChatStreamChunk
	TranscriptionStreamChunks []*TranscriptionStreamChunk
	AudioStreamChunks         []*AudioStreamChunk
	IsComplete                bool
	FinalTimestamp            time.Time
	Object                    string // Store object type once for the entire stream

	Timestamp time.Time
	// contains filtered or unexported fields
}

StreamAccumulator manages accumulation of streaming chunks

type StreamResponseType

type StreamResponseType string
const (
	StreamResponseTypeDelta StreamResponseType = "delta"
	StreamResponseTypeFinal StreamResponseType = "final"
)

type StreamType

type StreamType string
const (
	StreamTypeChat          StreamType = "chat.completion"
	StreamTypeAudio         StreamType = "audio.speech"
	StreamTypeTranscription StreamType = "audio.transcription"
)

type TranscriptionStreamChunk

type TranscriptionStreamChunk struct {
	Timestamp          time.Time                                // When chunk was received
	Delta              *schemas.BifrostTranscribeStreamResponse // The actual delta content
	FinishReason       *string                                  // If this is the final chunk
	TokenUsage         *schemas.LLMUsage                        // Token usage if available
	TranscriptionUsage *schemas.TranscriptionUsage              // Token usage if available
	SemanticCacheDebug *schemas.BifrostCacheDebug               // Semantic cache debug if available
	Cost               *float64                                 // Cost in dollars from pricing plugin
	ErrorDetails       *schemas.BifrostError                    // Error if any
}

TranscriptionStreamChunk represents a single transcription streaming chunk

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL