Documentation
¶
Overview ¶
Package agenticmodel provides an OpenAI-compatible agentic model processor component that routes agent requests to configured LLM endpoints with retry logic and tool calling support.
Package agenticmodel provides the model integration processor for the SemStreams agentic system.
Overview ¶
The agentic-model processor routes agent requests to OpenAI-compatible LLM endpoints. It receives AgentRequest messages from the loop orchestrator, calls the appropriate model endpoint, and publishes AgentResponse messages back. The processor supports tool calling, retry with backoff, and token tracking. Model endpoints are resolved from the unified model registry (component.Dependencies.ModelRegistry).
This processor acts as the bridge between the agentic orchestration layer and external LLM services (OpenAI, Ollama, LiteLLM, vLLM, or any OpenAI-compatible API).
Architecture ¶
The model processor sits between the loop orchestrator and external LLM services:
┌───────────────┐ ┌────────────────┐ ┌──────────────────┐ │ agentic-loop │────▶│ agentic-model │────▶│ LLM Endpoint │ │ │ │ (this pkg) │ │ (OpenAI, Ollama) │ │ │◀────│ │◀────│ │ └───────────────┘ └────────────────┘ └──────────────────┘ agent.request.* HTTP/HTTPS OpenAI-compatible agent.response.* /v1/chat/completions
Quick Start ¶
Configure the model registry in the top-level config and start the processor:
config := agenticmodel.Config{
StreamName: "AGENT",
Timeout: "120s",
}
// Model endpoints are resolved from deps.ModelRegistry (set in config.model_registry)
rawConfig, _ := json.Marshal(config)
comp, err := agenticmodel.NewComponent(rawConfig, deps)
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)
Endpoint Resolution ¶
When processing an AgentRequest, the processor resolves the endpoint from the unified model registry by looking up the request's Model field. Clients are created dynamically and cached for reuse.
If the resolved endpoint has SupportsTools=false, any tools in the request are stripped and a warning is logged.
OpenAI Compatibility ¶
The processor uses the sashabaranov/go-openai SDK and is compatible with any API that implements the OpenAI chat completions interface:
- OpenAI API (api.openai.com)
- Azure OpenAI Service
- Ollama (with OpenAI compatibility layer)
- LiteLLM proxy
- vLLM with OpenAI server
- LocalAI
- Any OpenAI-compatible proxy
Tool Support ¶
The processor fully supports tool calling (function calling):
Incoming request with tools:
request := agentic.AgentRequest{
Model: "gpt-4",
Messages: []agentic.ChatMessage{
{Role: "user", Content: "Read the config file"},
},
Tools: []agentic.ToolDefinition{
{
Name: "read_file",
Description: "Read file contents",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"path": map[string]any{"type": "string"},
},
},
},
},
}
Response with tool calls:
response := agentic.AgentResponse{
Status: "tool_call",
Message: agentic.ChatMessage{
Role: "assistant",
ToolCalls: []agentic.ToolCall{
{ID: "call_001", Name: "read_file", Arguments: map[string]any{"path": "config.yaml"}},
},
},
}
The processor converts between agentic.ToolDefinition and OpenAI's function schema format automatically.
Response Status Mapping ¶
The processor maps OpenAI finish reasons to agentic status:
- "stop" → "complete" (normal completion)
- "length" → "complete" (max tokens reached)
- "tool_calls" → "tool_call" (model wants to use tools)
- Any error → "error" with error message
Retry Logic ¶
The processor implements retry using pkg/retry with exponential backoff and jitter.
- Default: 3 attempts, 1s initial delay, 60s max delay
- Tests use 100ms initial delay for fast feedback
- HTTP 429: Detected via openai.APIError.HTTPStatusCode and openai.RequestError.HTTPStatusCode. An extra rate_limit_delay (default 5s) is prepended before normal backoff begins.
- Retryable: 429, 500, 502, 503, 504, and network errors
- Non-retryable: 400, 401, 403, 404, context cancellation
Configuration:
"retry": {
"max_attempts": 3,
"initial_delay": "1s",
"max_delay": "60s",
"rate_limit_delay": "5s"
}
Token Tracking ¶
Every response includes token usage for cost monitoring and rate limiting:
response.TokenUsage.PromptTokens // Input tokens response.TokenUsage.CompletionTokens // Output tokens response.TokenUsage.Total() // Sum of both
Token counts come directly from the LLM provider's response.
Configuration Reference ¶
Full configuration schema (endpoints are in the top-level model_registry):
{
"timeout": "string (default: 120s)",
"stream_name": "string (default: AGENT)",
"consumer_name_suffix": "string (optional)",
"retry": {
"max_attempts": "int (default: 3)",
"initial_delay": "string (default: 1s)",
"max_delay": "string (default: 60s)",
"rate_limit_delay": "string (default: 5s)"
},
"ports": {
"inputs": [...],
"outputs": [...]
}
}
Endpoint-level fields in model_registry:
{
"url": "string",
"model": "string",
"api_key_env": "string (optional)",
"requests_per_minute": "int (0 = unlimited)",
"max_concurrent": "int (0 = unlimited)"
}
Ports ¶
Input ports (JetStream consumers):
- agent.request: Agent requests from agentic-loop (subject: agent.request.>)
Output ports (JetStream publishers):
- agent.response: Model responses to agentic-loop (subject: agent.response.*)
Message Flow ¶
The processor handles each request through:
- Receive AgentRequest from agent.request.>
- Resolve endpoint by model name
- Acquire throttle slot (rate limiter token + concurrency semaphore)
- Convert AgentRequest to OpenAI format
- Call LLM endpoint with retry logic
- Release throttle slot
- Convert OpenAI response to AgentResponse
- Publish to agent.response.{request_id}
- Acknowledge JetStream message
Client Architecture ¶
The processor dynamically creates and caches Client instances per endpoint:
client, err := NewClient(endpointConfig) response, err := client.ChatCompletion(ctx, request)
Clients are cached by URL|Model key with mutex protection for concurrent access. Clients wrap the go-openai SDK and handle:
- API key injection from environment variables
- Request/response type conversion
- Retry with exponential backoff
- Context cancellation propagation
Error Handling ¶
Errors are returned as AgentResponse with status="error":
response := agentic.AgentResponse{
RequestID: "req_123",
Status: "error",
Error: "endpoint not found: unknown-model",
}
Error categories:
- Endpoint resolution errors: Model not found in registry
- Request validation errors: Invalid request format
- Network errors: Connection failures (may retry)
- API errors: 4xx/5xx from LLM provider
- Timeout errors: Request exceeded timeout
Environment Variables ¶
API keys are read from environment variables specified in endpoint config:
export OPENAI_API_KEY="sk-..." export ANTHROPIC_API_KEY="..."
Endpoint config:
{
"url": "https://api.openai.com/v1/chat/completions",
"model": "gpt-4",
"api_key_env": "OPENAI_API_KEY"
}
If api_key_env is not specified, requests are made without authentication (suitable for local models like Ollama).
Thread Safety ¶
The Component is safe for concurrent use after Start() is called. Multiple goroutines can process requests concurrently. Each request creates its own Client instance, avoiding shared state issues.
Testing ¶
For testing, use the ConsumerNameSuffix config option and provide a model registry:
config := agenticmodel.Config{
StreamName: "AGENT",
ConsumerNameSuffix: "test-" + t.Name(),
}
// Provide endpoints via model registry in deps.ModelRegistry
deps.ModelRegistry = &model.Registry{
Endpoints: map[string]*model.EndpointConfig{
"test-model": {URL: mockServer.URL, Model: "test-model", MaxTokens: 128000},
},
Defaults: model.DefaultsConfig{Model: "test-model"},
}
Use httptest.Server to mock the LLM endpoint in tests.
Limitations ¶
Current limitations:
- Responses are complete documents; streaming is not supported
- Retry configuration (max_attempts, delays) is global, not per-endpoint
See Also ¶
Related packages:
- agentic: Shared types (AgentRequest, AgentResponse, etc.)
- processor/agentic-loop: Loop orchestration
- processor/agentic-tools: Tool execution
- github.com/sashabaranov/go-openai: OpenAI SDK
Package agenticmodel provides Prometheus metrics for agentic-model component.
Index ¶
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- type ChunkHandler
- type Client
- func (c *Client) ChatCompletion(ctx context.Context, req agentic.AgentRequest) (agentic.AgentResponse, error)
- func (c *Client) Close() error
- func (c *Client) SetChunkHandler(handler ChunkHandler)
- func (c *Client) SetLogger(l *slog.Logger)
- func (c *Client) SetMetrics(m *modelMetrics)
- func (c *Client) SetRetryConfig(cfg RetryConfig)
- func (c *Client) SetThrottle(t *EndpointThrottle)
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type EndpointThrottle
- type RegistryInterface
- type RetryConfig
- type StreamChunk
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new agentic-model processor component
func Register ¶
func Register(registry RegistryInterface) error
Register registers the agentic-model processor component with the given registry
Types ¶
type ChunkHandler ¶
type ChunkHandler func(chunk StreamChunk)
ChunkHandler is a callback for receiving streaming deltas. Implementations must be safe for concurrent use if the handler is shared.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps OpenAI SDK for agentic model requests
func NewClient ¶
func NewClient(endpoint *model.EndpointConfig) (*Client, error)
NewClient creates a new client for the given endpoint configuration. The default retry config is suitable for unit tests (3 attempts, 100ms initial delay). Call SetRetryConfig before ChatCompletion to apply production settings.
func (*Client) ChatCompletion ¶
func (c *Client) ChatCompletion(ctx context.Context, req agentic.AgentRequest) (agentic.AgentResponse, error)
ChatCompletion sends a chat completion request with retry and throttling.
func (*Client) SetChunkHandler ¶
func (c *Client) SetChunkHandler(handler ChunkHandler)
SetChunkHandler sets the callback for receiving streaming deltas.
func (*Client) SetMetrics ¶
func (c *Client) SetMetrics(m *modelMetrics)
SetMetrics sets the metrics instance for recording streaming metrics.
func (*Client) SetRetryConfig ¶
func (c *Client) SetRetryConfig(cfg RetryConfig)
SetRetryConfig replaces the default retry configuration. Call this after NewClient to apply production settings.
func (*Client) SetThrottle ¶
func (c *Client) SetThrottle(t *EndpointThrottle)
SetThrottle attaches a rate/concurrency limiter to this client.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the agentic-model processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status
func (*Component) Initialize ¶
Initialize prepares the component (no-op for this component)
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
StreamName string `` /* 132-byte string literal not displayed */
ConsumerNameSuffix string `` /* 127-byte string literal not displayed */
DeleteConsumerOnStop bool `` /* 157-byte string literal not displayed */
Timeout string `json:"timeout" schema:"type:string,description:Request timeout,category:advanced,default:120s"`
Retry RetryConfig `json:"retry" schema:"type:object,description:Retry configuration,category:advanced"`
}
Config holds configuration for agentic-model processor component. Model endpoints are resolved from the unified model registry (component.Dependencies.ModelRegistry).
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns default configuration for agentic-model processor
type EndpointThrottle ¶
type EndpointThrottle struct {
// contains filtered or unexported fields
}
EndpointThrottle controls request rate to a single API endpoint.
It combines two mechanisms:
- A token bucket that limits requests per minute (if RequestsPerMinute > 0).
- A semaphore that caps concurrent in-flight requests (if MaxConcurrent > 0).
Both limits are applied on every Acquire call. Either limit may be disabled independently by setting its value to 0.
Throttle instances are shared across all clients that target the same endpoint URL + model pair, so the limits are enforced across the whole agent team.
func NewEndpointThrottle ¶
func NewEndpointThrottle(requestsPerMinute, maxConcurrent int) *EndpointThrottle
NewEndpointThrottle creates a throttle for the given limits. requestsPerMinute == 0 disables rate limiting. maxConcurrent == 0 disables concurrency limiting.
func (*EndpointThrottle) Acquire ¶
func (t *EndpointThrottle) Acquire(ctx context.Context) error
Acquire blocks until a request slot is available or the context is cancelled. The caller must call Release when the request completes.
func (*EndpointThrottle) Release ¶
func (t *EndpointThrottle) Release()
Release returns a concurrency slot to the semaphore. It is a no-op when MaxConcurrent == 0.
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface defines the minimal interface needed for registration
type RetryConfig ¶
type RetryConfig struct {
MaxAttempts int `json:"max_attempts" schema:"type:int,description:Maximum retry attempts,category:advanced,default:3"`
Backoff string `` /* 133-byte string literal not displayed */
InitialDelay string `json:"initial_delay" schema:"type:string,description:Initial retry delay,category:advanced,default:1s"`
MaxDelay string `json:"max_delay" schema:"type:string,description:Maximum retry delay,category:advanced,default:60s"`
RateLimitDelay string `json:"rate_limit_delay" schema:"type:string,description:Initial delay when rate limited (429),category:advanced,default:5s"`
}
RetryConfig holds retry configuration
func (*RetryConfig) Validate ¶
func (r *RetryConfig) Validate() error
Validate checks the retry configuration for errors
type StreamChunk ¶
type StreamChunk struct {
RequestID string `json:"request_id"`
ContentDelta string `json:"content_delta,omitempty"`
ReasoningDelta string `json:"reasoning_delta,omitempty"`
Done bool `json:"done,omitempty"`
}
StreamChunk represents a single streaming delta for real-time monitoring. Chunks are ephemeral — published via core NATS (fire-and-forget), not JetStream.