agenticmodel

package
v1.0.0-alpha.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: MIT Imports: 21 Imported by: 0

README

agentic-model

Model integration component for the agentic processing system.

Overview

The agentic-model component routes agent requests to OpenAI-compatible LLM endpoints. It receives AgentRequest messages from the loop orchestrator, calls the appropriate model endpoint, and publishes AgentResponse messages back. Supports multiple named endpoints, tool calling, retry with backoff, and token tracking.

Architecture

┌───────────────┐     ┌────────────────┐     ┌──────────────────┐
│ agentic-loop  │────►│ agentic-model  │────►│ LLM Endpoint     │
│               │     │                │     │ (OpenAI, Ollama) │
│               │◄────│                │◄────│                  │
└───────────────┘     └────────────────┘     └──────────────────┘
  agent.request.*       HTTP/HTTPS          OpenAI-compatible
  agent.response.*                          /v1/chat/completions

Features

  • Multiple Endpoints: Configure different models/providers by name
  • OpenAI Compatible: Works with OpenAI, Ollama, LiteLLM, vLLM, etc.
  • Tool Support: Full tool calling (function calling) support
  • Retry Logic: Configurable retry with exponential backoff
  • Token Tracking: Tracks prompt and completion tokens

Configuration

{
  "type": "processor",
  "name": "agentic-model",
  "enabled": true,
  "config": {
    "stream_name": "AGENT",
    "timeout": "120s",
    "endpoints": {
      "gpt-4": {
        "url": "https://api.openai.com/v1/chat/completions",
        "model": "gpt-4",
        "api_key_env": "OPENAI_API_KEY"
      },
      "ollama": {
        "url": "http://localhost:11434/v1/chat/completions",
        "model": "llama2"
      }
    },
    "retry": {
      "max_attempts": 3,
      "backoff": "exponential"
    },
    "ports": {
      "inputs": [
        {"name": "requests", "type": "jetstream", "subject": "agent.request.>", "stream_name": "AGENT"}
      ],
      "outputs": [
        {"name": "responses", "type": "jetstream", "subject": "agent.response.*", "stream_name": "AGENT"}
      ]
    }
  }
}
Configuration Options
Option Type Default Description
endpoints object required Named endpoint configurations
timeout string "120s" Request timeout
stream_name string "AGENT" JetStream stream name
consumer_name_suffix string "" Suffix for consumer names (for testing)
retry.max_attempts int 3 Maximum retry attempts
retry.backoff string "exponential" Backoff strategy
ports object (defaults) Port configuration
Endpoint Configuration
Field Type Required Description
url string yes Base URL for OpenAI-compatible API
model string yes Model name for API requests
api_key_env string no Environment variable for API key
Model Aliases

Model aliases provide semantic names for endpoints, allowing other components to reference models by purpose rather than specific endpoint:

{
  "model_aliases": {
    "reasoning": "gpt-4",
    "coding": "gpt-4-turbo",
    "fast": "gpt-3.5-turbo",
    "summarization": "gpt-3.5-turbo"
  }
}

Alias rules:

  • Target must exist in endpoints
  • No alias chaining (alias cannot point to another alias)
  • Empty target is not allowed

Requests can use either endpoint names or aliases:

{
  "model": "fast"
}

Resolves to the gpt-3.5-turbo endpoint.

Ports

Inputs
Name Type Subject Description
requests jetstream agent.request.> Agent requests from agentic-loop
Outputs
Name Type Subject Description
responses jetstream agent.response.* Model responses to agentic-loop

Endpoint Resolution

Requests are routed to endpoints by model name:

  1. Exact match: Request model: "gpt-4" routes to endpoints.gpt-4
  2. Alias match: Request model: "fast" routes to model_aliases.fast target
  3. Default fallback: If no match, routes to endpoints.default (if configured)
  4. Error: If no match and no default, returns error response

Compatible Providers

Provider URL Format Notes
OpenAI https://api.openai.com/v1/chat/completions Requires API key
Azure OpenAI https://{resource}.openai.azure.com/... Requires API key
Ollama http://localhost:11434/v1/chat/completions No auth required
LiteLLM http://localhost:8000/v1/chat/completions Proxy for multiple providers
vLLM http://localhost:8000/v1/chat/completions Self-hosted models
LocalAI http://localhost:8080/v1/chat/completions Local models

Response Status Mapping

OpenAI Finish Reason Agentic Status Description
stop complete Normal completion
length complete Max tokens reached
tool_calls tool_call Model wants to use tools
(error) error API or network error

Environment Variables

API keys are read from environment variables:

export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="..."

Reference in config:

{
  "api_key_env": "OPENAI_API_KEY"
}

Retry Behavior

  • Max Attempts: Default 3, configurable
  • Backoff: Exponential (100ms, 200ms, 400ms)
  • Retryable: Network errors, 5xx responses
  • Non-retryable: Context cancellation, 4xx responses

Troubleshooting

Endpoint not found
  • Verify model name in request matches an endpoint key
  • Add a "default" endpoint as fallback
  • Check for typos in model names
Authentication errors
  • Verify api_key_env points to correct environment variable
  • Check that environment variable is set
  • Ensure API key is valid and has quota
Timeout errors
  • Increase timeout for complex requests
  • Check network connectivity to endpoint
  • Verify endpoint URL is correct
Tool calls not working
  • Ensure tools are provided in AgentRequest
  • Check tool parameter schemas are valid JSON Schema
  • Verify model supports tool calling

Documentation

Overview

Package agenticmodel provides an OpenAI-compatible agentic model processor component that routes agent requests to configured LLM endpoints with retry logic and tool calling support.

Package agenticmodel provides the model integration processor for the SemStreams agentic system.

Overview

The agentic-model processor routes agent requests to OpenAI-compatible LLM endpoints. It receives AgentRequest messages from the loop orchestrator, calls the appropriate model endpoint, and publishes AgentResponse messages back. The processor supports tool calling, retry with backoff, and token tracking. Model endpoints are resolved from the unified model registry (component.Dependencies.ModelRegistry).

This processor acts as the bridge between the agentic orchestration layer and external LLM services (OpenAI, Ollama, LiteLLM, vLLM, or any OpenAI-compatible API).

Architecture

The model processor sits between the loop orchestrator and external LLM services:

┌───────────────┐     ┌────────────────┐     ┌──────────────────┐
│ agentic-loop  │────▶│ agentic-model  │────▶│ LLM Endpoint     │
│               │     │ (this pkg)     │     │ (OpenAI, Ollama) │
│               │◀────│                │◀────│                  │
└───────────────┘     └────────────────┘     └──────────────────┘
  agent.request.*       HTTP/HTTPS          OpenAI-compatible
  agent.response.*                          /v1/chat/completions

Quick Start

Configure the model registry in the top-level config and start the processor:

config := agenticmodel.Config{
    StreamName: "AGENT",
    Timeout:    "120s",
}

// Model endpoints are resolved from deps.ModelRegistry (set in config.model_registry)
rawConfig, _ := json.Marshal(config)
comp, err := agenticmodel.NewComponent(rawConfig, deps)

lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)

Endpoint Resolution

When processing an AgentRequest, the processor resolves the endpoint from the unified model registry by looking up the request's Model field. Clients are created dynamically and cached for reuse.

If the resolved endpoint has SupportsTools=false, any tools in the request are stripped and a warning is logged.

OpenAI Compatibility

The processor uses the sashabaranov/go-openai SDK and is compatible with any API that implements the OpenAI chat completions interface:

  • OpenAI API (api.openai.com)
  • Azure OpenAI Service
  • Ollama (with OpenAI compatibility layer)
  • LiteLLM proxy
  • vLLM with OpenAI server
  • LocalAI
  • Any OpenAI-compatible proxy

Tool Support

The processor fully supports tool calling (function calling):

Incoming request with tools:

request := agentic.AgentRequest{
    Model: "gpt-4",
    Messages: []agentic.ChatMessage{
        {Role: "user", Content: "Read the config file"},
    },
    Tools: []agentic.ToolDefinition{
        {
            Name:        "read_file",
            Description: "Read file contents",
            Parameters: map[string]any{
                "type": "object",
                "properties": map[string]any{
                    "path": map[string]any{"type": "string"},
                },
            },
        },
    },
}

Response with tool calls:

response := agentic.AgentResponse{
    Status: "tool_call",
    Message: agentic.ChatMessage{
        Role: "assistant",
        ToolCalls: []agentic.ToolCall{
            {ID: "call_001", Name: "read_file", Arguments: map[string]any{"path": "config.yaml"}},
        },
    },
}

The processor converts between agentic.ToolDefinition and OpenAI's function schema format automatically.

Response Status Mapping

The processor maps OpenAI finish reasons to agentic status:

  • "stop" → "complete" (normal completion)
  • "length" → "complete" (max tokens reached)
  • "tool_calls" → "tool_call" (model wants to use tools)
  • Any error → "error" with error message

Retry Logic

The processor implements retry with exponential backoff:

  • Default: 3 attempts
  • Backoff: 100ms, 200ms, 400ms (exponential)
  • Retryable: Network errors, 5xx responses
  • Non-retryable: Context cancellation, 4xx responses

Configuration:

"retry": {
    "max_attempts": 3,
    "backoff": "exponential"
}

Token Tracking

Every response includes token usage for cost monitoring and rate limiting:

response.TokenUsage.PromptTokens     // Input tokens
response.TokenUsage.CompletionTokens // Output tokens
response.TokenUsage.Total()          // Sum of both

Token counts come directly from the LLM provider's response.

Configuration Reference

Full configuration schema (endpoints are in the top-level model_registry):

{
    "timeout": "string (default: 120s)",
    "stream_name": "string (default: AGENT)",
    "consumer_name_suffix": "string (optional)",
    "retry": {
        "max_attempts": "int (default: 3)",
        "backoff": "string (default: exponential)"
    },
    "ports": {
        "inputs": [...],
        "outputs": [...]
    }
}

Ports

Input ports (JetStream consumers):

  • agent.request: Agent requests from agentic-loop (subject: agent.request.>)

Output ports (JetStream publishers):

  • agent.response: Model responses to agentic-loop (subject: agent.response.*)

Message Flow

The processor handles each request through:

  1. Receive AgentRequest from agent.request.>
  2. Resolve endpoint by model name
  3. Convert AgentRequest to OpenAI format
  4. Call LLM endpoint with retry logic
  5. Convert OpenAI response to AgentResponse
  6. Publish to agent.response.{request_id}
  7. Acknowledge JetStream message

Client Architecture

The processor dynamically creates and caches Client instances per endpoint:

client, err := NewClient(endpointConfig)
response, err := client.ChatCompletion(ctx, request)

Clients are cached by URL|Model key with mutex protection for concurrent access. Clients wrap the go-openai SDK and handle:

  • API key injection from environment variables
  • Request/response type conversion
  • Retry with exponential backoff
  • Context cancellation propagation

Error Handling

Errors are returned as AgentResponse with status="error":

response := agentic.AgentResponse{
    RequestID: "req_123",
    Status:    "error",
    Error:     "endpoint not found: unknown-model",
}

Error categories:

  • Endpoint resolution errors: Model not found in registry
  • Request validation errors: Invalid request format
  • Network errors: Connection failures (may retry)
  • API errors: 4xx/5xx from LLM provider
  • Timeout errors: Request exceeded timeout

Environment Variables

API keys are read from environment variables specified in endpoint config:

export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="..."

Endpoint config:

{
    "url": "https://api.openai.com/v1/chat/completions",
    "model": "gpt-4",
    "api_key_env": "OPENAI_API_KEY"
}

If api_key_env is not specified, requests are made without authentication (suitable for local models like Ollama).

Thread Safety

The Component is safe for concurrent use after Start() is called. Multiple goroutines can process requests concurrently. Each request creates its own Client instance, avoiding shared state issues.

Testing

For testing, use the ConsumerNameSuffix config option and provide a model registry:

config := agenticmodel.Config{
    StreamName:         "AGENT",
    ConsumerNameSuffix: "test-" + t.Name(),
}

// Provide endpoints via model registry in deps.ModelRegistry
deps.ModelRegistry = &model.Registry{
    Endpoints: map[string]*model.EndpointConfig{
        "test-model": {URL: mockServer.URL, Model: "test-model", MaxTokens: 128000},
    },
    Defaults: model.DefaultsConfig{Model: "test-model"},
}

Use httptest.Server to mock the LLM endpoint in tests.

Limitations

Current limitations:

  • No streaming support (responses are complete documents)
  • Retry configuration is global, not per-endpoint
  • No request queuing or rate limiting

See Also

Related packages:

  • agentic: Shared types (AgentRequest, AgentResponse, etc.)
  • processor/agentic-loop: Loop orchestration
  • processor/agentic-tools: Tool execution
  • github.com/sashabaranov/go-openai: OpenAI SDK

Package agenticmodel provides Prometheus metrics for agentic-model component.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new agentic-model processor component

func Register

func Register(registry RegistryInterface) error

Register registers the agentic-model processor component with the given registry

Types

type ChunkHandler

type ChunkHandler func(chunk StreamChunk)

ChunkHandler is a callback for receiving streaming deltas. Implementations must be safe for concurrent use if the handler is shared.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps OpenAI SDK for agentic model requests

func NewClient

func NewClient(endpoint *model.EndpointConfig) (*Client, error)

NewClient creates a new client for the given endpoint configuration

func (*Client) ChatCompletion

func (c *Client) ChatCompletion(ctx context.Context, req agentic.AgentRequest) (agentic.AgentResponse, error)

ChatCompletion sends a chat completion request

func (*Client) Close

func (c *Client) Close() error

Close releases resources held by the client

func (*Client) SetChunkHandler

func (c *Client) SetChunkHandler(handler ChunkHandler)

SetChunkHandler sets the callback for receiving streaming deltas.

func (*Client) SetLogger

func (c *Client) SetLogger(l *slog.Logger)

SetLogger sets the logger for debug-level request/response logging.

func (*Client) SetMetrics

func (c *Client) SetMetrics(m *modelMetrics)

SetMetrics sets the metrics instance for recording streaming metrics.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the agentic-model processor

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component (no-op for this component)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns configured input port definitions

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns configured output port definitions

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing agent requests

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully stops the component within the given timeout

type Config

type Config struct {
	Ports                *component.PortConfig `json:"ports"                schema:"type:ports,description:Port configuration,category:basic"`
	StreamName           string                `` /* 132-byte string literal not displayed */
	ConsumerNameSuffix   string                `` /* 127-byte string literal not displayed */
	DeleteConsumerOnStop bool                  `` /* 157-byte string literal not displayed */
	Timeout              string                `json:"timeout"              schema:"type:string,description:Request timeout,category:advanced,default:120s"`
	Retry                RetryConfig           `json:"retry"                schema:"type:object,description:Retry configuration,category:advanced"`
}

Config holds configuration for agentic-model processor component. Model endpoints are resolved from the unified model registry (component.Dependencies.ModelRegistry).

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration for agentic-model processor

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for errors

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(component.RegistrationConfig) error
}

RegistryInterface defines the minimal interface needed for registration

type RetryConfig

type RetryConfig struct {
	MaxAttempts int    `json:"max_attempts" schema:"type:int,description:Maximum retry attempts,category:advanced,default:3"`
	Backoff     string `` /* 129-byte string literal not displayed */
}

RetryConfig holds retry configuration

func (*RetryConfig) Validate

func (r *RetryConfig) Validate() error

Validate checks the retry configuration for errors

type StreamChunk

type StreamChunk struct {
	RequestID      string `json:"request_id"`
	ContentDelta   string `json:"content_delta,omitempty"`
	ReasoningDelta string `json:"reasoning_delta,omitempty"`
	Done           bool   `json:"done,omitempty"`
}

StreamChunk represents a single streaming delta for real-time monitoring. Chunks are ephemeral — published via core NATS (fire-and-forget), not JetStream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL