Documentation
¶
Overview ¶
Package agenticmodel provides an OpenAI-compatible agentic model processor component that routes agent requests to configured LLM endpoints with retry logic and tool calling support.
Package agenticmodel provides the model integration processor for the SemStreams agentic system.
Overview ¶
The agentic-model processor routes agent requests to OpenAI-compatible LLM endpoints. It receives AgentRequest messages from the loop orchestrator, calls the appropriate model endpoint, and publishes AgentResponse messages back. The processor supports tool calling, retry with backoff, and token tracking. Model endpoints are resolved from the unified model registry (component.Dependencies.ModelRegistry).
This processor acts as the bridge between the agentic orchestration layer and external LLM services (OpenAI, Ollama, LiteLLM, vLLM, or any OpenAI-compatible API).
Architecture ¶
The model processor sits between the loop orchestrator and external LLM services:
┌───────────────┐ ┌────────────────┐ ┌──────────────────┐ │ agentic-loop │────▶│ agentic-model │────▶│ LLM Endpoint │ │ │ │ (this pkg) │ │ (OpenAI, Ollama) │ │ │◀────│ │◀────│ │ └───────────────┘ └────────────────┘ └──────────────────┘ agent.request.* HTTP/HTTPS OpenAI-compatible agent.response.* /v1/chat/completions
Quick Start ¶
Configure the model registry in the top-level config and start the processor:
config := agenticmodel.Config{
StreamName: "AGENT",
Timeout: "120s",
}
// Model endpoints are resolved from deps.ModelRegistry (set in config.model_registry)
rawConfig, _ := json.Marshal(config)
comp, err := agenticmodel.NewComponent(rawConfig, deps)
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)
Endpoint Resolution ¶
When processing an AgentRequest, the processor resolves the endpoint from the unified model registry by looking up the request's Model field. Clients are created dynamically and cached for reuse.
If the resolved endpoint has SupportsTools=false, any tools in the request are stripped and a warning is logged.
OpenAI Compatibility ¶
The processor uses the sashabaranov/go-openai SDK and is compatible with any API that implements the OpenAI chat completions interface:
- OpenAI API (api.openai.com)
- Azure OpenAI Service
- Ollama (with OpenAI compatibility layer)
- LiteLLM proxy
- vLLM with OpenAI server
- LocalAI
- Any OpenAI-compatible proxy
Tool Support ¶
The processor fully supports tool calling (function calling):
Incoming request with tools:
request := agentic.AgentRequest{
Model: "gpt-4",
Messages: []agentic.ChatMessage{
{Role: "user", Content: "Read the config file"},
},
Tools: []agentic.ToolDefinition{
{
Name: "read_file",
Description: "Read file contents",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"path": map[string]any{"type": "string"},
},
},
},
},
}
Response with tool calls:
response := agentic.AgentResponse{
Status: "tool_call",
Message: agentic.ChatMessage{
Role: "assistant",
ToolCalls: []agentic.ToolCall{
{ID: "call_001", Name: "read_file", Arguments: map[string]any{"path": "config.yaml"}},
},
},
}
The processor converts between agentic.ToolDefinition and OpenAI's function schema format automatically.
Response Status Mapping ¶
The processor maps OpenAI finish reasons to agentic status:
- "stop" → "complete" (normal completion)
- "length" → "complete" (max tokens reached)
- "tool_calls" → "tool_call" (model wants to use tools)
- Any error → "error" with error message
Retry Logic ¶
The processor implements retry using pkg/retry with exponential backoff and jitter.
- Default: 3 attempts, 1s initial delay, 60s max delay
- Tests use 100ms initial delay for fast feedback
- HTTP 429: Detected via openai.APIError.HTTPStatusCode and openai.RequestError.HTTPStatusCode. An extra rate_limit_delay (default 5s) is prepended before normal backoff begins.
- Retryable: 429, 500, 502, 503, 504, and network errors
- Non-retryable: 400, 401, 403, 404, context cancellation
Configuration:
"retry": {
"max_attempts": 3,
"initial_delay": "1s",
"max_delay": "60s",
"rate_limit_delay": "5s"
}
Token Tracking ¶
Every response includes token usage for cost monitoring and rate limiting:
response.TokenUsage.PromptTokens // Input tokens response.TokenUsage.CompletionTokens // Output tokens response.TokenUsage.Total() // Sum of both
Token counts come directly from the LLM provider's response.
Configuration Reference ¶
Full configuration schema (endpoints are in the top-level model_registry):
{
"timeout": "string (default: 120s)",
"stream_name": "string (default: AGENT)",
"consumer_name_suffix": "string (optional)",
"retry": {
"max_attempts": "int (default: 3)",
"initial_delay": "string (default: 1s)",
"max_delay": "string (default: 60s)",
"rate_limit_delay": "string (default: 5s)"
},
"ports": {
"inputs": [...],
"outputs": [...]
}
}
Endpoint-level fields in model_registry:
{
"url": "string",
"model": "string",
"api_key_env": "string (optional)",
"requests_per_minute": "int (0 = unlimited)",
"max_concurrent": "int (0 = unlimited)"
}
Ports ¶
Input ports (JetStream consumers):
- agent.request: Agent requests from agentic-loop (subject: agent.request.>)
Output ports (JetStream publishers):
- agent.response: Model responses to agentic-loop (subject: agent.response.*)
Message Flow ¶
The processor handles each request through:
- Receive AgentRequest from agent.request.>
- Resolve endpoint by model name
- Acquire throttle slot (rate limiter token + concurrency semaphore)
- Convert AgentRequest to OpenAI format
- Call LLM endpoint with retry logic
- Release throttle slot
- Convert OpenAI response to AgentResponse
- Publish to agent.response.{request_id}
- Acknowledge JetStream message
Client Architecture ¶
The processor dynamically creates and caches Client instances per endpoint:
client, err := NewClient(endpointConfig) response, err := client.ChatCompletion(ctx, request)
Clients are cached by URL|Model key with mutex protection for concurrent access. Clients wrap the go-openai SDK and handle:
- API key injection from environment variables
- Request/response type conversion
- Retry with exponential backoff
- Context cancellation propagation
Error Handling ¶
Errors are returned as AgentResponse with status="error":
response := agentic.AgentResponse{
RequestID: "req_123",
Status: "error",
Error: "endpoint not found: unknown-model",
}
Error categories:
- Endpoint resolution errors: Model not found in registry
- Request validation errors: Invalid request format
- Network errors: Connection failures (may retry)
- API errors: 4xx/5xx from LLM provider
- Timeout errors: Request exceeded timeout
Environment Variables ¶
API keys are read from environment variables specified in endpoint config:
export OPENAI_API_KEY="sk-..." export ANTHROPIC_API_KEY="..."
Endpoint config:
{
"url": "https://api.openai.com/v1/chat/completions",
"model": "gpt-4",
"api_key_env": "OPENAI_API_KEY"
}
If api_key_env is not specified, requests are made without authentication (suitable for local models like Ollama).
Thread Safety ¶
The Component is safe for concurrent use after Start() is called. Multiple goroutines can process requests concurrently. Each request creates its own Client instance, avoiding shared state issues.
Testing ¶
For testing, use the ConsumerNameSuffix config option and provide a model registry:
config := agenticmodel.Config{
StreamName: "AGENT",
ConsumerNameSuffix: "test-" + t.Name(),
}
// Provide endpoints via model registry in deps.ModelRegistry
deps.ModelRegistry = &model.Registry{
Endpoints: map[string]*model.EndpointConfig{
"test-model": {URL: mockServer.URL, Model: "test-model", MaxTokens: 128000},
},
Defaults: model.DefaultsConfig{Model: "test-model"},
}
Use httptest.Server to mock the LLM endpoint in tests.
Limitations ¶
Current limitations:
- Responses are complete documents; streaming is not supported
- Retry configuration (max_attempts, delays) is global, not per-endpoint
See Also ¶
Related packages:
- agentic: Shared types (AgentRequest, AgentResponse, etc.)
- processor/agentic-loop: Loop orchestration
- processor/agentic-tools: Tool execution
- github.com/sashabaranov/go-openai: OpenAI SDK
Package agenticmodel provides Prometheus metrics for agentic-model component.
Index ¶
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- type ChunkHandler
- type Client
- func (c *Client) ChatCompletion(ctx context.Context, req agentic.AgentRequest) (agentic.AgentResponse, error)
- func (c *Client) Close() error
- func (c *Client) SetAdapter(a ProviderAdapter)
- func (c *Client) SetChunkHandler(handler ChunkHandler)
- func (c *Client) SetLogger(l *slog.Logger)
- func (c *Client) SetMetrics(m *modelMetrics)
- func (c *Client) SetRetryConfig(cfg RetryConfig)
- func (c *Client) SetThrottle(t *EndpointThrottle)
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type EndpointThrottle
- type GeminiAdapter
- func (a *GeminiAdapter) Name() string
- func (a *GeminiAdapter) NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage
- func (a *GeminiAdapter) NormalizeRequest(_ *openai.ChatCompletionRequest)
- func (a *GeminiAdapter) NormalizeResponse(_ *openai.ChatCompletionResponse)
- func (a *GeminiAdapter) NormalizeStreamDelta(delta openai.ToolCall, lastIndex int) int
- type GenericAdapter
- func (a *GenericAdapter) Name() string
- func (a *GenericAdapter) NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage
- func (a *GenericAdapter) NormalizeRequest(_ *openai.ChatCompletionRequest)
- func (a *GenericAdapter) NormalizeResponse(_ *openai.ChatCompletionResponse)
- func (a *GenericAdapter) NormalizeStreamDelta(delta openai.ToolCall, lastIndex int) int
- type OpenAIAdapter
- func (a *OpenAIAdapter) Name() string
- func (a *OpenAIAdapter) NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage
- func (a *OpenAIAdapter) NormalizeRequest(_ *openai.ChatCompletionRequest)
- func (a *OpenAIAdapter) NormalizeResponse(_ *openai.ChatCompletionResponse)
- func (a *OpenAIAdapter) NormalizeStreamDelta(delta openai.ToolCall, _ int) int
- type ProviderAdapter
- type RegistryInterface
- type RetryConfig
- type StreamChunk
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new agentic-model processor component
func Register ¶
func Register(registry RegistryInterface) error
Register registers the agentic-model processor component with the given registry
Types ¶
type ChunkHandler ¶
type ChunkHandler func(chunk StreamChunk)
ChunkHandler is a callback for receiving streaming deltas. Implementations must be safe for concurrent use if the handler is shared.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps OpenAI SDK for agentic model requests
func NewClient ¶
func NewClient(endpoint *model.EndpointConfig) (*Client, error)
NewClient creates a new client for the given endpoint configuration. The default retry config is suitable for unit tests (3 attempts, 100ms initial delay). Call SetRetryConfig before ChatCompletion to apply production settings.
func (*Client) ChatCompletion ¶
func (c *Client) ChatCompletion(ctx context.Context, req agentic.AgentRequest) (agentic.AgentResponse, error)
ChatCompletion sends a chat completion request with retry and throttling.
Retry strategy uses two independent backoff curves:
- Transient errors (5xx, network): exponential from InitialDelay, up to MaxAttempts
- Rate limits (429): exponential from RateLimitDelay, up to MaxRateLimitRetries
Both curves cap at MaxDelay and respect ctx cancellation at every wait point.
func (*Client) SetAdapter ¶
func (c *Client) SetAdapter(a ProviderAdapter)
SetAdapter sets the provider-specific adapter for normalizing requests and responses. When not set, buildChatRequest falls back to GenericAdapter.
func (*Client) SetChunkHandler ¶
func (c *Client) SetChunkHandler(handler ChunkHandler)
SetChunkHandler sets the callback for receiving streaming deltas.
func (*Client) SetMetrics ¶
func (c *Client) SetMetrics(m *modelMetrics)
SetMetrics sets the metrics instance for recording streaming metrics.
func (*Client) SetRetryConfig ¶
func (c *Client) SetRetryConfig(cfg RetryConfig)
SetRetryConfig replaces the default retry configuration. Call this after NewClient to apply production settings.
func (*Client) SetThrottle ¶
func (c *Client) SetThrottle(t *EndpointThrottle)
SetThrottle attaches a rate/concurrency limiter to this client.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the agentic-model processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status
func (*Component) Initialize ¶
Initialize prepares the component (no-op for this component)
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
StreamName string `` /* 132-byte string literal not displayed */
ConsumerNameSuffix string `` /* 127-byte string literal not displayed */
DeleteConsumerOnStop bool `` /* 157-byte string literal not displayed */
Timeout string `json:"timeout" schema:"type:string,description:Request timeout,category:advanced,default:120s"`
Retry RetryConfig `json:"retry" schema:"type:object,description:Retry configuration,category:advanced"`
}
Config holds configuration for agentic-model processor component. Model endpoints are resolved from the unified model registry (component.Dependencies.ModelRegistry).
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns default configuration for agentic-model processor
type EndpointThrottle ¶
type EndpointThrottle struct {
// contains filtered or unexported fields
}
EndpointThrottle controls request rate to a single API endpoint.
It combines two mechanisms:
- A token bucket that limits requests per minute (if RequestsPerMinute > 0).
- A semaphore that caps concurrent in-flight requests (if MaxConcurrent > 0).
Both limits are applied on every Acquire call. Either limit may be disabled independently by setting its value to 0.
Throttle instances are shared across all clients that target the same endpoint URL + model pair, so the limits are enforced across the whole agent team.
func NewEndpointThrottle ¶
func NewEndpointThrottle(requestsPerMinute, maxConcurrent int) *EndpointThrottle
NewEndpointThrottle creates a throttle for the given limits. requestsPerMinute == 0 disables rate limiting. maxConcurrent == 0 disables concurrency limiting.
func (*EndpointThrottle) Acquire ¶
func (t *EndpointThrottle) Acquire(ctx context.Context) error
Acquire blocks until a request slot is available or the context is cancelled. The caller must call Release when the request completes.
func (*EndpointThrottle) Release ¶
func (t *EndpointThrottle) Release()
Release returns a concurrency slot to the semaphore. It is a no-op when MaxConcurrent == 0.
type GeminiAdapter ¶
type GeminiAdapter struct{}
GeminiAdapter normalizes payloads for Google's Gemini OpenAI-compatible endpoint. Gemini's endpoint is broadly compatible but has several quirks that cause 400 errors.
func (*GeminiAdapter) NormalizeMessages ¶
func (a *GeminiAdapter) NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage
NormalizeMessages fixes two Gemini-specific message constraints:
Tool result messages require a non-empty name field. Without it: 400 INVALID_ARGUMENT: function_response.name cannot be empty.
Assistant messages with tool_calls require a non-empty content field. Gemini rejects a completely absent content — single space is the conventional workaround (used by LiteLLM, OpenAI proxy, etc.).
func (*GeminiAdapter) NormalizeRequest ¶
func (a *GeminiAdapter) NormalizeRequest(_ *openai.ChatCompletionRequest)
NormalizeRequest is a no-op for Gemini; all quirks are message-level.
func (*GeminiAdapter) NormalizeResponse ¶
func (a *GeminiAdapter) NormalizeResponse(_ *openai.ChatCompletionResponse)
NormalizeResponse is a no-op for Gemini; all quirks are on the request side.
func (*GeminiAdapter) NormalizeStreamDelta ¶
func (a *GeminiAdapter) NormalizeStreamDelta(delta openai.ToolCall, lastIndex int) int
NormalizeStreamDelta infers the tool call index when Gemini omits it. Gemini streaming deltas never include an index field. Instead:
- A non-empty ID signals the start of a new tool call → return -1 (sentinel: caller must allocate the next available index via nextToolIndex).
- An empty ID is an argument continuation → reuse lastIndex.
type GenericAdapter ¶
type GenericAdapter struct{}
GenericAdapter applies cross-provider safe normalizations that are either required by multiple providers or harmless for all known providers. It is the fallback when no provider-specific adapter is registered.
func (*GenericAdapter) NormalizeMessages ¶
func (a *GenericAdapter) NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage
NormalizeMessages applies normalizations that are safe across all providers:
Tool result messages get a non-empty name field. The name field is optional in the OpenAI spec but required by Gemini. Setting it universally is harmless.
Assistant messages with tool_calls get a non-empty content field. Gemini rejects absent content; setting it to a single space is a widely-used convention (LiteLLM, OpenAI proxy, etc.) and accepted by all known providers.
reasoning_content omission is handled structurally during message conversion (the field is never copied into the outgoing openai.ChatCompletionMessage).
func (*GenericAdapter) NormalizeRequest ¶
func (a *GenericAdapter) NormalizeRequest(_ *openai.ChatCompletionRequest)
NormalizeRequest is a no-op for the generic adapter.
func (*GenericAdapter) NormalizeResponse ¶
func (a *GenericAdapter) NormalizeResponse(_ *openai.ChatCompletionResponse)
NormalizeResponse is a no-op for the generic adapter.
func (*GenericAdapter) NormalizeStreamDelta ¶
func (a *GenericAdapter) NormalizeStreamDelta(delta openai.ToolCall, lastIndex int) int
NormalizeStreamDelta infers the tool call index when the provider omits it. When an explicit index is provided, it is used directly. When absent, a non-empty ID signals a new tool call (return -1 sentinel so the accumulator allocates the next index), and an empty ID is an argument continuation (reuse lastIndex). This matches the behavior required by Gemini and is harmless for providers that always supply an explicit index.
type OpenAIAdapter ¶
type OpenAIAdapter struct{}
OpenAIAdapter handles OpenAI-specific features. OpenAI's endpoint is the reference implementation — most fields behave as documented, so this adapter is mostly a no-op extension point.
func (*OpenAIAdapter) NormalizeMessages ¶
func (a *OpenAIAdapter) NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage
NormalizeMessages returns the messages unchanged; OpenAI requires no quirk fixes.
func (*OpenAIAdapter) NormalizeRequest ¶
func (a *OpenAIAdapter) NormalizeRequest(_ *openai.ChatCompletionRequest)
NormalizeRequest is a no-op for OpenAI; reasoning_effort is set directly from endpoint config in buildChatRequest and handled natively by OpenAI.
func (*OpenAIAdapter) NormalizeResponse ¶
func (a *OpenAIAdapter) NormalizeResponse(_ *openai.ChatCompletionResponse)
NormalizeResponse is a no-op for OpenAI.
func (*OpenAIAdapter) NormalizeStreamDelta ¶
func (a *OpenAIAdapter) NormalizeStreamDelta(delta openai.ToolCall, _ int) int
NormalizeStreamDelta uses the explicit index OpenAI always provides.
type ProviderAdapter ¶
type ProviderAdapter interface {
// Name returns the provider identifier (e.g., "gemini", "openai").
Name() string
// NormalizeRequest adjusts the ChatCompletionRequest before sending.
// Called after the generic request is built, before the HTTP call.
NormalizeRequest(req *openai.ChatCompletionRequest)
// NormalizeMessages adjusts the message array before sending.
// Called during request building for message-level fixes.
NormalizeMessages(messages []openai.ChatCompletionMessage) []openai.ChatCompletionMessage
// NormalizeStreamDelta adjusts a streaming tool call delta.
// Returns the corrected tool call index, or -1 as a sentinel meaning
// "allocate the next available index" (used when the provider omits it).
NormalizeStreamDelta(delta openai.ToolCall, lastIndex int) int
// NormalizeResponse adjusts the ChatCompletionResponse after receiving.
// Called before the response is converted to AgentResponse.
NormalizeResponse(resp *openai.ChatCompletionResponse)
}
ProviderAdapter normalizes request/response payloads for a specific LLM provider's OpenAI-compatible endpoint. Adapters handle quirks that would otherwise cause 400 errors or silent data corruption.
func AdapterFor ¶
func AdapterFor(provider string) ProviderAdapter
AdapterFor returns the appropriate adapter for the given provider name. Falls back to GenericAdapter for unknown providers.
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface defines the minimal interface needed for registration
type RetryConfig ¶
type RetryConfig struct {
MaxAttempts int `` /* 145-byte string literal not displayed */
MaxRateLimitRetries int `` /* 171-byte string literal not displayed */
Backoff string `` /* 139-byte string literal not displayed */
InitialDelay string `` /* 132-byte string literal not displayed */
MaxDelay string `` /* 155-byte string literal not displayed */
RateLimitDelay string `` /* 155-byte string literal not displayed */
}
RetryConfig holds retry configuration
func (*RetryConfig) Validate ¶
func (r *RetryConfig) Validate() error
Validate checks the retry configuration for errors
type StreamChunk ¶
type StreamChunk struct {
RequestID string `json:"request_id"`
ContentDelta string `json:"content_delta,omitempty"`
ReasoningDelta string `json:"reasoning_delta,omitempty"`
Done bool `json:"done,omitempty"`
}
StreamChunk represents a single streaming delta for real-time monitoring. Chunks are ephemeral — published via core NATS (fire-and-forget), not JetStream.