agenticmodel

package
v1.0.0-alpha.61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 24 Imported by: 0

README

agentic-model

Model integration component for the agentic processing system.

Overview

The agentic-model component routes agent requests to OpenAI-compatible LLM endpoints. It receives AgentRequest messages from the loop orchestrator, calls the appropriate model endpoint, and publishes AgentResponse messages back. Supports multiple named endpoints, tool calling, retry with backoff, and token tracking.

Architecture

┌───────────────┐     ┌────────────────┐     ┌──────────────────┐
│ agentic-loop  │────►│ agentic-model  │────►│ LLM Endpoint     │
│               │     │                │     │ (OpenAI, Ollama) │
│               │◄────│                │◄────│                  │
└───────────────┘     └────────────────┘     └──────────────────┘
  agent.request.*       HTTP/HTTPS          OpenAI-compatible
  agent.response.*                          /v1/chat/completions

Features

  • Multiple Endpoints: Configure different models/providers by name
  • OpenAI Compatible: Works with OpenAI, Ollama, LiteLLM, vLLM, etc.
  • Tool Support: Full tool calling (function calling) support
  • Retry Logic: Exponential backoff with jitter via pkg/retry
  • Rate Limiting: Per-endpoint token bucket rate limiting and concurrency control
  • Endpoint Throttling: Semaphore-based concurrency cap shared across all agents
  • Token Tracking: Tracks prompt and completion tokens

Configuration

{
  "type": "processor",
  "name": "agentic-model",
  "enabled": true,
  "config": {
    "stream_name": "AGENT",
    "timeout": "120s",
    "retry": {
      "max_attempts": 3,
      "initial_delay": "1s",
      "max_delay": "60s",
      "rate_limit_delay": "5s"
    },
    "ports": {
      "inputs": [
        {"name": "requests", "type": "jetstream", "subject": "agent.request.>", "stream_name": "AGENT"}
      ],
      "outputs": [
        {"name": "responses", "type": "jetstream", "subject": "agent.response.*", "stream_name": "AGENT"}
      ]
    }
  }
}

Endpoint configurations including rate limits are defined in the top-level model_registry config block, not inline in this component. See the Model Registry section for endpoint configuration.

Configuration Options
Option Type Default Description
timeout string "120s" Request timeout
stream_name string "AGENT" JetStream stream name
consumer_name_suffix string "" Suffix for consumer names (for testing)
retry.max_attempts int 3 Maximum retry attempts
retry.initial_delay string "1s" Initial delay before first retry
retry.max_delay string "60s" Maximum delay between retries
retry.rate_limit_delay string "5s" Extra wait added before backoff on HTTP 429
ports object (defaults) Port configuration
Endpoint Configuration

Endpoints are configured in the top-level model_registry block. In addition to the base fields, each endpoint now supports rate limiting controls:

Field Type Required Description
url string yes Base URL for OpenAI-compatible API
model string yes Model name for API requests
api_key_env string no Environment variable for API key
requests_per_minute int no Token bucket rate limit (0 = unlimited)
max_concurrent int no Maximum simultaneous in-flight requests (0 = unlimited)
Model Aliases

Model aliases provide semantic names for endpoints, allowing other components to reference models by purpose rather than specific endpoint:

{
  "model_aliases": {
    "reasoning": "gpt-4",
    "coding": "gpt-4-turbo",
    "fast": "gpt-3.5-turbo",
    "summarization": "gpt-3.5-turbo"
  }
}

Alias rules:

  • Target must exist in endpoints
  • No alias chaining (alias cannot point to another alias)
  • Empty target is not allowed

Requests can use either endpoint names or aliases:

{
  "model": "fast"
}

Resolves to the gpt-3.5-turbo endpoint.

Ports

Inputs
Name Type Subject Description
requests jetstream agent.request.> Agent requests from agentic-loop
Outputs
Name Type Subject Description
responses jetstream agent.response.* Model responses to agentic-loop

Endpoint Resolution

Requests are routed to endpoints by model name:

  1. Exact match: Request model: "gpt-4" routes to endpoints.gpt-4
  2. Alias match: Request model: "fast" routes to model_aliases.fast target
  3. Default fallback: If no match, routes to endpoints.default (if configured)
  4. Error: If no match and no default, returns error response

Compatible Providers

Provider URL Format Notes
OpenAI https://api.openai.com/v1/chat/completions Requires API key
Azure OpenAI https://{resource}.openai.azure.com/... Requires API key
Ollama http://localhost:11434/v1/chat/completions No auth required
LiteLLM http://localhost:8000/v1/chat/completions Proxy for multiple providers
vLLM http://localhost:8000/v1/chat/completions Self-hosted models
LocalAI http://localhost:8080/v1/chat/completions Local models

Response Status Mapping

OpenAI Finish Reason Agentic Status Description
stop complete Normal completion
length complete Max tokens reached
tool_calls tool_call Model wants to use tools
(error) error API or network error

Environment Variables

API keys are read from environment variables:

export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="..."

Reference in config:

{
  "api_key_env": "OPENAI_API_KEY"
}

Retry Behavior

Retries are implemented using pkg/retry with exponential backoff and jitter.

  • Max Attempts: Default 3, configurable via retry.max_attempts
  • Initial Delay: Default 1s (retry.initial_delay); tests use 100ms for speed
  • Max Delay: Default 60s (retry.max_delay); each interval doubles with jitter
  • HTTP 429 Handling: Detected via SDK error types (openai.APIError.HTTPStatusCode, openai.RequestError.HTTPStatusCode). An extra rate_limit_delay (default 5s) is added before the normal backoff begins, giving the provider time to recover
  • Retryable: HTTP 429, 500, 502, 503, 504, and network errors
  • Non-retryable: HTTP 400, 401, 403, 404, and context cancellation

Rate Limiting

When running agent teams or quests, multiple agents concurrently target the same endpoint. Without coordination, they can saturate the provider's rate limit within seconds, causing cascading 429 errors that waste time in retry loops. Per-endpoint throttling solves this by enforcing limits before requests leave the process.

Each endpoint in the model registry can be configured with two complementary controls:

  • requests_per_minute: A token bucket that caps the request rate. Agents block until a token is available rather than racing to the endpoint.
  • max_concurrent: A semaphore that caps simultaneous in-flight requests. Useful for providers that enforce concurrency limits independently of rate limits.

Both controls are shared across all agents targeting the same endpoint — the throttle is instantiated once per cached client, not per agent.

Configuration Example
{
  "model_registry": {
    "endpoints": {
      "gpt-4": {
        "url": "https://api.openai.com/v1/chat/completions",
        "model": "gpt-4-turbo-preview",
        "api_key_env": "OPENAI_API_KEY",
        "requests_per_minute": 60,
        "max_concurrent": 5
      },
      "ollama": {
        "url": "http://localhost:11434/v1/chat/completions",
        "model": "qwen2.5-coder:14b"
      }
    }
  }
}

The ollama endpoint has no limits configured — local models typically do not need them.

When to Use
Scenario Recommended Setting
Single agent, low traffic No limits needed
Agent team (quest), shared OpenAI key requests_per_minute matching your tier
Shared API key across multiple services Both requests_per_minute and max_concurrent
Local model (Ollama, vLLM) No limits needed
Observability

Rate limit events are tracked via:

semstreams_agentic_model_rate_limit_hits_total{model="gpt-4"}

This counter increments each time a request encounters an HTTP 429. A sustained high rate suggests the configured requests_per_minute is too close to the actual provider limit and should be reduced.

Troubleshooting

Endpoint not found
  • Verify model name in request matches an endpoint key
  • Add a "default" endpoint as fallback
  • Check for typos in model names
Authentication errors
  • Verify api_key_env points to correct environment variable
  • Check that environment variable is set
  • Ensure API key is valid and has quota
Timeout errors
  • Increase timeout for complex requests
  • Check network connectivity to endpoint
  • Verify endpoint URL is correct
Tool calls not working
  • Ensure tools are provided in AgentRequest
  • Check tool parameter schemas are valid JSON Schema
  • Verify model supports tool calling

Documentation

Overview

Package agenticmodel provides an OpenAI-compatible agentic model processor component that routes agent requests to configured LLM endpoints with retry logic and tool calling support.

Package agenticmodel provides the model integration processor for the SemStreams agentic system.

Overview

The agentic-model processor routes agent requests to OpenAI-compatible LLM endpoints. It receives AgentRequest messages from the loop orchestrator, calls the appropriate model endpoint, and publishes AgentResponse messages back. The processor supports tool calling, retry with backoff, and token tracking. Model endpoints are resolved from the unified model registry (component.Dependencies.ModelRegistry).

This processor acts as the bridge between the agentic orchestration layer and external LLM services (OpenAI, Ollama, LiteLLM, vLLM, or any OpenAI-compatible API).

Architecture

The model processor sits between the loop orchestrator and external LLM services:

┌───────────────┐     ┌────────────────┐     ┌──────────────────┐
│ agentic-loop  │────▶│ agentic-model  │────▶│ LLM Endpoint     │
│               │     │ (this pkg)     │     │ (OpenAI, Ollama) │
│               │◀────│                │◀────│                  │
└───────────────┘     └────────────────┘     └──────────────────┘
  agent.request.*       HTTP/HTTPS          OpenAI-compatible
  agent.response.*                          /v1/chat/completions

Quick Start

Configure the model registry in the top-level config and start the processor:

config := agenticmodel.Config{
    StreamName: "AGENT",
    Timeout:    "120s",
}

// Model endpoints are resolved from deps.ModelRegistry (set in config.model_registry)
rawConfig, _ := json.Marshal(config)
comp, err := agenticmodel.NewComponent(rawConfig, deps)

lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)

Endpoint Resolution

When processing an AgentRequest, the processor resolves the endpoint from the unified model registry by looking up the request's Model field. Clients are created dynamically and cached for reuse.

If the resolved endpoint has SupportsTools=false, any tools in the request are stripped and a warning is logged.

OpenAI Compatibility

The processor uses the sashabaranov/go-openai SDK and is compatible with any API that implements the OpenAI chat completions interface:

  • OpenAI API (api.openai.com)
  • Azure OpenAI Service
  • Ollama (with OpenAI compatibility layer)
  • LiteLLM proxy
  • vLLM with OpenAI server
  • LocalAI
  • Any OpenAI-compatible proxy

Tool Support

The processor fully supports tool calling (function calling):

Incoming request with tools:

request := agentic.AgentRequest{
    Model: "gpt-4",
    Messages: []agentic.ChatMessage{
        {Role: "user", Content: "Read the config file"},
    },
    Tools: []agentic.ToolDefinition{
        {
            Name:        "read_file",
            Description: "Read file contents",
            Parameters: map[string]any{
                "type": "object",
                "properties": map[string]any{
                    "path": map[string]any{"type": "string"},
                },
            },
        },
    },
}

Response with tool calls:

response := agentic.AgentResponse{
    Status: "tool_call",
    Message: agentic.ChatMessage{
        Role: "assistant",
        ToolCalls: []agentic.ToolCall{
            {ID: "call_001", Name: "read_file", Arguments: map[string]any{"path": "config.yaml"}},
        },
    },
}

The processor converts between agentic.ToolDefinition and OpenAI's function schema format automatically.

Response Status Mapping

The processor maps OpenAI finish reasons to agentic status:

  • "stop" → "complete" (normal completion)
  • "length" → "complete" (max tokens reached)
  • "tool_calls" → "tool_call" (model wants to use tools)
  • Any error → "error" with error message

Retry Logic

The processor implements retry using pkg/retry with exponential backoff and jitter.

  • Default: 3 attempts, 1s initial delay, 60s max delay
  • Tests use 100ms initial delay for fast feedback
  • HTTP 429: Detected via openai.APIError.HTTPStatusCode and openai.RequestError.HTTPStatusCode. An extra rate_limit_delay (default 5s) is prepended before normal backoff begins.
  • Retryable: 429, 500, 502, 503, 504, and network errors
  • Non-retryable: 400, 401, 403, 404, context cancellation

Configuration:

"retry": {
    "max_attempts": 3,
    "initial_delay": "1s",
    "max_delay": "60s",
    "rate_limit_delay": "5s"
}

Token Tracking

Every response includes token usage for cost monitoring and rate limiting:

response.TokenUsage.PromptTokens     // Input tokens
response.TokenUsage.CompletionTokens // Output tokens
response.TokenUsage.Total()          // Sum of both

Token counts come directly from the LLM provider's response.

Configuration Reference

Full configuration schema (endpoints are in the top-level model_registry):

{
    "timeout": "string (default: 120s)",
    "stream_name": "string (default: AGENT)",
    "consumer_name_suffix": "string (optional)",
    "retry": {
        "max_attempts": "int (default: 3)",
        "initial_delay": "string (default: 1s)",
        "max_delay": "string (default: 60s)",
        "rate_limit_delay": "string (default: 5s)"
    },
    "ports": {
        "inputs": [...],
        "outputs": [...]
    }
}

Endpoint-level fields in model_registry:

{
    "url": "string",
    "model": "string",
    "api_key_env": "string (optional)",
    "requests_per_minute": "int (0 = unlimited)",
    "max_concurrent": "int (0 = unlimited)"
}

Ports

Input ports (JetStream consumers):

  • agent.request: Agent requests from agentic-loop (subject: agent.request.>)

Output ports (JetStream publishers):

  • agent.response: Model responses to agentic-loop (subject: agent.response.*)

Message Flow

The processor handles each request through:

  1. Receive AgentRequest from agent.request.>
  2. Resolve endpoint by model name
  3. Acquire throttle slot (rate limiter token + concurrency semaphore)
  4. Convert AgentRequest to OpenAI format
  5. Call LLM endpoint with retry logic
  6. Release throttle slot
  7. Convert OpenAI response to AgentResponse
  8. Publish to agent.response.{request_id}
  9. Acknowledge JetStream message

Client Architecture

The processor dynamically creates and caches Client instances per endpoint:

client, err := NewClient(endpointConfig)
response, err := client.ChatCompletion(ctx, request)

Clients are cached by URL|Model key with mutex protection for concurrent access. Clients wrap the go-openai SDK and handle:

  • API key injection from environment variables
  • Request/response type conversion
  • Retry with exponential backoff
  • Context cancellation propagation

Error Handling

Errors are returned as AgentResponse with status="error":

response := agentic.AgentResponse{
    RequestID: "req_123",
    Status:    "error",
    Error:     "endpoint not found: unknown-model",
}

Error categories:

  • Endpoint resolution errors: Model not found in registry
  • Request validation errors: Invalid request format
  • Network errors: Connection failures (may retry)
  • API errors: 4xx/5xx from LLM provider
  • Timeout errors: Request exceeded timeout

Environment Variables

API keys are read from environment variables specified in endpoint config:

export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="..."

Endpoint config:

{
    "url": "https://api.openai.com/v1/chat/completions",
    "model": "gpt-4",
    "api_key_env": "OPENAI_API_KEY"
}

If api_key_env is not specified, requests are made without authentication (suitable for local models like Ollama).

Thread Safety

The Component is safe for concurrent use after Start() is called. Multiple goroutines can process requests concurrently. Each request creates its own Client instance, avoiding shared state issues.

Testing

For testing, use the ConsumerNameSuffix config option and provide a model registry:

config := agenticmodel.Config{
    StreamName:         "AGENT",
    ConsumerNameSuffix: "test-" + t.Name(),
}

// Provide endpoints via model registry in deps.ModelRegistry
deps.ModelRegistry = &model.Registry{
    Endpoints: map[string]*model.EndpointConfig{
        "test-model": {URL: mockServer.URL, Model: "test-model", MaxTokens: 128000},
    },
    Defaults: model.DefaultsConfig{Model: "test-model"},
}

Use httptest.Server to mock the LLM endpoint in tests.

Limitations

Current limitations:

  • Responses are complete documents; streaming is not supported
  • Retry configuration (max_attempts, delays) is global, not per-endpoint

See Also

Related packages:

  • agentic: Shared types (AgentRequest, AgentResponse, etc.)
  • processor/agentic-loop: Loop orchestration
  • processor/agentic-tools: Tool execution
  • github.com/sashabaranov/go-openai: OpenAI SDK

Package agenticmodel provides Prometheus metrics for agentic-model component.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new agentic-model processor component

func Register

func Register(registry RegistryInterface) error

Register registers the agentic-model processor component with the given registry

Types

type ChunkHandler

type ChunkHandler func(chunk StreamChunk)

ChunkHandler is a callback for receiving streaming deltas. Implementations must be safe for concurrent use if the handler is shared.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps OpenAI SDK for agentic model requests

func NewClient

func NewClient(endpoint *model.EndpointConfig) (*Client, error)

NewClient creates a new client for the given endpoint configuration. The default retry config is suitable for unit tests (3 attempts, 100ms initial delay). Call SetRetryConfig before ChatCompletion to apply production settings.

func (*Client) ChatCompletion

func (c *Client) ChatCompletion(ctx context.Context, req agentic.AgentRequest) (agentic.AgentResponse, error)

ChatCompletion sends a chat completion request with retry and throttling.

Retry strategy uses two independent backoff curves:

  • Transient errors (5xx, network): exponential from InitialDelay, up to MaxAttempts
  • Rate limits (429): exponential from RateLimitDelay, up to MaxRateLimitRetries

Both curves cap at MaxDelay and respect ctx cancellation at every wait point.

func (*Client) Close

func (c *Client) Close() error

Close releases resources held by the client

func (*Client) SetAdapter

func (c *Client) SetAdapter(a ProviderAdapter)

SetAdapter sets the provider-specific adapter for normalizing requests and responses. When not set, buildChatRequest falls back to GenericAdapter.

func (*Client) SetChunkHandler

func (c *Client) SetChunkHandler(handler ChunkHandler)

SetChunkHandler sets the callback for receiving streaming deltas.

func (*Client) SetLogger

func (c *Client) SetLogger(l *slog.Logger)

SetLogger sets the logger for debug-level request/response logging.

func (*Client) SetMetrics

func (c *Client) SetMetrics(m *modelMetrics)

SetMetrics sets the metrics instance for recording streaming metrics.

func (*Client) SetRetryConfig

func (c *Client) SetRetryConfig(cfg RetryConfig)

SetRetryConfig replaces the default retry configuration. Call this after NewClient to apply production settings.

func (*Client) SetThrottle

func (c *Client) SetThrottle(t *EndpointThrottle)

SetThrottle attaches a rate/concurrency limiter to this client.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the agentic-model processor

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component (no-op for this component)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns configured input port definitions

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns configured output port definitions

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing agent requests

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully stops the component within the given timeout

type Config

type Config struct {
	Ports                *component.PortConfig `json:"ports"                schema:"type:ports,description:Port configuration,category:basic"`
	StreamName           string                `` /* 132-byte string literal not displayed */
	ConsumerNameSuffix   string                `` /* 127-byte string literal not displayed */
	DeleteConsumerOnStop bool                  `` /* 157-byte string literal not displayed */
	Timeout              string                `json:"timeout"              schema:"type:string,description:Request timeout,category:advanced,default:120s"`
	Retry                RetryConfig           `json:"retry"                schema:"type:object,description:Retry configuration,category:advanced"`
}

Config holds configuration for agentic-model processor component. Model endpoints are resolved from the unified model registry (component.Dependencies.ModelRegistry).

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration for agentic-model processor

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for errors

type EndpointThrottle

type EndpointThrottle struct {
	// contains filtered or unexported fields
}

EndpointThrottle controls request rate to a single API endpoint.

It combines two mechanisms:

  • A token bucket that limits requests per minute (if RequestsPerMinute > 0).
  • A semaphore that caps concurrent in-flight requests (if MaxConcurrent > 0).

Both limits are applied on every Acquire call. Either limit may be disabled independently by setting its value to 0.

Throttle instances are shared across all clients that target the same endpoint URL + model pair, so the limits are enforced across the whole agent team.

func NewEndpointThrottle

func NewEndpointThrottle(requestsPerMinute, maxConcurrent int) *EndpointThrottle

NewEndpointThrottle creates a throttle for the given limits. requestsPerMinute == 0 disables rate limiting. maxConcurrent == 0 disables concurrency limiting.

func (*EndpointThrottle) Acquire

func (t *EndpointThrottle) Acquire(ctx context.Context) error

Acquire blocks until a request slot is available or the context is cancelled. The caller must call Release when the request completes.

func (*EndpointThrottle) Release

func (t *EndpointThrottle) Release()

Release returns a concurrency slot to the semaphore. It is a no-op when MaxConcurrent == 0.

type GeminiAdapter

type GeminiAdapter struct{}

GeminiAdapter normalizes payloads for Google's Gemini OpenAI-compatible endpoint. Gemini's endpoint is broadly compatible but has several quirks that cause 400 errors.

func (*GeminiAdapter) Name

func (a *GeminiAdapter) Name() string

Name returns "gemini".

func (*GeminiAdapter) NormalizeMessages

func (a *GeminiAdapter) NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage

NormalizeMessages fixes two Gemini-specific message constraints:

  1. Tool result messages require a non-empty name field. Without it: 400 INVALID_ARGUMENT: function_response.name cannot be empty.

  2. Assistant messages with tool_calls require a non-empty content field. Gemini rejects a completely absent content — single space is the conventional workaround (used by LiteLLM, OpenAI proxy, etc.).

func (*GeminiAdapter) NormalizeRequest

func (a *GeminiAdapter) NormalizeRequest(_ *openai.ChatCompletionRequest)

NormalizeRequest is a no-op for Gemini; all quirks are message-level.

func (*GeminiAdapter) NormalizeResponse

func (a *GeminiAdapter) NormalizeResponse(_ *openai.ChatCompletionResponse)

NormalizeResponse is a no-op for Gemini; all quirks are on the request side.

func (*GeminiAdapter) NormalizeStreamDelta

func (a *GeminiAdapter) NormalizeStreamDelta(delta openai.ToolCall, lastIndex int) int

NormalizeStreamDelta infers the tool call index when Gemini omits it. Gemini streaming deltas never include an index field. Instead:

  • A non-empty ID signals the start of a new tool call → return -1 (sentinel: caller must allocate the next available index via nextToolIndex).
  • An empty ID is an argument continuation → reuse lastIndex.

type GenericAdapter

type GenericAdapter struct{}

GenericAdapter applies cross-provider safe normalizations that are either required by multiple providers or harmless for all known providers. It is the fallback when no provider-specific adapter is registered.

func (*GenericAdapter) Name

func (a *GenericAdapter) Name() string

Name returns "generic".

func (*GenericAdapter) NormalizeMessages

func (a *GenericAdapter) NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage

NormalizeMessages applies normalizations that are safe across all providers:

  1. Tool result messages get a non-empty name field. The name field is optional in the OpenAI spec but required by Gemini. Setting it universally is harmless.

  2. Assistant messages with tool_calls get a non-empty content field. Gemini rejects absent content; setting it to a single space is a widely-used convention (LiteLLM, OpenAI proxy, etc.) and accepted by all known providers.

reasoning_content omission is handled structurally during message conversion (the field is never copied into the outgoing openai.ChatCompletionMessage).

func (*GenericAdapter) NormalizeRequest

func (a *GenericAdapter) NormalizeRequest(_ *openai.ChatCompletionRequest)

NormalizeRequest is a no-op for the generic adapter.

func (*GenericAdapter) NormalizeResponse

func (a *GenericAdapter) NormalizeResponse(_ *openai.ChatCompletionResponse)

NormalizeResponse is a no-op for the generic adapter.

func (*GenericAdapter) NormalizeStreamDelta

func (a *GenericAdapter) NormalizeStreamDelta(delta openai.ToolCall, lastIndex int) int

NormalizeStreamDelta infers the tool call index when the provider omits it. When an explicit index is provided, it is used directly. When absent, a non-empty ID signals a new tool call (return -1 sentinel so the accumulator allocates the next index), and an empty ID is an argument continuation (reuse lastIndex). This matches the behavior required by Gemini and is harmless for providers that always supply an explicit index.

type OpenAIAdapter

type OpenAIAdapter struct{}

OpenAIAdapter handles OpenAI-specific features. OpenAI's endpoint is the reference implementation — most fields behave as documented, so this adapter is mostly a no-op extension point.

func (*OpenAIAdapter) Name

func (a *OpenAIAdapter) Name() string

Name returns "openai".

func (*OpenAIAdapter) NormalizeMessages

func (a *OpenAIAdapter) NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage

NormalizeMessages returns the messages unchanged; OpenAI requires no quirk fixes.

func (*OpenAIAdapter) NormalizeRequest

func (a *OpenAIAdapter) NormalizeRequest(_ *openai.ChatCompletionRequest)

NormalizeRequest is a no-op for OpenAI; reasoning_effort is set directly from endpoint config in buildChatRequest and handled natively by OpenAI.

func (*OpenAIAdapter) NormalizeResponse

func (a *OpenAIAdapter) NormalizeResponse(_ *openai.ChatCompletionResponse)

NormalizeResponse is a no-op for OpenAI.

func (*OpenAIAdapter) NormalizeStreamDelta

func (a *OpenAIAdapter) NormalizeStreamDelta(delta openai.ToolCall, _ int) int

NormalizeStreamDelta uses the explicit index OpenAI always provides.

type ProviderAdapter

type ProviderAdapter interface {
	// Name returns the provider identifier (e.g., "gemini", "openai").
	Name() string

	// NormalizeRequest adjusts the ChatCompletionRequest before sending.
	// Called after the generic request is built, before the HTTP call.
	NormalizeRequest(req *openai.ChatCompletionRequest)

	// NormalizeMessages adjusts the message array before sending.
	// Called during request building for message-level fixes.
	NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage

	// NormalizeStreamDelta adjusts a streaming tool call delta.
	// Returns the corrected tool call index, or -1 as a sentinel meaning
	// "allocate the next available index" (used when the provider omits it).
	NormalizeStreamDelta(delta openai.ToolCall, lastIndex int) int

	// NormalizeResponse adjusts the ChatCompletionResponse after receiving.
	// Called before the response is converted to AgentResponse.
	NormalizeResponse(resp *openai.ChatCompletionResponse)
}

ProviderAdapter normalizes request/response payloads for a specific LLM provider's OpenAI-compatible endpoint. Adapters handle quirks that would otherwise cause 400 errors or silent data corruption.

func AdapterFor

func AdapterFor(provider string) ProviderAdapter

AdapterFor returns the appropriate adapter for the given provider name. Falls back to GenericAdapter for unknown providers.

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(component.RegistrationConfig) error
}

RegistryInterface defines the minimal interface needed for registration

type RetryConfig

type RetryConfig struct {
	MaxAttempts         int    `` /* 145-byte string literal not displayed */
	MaxRateLimitRetries int    `` /* 171-byte string literal not displayed */
	Backoff             string `` /* 139-byte string literal not displayed */
	InitialDelay        string `` /* 132-byte string literal not displayed */
	MaxDelay            string `` /* 155-byte string literal not displayed */
	RateLimitDelay      string `` /* 155-byte string literal not displayed */
}

RetryConfig holds retry configuration

func (*RetryConfig) Validate

func (r *RetryConfig) Validate() error

Validate checks the retry configuration for errors

type StreamChunk

type StreamChunk struct {
	RequestID      string `json:"request_id"`
	ContentDelta   string `json:"content_delta,omitempty"`
	ReasoningDelta string `json:"reasoning_delta,omitempty"`
	Done           bool   `json:"done,omitempty"`
}

StreamChunk represents a single streaming delta for real-time monitoring. Chunks are ephemeral — published via core NATS (fire-and-forget), not JetStream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL