providers

package
v1.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2026 License: Apache-2.0 Imports: 33 Imported by: 11

Documentation

Overview

Package providers implements multi-LLM provider support with unified interfaces.

Package providers implements multi-LLM provider support with unified interfaces.

This package provides a common abstraction for predict-based LLM providers including OpenAI, Anthropic Claude, and Google Gemini. It handles:

  • Predict completion requests with streaming support
  • Tool/function calling with provider-specific formats
  • Cost tracking and token usage calculation
  • Rate limiting and error handling

All providers implement the Provider interface for basic predict, and ToolSupport interface for function calling capabilities.

Package providers contains provider contract test helpers.

This file contains exported test helpers that can be used by provider implementations in subpackages to validate their contract compliance.

Index

Constants

View Source
const (
	ContentTypeHeader   = "Content-Type"
	AuthorizationHeader = "Authorization"
	ApplicationJSON     = "application/json"
	BearerPrefix        = "Bearer "
)

Common HTTP constants for embedding providers.

View Source
const (
	DefaultMaxIdleConns        = 1000
	DefaultMaxIdleConnsPerHost = 100
	// DefaultMaxConnsPerHost is the default cap on total TCP connections
	// per upstream host. Zero means unlimited, matching Go's stdlib
	// http.Transport default. Real LLM providers enforce rate limits
	// server-side; a client-side connection cap adds a hidden throughput
	// ceiling that's hard to debug (see #916).
	DefaultMaxConnsPerHost     = 0
	DefaultIdleConnTimeout     = 90 * time.Second
	DefaultTLSHandshakeTimeout = 10 * time.Second
	DefaultDialTimeout         = 30 * time.Second
	DefaultDialKeepAlive       = 30 * time.Second
)

Connection pooling defaults for HTTP transports shared across providers.

View Source
const (

	// MaxAggregateMediaSize is the maximum total size of all media items in a single
	// request (100MB). This prevents excessive memory usage when processing messages
	// with many media attachments.
	MaxAggregateMediaSize int64 = 100 * 1024 * 1024

	// MaxMediaItems is the maximum number of media items allowed per message/request.
	// This prevents abuse and excessive resource consumption.
	MaxMediaItems = 20
)
View Source
const (
	DefaultMaxRetries     = 3
	DefaultInitialDelayMs = 500
	DefaultBackoff        = "exponential"
)

Default retry constants.

View Source
const (
	DefaultStreamRetryMaxAttempts  = 2
	DefaultStreamRetryInitialDelay = 250 * time.Millisecond
	DefaultStreamRetryMaxDelay     = 2 * time.Second
)

Default values for StreamRetryPolicy. Kept small on purpose: streaming retry targets transient h2 stream resets, not generic 5xx storms, and the wrong default is "retry aggressively". See docs/local-backlog/STREAMING_RETRY_AT_SCALE.md.

View Source
const (
	// DefaultGeminiBaseURL is the default base URL for Gemini API (includes version path)
	DefaultGeminiBaseURL = "https://generativelanguage.googleapis.com/v1beta"
)
View Source
const (
	// DefaultMaxPayloadSize is the default maximum request payload size (100 MB).
	// This is generous but prevents accidentally sending multi-GB payloads.
	DefaultMaxPayloadSize int64 = 100 * 1024 * 1024
)

Payload size limits.

View Source
const DefaultStreamBufferSize = 32

DefaultStreamBufferSize is the default buffer size for streaming channels. A buffer of 32 prevents the streaming goroutine from blocking on every send when the consumer is slow, while keeping memory usage reasonable.

View Source
const DefaultStreamIdleTimeout = 30 * time.Second

DefaultStreamIdleTimeout is the maximum time to wait between stream chunks before considering the stream stalled. If no data is received within this duration, the response body is closed to unblock the reader.

View Source
const MaxErrorResponseSize int64 = 1 << 20

MaxErrorResponseSize is the maximum size for error response bodies (1 MB). Error responses should be small; this prevents reading huge bodies on failures.

Variables

View Source
var ErrPayloadTooLarge = errors.New("request payload too large")

ErrPayloadTooLarge is returned when a request payload exceeds the provider's configured maximum size (MaxRequestPayloadSize).

View Source
var ErrStreamIdleTimeout = errors.New("stream idle timeout: no data received")

ErrStreamIdleTimeout is returned when no data is received within the idle timeout.

Functions

func CheckHTTPError added in v1.1.0

func CheckHTTPError(resp *http.Response, url string) error

CheckHTTPError checks if HTTP response is an error and returns formatted error with body

func DefaultRetryPolicy added in v1.3.10

func DefaultRetryPolicy() pipeline.RetryPolicy

DefaultRetryPolicy returns a RetryPolicy with sensible defaults: 3 retries, exponential backoff, 500ms initial delay.

func DoWithRetry added in v1.3.10

func DoWithRetry(
	ctx context.Context,
	policy pipeline.RetryPolicy,
	providerName string,
	doFn DoRequestFunc,
) (*http.Response, error)

DoWithRetry executes doFn with retry logic according to the given policy. It retries on retryable HTTP status codes (429, 502, 503, 504) and transient network errors. The Retry-After header is honored for 429 responses. On retryable HTTP errors the response body is closed before retrying. The caller is responsible for closing the body of the final returned response.

func ExtractAPIKey added in v1.3.2

func ExtractAPIKey(cred Credential) string

ExtractAPIKey extracts an API key string from a Credential, if it is an APIKeyCredential. Returns an empty string if the credential is nil, not an api_key type, or does not implement the APIKey() method.

func ExtractOrderedEmbeddings added in v1.1.6

func ExtractOrderedEmbeddings[T any](
	data []T,
	getIndex func(T) int,
	getEmbedding func(T) []float32,
	expectedCount int,
) ([][]float32, error)

ExtractOrderedEmbeddings extracts embeddings from indexed response data and places them in the correct order. Returns an error if count doesn't match.

func HasAudioSupport added in v1.1.0

func HasAudioSupport(p Provider) bool

HasAudioSupport checks if a provider supports audio inputs

func HasDocumentSupport added in v1.1.9

func HasDocumentSupport(p Provider) bool

HasDocumentSupport checks if a provider supports document inputs

func HasImageSupport added in v1.1.0

func HasImageSupport(p Provider) bool

HasImageSupport checks if a provider supports image inputs

func HasVideoSupport added in v1.1.0

func HasVideoSupport(p Provider) bool

HasVideoSupport checks if a provider supports video inputs

func HostFromURL added in v1.4.2

func HostFromURL(raw string) string

HostFromURL extracts just the host portion (without scheme or path) from a URL string, intended for use as a Prometheus label on streaming metrics. Returns an empty string on parse error — callers treat empty-host labels as "unknown host" rather than failing.

func IsFormatSupported added in v1.1.0

func IsFormatSupported(p Provider, contentType, mimeType string) bool

IsFormatSupported checks if a provider supports a specific media format (MIME type)

func IsRetryableStreamError added in v1.4.2

func IsRetryableStreamError(err error) bool

IsRetryableStreamError returns true if the error looks like a transient streaming failure that is safe to retry from the pre-first-chunk window.

This deliberately covers a narrower set than isRetryableError in retry.go: we want h2 stream resets, TCP resets, TLS close_notify races, and idle connection reuse failures — but never context cancellation, deadline, or application-layer parse errors.

func IsRetryableStreamStatus added in v1.4.2

func IsRetryableStreamStatus(code int) bool

IsRetryableStreamStatus returns true for HTTP status codes that are worth retrying on a streaming request. Mirrors isRetryableStatusCode but is named distinctly so future divergence (e.g. treating 409 as retryable for Responses API) does not mutate non-streaming semantics.

func IsStreamIdleTimeout added in v1.3.10

func IsStreamIdleTimeout(err error) bool

IsStreamIdleTimeout checks if an error is (or wraps) a stream idle timeout.

func IsTransient added in v1.4.2

func IsTransient(err error) bool

IsTransient returns true if err represents a transient provider failure (retryable HTTP status or connection-level error). Uses errors.As to traverse wrapped error chains. Context cancellation and deadline errors are never transient — they represent deliberate caller action.

func IsValidationAbort

func IsValidationAbort(err error) bool

IsValidationAbort checks if an error is a validation abort

func LoadFileAsBase64 deprecated added in v1.1.0

func LoadFileAsBase64(filePath string) (string, error)

LoadFileAsBase64 reads a file and returns its content as a base64-encoded string.

Deprecated: Use MediaLoader.GetBase64Data instead for better functionality including storage reference support, URL loading, and proper context handling.

This function is kept for backward compatibility but will be removed in a future version. It now delegates to the new MediaLoader implementation.

func LogEmbeddingRequest added in v1.1.6

func LogEmbeddingRequest(provider, model string, textCount int, start time.Time)

LogEmbeddingRequest logs a completed embedding request with common fields.

func LogEmbeddingRequestWithTokens added in v1.1.6

func LogEmbeddingRequestWithTokens(provider, model string, textCount, tokens int, start time.Time)

LogEmbeddingRequestWithTokens logs a completed embedding request with token count.

func MarshalRequest added in v1.1.6

func MarshalRequest(req any) ([]byte, error)

MarshalRequest marshals a request body to JSON with standardized error handling.

func NewInstrumentedTransport added in v1.3.12

func NewInstrumentedTransport(base http.RoundTripper) http.RoundTripper

NewInstrumentedTransport wraps an http.RoundTripper with OpenTelemetry instrumentation. This propagates trace context (W3C traceparent header) on outgoing requests and creates client-side HTTP spans. When no TracerProvider is configured, the wrapper is a near-zero-cost passthrough.

func NewPooledTransport added in v1.3.2

func NewPooledTransport() *http.Transport

NewPooledTransport creates an *http.Transport configured with default connection pooling settings suitable for high-throughput provider communication. Equivalent to NewPooledTransportWithOptions(HTTPTransportOptions{}).

func NewPooledTransportWithOptions added in v1.4.2

func NewPooledTransportWithOptions(opts HTTPTransportOptions) *http.Transport

NewPooledTransportWithOptions creates an *http.Transport with the given pool configuration. Zero-valued fields in opts fall back to the package-level defaults. The resulting transport is otherwise identical to NewPooledTransport (same TLS minimum version, same dial timeouts, same HTTP/2 upgrade policy).

func ParsePlatformHTTPError added in v1.3.1

func ParsePlatformHTTPError(platform string, statusCode int, body []byte) error

ParsePlatformHTTPError extracts a human-readable error from platform-specific HTTP error responses (Bedrock, Vertex, Azure). These platforms return JSON like {"message":"..."} on HTTP 4xx/5xx. Falls back to raw body if parsing fails. When platform is empty, returns a generic error with the raw body.

func ReadErrorBody added in v1.3.10

func ReadErrorBody(body io.Reader) []byte

ReadErrorBody reads and returns an error response body, limiting the size to MaxErrorResponseSize. Error responses should be small; this is a safety net.

func ReadResponseBody added in v1.3.10

func ReadResponseBody(body io.Reader) ([]byte, error)

ReadResponseBody reads and returns the response body, limiting the size to DefaultMaxPayloadSize to prevent unbounded memory consumption.

func RegisterProviderFactory added in v1.1.0

func RegisterProviderFactory(providerType string, factory ProviderFactory)

RegisterProviderFactory registers a factory function for a provider type

func ResetDefaultStreamMetrics added in v1.4.2

func ResetDefaultStreamMetrics()

ResetDefaultStreamMetrics clears the process-wide instance. Intended for tests only — production code should not need to reset metrics.

func RunProviderContractTests added in v1.1.9

func RunProviderContractTests(t *testing.T, config ProviderContractTests)

RunProviderContractTests executes all contract tests against a provider. This should be called from each provider's test file.

func SetErrorResponse added in v1.1.0

func SetErrorResponse(predictResp *PredictionResponse, respBody []byte, start time.Time)

SetErrorResponse sets latency and raw body on error responses

func SkipIfNoCredentials added in v1.1.9

func SkipIfNoCredentials(t *testing.T, provider Provider)

SkipIfNoCredentials skips the test if API credentials are not available. This is a helper for integration tests that need real API access.

func StringPtr added in v1.1.0

func StringPtr(s string) *string

StringPtr is a helper function that returns a pointer to a string. This is commonly used across provider implementations for optional fields.

func SupportsMultimodal added in v1.1.0

func SupportsMultimodal(p Provider) bool

SupportsMultimodal checks if a provider implements multimodal support

func UnmarshalJSON added in v1.1.0

func UnmarshalJSON(respBody []byte, v any, predictResp *PredictionResponse, start time.Time) error

UnmarshalJSON unmarshals JSON with error recovery that sets latency and raw response

func UnmarshalResponse added in v1.1.6

func UnmarshalResponse(body []byte, resp any) error

UnmarshalResponse unmarshals a response body from JSON with standardized error handling.

func ValidateMultimodalMessage added in v1.1.0

func ValidateMultimodalMessage(p Provider, msg types.Message) error

ValidateMultimodalMessage checks if a message's multimodal content is supported by the provider

func ValidateMultimodalRequest added in v1.1.0

func ValidateMultimodalRequest(p Provider, req *PredictionRequest) error

ValidateMultimodalRequest validates all messages in a predict request for multimodal compatibility This is a helper function to reduce duplication across provider implementations

func ValidatePredictReturnsLatency added in v1.1.9

func ValidatePredictReturnsLatency(t *testing.T, provider Provider)

ValidatePredictReturnsLatency verifies that Predict() returns a response with non-zero latency. This is the critical test that would have caught the production bug! Exported for use in provider-specific regression tests.

func ValidatePredictWithToolsReturnsLatency added in v1.1.9

func ValidatePredictWithToolsReturnsLatency(t *testing.T, provider Provider)

ValidatePredictWithToolsReturnsLatency verifies that PredictWithTools() returns latency. This test is CRITICAL - it would have caught the production bug where PredictWithTools didn't set latency! Exported for use in provider-specific regression tests.

Types

type AudioStreamingCapabilities added in v1.1.0

type AudioStreamingCapabilities struct {
	// SupportedEncodings lists supported audio encodings
	// Common values: "pcm", "opus", "mp3", "aac"
	SupportedEncodings []string `json:"supported_encodings"`

	// SupportedSampleRates lists supported sample rates in Hz
	// Common values: 8000, 16000, 24000, 44100, 48000
	SupportedSampleRates []int `json:"supported_sample_rates"`

	// SupportedChannels lists supported channel counts
	// Common values: 1 (mono), 2 (stereo)
	SupportedChannels []int `json:"supported_channels"`

	// SupportedBitDepths lists supported bit depths
	// Common values: 16, 24, 32
	SupportedBitDepths []int `json:"supported_bit_depths,omitempty"`

	// PreferredEncoding is the recommended encoding for best quality/latency
	PreferredEncoding string `json:"preferred_encoding"`

	// PreferredSampleRate is the recommended sample rate
	PreferredSampleRate int `json:"preferred_sample_rate"`
}

AudioStreamingCapabilities describes audio streaming support.

type BaseEmbeddingProvider added in v1.1.6

type BaseEmbeddingProvider struct {
	ProviderModel string
	BaseURL       string
	APIKey        string
	HTTPClient    *http.Client
	Dimensions    int
	ProviderID    string
	BatchSize     int
}

BaseEmbeddingProvider provides common functionality for embedding providers. Embed this struct in provider-specific implementations to reduce duplication.

func NewBaseEmbeddingProvider added in v1.1.6

func NewBaseEmbeddingProvider(
	providerID, defaultModel, defaultBaseURL string,
	defaultDimensions, defaultBatchSize int,
	defaultTimeout time.Duration,
) *BaseEmbeddingProvider

NewBaseEmbeddingProvider creates a base embedding provider with defaults.

func (*BaseEmbeddingProvider) DoEmbeddingRequest added in v1.1.6

func (b *BaseEmbeddingProvider) DoEmbeddingRequest(
	ctx context.Context,
	cfg HTTPRequestConfig,
) ([]byte, error)

DoEmbeddingRequest performs a common HTTP POST request for embeddings. Returns the response body and any error.

func (*BaseEmbeddingProvider) EmbedWithEmptyCheck added in v1.1.6

func (b *BaseEmbeddingProvider) EmbedWithEmptyCheck(
	ctx context.Context,
	req EmbeddingRequest,
	embedFn EmbedFunc,
) (EmbeddingResponse, error)

EmbedWithEmptyCheck wraps embedding logic with empty request handling.

func (*BaseEmbeddingProvider) EmbeddingDimensions added in v1.1.6

func (b *BaseEmbeddingProvider) EmbeddingDimensions() int

EmbeddingDimensions returns the dimensionality of embedding vectors.

func (*BaseEmbeddingProvider) EmptyResponseForModel added in v1.1.6

func (b *BaseEmbeddingProvider) EmptyResponseForModel(model string) EmbeddingResponse

EmptyResponseForModel returns an empty EmbeddingResponse with the given model. Use this for handling empty input cases.

func (*BaseEmbeddingProvider) HandleEmptyRequest added in v1.1.6

func (b *BaseEmbeddingProvider) HandleEmptyRequest(
	req EmbeddingRequest,
) (EmbeddingResponse, bool)

HandleEmptyRequest checks if the request has no texts and returns early if so. Returns (response, true) if empty, (zero, false) if not empty.

func (*BaseEmbeddingProvider) ID added in v1.1.6

func (b *BaseEmbeddingProvider) ID() string

ID returns the provider identifier.

func (*BaseEmbeddingProvider) MaxBatchSize added in v1.1.6

func (b *BaseEmbeddingProvider) MaxBatchSize() int

MaxBatchSize returns the maximum texts per single API request.

func (*BaseEmbeddingProvider) Model added in v1.1.6

func (b *BaseEmbeddingProvider) Model() string

Model returns the current embedding model.

func (*BaseEmbeddingProvider) ResolveModel added in v1.1.6

func (b *BaseEmbeddingProvider) ResolveModel(reqModel string) string

ResolveModel returns the model to use, preferring the request model over the default.

type BaseProvider added in v1.1.0

type BaseProvider struct {
	// contains filtered or unexported fields
}

BaseProvider provides common functionality shared across all provider implementations. It should be embedded in concrete provider structs to avoid code duplication.

It carries two distinct HTTP clients: `client` for request/response calls (Predict, embeddings, etc.) which honors the configured request timeout, and `streamingClient` for long-lived SSE streams (PredictStream*) which has Timeout=0 so the client does not impose a wall-clock cap on streams. Liveness for streams is bounded separately by the IdleTimeoutReader wrapping the response body (see StreamIdleTimeout) and by context cancellation from the caller's deadline.

func NewBaseProvider added in v1.1.0

func NewBaseProvider(id string, includeRawOutput bool, client *http.Client) BaseProvider

NewBaseProvider creates a new BaseProvider with common fields. A companion streaming client is auto-derived from the given client's transport with Timeout=0 so SSE call sites can use GetStreamingHTTPClient() without any extra wiring.

func NewBaseProviderWithAPIKey added in v1.1.0

func NewBaseProviderWithAPIKey(id string, includeRawOutput bool, primaryKey, fallbackKey string) (provider BaseProvider, apiKey string)

NewBaseProviderWithAPIKey creates a BaseProvider and retrieves API key from environment It tries the primary key first, then falls back to the secondary key if primary is empty.

func NewBaseProviderWithCredential added in v1.3.2

func NewBaseProviderWithCredential(
	id string, includeRawOutput bool, timeout time.Duration, cred Credential,
) (base BaseProvider, apiKey string)

NewBaseProviderWithCredential creates a BaseProvider with an explicit credential. It creates an HTTP client with the given timeout, builds the BaseProvider, and extracts the API key from the credential (if it is an api_key credential). This eliminates the duplicated credential-setup boilerplate across providers.

func (*BaseProvider) AcquireStreamSlot added in v1.4.2

func (b *BaseProvider) AcquireStreamSlot(ctx context.Context) error

AcquireStreamSlot blocks on the configured concurrent-stream semaphore until a slot is available or the context is done. Returns nil on successful acquire (caller MUST call b.ReleaseStreamSlot exactly once) or the context error on cancellation/deadline. Classifies the rejection reason and records it on the promptkit_stream_concurrency_rejections_total counter so operators can see saturation.

Nil semaphore is a no-op — returns nil without blocking or emitting metrics, so callers can invoke this unconditionally.

func (*BaseProvider) ApplyCustomHeaders added in v1.4.2

func (b *BaseProvider) ApplyCustomHeaders(req *http.Request) error

ApplyCustomHeaders applies stored custom headers to the HTTP request. Must be called AFTER the provider sets its own built-in headers (Authorization, Content-Type, etc.). Returns an error if any custom header collides with a header already set on the request (case-insensitive per HTTP spec).

func (*BaseProvider) Close added in v1.1.0

func (b *BaseProvider) Close() error

Close closes the HTTP client's idle connections

func (*BaseProvider) DoAndReadResponse added in v1.3.10

func (b *BaseProvider) DoAndReadResponse(
	req *http.Request, predictResp *PredictionResponse, start time.Time, providerName string,
) (body []byte, statusCode int, err error)

DoAndReadResponse executes an HTTP request using the provider's client, reads the response body (with size limiting), and logs the response. On read error it sets predictResp.Latency. Returns the body bytes and HTTP status code.

func (*BaseProvider) GetHTTPClient added in v1.1.0

func (b *BaseProvider) GetHTTPClient() *http.Client

GetHTTPClient returns the underlying HTTP client for request/response calls. This client has a finite Timeout (the request_timeout) and MUST NOT be used for SSE streaming — use GetStreamingHTTPClient for that.

func (*BaseProvider) GetRetryPolicy added in v1.3.10

func (b *BaseProvider) GetRetryPolicy() pipeline.RetryPolicy

GetRetryPolicy returns the current retry policy.

func (*BaseProvider) GetStreamingHTTPClient added in v1.4.2

func (b *BaseProvider) GetStreamingHTTPClient() *http.Client

GetStreamingHTTPClient returns a dedicated HTTP client for SSE streaming calls. It shares the non-streaming client's transport but has Timeout=0 so long-lived streams are not killed by a wall-clock cap. When no dedicated streaming client is configured it falls back to the regular client so callers never receive nil.

func (*BaseProvider) HTTPTimeout added in v1.3.10

func (b *BaseProvider) HTTPTimeout() time.Duration

HTTPTimeout returns the current HTTP client timeout, or 0 if no client is set.

func (*BaseProvider) ID added in v1.1.0

func (b *BaseProvider) ID() string

ID returns the provider ID

func (*BaseProvider) MakeJSONRequest added in v1.1.9

func (b *BaseProvider) MakeJSONRequest(
	ctx context.Context,
	url string,
	request any,
	headers RequestHeaders,
	providerName string,
) ([]byte, error)

MakeJSONRequest performs a JSON HTTP POST request with common error handling. This reduces duplication across provider implementations. providerName is used for logging purposes.

func (*BaseProvider) MakeRawRequest added in v1.1.9

func (b *BaseProvider) MakeRawRequest(
	ctx context.Context,
	url string,
	body []byte,
	headers RequestHeaders,
	providerName string,
) ([]byte, error)

MakeRawRequest performs an HTTP POST request with pre-marshaled body. It automatically retries on transient errors (429, 502, 503, 504 and network errors) according to the provider's RetryPolicy.

Any provider-level custom headers configured via SetCustomHeaders are merged into the outgoing request headers up front, with a case-insensitive collision check against the built-in headers the caller passed. Collisions are deterministic client-side errors so they fail fast before the retry loop runs — a misconfigured gateway header should not burn retry budget.

func (*BaseProvider) MaxPayloadSize added in v1.3.10

func (b *BaseProvider) MaxPayloadSize() int64

MaxPayloadSize returns the current maximum request payload size in bytes.

func (*BaseProvider) RateLimiter added in v1.3.10

func (b *BaseProvider) RateLimiter() *rate.Limiter

RateLimiter returns the current rate limiter, or nil if rate limiting is not configured. This is useful for inspecting or sharing limiters.

func (*BaseProvider) ReleaseStreamSlot added in v1.4.2

func (b *BaseProvider) ReleaseStreamSlot()

ReleaseStreamSlot returns a slot to the concurrent-stream semaphore. Nil-safe; must be paired with a successful AcquireStreamSlot.

func (*BaseProvider) RunStreamingRequest added in v1.4.2

func (b *BaseProvider) RunStreamingRequest(
	ctx context.Context,
	req *StreamRetryRequest,
	consumer StreamConsumer,
) (<-chan StreamChunk, error)

RunStreamingRequest is the single entry point every streaming-capable provider should delegate through. It composes the three layers of back-pressure (semaphore, budget, retry) with in-flight gauge bookkeeping and the "release on all exit paths" defer pattern into one helper, so individual provider streaming functions don't have to re-implement the same ~60 lines of acquire/release/metric scaffolding.

The caller constructs the retry request (policy/budget/host/request factory/etc.) and provides a consumer that knows how to parse the provider-specific stream framing. On success, this function:

  1. Acquires a concurrent-stream slot (blocks on ctx; nil semaphore is a no-op).
  2. Increments streams_in_flight and provider_calls_in_flight gauges.
  3. Delegates to OpenStreamWithRetryRequest with the given req.
  4. Spawns a goroutine that invokes the consumer on the result body and, on consumer return, decrements the gauges and releases the semaphore slot.

On any error path before the goroutine is spawned, all acquired resources are released correctly via the deferred cleanup flags.

Callers must set req.ProviderName to b.ID() — this is not done automatically to avoid hiding the coupling.

func (*BaseProvider) SetCustomHeaders added in v1.4.2

func (b *BaseProvider) SetCustomHeaders(headers map[string]string)

SetCustomHeaders stores custom HTTP headers that will be applied to every outgoing request via ApplyCustomHeaders. Intended for OpenAI-compatible gateway headers (e.g. OpenRouter's HTTP-Referer, X-Title). Called by CreateProviderFromSpec after factory construction.

func (*BaseProvider) SetHTTPTimeout added in v1.3.10

func (b *BaseProvider) SetHTTPTimeout(timeout time.Duration)

SetHTTPTimeout replaces the request/response HTTP client with a new one that uses the given timeout while preserving the existing transport configuration. Does not affect the streaming client, which remains at Timeout=0 by design.

func (*BaseProvider) SetHTTPTransport added in v1.4.2

func (b *BaseProvider) SetHTTPTransport(rt http.RoundTripper)

SetHTTPTransport replaces the RoundTripper on both the regular and streaming HTTP clients so the provider routes every outbound request through the supplied transport. Both clients share the transport so connection pooling is effective across request/response and streaming traffic to the same upstream.

This is the hook CreateProviderFromSpec uses to apply per-provider connection pool config (see ProviderSpec.HTTPTransport and AltairaLabs/PromptKit#873). Provider factories are not aware of this plumbing — they create their client with the default pooled transport and the spec wiring replaces it after construction when the operator has configured overrides.

A nil transport resets both clients to Go's http.DefaultTransport via the http.Client zero-value behavior. Passing nil is not the typical use case; callers should build a transport via NewPooledTransportWithOptions and wrap it with NewInstrumentedTransport so the OpenTelemetry span wiring is preserved.

func (*BaseProvider) SetMaxPayloadSize added in v1.3.10

func (b *BaseProvider) SetMaxPayloadSize(size int64)

SetMaxPayloadSize configures the maximum allowed request payload size in bytes. A zero or negative value disables payload size checking.

func (*BaseProvider) SetRateLimit added in v1.3.10

func (b *BaseProvider) SetRateLimit(requestsPerSecond float64, burst int)

SetRateLimit configures per-provider rate limiting. requestsPerSecond controls the sustained rate, and burst controls how many requests can be made simultaneously before throttling kicks in. A zero or negative requestsPerSecond disables rate limiting (the default).

func (*BaseProvider) SetRetryPolicy added in v1.3.10

func (b *BaseProvider) SetRetryPolicy(policy pipeline.RetryPolicy)

SetRetryPolicy configures the retry policy for this provider.

func (*BaseProvider) SetStreamIdleTimeout added in v1.4.2

func (b *BaseProvider) SetStreamIdleTimeout(d time.Duration)

SetStreamIdleTimeout configures the SSE body idle timeout. A zero or negative value resets to DefaultStreamIdleTimeout.

func (*BaseProvider) SetStreamRetryBudget added in v1.4.2

func (b *BaseProvider) SetStreamRetryBudget(budget *RetryBudget)

SetStreamRetryBudget installs a token bucket that rate-limits retry attempts across all in-flight requests on this provider. Passing nil restores unbounded-retry behavior.

func (*BaseProvider) SetStreamRetryPolicy added in v1.4.2

func (b *BaseProvider) SetStreamRetryPolicy(policy StreamRetryPolicy)

SetStreamRetryPolicy configures bounded retry behavior for the pre-first-chunk streaming window. See StreamRetryPolicy for details.

func (*BaseProvider) SetStreamSemaphore added in v1.4.2

func (b *BaseProvider) SetStreamSemaphore(sem *StreamSemaphore)

SetStreamSemaphore installs a semaphore that caps concurrent streaming requests. Passing nil (or a zero-limit semaphore) restores unlimited concurrency.

func (*BaseProvider) ShouldIncludeRawOutput added in v1.1.0

func (b *BaseProvider) ShouldIncludeRawOutput() bool

ShouldIncludeRawOutput returns whether to include raw API responses in output

func (*BaseProvider) StreamIdleTimeout added in v1.4.2

func (b *BaseProvider) StreamIdleTimeout() time.Duration

StreamIdleTimeout returns the configured SSE body idle timeout or the package default (DefaultStreamIdleTimeout) when none is set.

func (*BaseProvider) StreamRetryBudget added in v1.4.2

func (b *BaseProvider) StreamRetryBudget() *RetryBudget

StreamRetryBudget returns the per-provider retry budget. A nil return means retries are unbounded (only MaxAttempts caps them).

func (*BaseProvider) StreamRetryPolicy added in v1.4.2

func (b *BaseProvider) StreamRetryPolicy() StreamRetryPolicy

StreamRetryPolicy returns the configured streaming-retry policy. The zero value (retry disabled) is the default — callers must opt in via config.

func (*BaseProvider) StreamSemaphore added in v1.4.2

func (b *BaseProvider) StreamSemaphore() *StreamSemaphore

StreamSemaphore returns the concurrent-stream semaphore. A nil return means unlimited concurrency (backwards-compatible default).

func (*BaseProvider) SupportsStreaming added in v1.1.0

func (b *BaseProvider) SupportsStreaming() bool

SupportsStreaming returns true by default (can be overridden by providers that don't support streaming)

func (*BaseProvider) WaitForRateLimit added in v1.3.10

func (b *BaseProvider) WaitForRateLimit(ctx context.Context) error

WaitForRateLimit blocks until the rate limiter allows the request to proceed, or until the context is canceled. If no rate limiter is configured, it returns immediately. Providers should call this before making HTTP requests to respect rate limits.

type BedrockEventScanner added in v1.3.1

type BedrockEventScanner struct {
	// contains filtered or unexported fields
}

BedrockEventScanner decodes AWS binary event-stream frames from Bedrock's invoke-with-response-stream endpoint. Each frame's payload is JSON like {"bytes":"<base64>"} where the decoded bytes are a standard Claude JSON event (identical to the SSE data: payloads from the direct API).

func NewBedrockEventScanner added in v1.3.1

func NewBedrockEventScanner(r io.Reader) *BedrockEventScanner

NewBedrockEventScanner creates a scanner that reads AWS binary event-stream frames.

func (*BedrockEventScanner) Data added in v1.3.1

func (s *BedrockEventScanner) Data() string

Data returns the decoded Claude JSON event from the last scanned frame.

func (*BedrockEventScanner) Err added in v1.3.1

func (s *BedrockEventScanner) Err() error

Err returns any error encountered during scanning.

func (*BedrockEventScanner) Scan added in v1.3.1

func (s *BedrockEventScanner) Scan() bool

Scan reads the next event-stream frame. Returns true if a data event was successfully decoded, false on EOF or error.

type BedrockEventStreamFrameDetector added in v1.4.2

type BedrockEventStreamFrameDetector struct{}

BedrockEventStreamFrameDetector reads one complete AWS binary event-stream message. The format is:

[4-byte total_length BE][4-byte headers_length BE][4-byte prelude_crc]
[headers...][payload...][4-byte message_crc]

The detector reads the 4-byte total_length prefix, then reads exactly (total_length - 4) more bytes to complete the message. This gives the retry driver one full binary frame for replay, confirming the stream is "live" before handing it to the consumer.

Used by the Claude provider when running on AWS Bedrock (Content-Type: application/vnd.amazon.eventstream).

func (BedrockEventStreamFrameDetector) Name added in v1.4.2

Name implements FrameDetector.

func (BedrockEventStreamFrameDetector) PeekFirstFrame added in v1.4.2

func (BedrockEventStreamFrameDetector) PeekFirstFrame(r io.Reader) ([]byte, error)

PeekFirstFrame reads one complete event-stream message from r and returns the raw bytes. The reader must be positioned at the start of a message boundary.

type ContextWindowProvider added in v1.3.24

type ContextWindowProvider interface {
	MaxContextTokens() int
}

ContextWindowProvider is an optional interface for providers that can report their context window size. Used to auto-configure the compactor budget.

type Credential added in v1.1.9

type Credential interface {
	// Apply adds authentication to the HTTP request.
	Apply(ctx context.Context, req *http.Request) error

	// Type returns the credential type identifier.
	Type() string
}

Credential applies authentication to HTTP requests. This is the interface that providers use to authenticate requests.

type DoRequestFunc added in v1.3.10

type DoRequestFunc func() (*http.Response, error)

DoRequestFunc is a function that performs an HTTP request. It is called by DoWithRetry on each attempt.

type EmbedFunc added in v1.1.6

type EmbedFunc func(ctx context.Context, texts []string, model string) (EmbeddingResponse, error)

EmbedFunc is the signature for provider-specific embedding logic.

type EmbeddingProvider added in v1.1.6

type EmbeddingProvider interface {
	// Embed generates embeddings for the given texts.
	// The response contains one embedding vector per input text, in the same order.
	// Implementations should handle batching internally if the request exceeds MaxBatchSize.
	Embed(ctx context.Context, req EmbeddingRequest) (EmbeddingResponse, error)

	// EmbeddingDimensions returns the dimensionality of embedding vectors.
	// Common values: 1536 (OpenAI ada-002/3-small), 768 (Gemini), 3072 (OpenAI 3-large)
	EmbeddingDimensions() int

	// MaxBatchSize returns the maximum number of texts per single API request.
	// Callers should batch requests appropriately, or rely on the provider
	// to handle splitting internally.
	MaxBatchSize() int

	// ID returns the provider identifier (e.g., "openai-embedding", "gemini-embedding")
	ID() string
}

EmbeddingProvider generates text embeddings for semantic similarity operations. Implementations exist for OpenAI, Gemini, and other embedding APIs.

Embeddings are dense vector representations of text that capture semantic meaning. Similar texts will have embeddings with high cosine similarity scores.

Example usage:

provider, _ := openai.NewEmbeddingProvider()
resp, err := provider.Embed(ctx, providers.EmbeddingRequest{
    Texts: []string{"Hello world", "Hi there"},
})
similarity := CosineSimilarity(resp.Embeddings[0], resp.Embeddings[1])

type EmbeddingRequest added in v1.1.6

type EmbeddingRequest struct {
	// Texts to embed (batched for efficiency)
	Texts []string

	// Model override for embedding model (optional, uses provider default if empty)
	Model string
}

EmbeddingRequest represents a request for text embeddings.

type EmbeddingResponse added in v1.1.6

type EmbeddingResponse struct {
	// Embeddings contains one vector per input text, in the same order
	Embeddings [][]float32

	// Model is the model that was used for embedding
	Model string

	// Usage contains token consumption information (optional)
	Usage *EmbeddingUsage
}

EmbeddingResponse contains the embedding vectors from a provider.

type EmbeddingUsage added in v1.1.6

type EmbeddingUsage struct {
	// TotalTokens is the total number of tokens processed
	TotalTokens int
}

EmbeddingUsage tracks token consumption for embedding requests.

type ExecutionResult

type ExecutionResult interface{}

ExecutionResult is a forward declaration to avoid circular import.

type FrameDetector added in v1.4.2

type FrameDetector interface {
	// Name identifies the detector in logs and errors. Should be a
	// short lowercase token: "sse", "ndjson", "json-array", etc.
	Name() string

	// PeekFirstFrame reads from r until at least one complete frame has
	// been seen, then returns the bytes consumed so far. Returns an
	// error if r fails before a complete frame is found, or if the
	// stream ends cleanly before any frame has been observed.
	//
	// On success, the returned slice is non-empty and r is positioned
	// immediately after the last returned byte.
	PeekFirstFrame(r io.Reader) ([]byte, error)
}

FrameDetector extracts the bytes of the first complete "frame" from a streaming response body. The concept of a frame is protocol-specific — SSE uses `data: ...\n\n` boundaries, NDJSON uses line terminators, Gemini's JSON-array streaming uses balanced-brace JSON objects inside a top-level array.

The returned bytes MUST be exactly a prefix of the underlying stream: the detector reads from r, returns everything it has consumed so far, and leaves r positioned immediately after the returned bytes. This is what lets the retry driver wrap the returned bytes + remaining body in a replayReadCloser so downstream parsers see a contiguous stream.

Detectors MUST drain any internal bufio lookahead into the returned slice before returning — if they use a bufio.Reader for efficiency, bytes buffered past the frame boundary belong in the replay slice, not stuck in a throwaway bufio buffer.

Detectors SHOULD NOT wrap r with an idle-timeout reader themselves; the retry driver applies the caller's StreamIdleTimeout to r before handing it to the detector, so each detector gets uniform idle protection for free.

type HTTPRequestConfig added in v1.1.6

type HTTPRequestConfig struct {
	URL         string
	Body        []byte
	UseAPIKey   bool   // If true, adds Authorization: Bearer <APIKey> header
	ContentType string // Defaults to application/json
}

HTTPRequestConfig configures how to make an HTTP request.

type HTTPTransportOptions added in v1.4.2

type HTTPTransportOptions struct {
	// MaxConnsPerHost caps the total TCP connections the transport may
	// open to any single host (in-use + idle). Zero means unlimited
	// (matching Go's http.Transport default). Negative values fall
	// back to DefaultMaxConnsPerHost.
	MaxConnsPerHost int
	// MaxIdleConnsPerHost caps the number of idle keep-alive
	// connections the transport will retain per host for reuse. Zero
	// uses DefaultMaxIdleConnsPerHost. Usually tracked to
	// MaxConnsPerHost to keep the whole pool reusable.
	MaxIdleConnsPerHost int
	// IdleConnTimeout is how long an idle keep-alive connection lingers
	// before being closed. Zero uses DefaultIdleConnTimeout. Longer
	// timeouts favor reuse; shorter ones release memory and FD slots
	// faster.
	IdleConnTimeout time.Duration
}

HTTPTransportOptions configures the connection pool for a pooled HTTP transport. Zero values mean unlimited for connection counts (matching Go's http.Transport) and fall back to DefaultIdleConnTimeout for the timeout. Negative values fall back to package-level defaults.

These are the single-process h2 pool controls that bound how many concurrent streams a provider can multiplex to a single upstream host. See AltairaLabs/PromptKit#873 for background: in combination with the upstream's advertised SETTINGS_MAX_CONCURRENT_STREAMS (RFC 7540 §6.5.2), MaxConnsPerHost is the wall that determines the realistic steady-state ceiling for concurrent streams per process.

type IdleTimeoutReader added in v1.3.10

type IdleTimeoutReader struct {
	// contains filtered or unexported fields
}

IdleTimeoutReader wraps an io.ReadCloser with idle timeout detection. If no data is read within the configured timeout, the underlying reader is closed, causing any blocking Read to return an error.

func NewIdleTimeoutReader added in v1.3.10

func NewIdleTimeoutReader(r io.ReadCloser, timeout time.Duration) *IdleTimeoutReader

NewIdleTimeoutReader wraps the given reader with idle timeout detection. The timeout is reset on every successful Read that returns data. If the timeout fires, the underlying reader is closed to unblock any pending Read calls.

func (*IdleTimeoutReader) Close added in v1.3.10

func (r *IdleTimeoutReader) Close() error

Close stops the idle timer and closes the underlying reader.

func (*IdleTimeoutReader) Read added in v1.3.10

func (r *IdleTimeoutReader) Read(p []byte) (int, error)

Read reads from the underlying reader and resets the idle timer on success.

type ImageDetail added in v1.1.0

type ImageDetail string

ImageDetail specifies the level of detail for image processing

const (
	ImageDetailLow  ImageDetail = "low"  // Faster, less detailed analysis
	ImageDetailHigh ImageDetail = "high" // Slower, more detailed analysis
	ImageDetailAuto ImageDetail = "auto" // Provider chooses automatically
)

Image detail levels for multimodal processing.

type JSONArrayFrameDetector added in v1.4.2

type JSONArrayFrameDetector struct{}

JSONArrayFrameDetector detects the first complete top-level object inside a streaming JSON array. This is the framing used by Gemini's `streamGenerateContent` endpoint, which returns

[
  {"candidates": [...], "usageMetadata": {...}},
  {"candidates": [...], "usageMetadata": {...}},
  ...
]

parsed incrementally by a downstream `json.Decoder`. The detector reads past leading whitespace and the opening `[`, then scans bytes until it finds the end of the first `{...}` at depth 0 (respecting JSON string escapes).

Byte-level parsing is deliberate — a `json.Decoder` would work but it buffers aggressively and makes it harder to track exactly how many bytes have been consumed from the underlying reader.

On success the returned bytes form a prefix of the stream ending at the closing brace of the first object. The downstream `json.Decoder` continues from there and expects either `,` or `]` next, which is exactly what remains in the stream.

func (JSONArrayFrameDetector) Name added in v1.4.2

Name implements FrameDetector.

func (JSONArrayFrameDetector) PeekFirstFrame added in v1.4.2

func (JSONArrayFrameDetector) PeekFirstFrame(r io.Reader) ([]byte, error)

PeekFirstFrame reads bytes from r tracking JSON state until the first top-level object inside the array is complete.

type MediaLoader added in v1.1.2

type MediaLoader struct {
	// contains filtered or unexported fields
}

MediaLoader handles loading media content from various sources (inline data, files, URLs, storage). It provides a unified interface for providers to access media regardless of the source. MediaLoader is not safe for concurrent use.

func NewMediaLoader added in v1.1.2

func NewMediaLoader(config MediaLoaderConfig) *MediaLoader

NewMediaLoader creates a new MediaLoader with the given configuration.

func (*MediaLoader) GetBase64Data added in v1.1.2

func (ml *MediaLoader) GetBase64Data(ctx context.Context, media *types.MediaContent) (string, error)

GetBase64Data loads media content and returns it as base64-encoded data. It handles all media sources: inline data, file paths, URLs, and storage references. It enforces per-item size limits, aggregate size limits, and item count limits.

type MediaLoaderConfig added in v1.1.2

type MediaLoaderConfig struct {
	// StorageService is optional - required only for loading from storage references
	StorageService storage.MediaStorageService

	// HTTPTimeout for URL fetching (default: 30s)
	HTTPTimeout time.Duration

	// MaxURLSizeBytes is the maximum size for URL-based media (default: 50MB)
	MaxURLSizeBytes int64
}

MediaLoaderConfig configures the MediaLoader behavior.

type MultimodalCapabilities added in v1.1.0

type MultimodalCapabilities struct {
	SupportsImages    bool     // Provider can process image inputs
	SupportsAudio     bool     // Provider can process audio inputs
	SupportsVideo     bool     // Provider can process video inputs
	SupportsDocuments bool     // Provider can process document inputs (PDF, etc.)
	ImageFormats      []string // Supported image MIME types (e.g., "image/jpeg", "image/png")
	AudioFormats      []string // Supported audio MIME types (e.g., "audio/mpeg", "audio/wav")
	VideoFormats      []string // Supported video MIME types (e.g., "video/mp4")
	DocumentFormats   []string // Supported document MIME types (e.g., "application/pdf")
	MaxImageSizeMB    int      // Maximum image size in megabytes (0 = unlimited/unknown)
	MaxAudioSizeMB    int      // Maximum audio size in megabytes (0 = unlimited/unknown)
	MaxVideoSizeMB    int      // Maximum video size in megabytes (0 = unlimited/unknown)
	MaxDocumentSizeMB int      // Maximum document size in megabytes (0 = unlimited/unknown)
}

MultimodalCapabilities describes what types of multimodal content a provider supports

type MultimodalCapabilityProvider added in v1.3.29

type MultimodalCapabilityProvider interface {
	GetMultimodalCapabilities() MultimodalCapabilities
}

MultimodalCapabilityProvider is implemented by providers that support multimodal content.

func GetMultimodalProvider added in v1.1.0

func GetMultimodalProvider(p Provider) MultimodalCapabilityProvider

GetMultimodalProvider safely casts a provider to MultimodalCapabilityProvider Returns nil if the provider doesn't support multimodal

type NDJSONFrameDetector added in v1.4.2

type NDJSONFrameDetector struct{}

NDJSONFrameDetector detects newline-delimited JSON frames. Each frame is one complete JSON object terminated by a literal `\n`. This is the framing used by Ollama's streaming API (`{"response":"..."}\n`) and several other non-SSE providers that stream raw JSON.

This detector does NOT validate that the line is parseable JSON — it only looks for the line terminator. Upstream producers that emit partial JSON would fail at the downstream parser, not here. This matches the SSE detector's behavior of not interpreting the payload.

func (NDJSONFrameDetector) Name added in v1.4.2

func (NDJSONFrameDetector) Name() string

Name implements FrameDetector.

func (NDJSONFrameDetector) PeekFirstFrame added in v1.4.2

func (NDJSONFrameDetector) PeekFirstFrame(r io.Reader) ([]byte, error)

PeekFirstFrame reads until the first `\n` and returns the line (including the newline) plus any bufio lookahead. Blank lines are skipped so leading whitespace or keepalive newlines from certain producers don't count as a "frame".

type PlatformConfig added in v1.1.9

type PlatformConfig = credentials.PlatformConfig

PlatformConfig is an alias for credentials.PlatformConfig.

type PredictionRequest added in v1.1.0

type PredictionRequest struct {
	System         string          `json:"system"`
	Messages       []types.Message `json:"messages"`
	Temperature    float32         `json:"temperature"`
	TopP           float32         `json:"top_p"`
	MaxTokens      int             `json:"max_tokens"`
	Seed           *int            `json:"seed,omitempty"`
	ResponseFormat *ResponseFormat `json:"response_format,omitempty"` // Optional response format (JSON mode)
	Metadata       map[string]any  `json:"metadata,omitempty"`        // Provider-specific context
}

PredictionRequest represents a request to a predict provider

func (*PredictionRequest) NormalizeMessages added in v1.3.24

func (r *PredictionRequest) NormalizeMessages()

NormalizeMessages extracts system-role messages from Messages, merges their content into the System field, and removes them from Messages. This ensures all providers receive system context through the dedicated System field rather than as role entries that some providers silently drop.

Ordering: existing System content first, then system-role message content in original order, separated by double newlines.

This method is idempotent — calling it on an already-normalized request (no system-role messages in Messages) is a no-op.

type PredictionResponse added in v1.1.0

type PredictionResponse struct {
	Content    string                  `json:"content"`
	Parts      []types.ContentPart     `json:"parts,omitempty"`     // Multimodal content parts (text, image, audio, video)
	CostInfo   *types.CostInfo         `json:"cost_info,omitempty"` // Cost breakdown for this response (includes token counts)
	Latency    time.Duration           `json:"latency"`
	Raw        []byte                  `json:"raw,omitempty"`
	RawRequest any                     `json:"raw_request,omitempty"` // Raw API request (for debugging)
	ToolCalls  []types.MessageToolCall `json:"tool_calls,omitempty"`  // Tools called in this response
}

PredictionResponse represents a response from a predict provider

type Pricing

type Pricing struct {
	InputCostPer1K  float64
	OutputCostPer1K float64
}

Pricing defines cost per 1K tokens for input and output

type Provider

type Provider interface {
	ID() string

	// Model returns the model name/identifier used by this provider
	Model() string

	Predict(ctx context.Context, req PredictionRequest) (PredictionResponse, error)

	// Streaming support
	PredictStream(ctx context.Context, req PredictionRequest) (<-chan StreamChunk, error)

	SupportsStreaming() bool

	ShouldIncludeRawOutput() bool

	Close() error // Close cleans up provider resources (e.g., HTTP connections)

	// CalculateCost calculates cost breakdown for given token counts
	CalculateCost(inputTokens, outputTokens, cachedTokens int) types.CostInfo
}

Provider interface defines the contract for predict providers

func CreateProviderFromSpec

func CreateProviderFromSpec(spec ProviderSpec) (Provider, error)

CreateProviderFromSpec creates a provider implementation from a spec. Returns an error if the provider type is unsupported.

type ProviderContractTests added in v1.1.9

type ProviderContractTests struct {
	// Provider instance to test
	Provider Provider

	// SupportsToolsExpected indicates whether this provider should support tools
	SupportsToolsExpected bool

	// SupportsStreamingExpected indicates whether this provider should support streaming
	SupportsStreamingExpected bool
}

ProviderContractTests defines a comprehensive test suite that validates the Provider interface contract. All provider implementations should pass these tests to ensure consistent behavior across the system.

Usage:

func TestOpenAIProviderContract(t *testing.T) {
    provider := NewProvider(...)
    RunProviderContractTests(t, provider)
}

type ProviderDefaults

type ProviderDefaults struct {
	Temperature float32
	TopP        float32
	MaxTokens   int
	Pricing     Pricing
}

ProviderDefaults holds default parameters for providers

type ProviderFactory added in v1.1.0

type ProviderFactory func(spec ProviderSpec) (Provider, error)

ProviderFactory is a function that creates a provider from a spec

func CredentialFactory added in v1.3.2

func CredentialFactory(withCred, withoutCred ProviderFactory) ProviderFactory

CredentialFactory builds a ProviderFactory that routes between a credential-based constructor and an env-var-based constructor. This eliminates the duplicated init() pattern across provider packages.

Usage in provider init():

providers.RegisterProviderFactory("claude", providers.CredentialFactory(
    func(spec providers.ProviderSpec) (providers.Provider, error) {
        return NewToolProviderWithCredential(spec.ID, spec.Model, ..., spec.Credential, ...), nil
    },
    func(spec providers.ProviderSpec) (providers.Provider, error) {
        return NewToolProvider(spec.ID, spec.Model, ...), nil
    },
))

type ProviderHTTPError added in v1.4.2

type ProviderHTTPError struct {
	StatusCode int
	URL        string
	Body       string
	Provider   string
}

ProviderHTTPError wraps a non-2xx HTTP response from a provider API. Use errors.As to extract the status code for classification.

func (*ProviderHTTPError) Error added in v1.4.2

func (e *ProviderHTTPError) Error() string

type ProviderSpec

type ProviderSpec struct {
	ID      string
	Type    string
	Model   string
	BaseURL string
	// Headers contains custom HTTP headers to include in every request.
	// Applied after built-in provider headers; collisions cause an error.
	Headers          map[string]string
	Defaults         ProviderDefaults
	IncludeRawOutput bool
	AdditionalConfig map[string]interface{} // Flexible key-value pairs for provider-specific configuration

	// Credential holds the resolved credential for this provider.
	// If nil, providers fall back to environment variable lookup.
	Credential Credential

	// Platform identifies the hosting platform (e.g., "bedrock", "vertex", "azure").
	// Empty string means direct API access to the provider.
	Platform string

	// PlatformConfig holds platform-specific configuration.
	// Only set when Platform is non-empty.
	PlatformConfig *PlatformConfig

	// UnsupportedParams lists model parameters not supported by this provider model.
	// For example, o-series OpenAI models don't support "temperature", "top_p", or "max_tokens".
	UnsupportedParams []string

	// RequestTimeout caps the wall-clock duration of request/response HTTP
	// calls (Predict, embeddings). Zero falls back to
	// httputil.DefaultProviderTimeout. Does not apply to SSE streaming
	// calls, which are unbounded by wall-clock and governed by
	// StreamIdleTimeout + context cancellation. Pre-parsed from
	// config.Provider.RequestTimeout by the arena loader.
	RequestTimeout time.Duration

	// StreamIdleTimeout bounds how long an SSE streaming body may remain
	// silent before it is aborted; timer resets on every byte. Zero falls
	// back to DefaultStreamIdleTimeout. Pre-parsed from
	// config.Provider.StreamIdleTimeout by the arena loader.
	StreamIdleTimeout time.Duration

	// StreamRetry configures bounded retry for streaming requests that
	// fail in the pre-first-chunk window. Zero value (disabled) leaves
	// the provider with no streaming retry. Pre-parsed from
	// config.Provider.StreamRetry by the arena loader.
	StreamRetry StreamRetryPolicy

	// StreamRetryBudget is a pre-constructed token bucket that
	// rate-limits retry attempts across all in-flight requests on this
	// provider. Nil means "unbounded retries" (only MaxAttempts caps
	// them). Pre-parsed from config.Provider.StreamRetry.Budget by the
	// arena loader. Each provider instance gets its own budget so one
	// misbehaving model cannot starve retry capacity for others.
	StreamRetryBudget *RetryBudget

	// StreamMaxConcurrent caps the number of concurrent streaming
	// requests the provider will have in flight. Zero means unlimited
	// (backwards-compatible default). Pre-parsed from
	// config.Provider.StreamMaxConcurrent by the arena loader. Applied
	// via SetStreamSemaphore on providers that implement the
	// streamConcurrencyConfigurable interface.
	StreamMaxConcurrent int

	// HTTPTransport configures the per-provider HTTP connection pool.
	// Zero-valued fields fall back to package-level defaults
	// (DefaultMaxConnsPerHost, etc.). Applied via SetHTTPTransport on
	// providers that implement the httpTransportConfigurable interface,
	// replacing the default pooled transport the factory constructed.
	//
	// See AltairaLabs/PromptKit#873 for motivation: at higher concurrency
	// than the repo was originally sized for, the hardcoded
	// MaxConnsPerHost: 100 becomes the wall before PromptKit's own
	// machinery does. Raising this (paired with operator awareness of
	// the upstream's SETTINGS_MAX_CONCURRENT_STREAMS) is the primary
	// lever on realistic single-process concurrent-stream capacity.
	HTTPTransport HTTPTransportOptions
}

ProviderSpec holds the configuration needed to create a provider instance

func (*ProviderSpec) HasCredential added in v1.3.2

func (s *ProviderSpec) HasCredential() bool

HasCredential returns true if the spec has a real (non-empty, non-"none") credential. Use this in factory functions to decide between credential-based and env-var-based constructors.

type ProviderTools added in v1.1.9

type ProviderTools = any

ProviderTools represents provider-specific tool configuration. Each provider returns its own native format:

  • OpenAI: []openAITool
  • Claude: []claudeTool
  • Gemini: geminiToolWrapper
  • Ollama: []ollamaTool
  • vLLM: []vllmTool
  • Mock: []*ToolDescriptor

The value returned by BuildTooling should be passed directly to PredictWithTools.

type ProviderTransportError added in v1.4.2

type ProviderTransportError struct {
	Cause    error
	Provider string
}

ProviderTransportError wraps a connection-level failure (http2 reset, TCP reset, dial timeout, etc.). These are always transient.

func (*ProviderTransportError) Error added in v1.4.2

func (e *ProviderTransportError) Error() string

func (*ProviderTransportError) Unwrap added in v1.4.2

func (e *ProviderTransportError) Unwrap() error

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages available providers

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new provider registry

func (*Registry) Close

func (r *Registry) Close() error

Close closes all registered providers and cleans up their resources. Returns the first error encountered, if any.

func (*Registry) Get

func (r *Registry) Get(id string) (Provider, bool)

Get retrieves a provider by ID, returning the provider and a boolean indicating if it was found.

func (*Registry) List

func (r *Registry) List() []string

List returns all registered provider IDs

func (*Registry) Register

func (r *Registry) Register(provider Provider)

Register adds a provider to the registry using its ID as the key.

type RequestHeaders added in v1.1.9

type RequestHeaders map[string]string

RequestHeaders is a map of HTTP header key-value pairs

type ResponseFormat added in v1.1.8

type ResponseFormat struct {
	// Type specifies the response format type
	Type ResponseFormatType `json:"type"`
	// JSONSchema is the schema to use when Type is ResponseFormatJSONSchema
	// This should be a valid JSON Schema object
	JSONSchema json.RawMessage `json:"json_schema,omitempty"`
	// SchemaName is an optional name for the schema (used by OpenAI)
	SchemaName string `json:"schema_name,omitempty"`
	// Strict enables strict schema validation (OpenAI-specific)
	Strict bool `json:"strict,omitempty"`
}

ResponseFormat specifies the format of the model's response

type ResponseFormatType added in v1.1.8

type ResponseFormatType string

ResponseFormatType defines the type of response format

const (
	// ResponseFormatText is the default text response format
	ResponseFormatText ResponseFormatType = "text"
	// ResponseFormatJSON requests JSON output from the model
	ResponseFormatJSON ResponseFormatType = "json_object"
	// ResponseFormatJSONSchema requests JSON output conforming to a schema
	ResponseFormatJSONSchema ResponseFormatType = "json_schema"
)

type RetryBudget added in v1.4.2

type RetryBudget struct {
	// contains filtered or unexported fields
}

RetryBudget is a token bucket that governs how often streaming retries may actually re-dial the upstream. The initial attempt of each request is NOT gated by the budget — only retries consume tokens.

Rationale: per-call bounded retry (policy.MaxAttempts) is not enough at scale. When a single HTTP/2 connection reset kills ~100 streams at once, naive bounded retry causes 100 simultaneous reconnect attempts, which amplifies the upstream problem instead of recovering from it. The budget caps the *rate* at which retries hit the upstream so one storm cannot saturate the provider's capacity for the entire runtime.

Design: gRPC's "retry throttling" pattern, implemented with a standard golang.org/x/time/rate token bucket. Non-blocking acquire (fail-fast) — exhausted budgets return the original error immediately rather than stacking goroutines on a starved bucket.

All methods are nil-safe: a nil *RetryBudget allows every retry (equivalent to unlimited budget). This lets callers use the budget unconditionally without guarding every call site.

func NewRetryBudget added in v1.4.2

func NewRetryBudget(ratePerSec float64, burst int) *RetryBudget

NewRetryBudget creates a new token bucket sized for streaming retries. ratePerSec is the sustained refill rate; burst is the maximum number of tokens that can accumulate. Returns nil when either parameter is non-positive (unlimited budget).

Typical sizing: start with rate=5/s, burst=10 and tune based on promptkit_stream_retries_total{outcome="budget_exhausted"}. These defaults are deliberately conservative — a healthy workload should almost never hit the budget, so high rejection counts are a signal that either retries are storming (upstream degraded) or the budget is undersized (bump it).

func (*RetryBudget) Available added in v1.4.2

func (b *RetryBudget) Available() float64

Available returns the current number of tokens in the bucket. Intended for the promptkit_stream_retry_budget_available gauge. A nil budget returns 0; callers in the hot path also skip publishing the gauge for nil budgets (see StreamMetrics.ObserveRetryBudgetAvailable), so the return value is only observable in tests.

Note: rate.Limiter.Tokens reflects state at the time of the call; it may drift between TryAcquire and Available under concurrent load. This is fine for observability — the gauge is a trailing indicator.

func (*RetryBudget) Burst added in v1.4.2

func (b *RetryBudget) Burst() int

Burst returns the configured burst size. Used by callers that want to compute saturation ratios (available / burst). Returns 0 for nil.

func (*RetryBudget) RatePerSec added in v1.4.2

func (b *RetryBudget) RatePerSec() float64

RatePerSec returns the configured refill rate. Returns 0 for nil.

func (*RetryBudget) TryAcquire added in v1.4.2

func (b *RetryBudget) TryAcquire() bool

TryAcquire attempts to take one token from the bucket without blocking. Returns true if a token was consumed (retry is permitted), false if the bucket is empty (retry must be rejected). A nil budget always returns true so provider code can call TryAcquire unconditionally.

type RetryableHTTPError added in v1.3.10

type RetryableHTTPError struct {
	StatusCode int
	Status     string
}

RetryableHTTPError is returned when all retries are exhausted for a retryable HTTP status code.

func (*RetryableHTTPError) Error added in v1.3.10

func (e *RetryableHTTPError) Error() string

Error implements the error interface.

type SSEFrameDetector added in v1.4.2

type SSEFrameDetector struct{}

SSEFrameDetector detects server-sent event boundaries. A complete frame is one or more `data: ...` lines terminated by a blank line, optionally preceded by `:` comments or other SSE directive lines that get passed through as part of the frame bytes.

This is the framing used by OpenAI Chat Completions, OpenAI Responses API, Claude Messages, VLLM, and most SSE-based LLM providers.

func (SSEFrameDetector) Name added in v1.4.2

func (SSEFrameDetector) Name() string

Name implements FrameDetector.

func (SSEFrameDetector) PeekFirstFrame added in v1.4.2

func (SSEFrameDetector) PeekFirstFrame(r io.Reader) ([]byte, error)

PeekFirstFrame reads until a `data: ...` line has been seen and then a terminating blank line. Comments and other directives before the first data line are included in the returned bytes.

If the stream closes cleanly right after the first event without a trailing blank line, this is still treated as a complete frame so downstream can decide what to do.

type SSEScanner

type SSEScanner struct {
	// contains filtered or unexported fields
}

SSEScanner scans Server-Sent Events (SSE) streams

func NewSSEScanner

func NewSSEScanner(r io.Reader) *SSEScanner

NewSSEScanner creates a new SSE scanner

func (*SSEScanner) Data

func (s *SSEScanner) Data() string

Data returns the current event data as a string. The string is lazily allocated on first call per Scan to avoid unnecessary heap allocations when only DataBytes is needed.

func (*SSEScanner) DataBytes added in v1.4.2

func (s *SSEScanner) DataBytes() []byte

DataBytes returns the current event data as a byte slice. The returned slice is only valid until the next call to Scan. Use this to avoid the string→[]byte conversion in json.Unmarshal.

func (*SSEScanner) Err

func (s *SSEScanner) Err() error

Err returns any scanning error

func (*SSEScanner) Scan

func (s *SSEScanner) Scan() bool

Scan advances to the next SSE event

type StreamChunk

type StreamChunk struct {
	// Content is the accumulated content so far
	Content string `json:"content"`

	// Delta is the new content in this chunk
	Delta string `json:"delta"`

	// MediaData contains raw streaming media bytes (audio, video, images).
	// Data is always raw bytes, never base64. Providers decode at source.
	MediaData *StreamMediaData `json:"-"`

	// TokenCount is the total number of tokens so far
	TokenCount int `json:"token_count"`

	// DeltaTokens is the number of tokens in this delta
	DeltaTokens int `json:"delta_tokens"`

	// ToolCalls contains accumulated tool calls (for assistant messages that invoke tools)
	ToolCalls []types.MessageToolCall `json:"tool_calls,omitempty"`

	// FinishReason is nil until stream is complete
	// Values: "stop", "length", "content_filter", "tool_calls", "error", "validation_failed", "cancelled"
	FinishReason *string `json:"finish_reason,omitempty"`

	// Interrupted indicates the response was interrupted (e.g., user started speaking)
	// When true, clients should clear any buffered audio and prepare for a new response
	Interrupted bool `json:"interrupted,omitempty"`

	// Reset signals that the stream is being retried from scratch after a
	// mid-stream failure. Consumers must discard all accumulated state
	// (content, tool calls, cost info) and treat subsequent chunks as a
	// fresh response. Only emitted when the provider's retry_window is
	// set to "always" and a mid-stream error triggered a full retry.
	// The retry costs tokens because the provider generates a new
	// response from scratch — this is why the default is off.
	Reset bool `json:"reset,omitempty"`

	// Error is set if an error occurred during streaming
	Error error `json:"error,omitempty"`

	// Metadata contains provider-specific metadata
	Metadata map[string]interface{} `json:"metadata,omitempty"`

	// FinalResult contains the complete execution result (only set in the final chunk)
	FinalResult ExecutionResult `json:"final_result,omitempty"`

	// CostInfo contains cost breakdown (only present in final chunk when FinishReason != nil)
	CostInfo *types.CostInfo `json:"cost_info,omitempty"`

	// PendingTools contains client-mode tools awaiting caller fulfillment.
	// Only set when FinishReason is "pending_tools".
	PendingTools []tools.PendingToolExecution `json:"pending_tools,omitempty"`
}

StreamChunk represents a batch of tokens with metadata

type StreamConsumer added in v1.4.2

type StreamConsumer func(ctx context.Context, body io.ReadCloser, outChan chan<- StreamChunk)

StreamConsumer is called on the success path of RunStreamingRequest inside a dedicated goroutine. It receives the (possibly retry-replayed) response body and the output channel; it must fully drain the body and close outChan when done. Typical implementations wrap body in an IdleTimeoutReader + SSEScanner (or equivalent) and run the provider's existing stream parser.

type StreamEvent

type StreamEvent struct {
	// Type is the event type: "chunk", "complete", "error"
	Type string `json:"type"`

	// Chunk contains the stream chunk data
	Chunk *StreamChunk `json:"chunk,omitempty"`

	// Error is set for error events
	Error error `json:"error,omitempty"`

	// Timestamp is when the event occurred
	Timestamp time.Time `json:"timestamp"`
}

StreamEvent is sent to observers for monitoring

type StreamInputSession added in v1.1.0

type StreamInputSession interface {
	// SendChunk sends a media chunk to the provider.
	// Returns an error if the chunk cannot be sent or the session is closed.
	// This method is safe to call from multiple goroutines.
	SendChunk(ctx context.Context, chunk *types.MediaChunk) error

	// SendText sends a text message to the provider during the streaming session.
	// This is useful for sending text prompts or instructions during audio streaming.
	// Note: This marks the turn as complete, triggering a response.
	SendText(ctx context.Context, text string) error

	// SendSystemContext sends a text message as context without completing the turn.
	// Use this for system prompts that provide context but shouldn't trigger an immediate response.
	// The audio/text that follows will be processed with this context in mind.
	SendSystemContext(ctx context.Context, text string) error

	// Response returns a receive-only channel for streaming responses.
	// The channel is closed when the session ends or encounters an error.
	// Consumers should read from this channel in a separate goroutine.
	Response() <-chan StreamChunk

	// Close ends the streaming session and releases resources.
	// After calling Close, SendChunk and SendText will return errors.
	// The Response channel will be closed.
	// Close is safe to call multiple times.
	Close() error

	// Error returns any error that occurred during the session.
	// Returns nil if no error has occurred.
	Error() error

	// Done returns a channel that's closed when the session ends.
	// This is useful for select statements to detect session completion.
	Done() <-chan struct{}
}

StreamInputSession manages a bidirectional streaming session with a provider. The session allows sending media chunks (e.g., audio from a microphone) and receiving streaming responses from the LLM.

Example usage:

session, err := provider.CreateStreamSession(ctx, StreamInputRequest{
    Config: types.StreamingMediaConfig{
        Type:       types.ContentTypeAudio,
        ChunkSize:  8192,
        SampleRate: 16000,
        Encoding:   "pcm",
        Channels:   1,
    },
})
if err != nil {
    return err
}
defer session.Close()

// Send audio chunks in a goroutine
go func() {
    for chunk := range micInput {
        if err := session.SendChunk(ctx, chunk); err != nil {
            log.Printf("send error: %v", err)
            break
        }
    }
}()

// Receive responses
for chunk := range session.Response() {
    if chunk.Error != nil {
        log.Printf("response error: %v", chunk.Error)
        break
    }
    fmt.Print(chunk.Delta)
}

type StreamInputSupport added in v1.1.0

type StreamInputSupport interface {
	Provider // Extends the base Provider interface

	// CreateStreamSession creates a new bidirectional streaming session.
	// The session remains active until Close() is called or an error occurs.
	// Returns an error if the provider doesn't support the requested media type.
	CreateStreamSession(ctx context.Context, req *StreamingInputConfig) (StreamInputSession, error)

	// SupportsStreamInput returns the media types supported for streaming input.
	// Common values: types.ContentTypeAudio, types.ContentTypeVideo
	SupportsStreamInput() []string

	// GetStreamingCapabilities returns detailed information about streaming support.
	// This includes supported codecs, sample rates, and other constraints.
	GetStreamingCapabilities() StreamingCapabilities
}

StreamInputSupport extends the Provider interface for bidirectional streaming. Providers that implement this interface can handle streaming media input (e.g., real-time audio) and provide streaming responses.

type StreamMediaData added in v1.4.1

type StreamMediaData struct {
	// Common fields
	Data     []byte // Raw media bytes (PCM audio, JPEG frame, H.264 chunk, etc.)
	MIMEType string // e.g., "audio/pcm", "image/jpeg", "video/h264"

	// Audio metadata
	SampleRate int // Sample rate in Hz (e.g., 16000, 24000). 0 if not audio.
	Channels   int // Channel count (1=mono, 2=stereo). 0 if not audio.

	// Visual metadata (image and video)
	Width  int // Pixels. 0 if unknown or not visual.
	Height int // Pixels. 0 if unknown or not visual.

	// Video/image streaming metadata
	FrameRate  float64 // FPS. 0 if not video.
	IsKeyFrame bool    // True if this is a key frame (video only).
	FrameNum   int64   // Sequence number for ordering frames/chunks.
}

StreamMediaData carries raw media bytes for streaming. Data is always raw bytes, never base64. Providers decode at source.

This type maps to pipeline-internal types at the StreamChunk/StreamElement boundary:

  • stage.AudioData: SampleRate, Channels
  • stage.ImageData: Width, Height, FrameNum
  • stage.VideoData: Width, Height, FrameRate, IsKeyFrame, FrameNum

type StreamMetrics added in v1.4.2

type StreamMetrics struct {
	// contains filtered or unexported fields
}

StreamMetrics holds the direct-update Prometheus metrics for streaming provider calls. These are updated inline at the source (not via the event bus) so that burst-load drops on the event bus cannot corrupt autoscaling signals. See docs/local-backlog/STREAMING_RETRY_AT_SCALE.md for the design rationale.

All methods are nil-safe: if StreamMetrics is nil, the call is a no-op. This lets provider code unconditionally call s.StreamsInFlightInc(...) without guarding on whether metrics are configured.

func DefaultStreamMetrics added in v1.4.2

func DefaultStreamMetrics() *StreamMetrics

DefaultStreamMetrics returns the process-wide StreamMetrics instance or nil if none has been registered. All StreamMetrics methods are nil-safe.

func NewStreamMetrics added in v1.4.2

func NewStreamMetrics(
	registerer prometheus.Registerer,
	namespace string,
	constLabels prometheus.Labels,
) *StreamMetrics

NewStreamMetrics creates and registers the Phase 1 streaming metrics into the given registerer under the given namespace. Const labels are applied to every metric.

Returns a non-nil *StreamMetrics. Re-registration of the same metric name into the same registry will panic (Prometheus semantic), so the default registration path uses sync.Once via RegisterDefaultStreamMetrics.

func RegisterDefaultStreamMetrics added in v1.4.2

func RegisterDefaultStreamMetrics(
	registerer prometheus.Registerer,
	namespace string,
	constLabels prometheus.Labels,
) *StreamMetrics

RegisterDefaultStreamMetrics creates and installs a process-wide StreamMetrics instance. Safe to call multiple times with the SAME registerer — subsequent calls are no-ops. Calling with a DIFFERENT registerer is not supported and is treated as a misconfiguration (second call is ignored and the first wins).

Hosts (Arena, SDK, server) call this once during startup. Code that only cares about metrics being present calls DefaultStreamMetrics() and gets a nil on a misconfigured host, which is safe (methods no-op).

func (*StreamMetrics) ConcurrencyRejected added in v1.4.2

func (m *StreamMetrics) ConcurrencyRejected(provider, reason string)

ConcurrencyRejected records one streaming request rejected by the per-provider concurrency semaphore. Reason distinguishes between caller-initiated cancellation ("context_canceled") and deadline timeout ("deadline_exceeded"); sustained spikes in either indicate the semaphore limit is undersized or upstream is saturated. Nil-safe.

func (*StreamMetrics) HTTPConnsInUseDec added in v1.4.2

func (m *StreamMetrics) HTTPConnsInUseDec(host string)

HTTPConnsInUseDec decrements the in-use HTTP connection gauge for a host. Called by the conn-tracking transport wrapper when a request's response body is closed (or when the RoundTrip errored before returning a body). Nil-safe.

func (*StreamMetrics) HTTPConnsInUseInc added in v1.4.2

func (m *StreamMetrics) HTTPConnsInUseInc(host string)

HTTPConnsInUseInc increments the in-use HTTP connection gauge for a host. Called by the conn-tracking transport wrapper at the start of each RoundTrip. Nil-safe.

func (*StreamMetrics) ObserveFirstChunkLatency added in v1.4.2

func (m *StreamMetrics) ObserveFirstChunkLatency(provider string, d time.Duration)

ObserveFirstChunkLatency records the time from request dispatch to the first SSE data event being observed for a provider. Nil-safe.

func (*StreamMetrics) ObserveRetryBudgetAvailable added in v1.4.2

func (m *StreamMetrics) ObserveRetryBudgetAvailable(provider, host string, budget *RetryBudget)

ObserveRetryBudgetAvailable samples the current token count of a retry budget and publishes it to the stream_retry_budget_available gauge. Intended to be called whenever a retry attempts to acquire a token — the gauge then reflects the budget state at the moment of highest interest (right before a retry decision).

A nil budget publishes 0, which is intentional: it lets operators distinguish "no budget configured" (gauge absent) from "budget fully drained" (gauge at 0) by gauge presence rather than value. Nil-safe on the receiver.

func (*StreamMetrics) ObserveStreamErrorChunksForwarded added in v1.4.2

func (m *StreamMetrics) ObserveStreamErrorChunksForwarded(provider string, chunks int)

ObserveStreamErrorChunksForwarded records how many content chunks were forwarded downstream before a streaming request terminated with an error. Called exactly once per errored stream by the RunStreamingRequest relay goroutine, with the count of non-empty content chunks observed prior to the terminal error chunk. Nil-safe.

func (*StreamMetrics) PipelineStageAudioBytesAdd added in v1.4.2

func (m *StreamMetrics) PipelineStageAudioBytesAdd(stage string, bytes int)

PipelineStageAudioBytesAdd adds to the audio byte counter for a pipeline stage. Called with the raw PCM byte count of each audio element that flows through the stage. Nil-safe.

func (*StreamMetrics) PipelineStageAudioBytesVec added in v1.4.2

func (m *StreamMetrics) PipelineStageAudioBytesVec() *prometheus.CounterVec

PipelineStageAudioBytesVec returns the raw counter vec for testing.

func (*StreamMetrics) PipelineStageElementInc added in v1.4.2

func (m *StreamMetrics) PipelineStageElementInc(stage string)

PipelineStageElementInc increments the element counter for a pipeline stage. Called by the pipeline runner after each element flows through a stage's output channel. Nil-safe.

func (*StreamMetrics) PipelineStageElementsVec added in v1.4.2

func (m *StreamMetrics) PipelineStageElementsVec() *prometheus.CounterVec

Package-level default instance. Hosts register it by calling PipelineStageElementsVec returns the raw counter vec for testing.

func (*StreamMetrics) ProviderCallsInFlightDec added in v1.4.2

func (m *StreamMetrics) ProviderCallsInFlightDec(provider string)

ProviderCallsInFlightDec decrements the total in-flight provider call gauge. Nil-safe.

func (*StreamMetrics) ProviderCallsInFlightInc added in v1.4.2

func (m *StreamMetrics) ProviderCallsInFlightInc(provider string)

ProviderCallsInFlightInc increments the total in-flight provider call gauge. Nil-safe.

func (*StreamMetrics) RetryAttempt added in v1.4.2

func (m *StreamMetrics) RetryAttempt(provider, outcome string)

RetryAttempt records one streaming retry attempt with an outcome label. Outcome values: "success" (attempt that produced a usable stream), "failed" (retryable transient failure that will be retried), "exhausted" (last attempt failed, no more retries), or "budget_exhausted" (retry was rejected because the per-provider retry budget had no tokens). Nil-safe.

func (*StreamMetrics) StreamsInFlightDec added in v1.4.2

func (m *StreamMetrics) StreamsInFlightDec(provider string)

StreamsInFlightDec decrements the in-flight stream gauge for a provider. Nil-safe.

func (*StreamMetrics) StreamsInFlightInc added in v1.4.2

func (m *StreamMetrics) StreamsInFlightInc(provider string)

StreamsInFlightInc increments the in-flight stream gauge for a provider. Nil-safe.

type StreamObserver

type StreamObserver interface {
	OnChunk(chunk StreamChunk)
	OnComplete(totalTokens int, duration time.Duration)
	OnError(err error)
}

StreamObserver receives stream events for monitoring

type StreamRetryPolicy added in v1.4.2

type StreamRetryPolicy struct {
	// Enabled turns the retry loop on. Zero value is off.
	Enabled bool
	// MaxAttempts is total attempts including the initial request. Values
	// <1 are normalized to 1 (no retry). Zero falls back to the default.
	MaxAttempts int
	// InitialDelay is the base backoff before the first retry. Zero falls
	// back to the default.
	InitialDelay time.Duration
	// MaxDelay caps per-attempt backoff. Zero falls back to the default.
	MaxDelay time.Duration
	// Window controls which point in the stream lifecycle is eligible for
	// retry. Empty falls back to StreamRetryWindowPreFirstChunk.
	Window StreamRetryWindow
}

StreamRetryPolicy governs bounded retry behavior for streaming requests that fail before any content chunk has been forwarded downstream.

The policy is intentionally separate from pipeline.RetryPolicy (which covers non-streaming requests) because the failure classes and safety constraints are different: streaming retries must respect the idempotency window, use full jitter instead of half jitter to break h2 herd resets, and default to far fewer attempts.

func DisabledStreamRetryPolicy added in v1.4.2

func DisabledStreamRetryPolicy() StreamRetryPolicy

DisabledStreamRetryPolicy returns a zero-value policy (retry off). Used as the BaseProvider default so callers never see nil.

func (StreamRetryPolicy) Attempts added in v1.4.2

func (p StreamRetryPolicy) Attempts() int

Attempts returns the normalized number of attempts (>=1). Returns 1 when retry is disabled so callers can use it unconditionally in a for loop.

func (StreamRetryPolicy) BackoffFor added in v1.4.2

func (p StreamRetryPolicy) BackoffFor(attempt int) time.Duration

BackoffFor computes the delay for the given attempt index (0-based) using full jitter: uniform random in [0, min(maxDelay, initialDelay * 2^attempt)]. Full jitter (as opposed to equal or decorrelated jitter) is deliberate — when a single h2 connection reset kills ~100 streams, equal jitter still synchronizes the retries into narrow buckets; full jitter smears them.

func (StreamRetryPolicy) InitialDelayOrDefault added in v1.4.2

func (p StreamRetryPolicy) InitialDelayOrDefault() time.Duration

InitialDelayOrDefault returns the configured initial delay or the default.

func (StreamRetryPolicy) MaxDelayOrDefault added in v1.4.2

func (p StreamRetryPolicy) MaxDelayOrDefault() time.Duration

MaxDelayOrDefault returns the configured max delay or the default.

type StreamRetryRequest added in v1.4.2

type StreamRetryRequest struct {
	Policy       StreamRetryPolicy
	Budget       *RetryBudget // nil means unbounded retries
	ProviderName string
	Host         string // metric label; may be empty
	IdleTimeout  time.Duration
	RequestFn    func(ctx context.Context) (*http.Request, error)
	Client       *http.Client
	// FrameDetector identifies the first complete protocol frame on
	// the response body so the retry driver knows when the stream is
	// "established" and retry must not fire. Nil defaults to
	// SSEFrameDetector — set to NDJSONFrameDetector for Ollama,
	// JSONArrayFrameDetector for Gemini, or any custom FrameDetector
	// implementation for new protocols.
	FrameDetector FrameDetector
}

StreamRetryRequest bundles the dependencies for a streaming retry attempt. This exists so OpenStreamWithRetryRequest can grow new parameters (budget, host label, etc.) without breaking every call site.

type StreamRetryResult added in v1.4.2

type StreamRetryResult struct {
	// Response is the HTTP response of the successful attempt. Body has
	// already been wrapped; callers must not read directly from
	// Response.Body. Use Body instead.
	Response *http.Response
	// Body is a composite reader that first replays the SSE event bytes
	// consumed by the peek, then streams the remainder of the response.
	// Closing this closes the underlying Response.Body.
	Body io.ReadCloser
	// Attempts is the total number of attempts made (1 on first-try success).
	Attempts int
}

StreamRetryResult holds the successfully opened streaming response after the pre-first-chunk retry loop. The caller takes ownership of Body (which is a composite reader re-prepending the peeked first SSE event), and must close it.

func OpenStreamWithRetry added in v1.4.2

func OpenStreamWithRetry(
	ctx context.Context,
	policy StreamRetryPolicy,
	providerName string,
	idleTimeout time.Duration,
	requestFn func(ctx context.Context) (*http.Request, error),
	client *http.Client,
) (*StreamRetryResult, error)

OpenStreamWithRetry executes requestFn and peeks the first SSE data event on the response body. If Do() returns a retryable error, or the response status is retryable, or the body fails to produce a first SSE event within the idle window, the attempt is discarded and retried up to the policy's MaxAttempts. On success, the buffered bytes are replayed into the returned Body so downstream SSE parsers see a contiguous stream.

This function only retries in the pre-first-chunk window — that is, only while no content bytes have been surfaced to the caller. It never reads past the end of the first SSE event payload.

When policy.Enabled is false this is equivalent to a single Do() + peek: on success the body is still wrapped to replay the peeked bytes, but no retry is performed.

Thin wrapper over OpenStreamWithRetryRequest for callers that don't need the budget or host-label parameters.

func OpenStreamWithRetryRequest added in v1.4.2

func OpenStreamWithRetryRequest(ctx context.Context, req *StreamRetryRequest) (*StreamRetryResult, error)

OpenStreamWithRetryRequest is the full-featured form of OpenStreamWithRetry that accepts a budget and host label. Retries beyond the initial attempt must acquire a token from req.Budget (if non-nil) before re-dialing; an empty budget causes the function to return the last error immediately (fail-fast) rather than waiting for token refill.

type StreamRetryWindow added in v1.4.2

type StreamRetryWindow string

StreamRetryWindow enumerates the points at which a streaming request may still be retried.

const (
	// StreamRetryWindowPreFirstChunk retries only while no content chunk
	// has been forwarded downstream. This is the safe default that avoids
	// any content duplication.
	StreamRetryWindowPreFirstChunk StreamRetryWindow = "pre_first_chunk"
	// StreamRetryWindowAlways retries on any stream failure, including
	// after content has been forwarded. On mid-stream failure the relay
	// emits a StreamChunk with Reset=true so consumers discard
	// accumulated state, then retries the full request from scratch.
	// The retry produces a new response (LLMs are non-deterministic)
	// and costs additional tokens. Off by default.
	StreamRetryWindowAlways StreamRetryWindow = "always"
)

type StreamScanner added in v1.3.1

type StreamScanner interface {
	Scan() bool
	Data() string
	Err() error
}

StreamScanner is the interface for scanning streaming responses. Both SSE (Server-Sent Events) and binary event-stream formats implement this.

type StreamSemaphore added in v1.4.2

type StreamSemaphore struct {
	// contains filtered or unexported fields
}

StreamSemaphore caps the number of concurrent streaming requests a provider will have in flight at any one time. Acquire blocks (subject to context cancellation) when the limit is reached, so the caller's deadline controls fail-fast vs. queueing behavior: a short context means "reject me quickly if you're full", a long one means "queue".

Rationale: per-request bounded retry + budget (Phase 1-2) does not bound the *number* of streams a provider can hold open. At 1000 concurrent streams each provider holds ~1000 goroutines, timers, and channel buffers, even though it only needs a handful of h2 connections to serve them. The semaphore turns unbounded goroutine growth into back-pressure that surfaces cleanly at the caller.

Design: wraps golang.org/x/sync/semaphore.Weighted. Nil-safe — a nil *StreamSemaphore never blocks and has a no-op Release, so callers can use it unconditionally.

func NewStreamSemaphore added in v1.4.2

func NewStreamSemaphore(limit int) *StreamSemaphore

NewStreamSemaphore returns a semaphore with the given concurrent-stream limit. Returns nil when limit is zero or negative, which callers interpret as "unlimited" (no gating).

func (*StreamSemaphore) Acquire added in v1.4.2

func (s *StreamSemaphore) Acquire(ctx context.Context) error

Acquire blocks until the semaphore has capacity or the context is done. Returns nil on successful acquire, or the context error on cancellation/deadline. A nil semaphore always returns nil immediately (unlimited).

Callers MUST call Release exactly once for every successful Acquire, and MUST NOT call Release after an Acquire that returned an error.

func (*StreamSemaphore) Limit added in v1.4.2

func (s *StreamSemaphore) Limit() int

Limit returns the configured concurrent-stream limit. Returns 0 for a nil receiver (interpreted as "unlimited" by observability consumers).

func (*StreamSemaphore) Release added in v1.4.2

func (s *StreamSemaphore) Release()

Release returns one slot to the semaphore. Nil-safe.

Release of a token that was not acquired will cause semaphore.Weighted to panic — callers must pair each successful Acquire with exactly one Release, typically via defer.

type StreamingCapabilities added in v1.1.0

type StreamingCapabilities struct {
	// SupportedMediaTypes lists the media types that can be streamed
	// Values: types.ContentTypeAudio, types.ContentTypeVideo
	SupportedMediaTypes []string `json:"supported_media_types"`

	// Audio capabilities
	Audio *AudioStreamingCapabilities `json:"audio,omitempty"`

	// Video capabilities
	Video *VideoStreamingCapabilities `json:"video,omitempty"`

	// BidirectionalSupport indicates if the provider supports full bidirectional streaming
	BidirectionalSupport bool `json:"bidirectional_support"`

	// MaxSessionDuration is the maximum duration for a streaming session (in seconds)
	// Zero means no limit
	MaxSessionDuration int `json:"max_session_duration,omitempty"`

	// MinChunkSize is the minimum chunk size in bytes
	MinChunkSize int `json:"min_chunk_size,omitempty"`

	// MaxChunkSize is the maximum chunk size in bytes
	MaxChunkSize int `json:"max_chunk_size,omitempty"`
}

StreamingCapabilities describes what streaming features a provider supports.

type StreamingInputConfig added in v1.1.6

type StreamingInputConfig struct {
	// Config specifies the media streaming configuration (codec, sample rate, etc.)
	Config types.StreamingMediaConfig `json:"config"`

	// SystemInstruction is the system prompt to configure the model's behavior.
	// For Gemini Live API, this is included in the setup message.
	SystemInstruction string `json:"system_instruction,omitempty"`

	// Tools defines functions the model can call during the session.
	// When configured, the model returns structured tool calls instead of
	// speaking them as text. Supported by Gemini Live API.
	Tools []StreamingToolDefinition `json:"tools,omitempty"`

	// Metadata contains provider-specific session configuration
	// Example: {"response_modalities": ["TEXT", "AUDIO"]} for Gemini
	Metadata map[string]interface{} `json:"metadata,omitempty"`
}

StreamingInputConfig configures a new streaming input session.

func (*StreamingInputConfig) Validate added in v1.1.6

func (r *StreamingInputConfig) Validate() error

Validate checks if the StreamInputRequest is valid

type StreamingToolDefinition added in v1.1.6

type StreamingToolDefinition struct {
	Name        string                 `json:"name"`
	Description string                 `json:"description,omitempty"`
	Parameters  map[string]interface{} `json:"parameters,omitempty"` // JSON Schema
}

StreamingToolDefinition represents a function/tool available in streaming sessions.

type ToolDescriptor

type ToolDescriptor struct {
	Name         string          `json:"name"`
	Description  string          `json:"description"`
	InputSchema  json.RawMessage `json:"input_schema"`
	OutputSchema json.RawMessage `json:"output_schema"`
}

ToolDescriptor represents a tool that can be used by providers

type ToolResponse added in v1.1.6

type ToolResponse struct {
	ToolCallID string `json:"tool_call_id"`
	Result     string `json:"result"`
	IsError    bool   `json:"is_error,omitempty"` // True if the tool execution failed
}

ToolResponse represents a single tool execution result.

type ToolResponseSupport added in v1.1.6

type ToolResponseSupport interface {
	// SendToolResponse sends the result of a tool execution back to the model.
	// The toolCallID must match the ID from the MessageToolCall.
	// The result is typically JSON-encoded but the format depends on the tool.
	// After receiving the tool response, the model will continue generating.
	SendToolResponse(ctx context.Context, toolCallID string, result string) error

	// SendToolResponses sends multiple tool results at once (for parallel tool calls).
	// This is more efficient than sending individual responses for providers that
	// support batched tool responses.
	SendToolResponses(ctx context.Context, responses []ToolResponse) error
}

ToolResponseSupport is an optional interface for streaming sessions that support tool calling. When the model returns a tool call, the caller can execute the tool and send the result back using this interface. The session will then continue generating a response based on the tool result.

Use type assertion to check if a StreamInputSession supports this interface:

if toolSession, ok := session.(ToolResponseSupport); ok {
    err := toolSession.SendToolResponse(ctx, toolCallID, result)
}

type ToolResult

type ToolResult = types.MessageToolResult

ToolResult represents the result of a tool execution This is an alias to types.MessageToolResult for provider-specific context

type ToolSupport

type ToolSupport interface {
	Provider // Extends the base Provider interface

	// BuildTooling converts tool descriptors to provider-native format.
	// Returns a provider-specific type that should be passed to PredictWithTools.
	BuildTooling(descriptors []*ToolDescriptor) (ProviderTools, error)

	// PredictWithTools performs a predict request with tool support.
	// The tools parameter should be the value returned by BuildTooling.
	PredictWithTools(
		ctx context.Context,
		req PredictionRequest,
		tools ProviderTools,
		toolChoice string,
	) (PredictionResponse, []types.MessageToolCall, error)

	// PredictStreamWithTools performs a streaming predict request with tool support.
	// The tools parameter should be the value returned by BuildTooling.
	PredictStreamWithTools(
		ctx context.Context,
		req PredictionRequest,
		tools ProviderTools,
		toolChoice string,
	) (<-chan StreamChunk, error)
}

ToolSupport interface for providers that support tool/function calling

type UnsupportedContentError added in v1.1.0

type UnsupportedContentError struct {
	Provider    string // Provider ID
	ContentType string // "image", "audio", "video", or "multimodal"
	Message     string // Human-readable error message
	PartIndex   int    // Index of the unsupported content part (if applicable)
	MIMEType    string // Specific MIME type that's unsupported (if applicable)
}

UnsupportedContentError is returned when a provider doesn't support certain content types

func (*UnsupportedContentError) Error added in v1.1.0

func (e *UnsupportedContentError) Error() string

type UnsupportedProviderError

type UnsupportedProviderError struct {
	ProviderType string
}

UnsupportedProviderError is returned when a provider type is not recognized

func (*UnsupportedProviderError) Error

func (e *UnsupportedProviderError) Error() string

Error returns the error message for this unsupported provider error.

type ValidationAbortError

type ValidationAbortError struct {
	Reason string
	Chunk  StreamChunk
}

ValidationAbortError is returned when a streaming validator aborts a stream

func (*ValidationAbortError) Error

func (e *ValidationAbortError) Error() string

Error returns the error message for this validation abort error.

type VideoResolution added in v1.1.0

type VideoResolution struct {
	Width  int `json:"width"`
	Height int `json:"height"`
}

VideoResolution represents a video resolution.

func (VideoResolution) String added in v1.1.0

func (r VideoResolution) String() string

String returns a string representation of the resolution (e.g., "1920x1080")

type VideoStreamingCapabilities added in v1.1.0

type VideoStreamingCapabilities struct {
	// SupportedEncodings lists supported video encodings
	// Common values: "h264", "vp8", "vp9", "av1"
	SupportedEncodings []string `json:"supported_encodings"`

	// SupportedResolutions lists supported resolutions (width x height)
	SupportedResolutions []VideoResolution `json:"supported_resolutions"`

	// SupportedFrameRates lists supported frame rates
	// Common values: 15, 24, 30, 60
	SupportedFrameRates []int `json:"supported_frame_rates"`

	// PreferredEncoding is the recommended encoding
	PreferredEncoding string `json:"preferred_encoding"`

	// PreferredResolution is the recommended resolution
	PreferredResolution VideoResolution `json:"preferred_resolution"`

	// PreferredFrameRate is the recommended frame rate
	PreferredFrameRate int `json:"preferred_frame_rate"`
}

VideoStreamingCapabilities describes video streaming support.

Directories

Path Synopsis
Package all provides a convenient way to register all PromptKit providers with a single import.
Package all provides a convenient way to register all PromptKit providers with a single import.
Package claude provides Anthropic Claude LLM provider integration.
Package claude provides Anthropic Claude LLM provider integration.
Package gemini provides Gemini Live API streaming support.
Package gemini provides Gemini Live API streaming support.
Package imagen provides Google Imagen image generation provider integration.
Package imagen provides Google Imagen image generation provider integration.
internal
streaming
Package streaming provides a common WebSocket streaming session abstraction used by provider implementations (OpenAI Realtime, Gemini Live, etc.).
Package streaming provides a common WebSocket streaming session abstraction used by provider implementations (OpenAI Realtime, Gemini Live, etc.).
Package mock provides mock provider implementation for testing and development.
Package mock provides mock provider implementation for testing and development.
Package ollama provides Ollama LLM provider integration for local development.
Package ollama provides Ollama LLM provider integration for local development.
Package openai provides OpenAI LLM provider integration.
Package openai provides OpenAI LLM provider integration.
Package replay provides a provider that replays recorded sessions deterministically.
Package replay provides a provider that replays recorded sessions deterministically.
Package vllm provides vLLM LLM provider integration for high-performance inference.
Package vllm provides vLLM LLM provider integration for high-performance inference.
Package voyageai provides embedding generation via the Voyage AI API.
Package voyageai provides embedding generation via the Voyage AI API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL