registry

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package registry owns the toolset catalog used by the gateway. The catalog is persisted directly in the registry replicated-map keyspace so all registry nodes share one canonical view of registration state.

Package registry provides the internal tool registry service implementation.

This file owns distributed provider liveness. Catalog membership is the authoritative source of which toolsets participate in health tracking, and shared health records are scoped to the current registration epoch so a same-name re-registration cannot inherit stale health from a prior provider.

Package registry provides the internal tool registry gateway service.

This package contains the server-side implementation of the registry, which runs as a standalone service. It includes:

  • Service implementation (service.go) — gRPC service handlers
  • Toolset catalog (catalog.go) — Pulse-backed metadata persistence
  • Health tracking (health_tracker.go) — provider liveness detection
  • Stream management (stream_manager.go) — Pulse stream handling
  • Generated code (gen/) — Goa-generated types and gRPC transport
  • Design (design/) — Goa DSL service definition

Multi-Node Clustering

Multiple registry nodes can participate in the same logical registry by using the same Name in their Config and connecting to the same Redis instance. Nodes with the same name automatically:

  • Share toolset registrations via replicated maps
  • Coordinate health check pings via distributed tickers (only one node pings at a time)
  • Share provider health state across all nodes

This enables horizontal scaling and high availability. Clients can connect to any node and see the same registry state.

For client-side code that agents use to connect to and consume the registry, see the runtime/registry package.

Package registry centralizes schema admission and payload validation. Registration compiles every admitted tool schema up front, and tool calls reuse the same compiled-schema cache so live traffic does not rediscover invalid schemas or repeatedly pay compilation cost for the same contract.

Package registry provides the internal tool registry service implementation.

This file owns the transport-facing registry contract: it admits toolsets into the shared catalog, validates routed tool calls against admitted schemas, gates execution on provider health, and publishes tool-call traffic onto the canonical registry streams.

Package registry provides the internal tool registry service implementation.

Index

Constants

View Source
const (
	// DefaultPingInterval is the default interval between health check pings.
	DefaultPingInterval = 10 * time.Second
	// DefaultMissedPingThreshold is the default number of consecutive missed pings
	// before marking a toolset as unhealthy.
	DefaultMissedPingThreshold = 3
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Redis is the Redis client for Pulse operations. Required.
	Redis *redis.Client
	// Name is the registry name used to derive Pulse resource names.
	// Multiple nodes with the same Name and Redis connection form a cluster,
	// sharing state and coordinating health checks automatically.
	//
	// The pool, health map, and registry map names are derived as:
	//   - Pool: "<name>"
	//   - Health map: "<name>:health"
	//   - Registry map: "<name>:toolsets"
	//
	// Defaults to "registry" if not provided.
	Name string
	// Logger receives health tracker logs (pings, transitions, failures).
	// When nil, health tracking logs are suppressed.
	Logger telemetry.Logger
	// PingInterval is the interval between health check pings.
	// Defaults to 10 seconds if not provided.
	PingInterval time.Duration
	// MissedPingThreshold is the number of consecutive missed pings
	// before marking a toolset as unhealthy.
	// Defaults to 3 if not provided.
	MissedPingThreshold int
	// ResultStreamMappingTTL is the TTL for tool_use_id to stream_id mappings.
	// ResultStreamTTL is the TTL for per-call result streams in Redis.
	// Defaults to 15 minutes if not provided.
	ResultStreamTTL time.Duration
	// PoolNodeOptions are additional options for the Pulse pool node.
	PoolNodeOptions []pool.NodeOption
}

Config configures the registry service.

type HealthTracker

type HealthTracker interface {
	// Health returns the current health state for a toolset.
	//
	// Contract:
	//   - Health is derived from the last recorded Pong timestamp and the
	//     configured staleness threshold.
	//   - If the toolset has never ponged (or no entry exists), Health reports
	//     Healthy=false with LastPong unset.
	Health(toolset string) (ToolsetHealth, error)

	// RecordPong records a pong response for a toolset when the pong matches
	// the current catalog registration epoch.
	RecordPong(ctx context.Context, toolset string, pingID string) error

	// IsHealthy returns whether a toolset has healthy providers.
	// A toolset is healthy if a pong was received within the staleness threshold.
	IsHealthy(toolset string) bool

	// StartPingLoop ensures this node participates in health tracking for a
	// catalog-registered toolset. Cross-node participation is derived from the
	// shared catalog, not from a second membership index.
	StartPingLoop(ctx context.Context, toolset string) error

	// StopPingLoop stops local health tracking participation for an
	// unregistered toolset and clears its shared health state. Other nodes stop
	// via catalog change propagation.
	StopPingLoop(ctx context.Context, toolset string)

	// Close stops all ping loops and releases resources.
	Close() error
}

HealthTracker tracks provider health status for toolsets. It manages ping/pong health checks to detect when providers become unavailable, enabling fast failure instead of timeouts when all providers are unhealthy.

The tracker uses two Pulse replicated maps: 1. A catalog map that stores registered toolsets (for cross-node coordination) 2. A health map that stores registration-scoped pong records for each toolset

All nodes subscribe to the catalog map. When a toolset is registered/unregistered, all nodes see the change and start/stop their distributed ticker participation. The distributed ticker ensures only one node sends pings at a time, with automatic failover if that node crashes.

func NewHealthTracker

func NewHealthTracker(streamManager StreamManager, healthMap, catalogMap *rmap.Map, node *pool.Node, opts ...HealthTrackerOption) (HealthTracker, error)

NewHealthTracker creates a new distributed health tracker.

The tracker derives toolset participation from the shared catalog map, stores registration-scoped health in the shared health map, and uses a Pulse pool ticker so only one node in the cluster publishes pings at a time.

type HealthTrackerOption

type HealthTrackerOption func(*healthTrackerOptions)

HealthTrackerOption configures optional settings for the health tracker.

func WithHealthLogger

func WithHealthLogger(l telemetry.Logger) HealthTrackerOption

WithHealthLogger sets the logger used for health-transition and ping errors.

func WithMissedPingThreshold

func WithMissedPingThreshold(n int) HealthTrackerOption

WithMissedPingThreshold sets the number of consecutive missed pings before marking a toolset as unhealthy.

func WithPingInterval

func WithPingInterval(d time.Duration) HealthTrackerOption

WithPingInterval sets the interval between health check pings.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is the main entry point for the internal tool registry. It manages all components required for multi-node operation including Pulse streams, replicated maps, and distributed tickers.

func New

func New(ctx context.Context, cfg Config) (*Registry, error)

New creates a new Registry with all components wired together. The registry connects to Redis for Pulse stream operations and creates replicated maps for cross-node state synchronization.

The caller is responsible for calling Close() when done to release resources.

func (*Registry) Close

func (r *Registry) Close(ctx context.Context) error

Close releases all resources held by the registry. It stops all ping loops, cleans up result streams, closes Pulse components, and closes Redis connections.

The caller is responsible for closing the Redis client if they own it. This method does not close the Redis client passed in Config.

func (*Registry) Run

func (r *Registry) Run(ctx context.Context, addr string, opts ...grpc.ServerOption) error

Run starts the gRPC server and blocks until the context is canceled or a termination signal is received. It handles graceful shutdown automatically.

The addr parameter specifies the network address to listen on (e.g., ":9090"). Optional gRPC server options can be passed to customize the server.

Example:

reg, _ := registry.New(ctx, registry.Config{Redis: rdb})
if err := reg.Run(ctx, ":9090"); err != nil {
    log.Fatal(err)
}

func (*Registry) Service

func (r *Registry) Service() *Service

Service returns the registry service implementation. This implements the genregistry.Service interface.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service implements the registry service interface. It provides toolset registration, discovery, and tool invocation capabilities.

func (*Service) CallTool

CallTool invokes a tool through the registry gateway. It validates the payload against the tool's payload schema, checks provider health, creates the per-call result stream, and publishes the request to the toolset stream.

func (*Service) GetToolset

GetToolset returns a specific toolset by name including all tool schemas. Returns the complete toolset with tool schemas, or not-found error if the toolset doesn't exist. **Validates: Requirements 7.1, 7.2**

func (*Service) ListToolsets

ListToolsets returns all registered toolsets with optional tag filtering. Returns all toolsets with metadata, supports tag filtering, and returns an empty list when the catalog is empty. **Validates: Requirements 6.1, 6.2, 6.3**

func (*Service) Pong

Pong records a pong response for a health check ping. This restores healthy status if the provider was previously marked unhealthy.

func (*Service) Register

Register registers a toolset with the registry. It validates the toolset schema, ensures the toolset request stream exists, stores the metadata, and starts the health ping loop.

func (*Service) Search

Search searches toolsets by keyword matching name, description, or tags. Returns matching toolsets or an empty list when no matches are found. **Validates: Requirements 8.1, 8.2**

func (*Service) Unregister

func (s *Service) Unregister(ctx context.Context, p *genregistry.UnregisterPayload) error

Unregister removes a toolset from the registry. It stops the health ping loop and removes the toolset from the catalog. Returns not-found error if the toolset doesn't exist. **Validates: Requirements 5.1, 5.2, 5.3**

type StreamManager

type StreamManager interface {
	// GetOrCreateStream returns the stream for a toolset, creating it if needed.
	// The stream ID is deterministic based on the toolset name.
	GetOrCreateStream(ctx context.Context, toolset string) (clientspulse.Stream, string, error)

	// GetStream returns the stream for a toolset if it exists.
	// Returns nil if the toolset has no associated stream.
	GetStream(toolset string) clientspulse.Stream

	// RemoveStream removes the stream tracking for a toolset.
	// This does not destroy the underlying Pulse stream.
	RemoveStream(toolset string)

	// PublishToolCall publishes a tool call message to the toolset's stream.
	PublishToolCall(ctx context.Context, toolset string, msg toolregistry.ToolCallMessage) error
}

StreamManager manages Pulse streams for toolset communication. It creates and tracks streams for each registered toolset, enabling tool request routing and result delivery.

func NewStreamManager

func NewStreamManager(client clientspulse.Client) StreamManager

NewStreamManager creates a new StreamManager backed by the given Pulse client.

type ToolsetHealth

type ToolsetHealth struct {
	// Healthy reports whether a provider pong was received within the configured threshold.
	Healthy bool
	// LastPong is the timestamp of the last recorded pong when available.
	LastPong time.Time
	// Age is the duration since LastPong when available.
	Age time.Duration
	// StalenessThreshold is the configured maximum acceptable pong age.
	StalenessThreshold time.Duration
}

ToolsetHealth reports derived provider health for a toolset.

Directories

Path Synopsis
cmd
registry command
Command registry runs the internal tool registry gRPC server.
Command registry runs the internal tool registry gRPC server.
Package design defines the internal tool registry service using Goa DSL.
Package design defines the internal tool registry service using Goa DSL.
gen

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL