Documentation
¶
Overview ¶
Package utils provides common utility functions used across different provider implementations. This file contains audio-related utility functions for format conversion.
Package utils — list_models.go Centralised pipeline for filtering and backfilling models in ListModels responses.
Every provider's ToBifrostListModelsResponse follows the same logical steps:
- Resolve each API model's name (alias lookup → alias key; else raw model ID)
- Filter (allowlist + blacklist check on the resolved name)
- Backfill entries that were not returned by the API but should appear in output
Providers plug in custom MatchFns to extend the default matching behaviour. Example: Bedrock adds region-prefix-aware matching on top of DefaultMatchFns.
Package providers implements various LLM providers and their utility functions. This file contains common utility functions used across different provider implementations.
Index ¶
- Constants
- Variables
- func AcquireBrotliReader(r io.Reader) *brotli.Reader
- func AcquireFlateReader(r io.Reader) (io.ReadCloser, error)
- func AcquireGzipReader(r io.Reader) (*gzip.Reader, error)
- func AcquireZstdDecoder(r io.Reader) (*zstd.Decoder, error)
- func AddVideoIDProviderSuffix(videoID string, provider schemas.ModelProvider) string
- func ApplyLargePayloadRequestBody(ctx context.Context, req *fasthttp.Request) bool
- func ApplyLargePayloadRequestBodyWithModelNormalization(ctx context.Context, req *fasthttp.Request, ...) bool
- func AudioFilenameFromBytes(audioData []byte) string
- func BuildClientStreamChunk(ctx context.Context, processedResponse *schemas.BifrostResponse, ...) *schemas.BifrostStreamChunk
- func BuildLargeResponseClient(base *fasthttp.Client, responseThreshold int64) *fasthttp.Client
- func BuildStreamingClient(base *fasthttp.Client) *fasthttp.Client
- func BuildStreamingHTTPClient(base *http.Client) *http.Client
- func BulkSetModelParams(entries map[string]ModelParams)
- func CheckAndDecodeBody(resp *fasthttp.Response) ([]byte, error)
- func CheckAndGetRawRequestBody(ctx context.Context, request RequestBodyGetter) ([]byte, bool)
- func CheckAndSetDefaultProvider(ctx *schemas.BifrostContext, defaultProvider schemas.ModelProvider) schemas.ModelProvider
- func CheckContextAndGetRequestBody(ctx context.Context, request RequestBodyGetter, ...) ([]byte, *schemas.BifrostError)
- func CheckFirstStreamChunkForError(ctx context.Context, stream chan *schemas.BifrostStreamChunk) (chan *schemas.BifrostStreamChunk, <-chan struct{}, *schemas.BifrostError)
- func CheckOperationAllowed(defaultProvider schemas.ModelProvider, config *schemas.CustomProviderConfig, ...) *schemas.BifrostError
- func CloneFastHTTPClientConfig(base *fasthttp.Client) *fasthttp.Client
- func ConfigureDialer(client *fasthttp.Client) *fasthttp.Client
- func ConfigureProxy(client *fasthttp.Client, proxyConfig *schemas.ProxyConfig, ...) *fasthttp.Client
- func ConfigureRetry(client *fasthttp.Client) *fasthttp.Clientdeprecated
- func ConfigureTLS(client *fasthttp.Client, networkConfig schemas.NetworkConfig, ...) *fasthttp.Client
- func ConvertPCMToWAV(pcmData []byte, config PCMConfig) ([]byte, error)
- func ConvertSizeToAspectRatioAndResolution(size string) (aspectRatio, imageSize string)
- func CreateBifrostChatCompletionChunkResponse(id string, usage *schemas.BifrostLLMUsage, finishReason *string, ...) *schemas.BifrostChatResponse
- func CreateBifrostTextCompletionChunkResponse(id string, usage *schemas.BifrostLLMUsage, finishReason *string, ...) *schemas.BifrostTextCompletionResponse
- func DecompressStreamBody(resp *fasthttp.Response) (io.Reader, func())
- func DeleteJSONField(data []byte, path string) ([]byte, error)
- func DeleteModelParams(model string)
- func DetectAudioMimeType(audioData []byte) string
- func DrainLargePayloadRemainder(ctx context.Context)
- func DrainNonSSEStreamResponse(resp *fasthttp.Response) bool
- func EnrichError(ctx *schemas.BifrostContext, bifrostErr *schemas.BifrostError, ...) *schemas.BifrostError
- func EnsureStreamFinalizerCalled(ctx context.Context, finalizer func(context.Context))
- func ExtractHTMLErrorMessage(body []byte) string
- func ExtractProviderResponseHeaders(resp *fasthttp.Response) map[string]string
- func ExtractProviderResponseHeadersFromHTTP(resp *http.Response) map[string]string
- func FileBytesToBase64DataURL(fileBytes []byte) string
- func FinalizeResponseWithLargeDetection(ctx *schemas.BifrostContext, resp *fasthttp.Response, logger schemas.Logger) ([]byte, bool, *schemas.BifrostError)
- func GetBifrostResponseForStreamResponse(textCompletionResponse *schemas.BifrostTextCompletionResponse, ...) *schemas.BifrostResponse
- func GetBudgetTokensFromReasoningEffort(effort string, minBudgetTokens int, maxTokens int) (int, error)
- func GetJSONField(data []byte, path string) gjson.Result
- func GetMaxOutputTokens(model string) (int, bool)
- func GetMaxOutputTokensOrDefault(model string, defaultValue int) int
- func GetPathFromContext(ctx context.Context, defaultPath string) string
- func GetProviderName(defaultProvider schemas.ModelProvider, ...) schemas.ModelProvider
- func GetRandomString(length int) string
- func GetReasoningEffortFromBudgetTokens(budgetTokens int, minBudgetTokens int, maxTokens int) string
- func GetRequestPath(ctx context.Context, defaultPath string, ...) (string, bool)
- func GetStreamIdleTimeout(ctx *schemas.BifrostContext) time.Duration
- func HandleKeylessListModelsRequest(provider schemas.ModelProvider, ...) (*schemas.BifrostListModelsResponse, *schemas.BifrostError)
- func HandleMultipleListModelsRequests(ctx *schemas.BifrostContext, keys []schemas.Key, ...) (*schemas.BifrostListModelsResponse, *schemas.BifrostError)
- func HandleProviderAPIError(resp *fasthttp.Response, errorResp any) *schemas.BifrostError
- func HandleProviderResponse[T any](responseBody []byte, response *T, requestBody []byte, sendBackRawRequest bool, ...) (rawRequest interface{}, rawResponse interface{}, ...)
- func HandleStreamCancellation(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, ...)
- func HandleStreamControlSkip(bifrostErr *schemas.BifrostError) bool
- func HandleStreamTimeout(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, ...)
- func IsHTMLResponse(resp *fasthttp.Response, body []byte) bool
- func IsLargePayloadPassthroughEnabled(ctx context.Context) bool
- func IsVertexMultiRegionOnlyModel(model string) bool
- func JSONFieldExists(data []byte, path string) bool
- func MakeRequestWithContext(ctx context.Context, client *fasthttp.Client, req *fasthttp.Request, ...) (time.Duration, *schemas.BifrostError, func())
- func MarshalSorted(v interface{}) ([]byte, error)
- func MarshalSortedIndent(v interface{}, prefix, indent string) ([]byte, error)
- func MaterializeStreamErrorBody(ctx *schemas.BifrostContext, resp *fasthttp.Response)
- func MergeExtraParams(jsonMap map[string]interface{}, extraParams map[string]interface{})
- func MergeExtraParamsIntoJSON(jsonBody []byte, extraParams map[string]interface{}) ([]byte, error)
- func ModelMatchesDenylist(denylist []string, candidates ...string) bool
- func NewBifrostOperationError(message string, err error) *schemas.BifrostError
- func NewBifrostTimeoutError(message string, err error) *schemas.BifrostError
- func NewConfigurationError(message string) *schemas.BifrostError
- func NewIdleTimeoutReader(reader io.Reader, bodyStream io.Reader, timeout time.Duration) (io.Reader, func())
- func NewProviderAPIError(message string, err error, statusCode int, errorType *string, eventID *string) *schemas.BifrostError
- func NewUnsupportedOperationError(requestType schemas.RequestType, providerName schemas.ModelProvider) *schemas.BifrostError
- func ParseAndSetRawRequest(extraFields *schemas.BifrostResponseExtraFields, jsonBody []byte)
- func ParseAndSetRawRequestIfJSON(fasthttpReq *fasthttp.Request, extraFields *schemas.BifrostResponseExtraFields)
- func ParseOpenAIUsageFromBytes(data []byte) *schemas.BifrostLLMUsage
- func PrepareResponseStreaming(ctx *schemas.BifrostContext, client *fasthttp.Client, resp *fasthttp.Response) *fasthttp.Client
- func ProcessAndSendBifrostError(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, ...)
- func ProcessAndSendError(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, err error, ...)
- func ProcessAndSendResponse(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, ...)
- func ProviderIsResponsesAPINative(providerName schemas.ModelProvider) bool
- func ProviderSendsDoneMarker(providerName schemas.ModelProvider) bool
- func ReleaseBrotliReader(br *brotli.Reader)
- func ReleaseFlateReader(fr io.ReadCloser)
- func ReleaseGzipReader(gz *gzip.Reader)
- func ReleaseStreamingResponse(resp *fasthttp.Response)
- func ReleaseZstdDecoder(dec *zstd.Decoder)
- func RewriteLargePayloadModelInJSONPrefix(reader io.Reader, fromModel, toModel string) (io.Reader, int)
- func RewriteLargePayloadModelInMultipartPrefix(reader io.Reader, fromModel, toModel string) (io.Reader, int)
- func SendCreatedEventResponsesChunk(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, ...)
- func SendInProgressEventResponsesChunk(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, ...)
- func SetCacheMissHandler(fn func(model string) *ModelParams)
- func SetExtraHeaders(ctx context.Context, req *fasthttp.Request, extraHeaders map[string]string, ...)
- func SetExtraHeadersHTTP(ctx context.Context, req *http.Request, extraHeaders map[string]string, ...)
- func SetJSONField(data []byte, path string, value interface{}) ([]byte, error)
- func SetLogger(l schemas.Logger)
- func SetModelParams(model string, params ModelParams)
- func SetStreamIdleTimeoutIfEmpty(ctx *schemas.BifrostContext, configSeconds int)
- func SetupStreamCancellation(ctx context.Context, bodyStream io.Reader, logger schemas.Logger) (cleanup func())
- func SetupStreamingPassthrough(ctx *schemas.BifrostContext, resp *fasthttp.Response) bool
- func ShouldSendBackRawRequest(ctx context.Context, defaultSendBackRawRequest bool) bool
- func ShouldSendBackRawResponse(ctx context.Context, defaultSendBackRawResponse bool) bool
- func StripVideoIDProviderSuffix(videoID string, provider schemas.ModelProvider) string
- func ToDisplayName(id string) string
- type FilterResult
- type JSONLParseResult
- type LargeResponseReader
- type ListModelsPipeline
- type MatchFn
- type ModelParams
- type PCMConfig
- type RequestBodyConverter
- type RequestBodyGetter
- type RequestBodyWithExtraParams
- type SSEDataReader
- type SSEEventReader
- type SSEReaderFactory
- type SerialListHelper
- type StreamTerminalDetector
Constants ¶
const DefaultModelParamsCacheSize = 2048
const DefaultStreamIdleTimeout = 60 * time.Second
DefaultStreamIdleTimeout is how long a stream read can block with zero data before bifrost considers the connection stalled and closes it. This protects against providers that stop sending data but keep the TCP connection open (e.g., Azure TPM throttling).
Variables ¶
var UnsupportedSpeechStreamModels = []string{"tts-1", "tts-1-hd"}
Functions ¶
func AcquireBrotliReader ¶
AcquireBrotliReader gets a brotli.Reader from the pool and resets it to read from r, or creates a new one if the pool is empty or reset panics.
func AcquireFlateReader ¶
func AcquireFlateReader(r io.Reader) (io.ReadCloser, error)
AcquireFlateReader gets a zlib (HTTP "deflate") reader from the pool and resets it to read from r, or creates a new one if the pool is empty or reset fails.
func AcquireGzipReader ¶
AcquireGzipReader gets a gzip.Reader from the pool and resets it to read from r, or creates a new one if the pool is empty or reset fails.
func AcquireZstdDecoder ¶
AcquireZstdDecoder gets a zstd.Decoder from the pool and resets it to read from r, or creates a new one if the pool is empty or reset fails. Decoders are created with concurrency=1 to minimise goroutine overhead.
func AddVideoIDProviderSuffix ¶
func AddVideoIDProviderSuffix(videoID string, provider schemas.ModelProvider) string
AddVideoIDProviderSuffix ensures a video ID is scoped as "<id>:<provider>".
func ApplyLargePayloadRequestBody ¶
ApplyLargePayloadRequestBody applies the request body reader from context to the outgoing provider request. Returns true when a streaming body was applied.
func ApplyLargePayloadRequestBodyWithModelNormalization ¶
func ApplyLargePayloadRequestBodyWithModelNormalization( ctx context.Context, req *fasthttp.Request, defaultProvider schemas.ModelProvider, ) bool
ApplyLargePayloadRequestBodyWithModelNormalization applies the streaming body reader from context and optionally rewrites prefixed model values for JSON passthrough requests (for example "openai/gpt-5" -> "gpt-5"). This preserves low-memory streaming while keeping large-payload behavior aligned with the normal parsed path that strips provider prefixes.
func AudioFilenameFromBytes ¶
AudioFilenameFromBytes returns a filename with the correct extension for the given audio data. Falls back to "audio.mp3" if the format cannot be detected.
func BuildClientStreamChunk ¶
func BuildClientStreamChunk(ctx context.Context, processedResponse *schemas.BifrostResponse, processedError *schemas.BifrostError) *schemas.BifrostStreamChunk
BuildClientStreamChunk constructs a BifrostStreamChunk from post-hook results. It never mutates the shared processedResponse or processedError objects — when raw fields need to be stripped (captured for storage but not for send-back), it shallow-copies each inner response struct and nils only the appropriate per-side field on those copies. This is safe for concurrent PostLLMHook goroutines that still hold references to the originals.
func BuildLargeResponseClient ¶
BuildLargeResponseClient creates a streaming-enabled fasthttp client for large response detection. The client caps buffering at the threshold and enables response body streaming.
ReadTimeout/WriteTimeout/MaxConnDuration are zeroed: large-response bodies may take arbitrarily long to download, and fasthttp's ReadTimeout bounds *full* body read — not idle. Idle detection on stalled streams is handled separately (see NewIdleTimeoutReader / SetupStreamingPassthrough).
func BuildStreamingClient ¶ added in v1.5.5
BuildStreamingClient returns a fasthttp.Client suitable for long-lived SSE or EventStream responses. It clones base's dialer/proxy/TLS/pool settings, then clears Read/Write timeouts and MaxConnDuration so fasthttp does not pre-empt a healthy stream. StreamResponseBody is forced on.
Per-chunk idle detection is enforced at the application layer via NewIdleTimeoutReader (see GetStreamIdleTimeout / StreamIdleTimeoutInSeconds). The initial TCP/TLS dial still honors the base client's ReadTimeout because the Dial closure installed by ConfigureDialer reads client.ReadTimeout from the base client pointer captured at ConfigureDialer call time — cloning copies that closure verbatim, so zeroing the clone's ReadTimeout does not affect dial.
func BuildStreamingHTTPClient ¶ added in v1.5.5
BuildStreamingHTTPClient returns an *http.Client for long-lived streaming responses over net/http (e.g. Bedrock EventStream). It reuses the base's Transport (safe for concurrent use by multiple clients) and sets Timeout=0 so Client.Timeout does not cap the entire request lifecycle including body reads. The transport's ResponseHeaderTimeout still bounds the initial response-headers wait; per-chunk idle is enforced by NewIdleTimeoutReader.
func BulkSetModelParams ¶
func BulkSetModelParams(entries map[string]ModelParams)
BulkSetModelParams sets parameters for multiple models at once.
func CheckAndDecodeBody ¶
CheckAndDecodeBody checks the content encoding and decodes the body accordingly. It returns a copy of the body to avoid race conditions when the response is released back to fasthttp's buffer pool. Uses pooled gzip readers to reduce GC pressure.
func CheckAndGetRawRequestBody ¶
func CheckAndGetRawRequestBody(ctx context.Context, request RequestBodyGetter) ([]byte, bool)
CheckAndGetRawRequestBody checks if the raw request body should be used, and returns it if it exists.
func CheckAndSetDefaultProvider ¶
func CheckAndSetDefaultProvider(ctx *schemas.BifrostContext, defaultProvider schemas.ModelProvider) schemas.ModelProvider
CheckAndSetDefaultProvider checks if the default provider should be used based on the context. It returns the default provider if it should be used, otherwise it returns an empty string. Checks if the direct key is set in the context, or if key selection is skipped. Or if the available providers are set in the context and the default provider is in the list.
func CheckContextAndGetRequestBody ¶
func CheckContextAndGetRequestBody(ctx context.Context, request RequestBodyGetter, requestConverter RequestBodyConverter) ([]byte, *schemas.BifrostError)
CheckContextAndGetRequestBody checks if the raw request body should be used, and returns it if it exists.
func CheckFirstStreamChunkForError ¶
func CheckFirstStreamChunkForError( ctx context.Context, stream chan *schemas.BifrostStreamChunk, ) (chan *schemas.BifrostStreamChunk, <-chan struct{}, *schemas.BifrostError)
CheckFirstStreamChunkForError reads the first chunk from a streaming channel to detect errors returned inside HTTP 200 SSE streams (e.g., providers that send rate limit errors as SSE events instead of HTTP 429).
If the first chunk is an error, it drains the source channel in the background (so the provider goroutine can exit cleanly) and returns the error for synchronous handling, enabling retries and fallbacks. The returned drainDone channel is closed once the drain completes — callers must wait on it before releasing any resources (e.g., plugin pipelines) that the provider goroutine's postHookRunner may still reference.
If the first chunk is valid data, it returns a wrapped channel that re-emits the first chunk followed by all remaining chunks from the source. drainDone is closed when the wrapper goroutine finishes forwarding the source stream.
If the source channel is closed immediately (empty stream), it returns a nil channel with nil error. drainDone is already closed.
The ctx argument cancels the background forwarding goroutine if the consumer abandons the returned wrapped channel. On ctx.Done the goroutine drains the source stream so the upstream provider's blocked send can exit cleanly.
func CheckOperationAllowed ¶
func CheckOperationAllowed(defaultProvider schemas.ModelProvider, config *schemas.CustomProviderConfig, operation schemas.RequestType) *schemas.BifrostError
CheckOperationAllowed enforces per-op gating using schemas.Operation. Behavior: - If no gating is configured (config == nil or AllowedRequests == nil), the operation is allowed. - If gating is configured, returns an error when the operation is not explicitly allowed.
func CloneFastHTTPClientConfig ¶
CloneFastHTTPClientConfig creates a fresh fasthttp.Client by copying only config fields from base. Never copy fasthttp.Client by value: it contains internal pools and locks. Example failure this prevents: parallel load regressions with unexpected buffering behavior after `cloned := *base` copies of active clients.
func ConfigureDialer ¶
ConfigureDialer configures the client's connection behavior:
- Sets up the stale-connection retry policy (see network.StaleConnectionRetryIfErr).
- Wraps the Dial function to enable TCP keepalive on all connections, proactively detecting dead connections before fasthttp tries to reuse them.
Must be called AFTER ConfigureProxy (which may set client.Dial to a proxy dialer), so the keepalive wrapper composes on top of the proxy connection.
Keepalive parameters:
- Idle 10s: first probe after 10s of inactivity (well under the 30s MaxIdleConnDuration)
- Interval 5s: subsequent probes every 5s
- Count 3: close after 3 failed probes
Dead connections are detected within ~25s (10 + 5*3), before the 30s MaxIdleConnDuration expires and the connection is reused.
func ConfigureProxy ¶
func ConfigureProxy(client *fasthttp.Client, proxyConfig *schemas.ProxyConfig, logger schemas.Logger) *fasthttp.Client
ConfigureProxy sets up a proxy for the fasthttp client based on the provided configuration. It supports HTTP, SOCKS5, and environment-based proxy configurations. Returns the configured client or the original client if proxy configuration is invalid.
func ConfigureRetry
deprecated
func ConfigureTLS ¶
func ConfigureTLS(client *fasthttp.Client, networkConfig schemas.NetworkConfig, logger schemas.Logger) *fasthttp.Client
ConfigureTLS applies TLS settings from NetworkConfig to the fasthttp client. It merges with any existing TLSConfig (e.g., from ConfigureProxy).
func ConvertPCMToWAV ¶
ConvertPCMToWAV converts raw PCM audio data to WAV format The PCM data is expected to be in signed little-endian format (s16le for 16-bit)
func ConvertSizeToAspectRatioAndResolution ¶ added in v1.5.5
ConvertSizeToAspectRatioAndResolution converts a standard size string (e.g., "1024x1024") to an aspect ratio and image size tier. aspectRatio is one of "1:1", "3:4", "4:3", "9:16", "16:9" (empty if unrecognised). imageSize is one of "1K", "2K", "4K" (empty if out of range).
func CreateBifrostChatCompletionChunkResponse ¶
func CreateBifrostChatCompletionChunkResponse( id string, usage *schemas.BifrostLLMUsage, finishReason *string, currentChunkIndex int, model string, created int, ) *schemas.BifrostChatResponse
CreateBifrostChatCompletionChunkResponse creates a bifrost chat completion chunk response.
func CreateBifrostTextCompletionChunkResponse ¶
func CreateBifrostTextCompletionChunkResponse( id string, usage *schemas.BifrostLLMUsage, finishReason *string, currentChunkIndex int, requestType schemas.RequestType, ) *schemas.BifrostTextCompletionResponse
CreateBifrostTextCompletionChunkResponse creates a bifrost text completion chunk response.
func DecompressStreamBody ¶
DecompressStreamBody returns a reader for consuming the response body, with on-the-fly gzip decompression when Content-Encoding indicates gzip. The response object is NOT modified (no SetBodyStream call), so the original requestStream remains live for proper cleanup by ReleaseStreamingResponse. Clears the Content-Encoding header to prevent double-decompression.
Returns:
- io.Reader: the reader to use for scanning (gzip reader if gzip-encoded, original body stream otherwise).
- func(): cleanup function that releases the gzip reader back to the pool. Must be called (typically via defer) after streaming is complete.
func DeleteJSONField ¶
DeleteJSONField deletes a field from JSON bytes without disturbing other fields' ordering. Uses in-place byte manipulation for minimal allocations and preserves nested structure.
func DeleteModelParams ¶ added in v1.5.5
func DeleteModelParams(model string)
DeleteModelParams removes a model from the cache.
func DetectAudioMimeType ¶
DetectAudioMimeType attempts to detect the MIME type from audio file headers. Supports detection of: WAV, MP3, AIFF, AAC, OGG Vorbis, and FLAC formats.
func DrainLargePayloadRemainder ¶
DrainLargePayloadRemainder drains any unread bytes from the large payload reader. This is useful for request types that may receive an upstream response before the incoming client upload is fully consumed (for example, lightweight preflight APIs). Example failure this prevents: fronting proxy returns 502/broken-pipe when backend responds early while client is still uploading a large body.
func DrainNonSSEStreamResponse ¶
DrainNonSSEStreamResponse checks if the upstream response is a Server-Sent Events stream. If not SSE, drains the body to io.Discard to prevent bufio.Scanner buffer bloat on non-line-delimited data. Returns true if body was drained (caller should skip scanner). We intentionally do not touch valid SSE bodies here: callers must continue reading from the reader returned by DecompressStreamBody, and draining SSE in this helper would consume the stream before the scanner/manual event loop starts.
func EnrichError ¶
func EnrichError( ctx *schemas.BifrostContext, bifrostErr *schemas.BifrostError, requestBody []byte, responseBody []byte, sendBackRawRequest bool, sendBackRawResponse bool, ) *schemas.BifrostError
EnrichError attaches the raw request and response to a BifrostError. Returns the request and response from provider embedded in BifrostError.ExtraFields.
func EnsureStreamFinalizerCalled ¶ added in v1.5.5
EnsureStreamFinalizerCalled invokes the post-hook span finalizer registered on ctx, if any. Designed to be deferred as the last line of defence in a provider's streaming goroutine (next to SetupStreamCancellation's cleanup):
defer providerUtils.EnsureStreamFinalizerCalled(ctx)
On a normal stream end the finalizer is already invoked when the final chunk is processed (via completeDeferredSpan). The registration wraps the closure in sync.Once, so this safety-net call is a noop in that case. It only does real work when the streaming goroutine exits without reaching the final-chunk path — e.g. a panic mid-stream — which would otherwise leak the plugin pipeline back-reference held by the finalizer closure.
Panics inside the finalizer are recovered and logged so they never mask an in-flight panic that triggered the defer.
func ExtractHTMLErrorMessage ¶
ExtractHTMLErrorMessage extracts meaningful error information from an HTML response. It attempts to find error messages from title tags, headers, and visible text. UNUSED for now but could be useful in the future
func ExtractProviderResponseHeaders ¶
ExtractProviderResponseHeaders extracts and filters response headers from a fasthttp response. Transport-level headers are excluded.
func ExtractProviderResponseHeadersFromHTTP ¶
ExtractProviderResponseHeadersFromHTTP extracts and filters response headers from a standard net/http response. Transport-level headers are excluded. Used by providers like Bedrock that use net/http instead of fasthttp.
func FileBytesToBase64DataURL ¶
FileBytesToBase64DataURL converts raw file bytes to base64 data URL format
func FinalizeResponseWithLargeDetection ¶
func FinalizeResponseWithLargeDetection( ctx *schemas.BifrostContext, resp *fasthttp.Response, logger schemas.Logger, ) ([]byte, bool, *schemas.BifrostError)
FinalizeResponseWithLargeDetection processes the response body with optional large response detection. Takes ownership semantics: when isLargeResponse is true, the caller must NOT release resp (it's wrapped in a reader stored in context). When false, resp is unchanged and the caller should release as normal.
Returns:
- (body, false, nil) — normal path; body ready for parsing; resp NOT released.
- (nil, true, nil) — large response detected; context keys set for streaming; caller must set respOwned = false.
- (nil, false, err) — error; resp NOT released.
func GetBifrostResponseForStreamResponse ¶
func GetBifrostResponseForStreamResponse( textCompletionResponse *schemas.BifrostTextCompletionResponse, chatResponse *schemas.BifrostChatResponse, responsesStreamResponse *schemas.BifrostResponsesStreamResponse, speechStreamResponse *schemas.BifrostSpeechStreamResponse, transcriptionStreamResponse *schemas.BifrostTranscriptionStreamResponse, imageGenerationStreamResponse *schemas.BifrostImageGenerationStreamResponse, ) *schemas.BifrostResponse
GetBifrostResponseForStreamResponse converts the provided responses to a bifrost response.
func GetBudgetTokensFromReasoningEffort ¶
func GetBudgetTokensFromReasoningEffort( effort string, minBudgetTokens int, maxTokens int, ) (int, error)
GetBudgetTokensFromReasoningEffort converts reasoning effort into a reasoning token budget. effort ∈ {"none", "minimal", "low", "medium", "high", "xhigh", "max"}
func GetJSONField ¶
GetJSONField retrieves a field value from JSON bytes without parsing the entire document.
func GetMaxOutputTokens ¶
GetMaxOutputTokens returns the cached max_output_tokens for a model. Returns 0, false on cache miss or if max_output_tokens is not set.
func GetMaxOutputTokensOrDefault ¶
GetMaxOutputTokensOrDefault returns the cached max_output_tokens for a model, or the provided default value on cache miss. For Claude models, falls back to known static defaults before using the caller's default.
func GetPathFromContext ¶
GetPathFromContext gets the path from the context, if it exists, otherwise returns the default path.
func GetProviderName ¶
func GetProviderName(defaultProvider schemas.ModelProvider, customConfig *schemas.CustomProviderConfig) schemas.ModelProvider
GetProviderName extracts the provider name from custom provider configuration. If a custom provider key is specified, it returns that; otherwise, it returns the default provider. Note: CustomProviderKey is internally set by Bifrost and should always match the provider name.
func GetRandomString ¶
GetRandomString generates a random alphanumeric string of the given length.
func GetReasoningEffortFromBudgetTokens ¶
func GetReasoningEffortFromBudgetTokens( budgetTokens int, minBudgetTokens int, maxTokens int, ) string
GetReasoningEffortFromBudgetTokens maps a reasoning token budget to OpenAI reasoning effort. Valid values: none, low, medium, high
func GetRequestPath ¶
func GetRequestPath(ctx context.Context, defaultPath string, customProviderConfig *schemas.CustomProviderConfig, requestType schemas.RequestType) (string, bool)
GetRequestPath gets the request path from the context, if it exists, checking for path overrides in the custom provider config. It returns the resolved value and a boolean indicating whether the value is a full absolute URL. If the boolean is false, the returned string is a path (leading slash ensured).
func GetStreamIdleTimeout ¶
func GetStreamIdleTimeout(ctx *schemas.BifrostContext) time.Duration
GetStreamIdleTimeout reads the per-chunk idle timeout from context, falling back to DefaultStreamIdleTimeout if not set.
func HandleKeylessListModelsRequest ¶
func HandleKeylessListModelsRequest( provider schemas.ModelProvider, listFunc func() (*schemas.BifrostListModelsResponse, *schemas.BifrostError), ) (*schemas.BifrostListModelsResponse, *schemas.BifrostError)
HandleKeylessListModelsRequest wraps a list models request for keyless providers and automatically populates the KeyStatuses field with provider-level status tracking. This centralizes the status tracking logic for keyless providers.
func HandleMultipleListModelsRequests ¶
func HandleMultipleListModelsRequests( ctx *schemas.BifrostContext, keys []schemas.Key, request *schemas.BifrostListModelsRequest, listModelsByKey func(ctx *schemas.BifrostContext, key schemas.Key, request *schemas.BifrostListModelsRequest) (*schemas.BifrostListModelsResponse, *schemas.BifrostError), ) (*schemas.BifrostListModelsResponse, *schemas.BifrostError)
HandleMultipleListModelsRequests handles multiple list models requests concurrently for different keys. It launches concurrent requests for all keys and waits for all goroutines to complete. It returns the aggregated response with per-key status information or an error if the request fails.
func HandleProviderAPIError ¶
func HandleProviderAPIError(resp *fasthttp.Response, errorResp any) *schemas.BifrostError
HandleProviderAPIError processes error responses from provider APIs. It attempts to unmarshal the error response and returns a BifrostError with the appropriate status code and error information. HTML detection only runs if JSON parsing fails to avoid expensive regex operations on responses that are almost certainly valid JSON. errorResp must be a pointer to the target struct for unmarshaling.
func HandleProviderResponse ¶
func HandleProviderResponse[T any](responseBody []byte, response *T, requestBody []byte, sendBackRawRequest bool, sendBackRawResponse bool) (rawRequest interface{}, rawResponse interface{}, bifrostErr *schemas.BifrostError)
HandleProviderResponse handles common response parsing logic for provider responses. It attempts to parse the response body into the provided response type and returns either the parsed response or a BifrostError if parsing fails. If sendBackRawResponse is true, it returns the raw response interface, otherwise nil. HTML detection only runs if JSON parsing fails to avoid expensive regex operations on responses that are almost certainly valid JSON.
func HandleStreamCancellation ¶
func HandleStreamCancellation( ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, responseChan chan *schemas.BifrostStreamChunk, logger schemas.Logger, postHookSpanFinalizer func(context.Context), )
HandleStreamCancellation should be called when a streaming goroutine exits due to context cancellation. It ensures proper cleanup by: 1. Checking if StreamEndIndicator was already set (to avoid duplicate handling) 2. Setting StreamEndIndicator to true 3. Sending a cancellation error through PostHook chain
This is critical for the logging plugin to update log status from "processing" to "error" when a client disconnects mid-stream.
func HandleStreamControlSkip ¶
func HandleStreamControlSkip(bifrostErr *schemas.BifrostError) bool
HandleStreamControlSkip checks if the stream control should be skipped.
func HandleStreamTimeout ¶
func HandleStreamTimeout( ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, responseChan chan *schemas.BifrostStreamChunk, logger schemas.Logger, postHookSpanFinalizer func(context.Context), )
HandleStreamTimeout should be called when a streaming goroutine exits due to context deadline exceeded. It ensures proper cleanup by: 1. Checking if StreamEndIndicator was already set (to avoid duplicate handling) 2. Setting StreamEndIndicator to true 3. Sending a timeout error through PostHook chain
This is critical for the logging plugin to update log status from "processing" to "error" when a request times out mid-stream.
func IsHTMLResponse ¶
IsHTMLResponse checks if the response is HTML by examining the Content-Type header and/or the response body for HTML indicators.
func IsLargePayloadPassthroughEnabled ¶
IsLargePayloadPassthroughEnabled returns true when large payload mode has already prepared an upstream body reader in context.
func IsVertexMultiRegionOnlyModel ¶ added in v1.5.5
IsVertexMultiRegionOnlyModel reports whether the given model is flagged in the datasheet as only available on Google Vertex multi-region pool endpoints (aiplatform.{region}.rep.googleapis.com). Returns false on cache miss or if the flag is not set. Looks up using "vertex_ai/" prefix since model-parameters are stored with provider-prefixed keys.
func JSONFieldExists ¶
JSONFieldExists checks if a field exists in JSON bytes.
func MakeRequestWithContext ¶
func MakeRequestWithContext(ctx context.Context, client *fasthttp.Client, req *fasthttp.Request, resp *fasthttp.Response) (time.Duration, *schemas.BifrostError, func())
MakeRequestWithContext makes a request with a context and returns the latency, error, and a wait function. The wait function MUST be called (typically via defer) before releasing the request or response objects. On the normal path it is a no-op. On the context-cancellation path it blocks until the background client.Do goroutine finishes, preventing a data race between the still-running goroutine and the caller's release of req/resp.
IMPORTANT: This function does NOT truly cancel the underlying fasthttp network request if the context is done. The fasthttp client call will continue in its goroutine until it completes or times out based on its own settings. This function merely stops *waiting* for the fasthttp call and returns an error related to the context.
func MarshalSorted ¶
MarshalSorted marshals v to JSON with map keys sorted alphabetically.
func MarshalSortedIndent ¶
MarshalSortedIndent marshals v to indented JSON with map keys sorted alphabetically.
func MaterializeStreamErrorBody ¶
func MaterializeStreamErrorBody(ctx *schemas.BifrostContext, resp *fasthttp.Response)
MaterializeStreamErrorBody reads a streamed error body into resp so that resp.Body() returns the error payload for parsing. No-op when response streaming is not active.
func MergeExtraParams ¶
MergeExtraParams merges extraParams into jsonMap, handling nested maps recursively.
func MergeExtraParamsIntoJSON ¶
MergeExtraParamsIntoJSON merges extra params into serialized JSON while preserving the original key ordering. This avoids the order-destroying roundtrip through map[string]interface{} that would lose key ordering in tool schemas and other order-sensitive JSON structures.
func ModelMatchesDenylist ¶
ModelMatchesDenylist reports whether any of the candidate model IDs matches an entry in denylist, using both exact and base-model (SameBaseModel) matching. Empty candidates are skipped. Returns false immediately if denylist is empty.
func NewBifrostOperationError ¶
func NewBifrostOperationError(message string, err error) *schemas.BifrostError
NewBifrostOperationError creates a standardized error for bifrost operation errors. This helper reduces code duplication across providers that have bifrost operation errors.
func NewBifrostTimeoutError ¶
func NewBifrostTimeoutError(message string, err error) *schemas.BifrostError
NewBifrostTimeoutError creates a standardized error for provider request timeout errors. Sets StatusCode to 504 (Gateway Timeout) and Error.Type to RequestTimedOut, consistent with HandleStreamTimeout for streaming requests.
func NewConfigurationError ¶
func NewConfigurationError(message string) *schemas.BifrostError
NewConfigurationError creates a standardized error for configuration errors. This helper reduces code duplication across providers that have configuration errors.
func NewIdleTimeoutReader ¶
func NewIdleTimeoutReader(reader io.Reader, bodyStream io.Reader, timeout time.Duration) (io.Reader, func())
NewIdleTimeoutReader wraps reader with idle detection. If reader.Read() returns no data for the given timeout duration, bodyStream is closed to unblock the read. bodyStream must implement io.Closer for the timeout to take effect; if it does not, the wrapper still functions but cannot force-close the stream. Returns the wrapped reader and a cleanup function that MUST be called (via defer) when streaming is complete, to stop the timer and prevent premature closure.
func NewProviderAPIError ¶
func NewProviderAPIError(message string, err error, statusCode int, errorType *string, eventID *string) *schemas.BifrostError
NewProviderAPIError creates a standardized error for provider API errors. This helper reduces code duplication across providers that have provider API errors.
func NewUnsupportedOperationError ¶
func NewUnsupportedOperationError(requestType schemas.RequestType, providerName schemas.ModelProvider) *schemas.BifrostError
NewUnsupportedOperationError creates a standardized error for unsupported operations. This helper reduces code duplication across providers that don't support certain operations.
func ParseAndSetRawRequest ¶
func ParseAndSetRawRequest(extraFields *schemas.BifrostResponseExtraFields, jsonBody []byte)
ParseAndSetRawRequest stores the raw request body in the extra fields. Uses json.RawMessage to preserve the exact JSON bytes (including key ordering). The body is compacted to remove insignificant whitespace, which prevents literal newlines from breaking SSE data-line framing during streaming.
func ParseAndSetRawRequestIfJSON ¶
func ParseAndSetRawRequestIfJSON(fasthttpReq *fasthttp.Request, extraFields *schemas.BifrostResponseExtraFields)
ParseAndSetRawRequestIfJSON parses the request body if it's JSON and sets the raw request in the extra fields.
func ParseOpenAIUsageFromBytes ¶
func ParseOpenAIUsageFromBytes(data []byte) *schemas.BifrostLLMUsage
ParseOpenAIUsageFromBytes parses OpenAI-format usage from raw JSON bytes into BifrostLLMUsage. Handles both Chat Completions (prompt_tokens/completion_tokens) and Responses API (input_tokens/output_tokens) field names. Expects the "usage" object bytes directly, not the full response body.
func PrepareResponseStreaming ¶
func PrepareResponseStreaming(ctx *schemas.BifrostContext, client *fasthttp.Client, resp *fasthttp.Response) *fasthttp.Client
PrepareResponseStreaming configures response body streaming when a large response threshold is set in context. Returns the client to use for MakeRequestWithContext. When threshold > 0: sets resp.StreamBody = true and returns a streaming-enabled client. When threshold <= 0: returns the original client unchanged (no-op for feature-off path).
func ProcessAndSendBifrostError ¶
func ProcessAndSendBifrostError( ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, bifrostErr *schemas.BifrostError, responseChan chan *schemas.BifrostStreamChunk, logger schemas.Logger, postHookSpanFinalizer func(context.Context), )
ProcessAndSendBifrostError handles post-hook processing and sends the bifrost error to the channel. This utility reduces code duplication across streaming implementations by encapsulating the common pattern of running post hooks, handling errors, and sending responses with proper context cancellation handling. It also completes the deferred LLM span when the final chunk is sent (StreamEndIndicator is true).
func ProcessAndSendError ¶
func ProcessAndSendError( ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, err error, responseChan chan *schemas.BifrostStreamChunk, logger schemas.Logger, postHookSpanFinalizer func(context.Context), )
ProcessAndSendError handles post-hook processing and sends the error to the channel. This utility reduces code duplication across streaming implementations by encapsulating the common pattern of running post hooks, handling errors, and sending responses with proper context cancellation handling.
func ProcessAndSendResponse ¶
func ProcessAndSendResponse( ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, response *schemas.BifrostResponse, responseChan chan *schemas.BifrostStreamChunk, postHookSpanFinalizer func(context.Context), )
ProcessAndSendResponse handles post-hook processing and sends the response to the channel. This utility reduces code duplication across streaming implementations by encapsulating the common pattern of running post hooks, handling errors, and sending responses with proper context cancellation handling. It also completes the deferred LLM span when the final chunk is sent (StreamEndIndicator is true).
func ProviderIsResponsesAPINative ¶
func ProviderIsResponsesAPINative(providerName schemas.ModelProvider) bool
func ProviderSendsDoneMarker ¶
func ProviderSendsDoneMarker(providerName schemas.ModelProvider) bool
ProviderSendsDoneMarker returns true if the provider sends the [DONE] marker in streaming responses. Some OpenAI-compatible providers (like Cerebras) don't send [DONE] and instead end the stream after sending the finish_reason. This function helps determine the correct stream termination logic.
func ReleaseBrotliReader ¶
ReleaseBrotliReader returns a brotli.Reader to the pool. Brotli readers have no Close method; Reset(nil) is sufficient to drop the reference to the underlying reader.
func ReleaseFlateReader ¶
func ReleaseFlateReader(fr io.ReadCloser)
ReleaseFlateReader closes and returns a deflate reader to the pool.
func ReleaseGzipReader ¶
ReleaseGzipReader closes and returns a gzip.Reader to the pool.
func ReleaseStreamingResponse ¶
ReleaseStreamingResponse releases a streaming response by draining the body stream and releasing the response.
func ReleaseZstdDecoder ¶
ReleaseZstdDecoder returns a zstd.Decoder to the pool. Unlike other decoders, zstd.Close() is terminal (stops background goroutines permanently). We only call Reset(nil) to release the source reference, then re-pool. Close is never called on pooled decoders.
func RewriteLargePayloadModelInJSONPrefix ¶
func RewriteLargePayloadModelInJSONPrefix(reader io.Reader, fromModel, toModel string) (io.Reader, int)
RewriteLargePayloadModelInJSONPrefix reads the first 256KB of a streaming body, rewrites the "model" JSON value from fromModel to toModel, and returns a combined reader (rewritten prefix + remaining stream) with the size delta.
func RewriteLargePayloadModelInMultipartPrefix ¶
func RewriteLargePayloadModelInMultipartPrefix(reader io.Reader, fromModel, toModel string) (io.Reader, int)
RewriteLargePayloadModelInMultipartPrefix reads the first 256KB of a streaming multipart body, finds the model form field value, and rewrites it from fromModel to toModel. The model field appears early in multipart bodies (typically the first form field), so scanning the prefix is sufficient.
func SendCreatedEventResponsesChunk ¶
func SendCreatedEventResponsesChunk(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, startTime time.Time, responseChan chan *schemas.BifrostStreamChunk, postHookSpanFinalizer func(context.Context))
SendCreatedEventResponsesChunk sends a ResponsesStreamResponseTypeCreated event.
func SendInProgressEventResponsesChunk ¶
func SendInProgressEventResponsesChunk(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, startTime time.Time, responseChan chan *schemas.BifrostStreamChunk, postHookSpanFinalizer func(context.Context))
SendInProgressEventResponsesChunk sends a ResponsesStreamResponseTypeInProgress event
func SetCacheMissHandler ¶
func SetCacheMissHandler(fn func(model string) *ModelParams)
SetCacheMissHandler registers a callback invoked on cache miss. The handler should query the DB for the model's parameters and return them, or nil if not found. The result is automatically cached.
func SetExtraHeaders ¶
func SetExtraHeaders(ctx context.Context, req *fasthttp.Request, extraHeaders map[string]string, skipHeaders []string)
SetExtraHeaders sets additional headers from NetworkConfig to the fasthttp request. This allows users to configure custom headers for their provider requests. Header keys are canonicalized using textproto.CanonicalMIMEHeaderKey to avoid duplicates. It accepts a list of headers (all canonicalized) to skip for security reasons. Headers are only set if they don't already exist on the request to avoid overwriting important headers.
func SetExtraHeadersHTTP ¶
func SetExtraHeadersHTTP(ctx context.Context, req *http.Request, extraHeaders map[string]string, skipHeaders []string)
SetExtraHeadersHTTP sets additional headers from NetworkConfig to the standard HTTP request. This allows users to configure custom headers for their provider requests. Header keys are canonicalized using textproto.CanonicalMIMEHeaderKey to avoid duplicates. It accepts a list of headers (all canonicalized) to skip for security reasons. Headers are only set if they don't already exist on the request to avoid overwriting important headers.
func SetJSONField ¶
SetJSONField sets a field in JSON bytes without disturbing other fields' ordering. Uses in-place byte manipulation for minimal allocations and preserves nested structure.
func SetModelParams ¶
func SetModelParams(model string, params ModelParams)
SetModelParams sets the parameters for a model in the cache.
func SetStreamIdleTimeoutIfEmpty ¶
func SetStreamIdleTimeoutIfEmpty(ctx *schemas.BifrostContext, configSeconds int)
SetStreamIdleTimeoutIfEmpty sets the stream idle timeout on the context from the provider's network config, but only if no valid timeout is already present. This allows upstream layers (transport, headers) to set the timeout first, with the provider config acting as a fallback.
func SetupStreamCancellation ¶
func SetupStreamCancellation(ctx context.Context, bodyStream io.Reader, logger schemas.Logger) (cleanup func())
SetupStreamCancellation spawns a goroutine that closes the body stream when the context is cancelled or deadline exceeded, unblocking any blocked Read/Scan operations. Returns a cleanup function that MUST be called when streaming is done to prevent the goroutine from closing the stream during normal operation. Works with both fasthttp's BodyStream() (io.Reader) and net/http's resp.Body (io.ReadCloser).
func SetupStreamingPassthrough ¶
func SetupStreamingPassthrough(ctx *schemas.BifrostContext, resp *fasthttp.Response) bool
SetupStreamingPassthrough configures large response passthrough for streaming responses when large payload mode is active. Wraps the response body stream in a LargeResponseReader and sets context keys for the transport layer. Returns true if passthrough was set up. When true, the caller should return a closed channel and must NOT release resp — it's owned by the reader in context.
func ShouldSendBackRawRequest ¶
ShouldSendBackRawRequest checks if raw request bytes should be captured. bifrost.go always writes BifrostContextKeyCaptureRawRequest before provider dispatch, combining provider config, per-request overrides, and store_raw_request_response. The default parameter is a fallback for callers outside the normal bifrost dispatch path.
func ShouldSendBackRawResponse ¶
ShouldSendBackRawResponse checks if raw response bytes should be captured. bifrost.go always writes BifrostContextKeyCaptureRawResponse before provider dispatch, combining provider config, per-request overrides, and store_raw_request_response. The default parameter is a fallback for callers outside the normal bifrost dispatch path.
func StripVideoIDProviderSuffix ¶
func StripVideoIDProviderSuffix(videoID string, provider schemas.ModelProvider) string
StripVideoIDProviderSuffix removes ":<provider>" from a video ID if present.
func ToDisplayName ¶ added in v1.5.5
ToDisplayName converts a raw model ID or alias key into a human-readable display name. Splits on "-" or "_", title-cases each word, and joins with spaces.
"gemini-pro" → "Gemini Pro" "claude_3_opus" → "Claude 3 Opus" "gpt-4-turbo" → "Gpt 4 Turbo"
Types ¶
type FilterResult ¶ added in v1.5.5
type FilterResult struct {
// ResolvedID is the user-facing model name to use as the ID suffix.
// If the model matched an alias VALUE, this is the alias KEY.
// Otherwise this is the original model ID from the API response.
//
// Example: API returns "gpt-4-turbo", aliases={"my-gpt4":"gpt-4-turbo"}
// → ResolvedID = "my-gpt4"
// Example: API returns "gpt-3.5-turbo", no alias match
// → ResolvedID = "gpt-3.5-turbo"
ResolvedID string
// AliasValue is the provider-specific model ID when the model was matched
// via an alias. Set as the model.Alias field so callers know the underlying ID.
// Empty when the model was matched directly (no alias involved).
//
// Example: API returns "gpt-4-turbo", alias key "my-gpt4" matched
// → AliasValue = "gpt-4-turbo"
AliasValue string
}
FilterResult is the outcome of running Pipeline.FilterModel for a single model from the provider's API response. Each returned result represents one alias entry (or the raw model ID when no alias matched) that passed all filters.
type JSONLParseResult ¶
type JSONLParseResult struct {
Errors []schemas.BatchError
}
JSONLParseResult holds parsed items and any line-level errors encountered during parsing.
func ParseJSONL ¶
func ParseJSONL(data []byte, parseLine func(line []byte) error) JSONLParseResult
ParseJSONL parses JSONL data line by line, calling the provided callback for each line. It collects parse errors with line numbers rather than silently skipping failed lines. The callback receives the line bytes and returns an error if parsing fails. This function operates directly on byte slices to avoid unnecessary string conversions.
type LargeResponseReader ¶
type LargeResponseReader struct {
io.Reader
Resp *fasthttp.Response
// contains filtered or unexported fields
}
LargeResponseReader wraps an io.Reader and releases the fasthttp response on Close. Used by providers to keep the response alive while the transport streams it to the client.
func (*LargeResponseReader) Close ¶
func (r *LargeResponseReader) Close() error
Close drains any unconsumed body stream and releases the underlying fasthttp response back to the pool. Draining prevents "whitespace in header" errors on connection reuse when the client disconnects before the full response is consumed (see: fasthttp#1743).
When the body was already fully consumed through the Reader chain (consumed == true), the drain is skipped. For identity-encoded responses (no Content-Length), the body stream is a fasthttp closeReader that blocks until the TCP connection closes — which can take minutes if the upstream server keeps the connection alive.
type ListModelsPipeline ¶ added in v1.5.5
type ListModelsPipeline struct {
AllowedModels schemas.WhiteList
BlacklistedModels schemas.BlackList
// Aliases maps user-facing alias keys to provider-specific model IDs.
// e.g. {"my-gpt4": "gpt-4-turbo-2024-04-09"}
Aliases map[string]string
Unfiltered bool
ProviderKey schemas.ModelProvider
// MatchFns is the ordered list of equivalence functions used for every
// model ID comparison. Use DefaultMatchFns() for standard behaviour;
// providers may append additional fns (e.g. Bedrock's region-prefix remover).
MatchFns []MatchFn
}
Pipeline holds all the context needed to filter and backfill models in a single ListModels response. Construct one per ToBifrostListModelsResponse call and use its methods instead of passing params + matchFns to every function.
pipeline := &providerUtils.ListModelsPipeline{
AllowedModels: key.Models,
BlacklistedModels: key.BlacklistedModels,
Aliases: key.Aliases,
Unfiltered: request.Unfiltered,
ProviderKey: schemas.OpenAI,
MatchFns: providerUtils.DefaultMatchFns(),
}
if pipeline.ShouldEarlyExit() { return empty }
result := pipeline.FilterModel(model.ID)
pipeline.BackfillModels(included)
func (*ListModelsPipeline) BackfillModels ¶ added in v1.5.5
func (p *ListModelsPipeline) BackfillModels(included map[string]bool) []schemas.Model
BackfillModels adds model entries that were configured by the caller but not returned by the provider's API response (or not matched during filtering).
The `included` map tracks model IDs (lowercased) already added during the filter pass, used to avoid duplicates.
Two cases depending on whether the allowlist is restricted:
Case A — allowlist restricted (caller specified explicit model names):
Add each allowlist entry that is not yet in `included`, skip if blacklisted.
If the entry has an alias mapping (aliases[entry] exists), set Alias to the
provider-specific ID so callers can route to the right model.
Example: allowedModels=["my-gpt4","gpt-3.5"], aliases={"my-gpt4":"gpt-4-turbo"}
"my-gpt4" not in included → add {ID:"openai/my-gpt4", Alias:"gpt-4-turbo"}
"gpt-3.5" not in included → add {ID:"openai/gpt-3.5"}
Case B — allowlist wildcard (*) only:
We don't know all model names (no explicit list), so we only backfill entries
that were explicitly configured via aliases and not yet matched from the API.
Note: an empty allowlist is deny-all (IsRestricted()==true), not wildcard.
Example: aliases={"my-gpt4":"gpt-4-turbo"}, "my-gpt4" not in included
→ add {ID:"openai/my-gpt4", Alias:"gpt-4-turbo"}
Blacklist always wins — nothing blacklisted is added in either case.
func (*ListModelsPipeline) FilterModel ¶ added in v1.5.5
func (p *ListModelsPipeline) FilterModel(modelID string) []FilterResult
FilterModel applies the full filter pipeline for a single model from the API response.
Steps:
- Resolve name — check alias VALUES for a match (uses MatchFns). If matched: resolvedName = alias KEY, aliasValue = provider ID. If not matched: resolvedName = original modelID, aliasValue = "".
- Allowlist check (only when allowlist is restricted, i.e. not wildcard): Skip if resolvedName is not in AllowedModels.
- Blacklist check (always): Skip if resolvedName is blacklisted. Blacklist takes precedence over everything.
- Return one FilterResult per passing candidate.
An empty slice means the model should be skipped entirely. When multiple aliases map to the same provider model ID, each alias that passes the filters produces its own FilterResult entry.
Examples:
allowedModels=["my-gpt4"], aliases={"my-gpt4":"gpt-4-turbo"}, blacklist=[]
FilterModel("gpt-4-turbo") → [{ResolvedID:"my-gpt4", AliasValue:"gpt-4-turbo"}]
FilterModel("gpt-3.5") → [] (not in allowlist)
allowedModels=*, aliases={"my-gpt4":"gpt-4-turbo","gpt4-alias":"gpt-4-turbo"}, blacklist=[]
FilterModel("gpt-4-turbo") → [{ResolvedID:"gpt-4-turbo", AliasValue:""},
{ResolvedID:"gpt4-alias", AliasValue:"gpt-4-turbo"},
{ResolvedID:"my-gpt4", AliasValue:"gpt-4-turbo"}]
allowedModels=["gpt-3.5"], aliases={}, blacklist=[]
FilterModel("gpt-3.5") → [{ResolvedID:"gpt-3.5", AliasValue:""}]
FilterModel("gpt-4") → []
func (*ListModelsPipeline) ShouldEarlyExit ¶ added in v1.5.5
func (p *ListModelsPipeline) ShouldEarlyExit() bool
ShouldEarlyExit reports whether ToBifrostListModelsResponse should immediately return an empty response without processing any models.
Returns true when:
- not unfiltered AND allowlist is empty AND no aliases configured (there is nothing to match against — all models would be filtered out anyway)
- not unfiltered AND blacklist blocks everything
Note: allowlist empty + aliases present → do NOT early exit. The aliases drive backfill in the wildcard-allowlist case (Case B of BackfillModels).
type MatchFn ¶ added in v1.5.5
MatchFn reports whether two model ID strings should be treated as equivalent. Functions are applied in order during every comparison — the first one that returns true short-circuits the rest.
Example built-in fns (see DefaultMatchFns):
exactMatch("gpt-4", "gpt-4") → true
sameBaseModel("claude-3-5-sonnet-20241022", "claude-3-5") → true
func DefaultMatchFns ¶ added in v1.5.5
func DefaultMatchFns() []MatchFn
DefaultMatchFns returns the standard matching functions used by most providers. Currently only performs case-insensitive exact matching.
SameBaseModel (strips version suffixes, e.g. "claude-3-5-sonnet-20241022" ≈ "claude-3-5-sonnet") is intentionally excluded — users should use aliases for explicit version-to-base-name mapping. It can be appended here if fuzzy base-model matching is ever needed globally.
type ModelParams ¶
type ModelParams struct {
MaxOutputTokens *int
IsVertexMultiRegionOnly *bool // true when model is only available on Vertex multi-region pool endpoints (rep.googleapis.com)
}
ModelParams holds cached parameters for a model. Add new fields here as more model-level parameters need caching.
func GetModelParams ¶
func GetModelParams(model string) (ModelParams, bool)
GetModelParams returns the cached parameters for a model. On cache miss, calls the registered miss handler (if any) to load from DB.
type PCMConfig ¶
type PCMConfig struct {
SampleRate int // Sample rate in Hz (e.g., 24000)
NumChannels int // Number of audio channels (1 = mono, 2 = stereo)
BitsPerSample int // Bits per sample (e.g., 16)
}
PCMConfig holds the configuration for PCM audio data
func DefaultGeminiPCMConfig ¶
func DefaultGeminiPCMConfig() PCMConfig
DefaultGeminiPCMConfig returns the default PCM configuration for Gemini TTS Gemini TTS returns audio in PCM format with the following specs: - Format: signed 16-bit little-endian (s16le) - Sample rate: 24000 Hz - Channels: 1 (mono)
type RequestBodyConverter ¶
type RequestBodyConverter func() (RequestBodyWithExtraParams, error)
type RequestBodyGetter ¶
type RequestBodyGetter interface {
GetRawRequestBody() []byte
}
type RequestBodyWithExtraParams ¶
type RequestBodyWithExtraParams interface {
GetExtraParams() map[string]interface{}
}
type SSEDataReader ¶
SSEDataReader reads SSE data-only events (Format A: OpenAI, Gemini, Cohere, etc.). ReadDataLine returns the next SSE data payload, stripping the "data:" prefix. Returns (nil, io.EOF) at end of stream or on "data: [DONE]".
func GetSSEDataReader ¶
func GetSSEDataReader(ctx *schemas.BifrostContext, reader io.Reader) SSEDataReader
GetSSEDataReader returns an SSEDataReader for the given reader. If enterprise has injected an SSEReaderFactory via context, uses that. Otherwise returns a default implementation wrapping bufio.NewScanner.
type SSEEventReader ¶
SSEEventReader reads SSE events with type and data (Format B: Anthropic, Replicate, etc.). ReadEvent returns the complete event once an empty-line delimiter is encountered. Multiple "data:" lines within one event are concatenated with newlines. Returns ("", nil, io.EOF) at end of stream.
func GetSSEEventReader ¶
func GetSSEEventReader(ctx *schemas.BifrostContext, reader io.Reader) SSEEventReader
GetSSEEventReader returns an SSEEventReader for the given reader. If enterprise has injected an SSEReaderFactory via context, uses that. Otherwise returns a default implementation wrapping bufio.NewScanner.
type SSEReaderFactory ¶
type SSEReaderFactory struct {
NewDataReader func(reader io.Reader) SSEDataReader
NewEventReader func(reader io.Reader) SSEEventReader
}
SSEReaderFactory creates SSE readers for streaming response processing. Enterprise injects this via BifrostContextKeySSEReaderFactory to replace the default bufio.Scanner-based implementations with streaming readers.
type SerialListHelper ¶
type SerialListHelper struct {
Keys []schemas.Key
Cursor *schemas.SerialCursor
Logger schemas.Logger
}
SerialListHelper manages serial key pagination for list operations. It ensures that all pages from one key are exhausted before moving to the next, guaranteeing only one API call per pagination request regardless of key count.
func NewSerialListHelper ¶
func NewSerialListHelper(keys []schemas.Key, encodedCursor *string, logger schemas.Logger) (*SerialListHelper, error)
NewSerialListHelper creates a new SerialListHelper from the provided keys and encoded cursor. If the cursor is empty or nil, pagination starts from the first key. If the cursor is invalid, an error is returned.
func (*SerialListHelper) BuildNextCursor ¶
func (h *SerialListHelper) BuildNextCursor(hasMore bool, nativeCursor string) (string, bool)
BuildNextCursor creates the cursor for the next pagination request. Parameters:
- hasMore: whether the current key has more pages
- nativeCursor: the native cursor returned by the current key's API
Returns:
- encodedCursor: the encoded cursor for the next request (empty if all keys exhausted)
- moreAvailable: true if there are more results available (either from current key or remaining keys)
func (*SerialListHelper) GetCurrentKey ¶
func (h *SerialListHelper) GetCurrentKey() (schemas.Key, string, bool)
GetCurrentKey returns the key to query and its native cursor. Returns (key, nativeCursor, true) if there's a key to query. Returns (Key{}, "", false) if all keys are exhausted.
func (*SerialListHelper) GetCurrentKeyIndex ¶
func (h *SerialListHelper) GetCurrentKeyIndex() int
GetCurrentKeyIndex returns the current key index being processed.
func (*SerialListHelper) HasMoreKeys ¶
func (h *SerialListHelper) HasMoreKeys() bool
HasMoreKeys returns true if there are more keys after the current one.
type StreamTerminalDetector ¶ added in v1.5.5
type StreamTerminalDetector struct {
// contains filtered or unexported fields
}
StreamTerminalDetector incrementally parses stream frames and detects semantic completion markers such as finishReason or [DONE].
func (*StreamTerminalDetector) ObserveChunk ¶ added in v1.5.5
func (d *StreamTerminalDetector) ObserveChunk(chunk []byte) bool
ObserveChunk ingests a new raw stream chunk and returns true if a terminal marker was detected in a parsed frame payload.