Documentation
¶
Overview ¶
Package agenticmodel provides an OpenAI-compatible agentic model processor component that routes agent requests to configured LLM endpoints with retry logic and tool calling support.
Package agenticmodel provides the model integration processor for the SemStreams agentic system.
Overview ¶
The agentic-model processor routes agent requests to OpenAI-compatible LLM endpoints. It receives AgentRequest messages from the loop orchestrator, calls the appropriate model endpoint, and publishes AgentResponse messages back. The processor supports tool calling, retry with backoff, and token tracking. Model endpoints are resolved from the unified model registry (component.Dependencies.ModelRegistry).
This processor acts as the bridge between the agentic orchestration layer and external LLM services (OpenAI, Ollama, LiteLLM, vLLM, or any OpenAI-compatible API).
Architecture ¶
The model processor sits between the loop orchestrator and external LLM services:
┌───────────────┐ ┌────────────────┐ ┌──────────────────┐ │ agentic-loop │────▶│ agentic-model │────▶│ LLM Endpoint │ │ │ │ (this pkg) │ │ (OpenAI, Ollama) │ │ │◀────│ │◀────│ │ └───────────────┘ └────────────────┘ └──────────────────┘ agent.request.* HTTP/HTTPS OpenAI-compatible agent.response.* /v1/chat/completions
Quick Start ¶
Configure the model registry in the top-level config and start the processor:
config := agenticmodel.Config{
StreamName: "AGENT",
Timeout: "120s",
}
// Model endpoints are resolved from deps.ModelRegistry (set in config.model_registry)
rawConfig, _ := json.Marshal(config)
comp, err := agenticmodel.NewComponent(rawConfig, deps)
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)
Endpoint Resolution ¶
When processing an AgentRequest, the processor resolves the endpoint from the unified model registry by looking up the request's Model field. Clients are created dynamically and cached for reuse.
If the resolved endpoint has SupportsTools=false, any tools in the request are stripped and a warning is logged.
OpenAI Compatibility ¶
The processor uses the sashabaranov/go-openai SDK and is compatible with any API that implements the OpenAI chat completions interface:
- OpenAI API (api.openai.com)
- Azure OpenAI Service
- Ollama (with OpenAI compatibility layer)
- LiteLLM proxy
- vLLM with OpenAI server
- LocalAI
- Any OpenAI-compatible proxy
Tool Support ¶
The processor fully supports tool calling (function calling):
Incoming request with tools:
request := agentic.AgentRequest{
Model: "gpt-4",
Messages: []agentic.ChatMessage{
{Role: "user", Content: "Read the config file"},
},
Tools: []agentic.ToolDefinition{
{
Name: "read_file",
Description: "Read file contents",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"path": map[string]any{"type": "string"},
},
},
},
},
}
Response with tool calls:
response := agentic.AgentResponse{
Status: "tool_call",
Message: agentic.ChatMessage{
Role: "assistant",
ToolCalls: []agentic.ToolCall{
{ID: "call_001", Name: "read_file", Arguments: map[string]any{"path": "config.yaml"}},
},
},
}
The processor converts between agentic.ToolDefinition and OpenAI's function schema format automatically.
Response Status Mapping ¶
The processor maps OpenAI finish reasons to agentic status:
- "stop" → "complete" (normal completion)
- "length" → "complete" (max tokens reached)
- "tool_calls" → "tool_call" (model wants to use tools)
- Any error → "error" with error message
Retry Logic ¶
The processor implements retry with exponential backoff:
- Default: 3 attempts
- Backoff: 100ms, 200ms, 400ms (exponential)
- Retryable: Network errors, 5xx responses
- Non-retryable: Context cancellation, 4xx responses
Configuration:
"retry": {
"max_attempts": 3,
"backoff": "exponential"
}
Token Tracking ¶
Every response includes token usage for cost monitoring and rate limiting:
response.TokenUsage.PromptTokens // Input tokens response.TokenUsage.CompletionTokens // Output tokens response.TokenUsage.Total() // Sum of both
Token counts come directly from the LLM provider's response.
Configuration Reference ¶
Full configuration schema (endpoints are in the top-level model_registry):
{
"timeout": "string (default: 120s)",
"stream_name": "string (default: AGENT)",
"consumer_name_suffix": "string (optional)",
"retry": {
"max_attempts": "int (default: 3)",
"backoff": "string (default: exponential)"
},
"ports": {
"inputs": [...],
"outputs": [...]
}
}
Ports ¶
Input ports (JetStream consumers):
- agent.request: Agent requests from agentic-loop (subject: agent.request.>)
Output ports (JetStream publishers):
- agent.response: Model responses to agentic-loop (subject: agent.response.*)
Message Flow ¶
The processor handles each request through:
- Receive AgentRequest from agent.request.>
- Resolve endpoint by model name
- Convert AgentRequest to OpenAI format
- Call LLM endpoint with retry logic
- Convert OpenAI response to AgentResponse
- Publish to agent.response.{request_id}
- Acknowledge JetStream message
Client Architecture ¶
The processor dynamically creates and caches Client instances per endpoint:
client, err := NewClient(endpointConfig) response, err := client.ChatCompletion(ctx, request)
Clients are cached by URL|Model key with mutex protection for concurrent access. Clients wrap the go-openai SDK and handle:
- API key injection from environment variables
- Request/response type conversion
- Retry with exponential backoff
- Context cancellation propagation
Error Handling ¶
Errors are returned as AgentResponse with status="error":
response := agentic.AgentResponse{
RequestID: "req_123",
Status: "error",
Error: "endpoint not found: unknown-model",
}
Error categories:
- Endpoint resolution errors: Model not found in registry
- Request validation errors: Invalid request format
- Network errors: Connection failures (may retry)
- API errors: 4xx/5xx from LLM provider
- Timeout errors: Request exceeded timeout
Environment Variables ¶
API keys are read from environment variables specified in endpoint config:
export OPENAI_API_KEY="sk-..." export ANTHROPIC_API_KEY="..."
Endpoint config:
{
"url": "https://api.openai.com/v1/chat/completions",
"model": "gpt-4",
"api_key_env": "OPENAI_API_KEY"
}
If api_key_env is not specified, requests are made without authentication (suitable for local models like Ollama).
Thread Safety ¶
The Component is safe for concurrent use after Start() is called. Multiple goroutines can process requests concurrently. Each request creates its own Client instance, avoiding shared state issues.
Testing ¶
For testing, use the ConsumerNameSuffix config option and provide a model registry:
config := agenticmodel.Config{
StreamName: "AGENT",
ConsumerNameSuffix: "test-" + t.Name(),
}
// Provide endpoints via model registry in deps.ModelRegistry
deps.ModelRegistry = &model.Registry{
Endpoints: map[string]*model.EndpointConfig{
"test-model": {URL: mockServer.URL, Model: "test-model", MaxTokens: 128000},
},
Defaults: model.DefaultsConfig{Model: "test-model"},
}
Use httptest.Server to mock the LLM endpoint in tests.
Limitations ¶
Current limitations:
- No streaming support (responses are complete documents)
- Retry configuration is global, not per-endpoint
- No request queuing or rate limiting
See Also ¶
Related packages:
- agentic: Shared types (AgentRequest, AgentResponse, etc.)
- processor/agentic-loop: Loop orchestration
- processor/agentic-tools: Tool execution
- github.com/sashabaranov/go-openai: OpenAI SDK
Package agenticmodel provides Prometheus metrics for agentic-model component.
Index ¶
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- type ChunkHandler
- type Client
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type RegistryInterface
- type RetryConfig
- type StreamChunk
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new agentic-model processor component
func Register ¶
func Register(registry RegistryInterface) error
Register registers the agentic-model processor component with the given registry
Types ¶
type ChunkHandler ¶
type ChunkHandler func(chunk StreamChunk)
ChunkHandler is a callback for receiving streaming deltas. Implementations must be safe for concurrent use if the handler is shared.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps OpenAI SDK for agentic model requests
func NewClient ¶
func NewClient(endpoint *model.EndpointConfig) (*Client, error)
NewClient creates a new client for the given endpoint configuration
func (*Client) ChatCompletion ¶
func (c *Client) ChatCompletion(ctx context.Context, req agentic.AgentRequest) (agentic.AgentResponse, error)
ChatCompletion sends a chat completion request
func (*Client) SetChunkHandler ¶
func (c *Client) SetChunkHandler(handler ChunkHandler)
SetChunkHandler sets the callback for receiving streaming deltas.
func (*Client) SetMetrics ¶
func (c *Client) SetMetrics(m *modelMetrics)
SetMetrics sets the metrics instance for recording streaming metrics.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the agentic-model processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status
func (*Component) Initialize ¶
Initialize prepares the component (no-op for this component)
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
StreamName string `` /* 132-byte string literal not displayed */
ConsumerNameSuffix string `` /* 127-byte string literal not displayed */
DeleteConsumerOnStop bool `` /* 157-byte string literal not displayed */
Timeout string `json:"timeout" schema:"type:string,description:Request timeout,category:advanced,default:120s"`
Retry RetryConfig `json:"retry" schema:"type:object,description:Retry configuration,category:advanced"`
}
Config holds configuration for agentic-model processor component. Model endpoints are resolved from the unified model registry (component.Dependencies.ModelRegistry).
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns default configuration for agentic-model processor
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface defines the minimal interface needed for registration
type RetryConfig ¶
type RetryConfig struct {
MaxAttempts int `json:"max_attempts" schema:"type:int,description:Maximum retry attempts,category:advanced,default:3"`
Backoff string `` /* 129-byte string literal not displayed */
}
RetryConfig holds retry configuration
func (*RetryConfig) Validate ¶
func (r *RetryConfig) Validate() error
Validate checks the retry configuration for errors
type StreamChunk ¶
type StreamChunk struct {
RequestID string `json:"request_id"`
ContentDelta string `json:"content_delta,omitempty"`
ReasoningDelta string `json:"reasoning_delta,omitempty"`
Done bool `json:"done,omitempty"`
}
StreamChunk represents a single streaming delta for real-time monitoring. Chunks are ephemeral — published via core NATS (fire-and-forget), not JetStream.