Documentation
¶
Overview ¶
Package model provides a unified model registry for centralized endpoint configuration, capability-based routing, and tool capability metadata.
Index ¶
- Constants
- func Watch(ctx context.Context, watcher Watcher, apply func(*Registry))
- type BreakerConfig
- type CapabilityConfig
- type DefaultsConfig
- type EndpointConfig
- type EndpointStatus
- type ErrorKind
- type HealthAwareRegistry
- type HealthPolicy
- type HealthStats
- type Registry
- func (r *Registry) GetCapability(name string) *CapabilityConfig
- func (r *Registry) GetDefault() string
- func (r *Registry) GetEndpoint(name string) *EndpointConfig
- func (r *Registry) GetFallbackChain(capability string) []string
- func (r *Registry) GetMaxTokens(name string) int
- func (r *Registry) ListCapabilities() []string
- func (r *Registry) ListEndpoints() []string
- func (r *Registry) Resolve(capability string) string
- func (r *Registry) ResolveSummarization() string
- func (r *Registry) Validate() error
- type RegistryReader
- type ResolvedEndpoint
- type Result
- type RollingWindowBreaker
- type Watcher
Constants ¶
const ( // CapabilitySummarization is used by the agentic loop for context compaction. CapabilitySummarization = "summarization" // CapabilityCommunitySummary is used by graph-clustering for LLM community summaries. CapabilityCommunitySummary = "community_summary" // CapabilityEmbedding is used by graph-embedding for HTTP embedding services. CapabilityEmbedding = "embedding" // CapabilityQueryClassification is used by graph-query for LLM-based query classification. CapabilityQueryClassification = "query_classification" // CapabilityAnswerSynthesis is used by graph-query to synthesize natural language // answers from community summaries and entity digests in globalSearch responses. CapabilityAnswerSynthesis = "answer_synthesis" )
Capability name constants for registry-based endpoint resolution. Custom capabilities beyond these can be used with Resolve() directly.
Variables ¶
This section is empty.
Functions ¶
func Watch ¶
Watch keeps a caller-owned *Registry pointer in sync with KV-driven model_registry changes. It blocks the calling goroutine, calling apply for every new *Registry until ctx is cancelled or the watcher channel closes. Run it in a goroutine spawned by the caller:
var holder atomic.Pointer[model.Registry] holder.Store(initialRegistry) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go model.Watch(ctx, cfgMgr, holder.Store)
This exists for external library consumers that maintain their own *Registry alongside the semstreams runtime. semstreams components (components in flow configs) don't need it — ComponentManager restarts them automatically when their factory declares component.DepModelRegistry. See docs/operations for the full guide.
Watch never panics; nil watcher or nil apply returns immediately.
Types ¶
type BreakerConfig ¶
type BreakerConfig struct {
// WindowSize is the number of recent results to keep per endpoint.
// Older results slide out as new ones arrive. Must be >= 1.
// Default: 20.
WindowSize int
// MinRequests is the minimum number of observations before the
// error rate is consulted. Below this, the endpoint stays Closed
// regardless of failure ratio. Prevents one-or-two-error flukes
// on cold endpoints from opening the breaker. Must be <= WindowSize.
// Default: 5.
MinRequests int
// ErrorRateThreshold is the failure ratio above which the breaker
// opens (assuming MinRequests has been met). 0.5 means "open if
// more than half the recent window failed." Must be in (0, 1].
// Default: 0.5.
ErrorRateThreshold float64
// Cooldown is the wait between Open and HalfOpen — how long the
// endpoint stays out of rotation before we send a probe. Must be > 0.
// Default: 30s.
Cooldown time.Duration
// Now is the time source. Tests inject a clock; production leaves
// this nil and time.Now is used.
Now func() time.Time
}
BreakerConfig tunes the rolling-window circuit breaker. Zero-value defaults are sensible for typical LLM endpoint workloads (tens of requests per minute, latency dominated by upstream).
type CapabilityConfig ¶
type CapabilityConfig struct {
// Description explains what this capability is for.
Description string `json:"description,omitempty"`
// Preferred lists endpoint names in order of preference.
Preferred []string `json:"preferred"`
// Fallback lists backup endpoint names if all preferred are unavailable.
Fallback []string `json:"fallback,omitempty"`
// RequiresTools filters the chain to only tool-capable endpoints.
RequiresTools bool `json:"requires_tools,omitempty"`
// Timeout caps a single LLM request routed via this capability.
// Go duration string (e.g. "45s", "5m"). Empty means no per-capability
// cap; the component-level timeout applies. Endpoint and task timeouts
// take precedence over this value. See agentic-model Timeout Resolution
// for the full precedence chain.
Timeout string `json:"timeout,omitempty"`
}
CapabilityConfig defines model preferences for a capability.
type DefaultsConfig ¶
type DefaultsConfig struct {
// Model is the default endpoint name when no capability matches.
Model string `json:"model"`
// Capability is the default capability when none specified.
Capability string `json:"capability,omitempty"`
}
DefaultsConfig holds default model settings.
type EndpointConfig ¶
type EndpointConfig struct {
// Provider identifies the API type: "anthropic", "ollama", "openai", "openrouter".
Provider string `json:"provider"`
// URL is the API endpoint. Required for ollama/openai/openrouter, optional for anthropic.
URL string `json:"url,omitempty"`
// Model is the model identifier sent to the provider.
Model string `json:"model"`
// MaxTokens is the context window size in tokens.
//
// For provider="ollama", this value is NOT forwarded to the server —
// Ollama's /v1/chat/completions layer rejects per-request num_ctx by
// design. Pre-build a Modelfile with PARAMETER num_ctx <N> and point
// endpoint.model at the derived name. See
// docs/operations/04-ollama-setup.md. On first request to an Ollama
// endpoint, agentic-model probes /api/show and WARNs once if the
// model's num_ctx is below this value.
//
// For all other providers this value is used only for summarization
// routing (picking the largest-context endpoint) and context-budget
// math in agentic-loop — the provider determines the actual window.
MaxTokens int `json:"max_tokens"`
// SupportsTools indicates whether this endpoint supports function/tool calling.
SupportsTools bool `json:"supports_tools,omitempty"`
// ToolFormat specifies the tool calling format: "anthropic" or "openai".
// Empty means auto-detect from provider.
ToolFormat string `json:"tool_format,omitempty"`
// APIKeyEnv is the environment variable containing the API key.
// Required for anthropic/openai/openrouter, ignored for ollama.
APIKeyEnv string `json:"api_key_env,omitempty"`
// Options holds provider-specific template parameters passed to the API
// as chat_template_kwargs. For vLLM/SGLang with thinking models, set
// "enable_thinking" and "thinking_budget" here.
//
// Note: Ollama's OpenAI-compatible endpoint ignores chat_template_kwargs.
// Ollama thinking models (Qwen3, DeepSeek-R1) always return reasoning_content
// but the thinking toggle and budget are only controllable via Ollama's
// native /api/chat endpoint, not the OpenAI-compatible /v1/ endpoint.
//
// Do not use for inference parameters (temperature, top_k, etc.) which
// have dedicated fields in AgentRequest.
Options map[string]any `json:"options,omitempty"`
// Stream enables SSE streaming for this endpoint. The client uses
// CreateChatCompletionStream internally, reducing time-to-first-token.
// The inter-component protocol remains complete AgentResponse messages.
Stream bool `json:"stream,omitempty"`
// ReasoningEffort controls how much effort reasoning models spend thinking.
// Accepted values: "none" (Gemini only), "low", "medium", "high".
// Empty means the provider default is used. Forwarded as reasoning_effort
// on the OpenAI-compatible chat completions request.
ReasoningEffort string `json:"reasoning_effort,omitempty"`
// InputPricePer1MTokens is the cost per 1M input tokens in USD.
// Consumers join this with token usage data to calculate costs.
InputPricePer1MTokens float64 `json:"input_price_per_1m_tokens,omitempty"`
// OutputPricePer1MTokens is the cost per 1M output tokens in USD.
// Consumers join this with token usage data to calculate costs.
OutputPricePer1MTokens float64 `json:"output_price_per_1m_tokens,omitempty"`
// RequestsPerMinute limits the rate of requests to this endpoint.
// 0 means no rate limiting. Applied per-endpoint across all consumers.
RequestsPerMinute int `json:"requests_per_minute,omitempty"`
// MaxConcurrent limits concurrent in-flight requests to this endpoint.
// 0 means no concurrency limit.
MaxConcurrent int `json:"max_concurrent,omitempty"`
// RequestTimeout caps a single LLM request to this endpoint.
// Go duration string (e.g. "45s", "5m"). Empty means no per-endpoint cap;
// the capability or component-level timeout applies. See agentic-model
// Timeout Resolution for the full precedence chain.
RequestTimeout string `json:"request_timeout,omitempty"`
}
EndpointConfig defines an available model endpoint.
type EndpointStatus ¶
type EndpointStatus int
EndpointStatus is the circuit-breaker state for a single endpoint.
Transitions follow the standard pattern:
Closed --(error rate > threshold)--> Open Open --(cooldown elapsed)--------> HalfOpen HalfOpen --(probe succeeds)--------> Closed HalfOpen --(probe fails)-----------> Open (cooldown restarts)
const ( // StatusClosed is the healthy state; requests flow normally. StatusClosed EndpointStatus = iota // StatusOpen rejects requests until the cooldown elapses. Callers // (typically agentic-model's fallback chain) skip Open endpoints. StatusOpen // StatusHalfOpen lets a single probe through to test recovery. // While in this state, additional concurrent calls are still // treated as if Open. StatusHalfOpen )
func (EndpointStatus) String ¶
func (s EndpointStatus) String() string
String returns the lowercase status name suitable for log fields and Prometheus label values.
type ErrorKind ¶
type ErrorKind string
ErrorKind classifies a failed request for breaker accounting and metrics. Mirrors the categories agentic-model already records via recordRequestError so health policies and metrics agree on shape.
const ( // ErrorKindNone marks a successful result; required field on // successful Results so callers don't have to remember to clear it. ErrorKindNone ErrorKind = "" // ErrorKindTimeout — context deadline exceeded or transport timeout. ErrorKindTimeout ErrorKind = "timeout" // ErrorKindRateLimit — provider returned 429 (after retry budget // exhausted; transient retries don't reach the policy). ErrorKindRateLimit ErrorKind = "rate_limit" // ErrorKindServerError — provider returned 5xx (after retry budget // exhausted). ErrorKindServerError ErrorKind = "server_error" // ErrorKindNetwork — connection refused, DNS failure, TLS handshake // failure, etc. Distinguished from server errors because retry // strategy differs. ErrorKindNetwork ErrorKind = "network" // ErrorKindUnknown — fallback for errors we couldn't classify. // Treated like ErrorKindServerError for breaker math. ErrorKindUnknown ErrorKind = "unknown" )
type HealthAwareRegistry ¶
type HealthAwareRegistry interface {
RegistryReader
HealthPolicy
}
HealthAwareRegistry embeds RegistryReader with a HealthPolicy view. External consumers (e.g., a sidecar dispatcher wanting to consult circuit-breaker state before routing) can take this richer interface instead of plumbing the policy and registry separately.
agentic-model wraps its (registry, policy) pair into one of these for callers that want the combined shape; raw *Registry instances don't satisfy it, but ComposeHealth(r, p) returns one.
func ComposeHealth ¶
func ComposeHealth(r RegistryReader, p HealthPolicy) HealthAwareRegistry
ComposeHealth wraps a RegistryReader and HealthPolicy into a single HealthAwareRegistry. Useful for callers who pass one value through dependency injection but want both shapes available.
Panics if r is nil. If p is nil, the always-healthy fallback is substituted so health methods don't have to nil-guard their callers.
type HealthPolicy ¶
type HealthPolicy interface {
// IsHealthy returns true iff the endpoint is eligible to receive
// new requests. Equivalent to EndpointStatus(name) != StatusOpen
// (the half-open state allows one probe through).
IsHealthy(endpoint string) bool
// EndpointStatus returns the current circuit-breaker state for
// the named endpoint. Unknown endpoints (no observations yet)
// must return StatusClosed — new endpoints are healthy by default.
EndpointStatus(endpoint string) EndpointStatus
// EndpointStats returns a snapshot of the endpoint's recent
// observation window. Unknown endpoints return a zero HealthStats
// with Status=StatusClosed.
EndpointStats(endpoint string) HealthStats
// RecordResult tells the policy that endpoint just produced
// the given result. Drives state transitions on the next call.
// Safe to call concurrently from multiple request goroutines
// targeting the same endpoint.
RecordResult(endpoint string, result Result)
}
HealthPolicy tracks per-endpoint failure state and decides which endpoints are eligible to receive traffic. agentic-model consults IsHealthy when iterating its fallback chain and reports each request outcome via RecordResult.
Implementations must be safe for concurrent use. The default in-process implementation is RollingWindowBreaker (model/breaker.go); callers can swap in their own (e.g., NATS-KV-backed for shared state across processes) by implementing the same surface.
func NewAlwaysHealthyPolicy ¶
func NewAlwaysHealthyPolicy() HealthPolicy
NewAlwaysHealthyPolicy returns a HealthPolicy that always reports endpoints as healthy. Use this in tests or in deployments that explicitly want to disable circuit breaking.
type HealthStats ¶
type HealthStats struct {
// Status is the current circuit-breaker state.
Status EndpointStatus
// Successes is the count of successful results in the window.
Successes int
// Failures is the count of failed results in the window.
Failures int
// ErrorRate is Failures / (Successes + Failures); 0 if no
// observations yet.
ErrorRate float64
// LastFailure is the time of the most recent failure (zero if no
// failure has been observed in the window).
LastFailure time.Time
// LastTransition is the time of the most recent state change.
// Useful for "how long has this endpoint been open?" dashboards.
LastTransition time.Time
}
HealthStats is a snapshot of an endpoint's recent observable state. Returned by HealthPolicy.EndpointStats for callers that want to surface a richer view than the binary IsHealthy.
All counters are over the policy's most recent observation window; they reset implicitly as the window slides.
type Registry ¶
type Registry struct {
Capabilities map[string]*CapabilityConfig `json:"capabilities,omitempty"`
Endpoints map[string]*EndpointConfig `json:"endpoints"`
Defaults DefaultsConfig `json:"defaults"`
}
Registry holds all model endpoint definitions and capability routing. It is JSON-serializable for config loading and implements RegistryReader.
func (*Registry) GetCapability ¶
func (r *Registry) GetCapability(name string) *CapabilityConfig
GetCapability returns the capability configuration for a name, or nil if not a configured capability.
func (*Registry) GetDefault ¶
GetDefault returns the default endpoint name.
func (*Registry) GetEndpoint ¶
func (r *Registry) GetEndpoint(name string) *EndpointConfig
GetEndpoint returns the endpoint configuration for a name, or nil if not found.
func (*Registry) GetFallbackChain ¶
GetFallbackChain returns all endpoint names for a capability in preference order.
func (*Registry) GetMaxTokens ¶
GetMaxTokens returns the context window size for an endpoint name.
func (*Registry) ListCapabilities ¶
ListCapabilities returns all configured capability names sorted alphabetically.
func (*Registry) ListEndpoints ¶
ListEndpoints returns all configured endpoint names sorted alphabetically.
func (*Registry) ResolveSummarization ¶
ResolveSummarization returns the endpoint name best suited for summarization.
type RegistryReader ¶
type RegistryReader interface {
// Resolve returns the preferred endpoint name for a capability.
// Returns the first endpoint in the preferred list.
// If RequiresTools is set, filters to tool-capable endpoints.
Resolve(capability string) string
// GetFallbackChain returns all endpoint names for a capability in preference order.
// Includes both preferred and fallback endpoints.
GetFallbackChain(capability string) []string
// GetEndpoint returns the full endpoint configuration for an endpoint name.
// Returns nil if the endpoint is not configured.
GetEndpoint(name string) *EndpointConfig
// GetCapability returns the full capability configuration for a capability
// name. Returns nil if the name is not a configured capability (e.g. when
// the caller passed an endpoint name instead).
GetCapability(name string) *CapabilityConfig
// GetMaxTokens returns the context window size for an endpoint name.
// Returns 0 if the endpoint is not configured.
GetMaxTokens(name string) int
// GetDefault returns the default endpoint name.
GetDefault() string
// ListCapabilities returns all configured capability names sorted alphabetically.
ListCapabilities() []string
// ListEndpoints returns all configured endpoint names sorted alphabetically.
ListEndpoints() []string
// ResolveSummarization returns the endpoint name to use for context summarization.
// Resolution order:
// 1. Explicit "summarization" capability if configured
// 2. Endpoint with the largest MaxTokens (best suited for long context summarization)
// 3. The default endpoint as final fallback
ResolveSummarization() string
}
RegistryReader provides read-only access to the model registry. Components receive this interface via Dependencies.
type ResolvedEndpoint ¶
ResolvedEndpoint holds the resolved connection details for a capability endpoint.
func ResolveEndpoint ¶
func ResolveEndpoint(reg RegistryReader, capability string) (*ResolvedEndpoint, error)
ResolveEndpoint resolves a capability to its endpoint connection details. Returns an error if the registry is nil, or if no endpoint is configured for the capability. The API key is read from the environment variable specified by EndpointConfig.APIKeyEnv (empty if unset or not configured).
type Result ¶
type Result struct {
// Success is true iff the request returned a usable response after
// any internal retry. Transient retries that ultimately succeeded
// should be reported as Success=true with ErrorKindNone.
Success bool
// Kind classifies the failure for metrics and breaker tuning;
// ErrorKindNone on success.
Kind ErrorKind
// Latency is wall time spent on the request from caller dispatch
// to result. Includes retry time. Zero is acceptable when the
// caller doesn't measure (e.g., synthetic results in tests).
Latency time.Duration
}
Result is a single endpoint observation reported to a HealthPolicy after a request completes (whether or not it succeeded).
type RollingWindowBreaker ¶
type RollingWindowBreaker struct {
// contains filtered or unexported fields
}
RollingWindowBreaker is the default in-process HealthPolicy. It keeps a sliding window of the most recent results per endpoint and transitions the breaker based on the failure ratio in that window.
State machine:
Closed --error rate > threshold (and >= MinRequests)--> Open
Open --cooldown elapsed----------------------------> HalfOpen
HalfOpen --probe succeeds-----------------------------> Closed
HalfOpen --probe fails--------------------------------> Open
(cooldown timer restarts on each Open transition)
HalfOpen lets exactly one probe through; concurrent IsHealthy calls while a probe is in flight return false, preventing thundering-herd reopening of a sick endpoint.
Safe for concurrent use across many goroutines targeting the same or different endpoints. Per-endpoint state is locked independently.
func NewRollingWindowBreaker ¶
func NewRollingWindowBreaker(cfg BreakerConfig) *RollingWindowBreaker
NewRollingWindowBreaker builds a breaker with the given config. Zero-valued config fields fall back to sensible defaults (WindowSize=20, MinRequests=5, ErrorRateThreshold=0.5, Cooldown=30s).
func (*RollingWindowBreaker) EndpointStats ¶
func (b *RollingWindowBreaker) EndpointStats(endpoint string) HealthStats
EndpointStats implements HealthPolicy.
func (*RollingWindowBreaker) EndpointStatus ¶
func (b *RollingWindowBreaker) EndpointStatus(endpoint string) EndpointStatus
EndpointStatus implements HealthPolicy. Drives the cooldown→half-open transition lazily on read so we don't need a background goroutine. Returns StatusClosed for endpoints with no observations yet.
func (*RollingWindowBreaker) IsHealthy ¶
func (b *RollingWindowBreaker) IsHealthy(endpoint string) bool
IsHealthy implements HealthPolicy.
func (*RollingWindowBreaker) RecordResult ¶
func (b *RollingWindowBreaker) RecordResult(endpoint string, result Result)
RecordResult implements HealthPolicy. Updates the rolling window, drives state transitions, and returns. Safe to call concurrently.
type Watcher ¶
type Watcher interface {
WatchModelRegistry() <-chan *Registry
}
Watcher is the minimal surface model.Watch needs to subscribe to KV- driven registry updates. semstreams's config.Manager satisfies it via WatchModelRegistry(). External consumers can adapt other config sources by implementing the same one-method interface.
The returned channel may emit nil — callers should guard their apply callback if they expect a non-nil registry at all times.