Documentation
¶
Overview ¶
Package registry owns the toolset catalog used by the gateway. The catalog is persisted directly in the registry replicated-map keyspace so all registry nodes share one canonical view of registration state.
Package registry provides the internal tool registry service implementation.
This file owns distributed provider liveness. Catalog membership is the authoritative source of which toolsets participate in health tracking, and shared health records are scoped to the current registration epoch so a same-name re-registration cannot inherit stale health from a prior provider.
Package registry provides the internal tool registry gateway service.
This package contains the server-side implementation of the registry, which runs as a standalone service. It includes:
- Service implementation (service.go) — gRPC service handlers
- Toolset catalog (catalog.go) — Pulse-backed metadata persistence
- Health tracking (health_tracker.go) — provider liveness detection
- Stream management (stream_manager.go) — Pulse stream handling
- Generated code (gen/) — Goa-generated types and gRPC transport
- Design (design/) — Goa DSL service definition
Multi-Node Clustering ¶
Multiple registry nodes can participate in the same logical registry by using the same Name in their Config and connecting to the same Redis instance. Nodes with the same name automatically:
- Share toolset registrations via replicated maps
- Coordinate health check pings via distributed tickers (only one node pings at a time)
- Share provider health state across all nodes
This enables horizontal scaling and high availability. Clients can connect to any node and see the same registry state.
For client-side code that agents use to connect to and consume the registry, see the runtime/registry package.
Package registry centralizes schema admission and payload validation. Registration compiles every admitted tool schema up front, and tool calls reuse the same compiled-schema cache so live traffic does not rediscover invalid schemas or repeatedly pay compilation cost for the same contract.
Package registry provides the internal tool registry service implementation.
This file owns the transport-facing registry contract: it admits toolsets into the shared catalog, validates routed tool calls against admitted schemas, gates execution on provider health, and publishes tool-call traffic onto the canonical registry streams.
Package registry provides the internal tool registry service implementation.
Index ¶
- Constants
- type Config
- type HealthTracker
- type HealthTrackerOption
- type Registry
- type Service
- func (s *Service) CallTool(ctx context.Context, p *genregistry.CallToolPayload) (*genregistry.CallToolResult, error)
- func (s *Service) GetToolset(ctx context.Context, p *genregistry.GetToolsetPayload) (*genregistry.Toolset, error)
- func (s *Service) ListToolsets(ctx context.Context, p *genregistry.ListToolsetsPayload) (*genregistry.ListToolsetsResult, error)
- func (s *Service) Pong(ctx context.Context, p *genregistry.PongPayload) error
- func (s *Service) Register(ctx context.Context, p *genregistry.RegisterPayload) (*genregistry.RegisterResult, error)
- func (s *Service) Search(ctx context.Context, p *genregistry.SearchPayload) (*genregistry.SearchResult, error)
- func (s *Service) Unregister(ctx context.Context, p *genregistry.UnregisterPayload) error
- type StreamManager
- type ToolsetHealth
Constants ¶
const ( // DefaultPingInterval is the default interval between health check pings. DefaultPingInterval = 10 * time.Second // DefaultMissedPingThreshold is the default number of consecutive missed pings // before marking a toolset as unhealthy. DefaultMissedPingThreshold = 3 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Redis is the Redis client for Pulse operations. Required.
Redis *redis.Client
// Name is the registry name used to derive Pulse resource names.
// Multiple nodes with the same Name and Redis connection form a cluster,
// sharing state and coordinating health checks automatically.
//
// The pool, health map, and registry map names are derived as:
// - Pool: "<name>"
// - Health map: "<name>:health"
// - Registry map: "<name>:toolsets"
//
// Defaults to "registry" if not provided.
Name string
// Logger receives health tracker logs (pings, transitions, failures).
// When nil, health tracking logs are suppressed.
Logger telemetry.Logger
// PingInterval is the interval between health check pings.
// Defaults to 10 seconds if not provided.
PingInterval time.Duration
// MissedPingThreshold is the number of consecutive missed pings
// before marking a toolset as unhealthy.
// Defaults to 3 if not provided.
MissedPingThreshold int
// ResultStreamMappingTTL is the TTL for tool_use_id to stream_id mappings.
// ResultStreamTTL is the TTL for per-call result streams in Redis.
// Defaults to 15 minutes if not provided.
ResultStreamTTL time.Duration
// PoolNodeOptions are additional options for the Pulse pool node.
PoolNodeOptions []pool.NodeOption
}
Config configures the registry service.
type HealthTracker ¶
type HealthTracker interface {
// Health returns the current health state for a toolset.
//
// Contract:
// - Health is derived from the last recorded Pong timestamp and the
// configured staleness threshold.
// - If the toolset has never ponged (or no entry exists), Health reports
// Healthy=false with LastPong unset.
Health(toolset string) (ToolsetHealth, error)
// RecordPong records a pong response for a toolset when the pong matches
// the current catalog registration epoch.
RecordPong(ctx context.Context, toolset string, pingID string) error
// IsHealthy returns whether a toolset has healthy providers.
// A toolset is healthy if a pong was received within the staleness threshold.
IsHealthy(toolset string) bool
// StartPingLoop ensures this node participates in health tracking for a
// catalog-registered toolset. Cross-node participation is derived from the
// shared catalog, not from a second membership index.
StartPingLoop(ctx context.Context, toolset string) error
// StopPingLoop stops local health tracking participation for an
// unregistered toolset and clears its shared health state. Other nodes stop
// via catalog change propagation.
StopPingLoop(ctx context.Context, toolset string)
// Close stops all ping loops and releases resources.
Close() error
}
HealthTracker tracks provider health status for toolsets. It manages ping/pong health checks to detect when providers become unavailable, enabling fast failure instead of timeouts when all providers are unhealthy.
The tracker uses two Pulse replicated maps: 1. A catalog map that stores registered toolsets (for cross-node coordination) 2. A health map that stores registration-scoped pong records for each toolset
All nodes subscribe to the catalog map. When a toolset is registered/unregistered, all nodes see the change and start/stop their distributed ticker participation. The distributed ticker ensures only one node sends pings at a time, with automatic failover if that node crashes.
func NewHealthTracker ¶
func NewHealthTracker(streamManager StreamManager, healthMap, catalogMap *rmap.Map, node *pool.Node, opts ...HealthTrackerOption) (HealthTracker, error)
NewHealthTracker creates a new distributed health tracker.
The tracker derives toolset participation from the shared catalog map, stores registration-scoped health in the shared health map, and uses a Pulse pool ticker so only one node in the cluster publishes pings at a time.
type HealthTrackerOption ¶
type HealthTrackerOption func(*healthTrackerOptions)
HealthTrackerOption configures optional settings for the health tracker.
func WithHealthLogger ¶
func WithHealthLogger(l telemetry.Logger) HealthTrackerOption
WithHealthLogger sets the logger used for health-transition and ping errors.
func WithMissedPingThreshold ¶
func WithMissedPingThreshold(n int) HealthTrackerOption
WithMissedPingThreshold sets the number of consecutive missed pings before marking a toolset as unhealthy.
func WithPingInterval ¶
func WithPingInterval(d time.Duration) HealthTrackerOption
WithPingInterval sets the interval between health check pings.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is the main entry point for the internal tool registry. It manages all components required for multi-node operation including Pulse streams, replicated maps, and distributed tickers.
func New ¶
New creates a new Registry with all components wired together. The registry connects to Redis for Pulse stream operations and creates replicated maps for cross-node state synchronization.
The caller is responsible for calling Close() when done to release resources.
func (*Registry) Close ¶
Close releases all resources held by the registry. It stops all ping loops, cleans up result streams, closes Pulse components, and closes Redis connections.
The caller is responsible for closing the Redis client if they own it. This method does not close the Redis client passed in Config.
func (*Registry) Run ¶
Run starts the gRPC server and blocks until the context is canceled or a termination signal is received. It handles graceful shutdown automatically.
The addr parameter specifies the network address to listen on (e.g., ":9090"). Optional gRPC server options can be passed to customize the server.
Example:
reg, _ := registry.New(ctx, registry.Config{Redis: rdb})
if err := reg.Run(ctx, ":9090"); err != nil {
log.Fatal(err)
}
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service implements the registry service interface. It provides toolset registration, discovery, and tool invocation capabilities.
func (*Service) CallTool ¶
func (s *Service) CallTool(ctx context.Context, p *genregistry.CallToolPayload) (*genregistry.CallToolResult, error)
CallTool invokes a tool through the registry gateway. It validates the payload against the tool's payload schema, checks provider health, creates the per-call result stream, and publishes the request to the toolset stream.
func (*Service) GetToolset ¶
func (s *Service) GetToolset(ctx context.Context, p *genregistry.GetToolsetPayload) (*genregistry.Toolset, error)
GetToolset returns a specific toolset by name including all tool schemas. Returns the complete toolset with tool schemas, or not-found error if the toolset doesn't exist. **Validates: Requirements 7.1, 7.2**
func (*Service) ListToolsets ¶
func (s *Service) ListToolsets(ctx context.Context, p *genregistry.ListToolsetsPayload) (*genregistry.ListToolsetsResult, error)
ListToolsets returns all registered toolsets with optional tag filtering. Returns all toolsets with metadata, supports tag filtering, and returns an empty list when the catalog is empty. **Validates: Requirements 6.1, 6.2, 6.3**
func (*Service) Pong ¶
func (s *Service) Pong(ctx context.Context, p *genregistry.PongPayload) error
Pong records a pong response for a health check ping. This restores healthy status if the provider was previously marked unhealthy.
func (*Service) Register ¶
func (s *Service) Register(ctx context.Context, p *genregistry.RegisterPayload) (*genregistry.RegisterResult, error)
Register registers a toolset with the registry. It validates the toolset schema, ensures the toolset request stream exists, stores the metadata, and starts the health ping loop.
func (*Service) Search ¶
func (s *Service) Search(ctx context.Context, p *genregistry.SearchPayload) (*genregistry.SearchResult, error)
Search searches toolsets by keyword matching name, description, or tags. Returns matching toolsets or an empty list when no matches are found. **Validates: Requirements 8.1, 8.2**
func (*Service) Unregister ¶
func (s *Service) Unregister(ctx context.Context, p *genregistry.UnregisterPayload) error
Unregister removes a toolset from the registry. It stops the health ping loop and removes the toolset from the catalog. Returns not-found error if the toolset doesn't exist. **Validates: Requirements 5.1, 5.2, 5.3**
type StreamManager ¶
type StreamManager interface {
// GetOrCreateStream returns the stream for a toolset, creating it if needed.
// The stream ID is deterministic based on the toolset name.
GetOrCreateStream(ctx context.Context, toolset string) (clientspulse.Stream, string, error)
// GetStream returns the stream for a toolset if it exists.
// Returns nil if the toolset has no associated stream.
GetStream(toolset string) clientspulse.Stream
// RemoveStream removes the stream tracking for a toolset.
// This does not destroy the underlying Pulse stream.
RemoveStream(toolset string)
// PublishToolCall publishes a tool call message to the toolset's stream.
PublishToolCall(ctx context.Context, toolset string, msg toolregistry.ToolCallMessage) error
}
StreamManager manages Pulse streams for toolset communication. It creates and tracks streams for each registered toolset, enabling tool request routing and result delivery.
func NewStreamManager ¶
func NewStreamManager(client clientspulse.Client) StreamManager
NewStreamManager creates a new StreamManager backed by the given Pulse client.
type ToolsetHealth ¶
type ToolsetHealth struct {
// Healthy reports whether a provider pong was received within the configured threshold.
Healthy bool
// LastPong is the timestamp of the last recorded pong when available.
LastPong time.Time
// Age is the duration since LastPong when available.
Age time.Duration
// StalenessThreshold is the configured maximum acceptable pong age.
StalenessThreshold time.Duration
}
ToolsetHealth reports derived provider health for a toolset.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
registry
command
Command registry runs the internal tool registry gRPC server.
|
Command registry runs the internal tool registry gRPC server. |
|
Package design defines the internal tool registry service using Goa DSL.
|
Package design defines the internal tool registry service using Goa DSL. |
|
gen
|
|