Documentation
¶
Overview ¶
Package streaming provides functionality for accumulating streaming chunks and other chunk-related workflows
Index ¶
- type AccumulatedData
- type Accumulator
- func (a *Accumulator) Cleanup()
- func (a *Accumulator) CleanupStreamAccumulator(requestID string) error
- func (a *Accumulator) CreateStreamAccumulator(requestID string, startTimestamp time.Time) *StreamAccumulator
- func (a *Accumulator) ProcessStreamingResponse(ctx *schemas.BifrostContext, result *schemas.BifrostResponse, ...) (*ProcessedStreamResponse, error)
- type AudioStreamChunk
- type ChatStreamChunk
- type ImageStreamChunk
- type ProcessedStreamResponse
- type ResponsesStreamChunk
- type StreamAccumulator
- type StreamType
- type TranscriptionStreamChunk
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccumulatedData ¶
type AccumulatedData struct {
RequestID string
Model string
Status string
Stream bool
Latency int64 // in milliseconds
TimeToFirstToken int64 // Time to first token in milliseconds (streaming only)
StartTimestamp time.Time
EndTimestamp time.Time
OutputMessage *schemas.ChatMessage
OutputMessages []schemas.ResponsesMessage // For responses API
ToolCalls []schemas.ChatAssistantMessageToolCall
ErrorDetails *schemas.BifrostError
TokenUsage *schemas.BifrostLLMUsage
CacheDebug *schemas.BifrostCacheDebug
Cost *float64
AudioOutput *schemas.BifrostSpeechResponse
TranscriptionOutput *schemas.BifrostTranscriptionResponse
ImageGenerationOutput *schemas.BifrostImageGenerationResponse
FinishReason *string
RawResponse *string
}
AccumulatedData contains the accumulated data for a stream
type Accumulator ¶
type Accumulator struct {
// contains filtered or unexported fields
}
Accumulator manages accumulation of streaming chunks
func NewAccumulator ¶
func NewAccumulator(pricingManager *modelcatalog.ModelCatalog, logger schemas.Logger) *Accumulator
NewAccumulator creates a new accumulator
func (*Accumulator) CleanupStreamAccumulator ¶ added in v1.1.2
func (a *Accumulator) CleanupStreamAccumulator(requestID string) error
CleanupStreamAccumulator decrements the reference counter for a stream accumulator. The accumulator is only cleaned up when the reference counter reaches 0. This function is idempotent - calling it after cleanup has already happened is safe.
func (*Accumulator) CreateStreamAccumulator ¶
func (a *Accumulator) CreateStreamAccumulator(requestID string, startTimestamp time.Time) *StreamAccumulator
CreateStreamAccumulator creates a new stream accumulator for a request It increments the reference counter atomically for concurrent access tracking
func (*Accumulator) ProcessStreamingResponse ¶
func (a *Accumulator) ProcessStreamingResponse(ctx *schemas.BifrostContext, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*ProcessedStreamResponse, error)
ProcessStreamingResponse processes a streaming response It handles chat, audio, and responses streaming responses
type AudioStreamChunk ¶
type AudioStreamChunk struct {
Timestamp time.Time // When chunk was received
Delta *schemas.BifrostSpeechStreamResponse // The actual delta content
FinishReason *string // If this is the final chunk
TokenUsage *schemas.SpeechUsage // Token usage if available
SemanticCacheDebug *schemas.BifrostCacheDebug // Semantic cache debug if available
Cost *float64 // Cost in dollars from pricing plugin
ErrorDetails *schemas.BifrostError // Error if any
ChunkIndex int // Index of the chunk in the stream
RawResponse *string
}
AudioStreamChunk represents a single streaming chunk
type ChatStreamChunk ¶
type ChatStreamChunk struct {
Timestamp time.Time // When chunk was received
Delta *schemas.ChatStreamResponseChoiceDelta // The actual delta content
FinishReason *string // If this is the final chunk
TokenUsage *schemas.BifrostLLMUsage // Token usage if available
SemanticCacheDebug *schemas.BifrostCacheDebug // Semantic cache debug if available
Cost *float64 // Cost in dollars from pricing plugin
ErrorDetails *schemas.BifrostError // Error if any
ChunkIndex int // Index of the chunk in the stream
RawResponse *string // Raw response if available
}
ChatStreamChunk represents a single streaming chunk
type ImageStreamChunk ¶ added in v1.2.9
type ImageStreamChunk struct {
Timestamp time.Time // When chunk was received
Delta *schemas.BifrostImageGenerationStreamResponse // The actual stream response
FinishReason *string // If this is the final chunk
ChunkIndex int // Index of the chunk in the stream
ImageIndex int // Index of the image in the stream
ErrorDetails *schemas.BifrostError // Error if any
Cost *float64 // Cost in dollars from pricing plugin
SemanticCacheDebug *schemas.BifrostCacheDebug // Semantic cache debug if available
TokenUsage *schemas.ImageUsage // Token usage if available
RawResponse *string // Raw response if available
}
ImageStreamChunk represents a single image streaming chunk
type ProcessedStreamResponse ¶
type ProcessedStreamResponse struct {
RequestID string
StreamType StreamType
Provider schemas.ModelProvider
Model string
Data *AccumulatedData
RawRequest *interface{}
}
ProcessedStreamResponse represents a processed streaming response
func (*ProcessedStreamResponse) ToBifrostResponse ¶
func (p *ProcessedStreamResponse) ToBifrostResponse() *schemas.BifrostResponse
ToBifrostResponse converts a ProcessedStreamResponse to a BifrostResponse
type ResponsesStreamChunk ¶ added in v1.1.6
type ResponsesStreamChunk struct {
Timestamp time.Time // When chunk was received
StreamResponse *schemas.BifrostResponsesStreamResponse // The actual stream response
FinishReason *string // If this is the final chunk
TokenUsage *schemas.BifrostLLMUsage // Token usage if available
SemanticCacheDebug *schemas.BifrostCacheDebug // Semantic cache debug if available
Cost *float64 // Cost in dollars from pricing plugin
ErrorDetails *schemas.BifrostError // Error if any
ChunkIndex int // Index of the chunk in the stream
RawResponse *string
}
ResponsesStreamChunk represents a single responses streaming chunk
type StreamAccumulator ¶
type StreamAccumulator struct {
RequestID string
StartTimestamp time.Time
FirstChunkTimestamp time.Time // Timestamp when the first chunk was received (for TTFT calculation)
ChatStreamChunks []*ChatStreamChunk
ResponsesStreamChunks []*ResponsesStreamChunk
TranscriptionStreamChunks []*TranscriptionStreamChunk
AudioStreamChunks []*AudioStreamChunk
ImageStreamChunks []*ImageStreamChunk
// De-dup maps to prevent chunk loss on out-of-order arrival
ChatChunksSeen map[int]struct{}
ResponsesChunksSeen map[int]struct{}
TranscriptionChunksSeen map[int]struct{}
AudioChunksSeen map[int]struct{}
ImageChunksSeen map[string]struct{} // Composite key: "imageIndex:chunkIndex" to scope de-dup per image
// Track highest ChunkIndex for metadata extraction (TokenUsage, Cost, FinishReason)
MaxChatChunkIndex int
MaxResponsesChunkIndex int
MaxTranscriptionChunkIndex int
MaxAudioChunkIndex int
IsComplete bool
FinalTimestamp time.Time
Timestamp time.Time
// contains filtered or unexported fields
}
StreamAccumulator manages accumulation of streaming chunks
type StreamType ¶
type StreamType string
const ( StreamTypeText StreamType = "text.completion" StreamTypeChat StreamType = "chat.completion" StreamTypeAudio StreamType = "audio.speech" StreamTypeImage StreamType = "image.generation" StreamTypeTranscription StreamType = "audio.transcription" StreamTypeResponses StreamType = "responses" )
type TranscriptionStreamChunk ¶
type TranscriptionStreamChunk struct {
Timestamp time.Time // When chunk was received
Delta *schemas.BifrostTranscriptionStreamResponse // The actual delta content
FinishReason *string // If this is the final chunk
TokenUsage *schemas.TranscriptionUsage // Token usage if available
SemanticCacheDebug *schemas.BifrostCacheDebug // Semantic cache debug if available
Cost *float64 // Cost in dollars from pricing plugin
ErrorDetails *schemas.BifrostError // Error if any
ChunkIndex int // Index of the chunk in the stream
RawResponse *string
}
TranscriptionStreamChunk represents a single transcription streaming chunk