Documentation
¶
Overview ¶
Package discovery provides pluggable service discovery for FARP.
It supports three discovery modes:
- Registry-based (pull): Services register in Consul/etcd/K8s/Redis/mDNS, gateways watch the registry for changes.
- Push-based (reverse): Services push their manifest directly to the gateway via HTTP. No external registry needed.
- Hybrid: Registry for discovery + direct push for fast propagation.
Service Side ¶
Use ServiceNode to auto-register, serve FARP HTTP endpoints, and manage the full lifecycle:
node, _ := discovery.NewServiceNode(discovery.ServiceNodeConfig{
ServiceName: "user-service",
Address: "10.0.0.5:8080",
Discovery: consulBackend,
})
node.Start(ctx)
defer node.Stop(ctx)
http.Handle("/_farp/", node.HTTPHandler())
Gateway Side ¶
Use GatewayNode to auto-discover services, fetch manifests, and manage routes:
gw, _ := discovery.NewGatewayNode(discovery.GatewayNodeConfig{
Discovery: consulBackend,
OnRoutesChanged: updateRoutes,
})
gw.Start(ctx)
defer gw.Stop(ctx)
Index ¶
- type DiscoveryEvent
- type DiscoveryEventHandler
- type FARPHandler
- type GatewayNode
- func (n *GatewayNode) GatewayClient() *gateway.Client
- func (n *GatewayNode) ListServiceNames() []string
- func (n *GatewayNode) PushHandler() http.Handler
- func (n *GatewayNode) Registry() farp.SchemaRegistry
- func (n *GatewayNode) Routes() []gateway.ServiceRoute
- func (n *GatewayNode) Services() map[string]*farp.SchemaManifest
- func (n *GatewayNode) Start(ctx context.Context) error
- func (n *GatewayNode) Stop(_ context.Context) error
- type GatewayNodeConfig
- type HTTPManifestFetcher
- type ListableDiscovery
- type ManifestFetcher
- type NamedDiscovery
- type PushDiscovery
- func (p *PushDiscovery) Close() error
- func (p *PushDiscovery) Deregister(ctx context.Context, instanceID string) error
- func (p *PushDiscovery) Discover(ctx context.Context, serviceName string) ([]ServiceInstance, error)
- func (p *PushDiscovery) Health(ctx context.Context) error
- func (p *PushDiscovery) Register(ctx context.Context, instance ServiceInstance) error
- func (p *PushDiscovery) RegisterWithManifest(ctx context.Context, instance ServiceInstance, manifest *farp.SchemaManifest) (*PushRegistrationResponse, error)
- func (p *PushDiscovery) ReportHealth(ctx context.Context, instanceID string, status farp.InstanceStatus) error
- func (p *PushDiscovery) ReportHealthWithChecksum(ctx context.Context, instanceID string, status farp.InstanceStatus, ...) (*PushHeartbeatResponse, error)
- func (p *PushDiscovery) Watch(ctx context.Context, _ string, _ DiscoveryEventHandler) error
- type PushHandler
- func (h *PushHandler) Close() error
- func (h *PushHandler) Deregister(_ context.Context, instanceID string) error
- func (h *PushHandler) Discover(_ context.Context, serviceName string) ([]ServiceInstance, error)
- func (h *PushHandler) Health(_ context.Context) error
- func (h *PushHandler) Register(_ context.Context, instance ServiceInstance) error
- func (h *PushHandler) ReportHealth(_ context.Context, instanceID string, status farp.InstanceStatus) error
- func (h *PushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (h *PushHandler) Watch(ctx context.Context, _ string, handler DiscoveryEventHandler) error
- type PushHeartbeat
- type PushHeartbeatResponse
- type PushRegistration
- type PushRegistrationResponse
- type ServiceDiscovery
- type ServiceInstance
- type ServiceNode
- type ServiceNodeConfig
- type TagDiscovery
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DiscoveryEvent ¶
type DiscoveryEvent struct {
Type farp.EventType `json:"type"`
Instance ServiceInstance `json:"instance"`
Timestamp time.Time `json:"timestamp"`
// Manifest is populated when available (e.g., push mode includes it directly).
// For registry-based discovery, this may be nil and the gateway fetches it separately.
Manifest *farp.SchemaManifest `json:"manifest,omitempty"`
}
DiscoveryEvent represents a change in service discovery.
type DiscoveryEventHandler ¶
type DiscoveryEventHandler func(event DiscoveryEvent)
DiscoveryEventHandler is called when a service discovery event occurs.
type FARPHandler ¶
type FARPHandler struct {
// contains filtered or unexported fields
}
FARPHandler serves FARP protocol HTTP endpoints for a service. Mount this on your HTTP router to expose manifest, health, and schema endpoints.
Endpoints served (relative to mount point):
GET /_farp/manifest — returns SchemaManifest JSON
GET /_farp/health — returns health status (200 or 503)
GET /_farp/schemas/{type} — returns schema by type (openapi, asyncapi, graphql, etc.)
func NewFARPHandler ¶
func NewFARPHandler(manifest *farp.SchemaManifest, schemas map[farp.SchemaType]any) *FARPHandler
NewFARPHandler creates a new FARP HTTP handler.
func (*FARPHandler) ServeHTTP ¶
func (h *FARPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements http.Handler.
func (*FARPHandler) SetHealth ¶
func (h *FARPHandler) SetHealth(status farp.InstanceStatus)
SetHealth sets the health status returned by /_farp/health.
func (*FARPHandler) UpdateManifest ¶
func (h *FARPHandler) UpdateManifest(manifest *farp.SchemaManifest)
UpdateManifest updates the manifest served by /_farp/manifest.
func (*FARPHandler) UpdateSchema ¶
func (h *FARPHandler) UpdateSchema(schemaType farp.SchemaType, schema any)
UpdateSchema updates a schema served by /_farp/schemas/{type}.
type GatewayNode ¶
type GatewayNode struct {
// contains filtered or unexported fields
}
GatewayNode manages the full FARP lifecycle for a gateway: service discovery, manifest fetching, schema registration, route management, and health tracking — all automatic.
func NewGatewayNode ¶
func NewGatewayNode(config GatewayNodeConfig) (*GatewayNode, error)
NewGatewayNode creates a new GatewayNode.
func (*GatewayNode) GatewayClient ¶
func (n *GatewayNode) GatewayClient() *gateway.Client
GatewayClient returns the underlying gateway.Client.
func (*GatewayNode) ListServiceNames ¶
func (n *GatewayNode) ListServiceNames() []string
ListServiceNames returns distinct service names from all known manifests.
func (*GatewayNode) PushHandler ¶
func (n *GatewayNode) PushHandler() http.Handler
PushHandler returns the HTTP handler for push-based registration. Returns nil if EnablePush is false.
func (*GatewayNode) Registry ¶
func (n *GatewayNode) Registry() farp.SchemaRegistry
Registry returns the underlying SchemaRegistry.
func (*GatewayNode) Routes ¶
func (n *GatewayNode) Routes() []gateway.ServiceRoute
Routes returns the current computed route table.
func (*GatewayNode) Services ¶
func (n *GatewayNode) Services() map[string]*farp.SchemaManifest
Services returns all currently known services and their manifests.
type GatewayNodeConfig ¶
type GatewayNodeConfig struct {
// Discovery mode (use one or both):
// Option A: Registry-based — watch external backend for services
Discovery ServiceDiscovery
// Option B: Push-based — accept manifest pushes from services
EnablePush bool
// Optional: SchemaRegistry for storing discovered manifests.
// If nil, uses an in-memory registry internally.
Registry farp.SchemaRegistry
// Route change callback — called when routes change.
// Use this for simple integrations.
OnRoutesChanged func(routes []gateway.ServiceRoute)
// OnServiceEvent is called when a service discovery event occurs
// (add, update, remove). Called before manifest processing, so the
// gateway can track service lifecycle independently of routes.
OnServiceEvent func(event DiscoveryEvent)
// Or use atomic swap handler for zero-downtime route updates.
RouteHandler farp.RouteUpdateHandler
// Which services to watch (empty = all services)
ServiceNames []string
// HTTP client for fetching manifests from services (registry mode)
HTTPClient *http.Client
// ManifestFetcher for custom manifest fetching logic.
// If nil, uses HTTPManifestFetcher (GET /_farp/manifest).
Fetcher ManifestFetcher
// How often to poll for health (for backends that don't push health)
HealthPollInterval time.Duration // default: 15s
// Push mode: how long before an instance with no heartbeat is removed
HeartbeatTimeout time.Duration // default: 30s
}
GatewayNodeConfig configures a GatewayNode.
type HTTPManifestFetcher ¶
type HTTPManifestFetcher struct {
// contains filtered or unexported fields
}
HTTPManifestFetcher fetches manifests from service instances via HTTP. It GETs /_farp/manifest from the service address.
func NewHTTPManifestFetcher ¶
func NewHTTPManifestFetcher(client *http.Client) *HTTPManifestFetcher
NewHTTPManifestFetcher creates a new HTTP-based manifest fetcher.
func (*HTTPManifestFetcher) FetchManifest ¶
func (f *HTTPManifestFetcher) FetchManifest(_ context.Context, instance ServiceInstance) (*farp.SchemaManifest, error)
FetchManifest fetches the SchemaManifest from a service instance via HTTP.
type ListableDiscovery ¶
type ListableDiscovery interface {
// ListServices returns all known service names.
ListServices(ctx context.Context) ([]string, error)
}
ListableDiscovery is an optional interface for backends that can list all service names.
type ManifestFetcher ¶
type ManifestFetcher interface {
// FetchManifest fetches the SchemaManifest from a service instance.
FetchManifest(ctx context.Context, instance ServiceInstance) (*farp.SchemaManifest, error)
}
ManifestFetcher fetches a FARP manifest from a live service instance. Used by GatewayNode when discovery events don't include the manifest directly.
type NamedDiscovery ¶
type NamedDiscovery interface {
// Name returns the backend name (e.g., "consul", "etcd", "mdns").
Name() string
// Initialize performs any deferred initialization.
Initialize(ctx context.Context) error
}
NamedDiscovery is an optional interface that discovery backends can implement to provide identity and initialization. This is useful for frameworks (like Forge) that need to manage backend lifecycle.
type PushDiscovery ¶
type PushDiscovery struct {
// contains filtered or unexported fields
}
PushDiscovery implements ServiceDiscovery by pushing manifests directly to a gateway via HTTP. No external service registry needed.
This is the service-side component of push-based discovery. The gateway-side component is PushHandler.
func NewPushDiscovery ¶
func NewPushDiscovery(gatewayURL string, client *http.Client) *PushDiscovery
NewPushDiscovery creates a new push-based discovery client. gatewayURL is the base URL of the gateway's FARP push endpoint.
func (*PushDiscovery) Close ¶
func (p *PushDiscovery) Close() error
Close is a no-op for push discovery.
func (*PushDiscovery) Deregister ¶
func (p *PushDiscovery) Deregister(ctx context.Context, instanceID string) error
Deregister removes the service from the gateway.
func (*PushDiscovery) Discover ¶
func (p *PushDiscovery) Discover(ctx context.Context, serviceName string) ([]ServiceInstance, error)
Discover queries the gateway for instances of a service.
func (*PushDiscovery) Health ¶
func (p *PushDiscovery) Health(ctx context.Context) error
Health checks if the gateway is reachable.
func (*PushDiscovery) Register ¶
func (p *PushDiscovery) Register(ctx context.Context, instance ServiceInstance) error
Register pushes the service instance to the gateway.
func (*PushDiscovery) RegisterWithManifest ¶
func (p *PushDiscovery) RegisterWithManifest(ctx context.Context, instance ServiceInstance, manifest *farp.SchemaManifest) (*PushRegistrationResponse, error)
RegisterWithManifest pushes both the instance and manifest to the gateway. Use this when the gateway needs the manifest directly (e.g. after a heartbeat reconciliation mismatch — see §17.4.1).
func (*PushDiscovery) ReportHealth ¶
func (p *PushDiscovery) ReportHealth(ctx context.Context, instanceID string, status farp.InstanceStatus) error
ReportHealth sends a heartbeat to the gateway (without checksum). Satisfies the ServiceDiscovery interface.
func (*PushDiscovery) ReportHealthWithChecksum ¶
func (p *PushDiscovery) ReportHealthWithChecksum(ctx context.Context, instanceID string, status farp.InstanceStatus, routesChecksum string) (*PushHeartbeatResponse, error)
ReportHealthWithChecksum sends a heartbeat with the service's expected routes_checksum for reconciliation (FARP spec §17.4.1). The response includes the gateway's applied checksum and schema count.
func (*PushDiscovery) Watch ¶
func (p *PushDiscovery) Watch(ctx context.Context, _ string, _ DiscoveryEventHandler) error
Watch is not natively supported in push mode from the service side. Services push to the gateway; they don't watch it. This is a no-op that blocks until context is cancelled.
type PushHandler ¶
type PushHandler struct {
// contains filtered or unexported fields
}
PushHandler is the gateway-side HTTP handler for push-based service registration. Services POST their manifests directly to this handler.
It also implements ServiceDiscovery so GatewayNode can use it like any other backend.
Endpoints:
POST /register — service pushes instance + manifest
PUT /heartbeat/{id} — service sends heartbeat
DELETE /deregister/{id} — service deregisters
GET /services — list all registered services
GET /services/{name} — get instances of a service
func NewPushHandler ¶
func NewPushHandler(heartbeatTimeout time.Duration) *PushHandler
NewPushHandler creates a new push registration handler. heartbeatTimeout is how long before an instance with no heartbeat is evicted.
func (*PushHandler) Deregister ¶
func (h *PushHandler) Deregister(_ context.Context, instanceID string) error
Deregister removes an instance.
func (*PushHandler) Discover ¶
func (h *PushHandler) Discover(_ context.Context, serviceName string) ([]ServiceInstance, error)
Discover returns all known instances, optionally filtered by service name.
func (*PushHandler) Health ¶
func (h *PushHandler) Health(_ context.Context) error
Health always returns nil (push handler is always "healthy").
func (*PushHandler) Register ¶
func (h *PushHandler) Register(_ context.Context, instance ServiceInstance) error
Register adds an instance (same as receiving a push via HTTP).
func (*PushHandler) ReportHealth ¶
func (h *PushHandler) ReportHealth(_ context.Context, instanceID string, status farp.InstanceStatus) error
ReportHealth updates the health status of an instance.
func (*PushHandler) ServeHTTP ¶
func (h *PushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP routes push API requests.
func (*PushHandler) Watch ¶
func (h *PushHandler) Watch(ctx context.Context, _ string, handler DiscoveryEventHandler) error
Watch registers a watcher for discovery events.
type PushHeartbeat ¶
type PushHeartbeat struct {
Status farp.InstanceStatus `json:"status"`
RoutesChecksum string `json:"routes_checksum,omitempty"`
}
PushHeartbeat is the body sent for heartbeat updates. RoutesChecksum is optional (§17.4.1 reconciliation).
type PushHeartbeatResponse ¶
type PushHeartbeatResponse struct {
Status string `json:"status"`
RoutesChecksum string `json:"routes_checksum"`
SchemasApplied int `json:"schemas_applied"`
}
PushHeartbeatResponse is the gateway's response to a heartbeat.
type PushRegistration ¶
type PushRegistration struct {
Instance ServiceInstance `json:"instance"`
Manifest *farp.SchemaManifest `json:"manifest"`
}
PushRegistration is the body sent when a service pushes its manifest to the gateway.
type PushRegistrationResponse ¶
type PushRegistrationResponse struct {
Status string `json:"status"`
RoutesChecksum string `json:"routes_checksum"`
SchemasApplied int `json:"schemas_applied"`
}
PushRegistrationResponse is the gateway's response to a registration.
type ServiceDiscovery ¶
type ServiceDiscovery interface {
// Discover returns all known instances of a service.
// If serviceName is empty, returns all instances across all services.
Discover(ctx context.Context, serviceName string) ([]ServiceInstance, error)
// Watch watches for changes to instances of a service.
// The handler is called when instances are added, removed, or change health status.
// If serviceName is empty, watches all services.
// The watch runs until the context is cancelled.
Watch(ctx context.Context, serviceName string, handler DiscoveryEventHandler) error
// Register registers a service instance in the discovery backend.
Register(ctx context.Context, instance ServiceInstance) error
// Deregister removes a service instance from the discovery backend.
Deregister(ctx context.Context, instanceID string) error
// ReportHealth reports the health status of a registered instance.
ReportHealth(ctx context.Context, instanceID string, status farp.InstanceStatus) error
// Close closes the discovery backend connection and releases resources.
Close() error
// Health returns nil if the discovery backend itself is reachable.
Health(ctx context.Context) error
}
ServiceDiscovery provides pluggable service discovery operations. Implementations wrap infrastructure-specific discovery mechanisms (Consul, etcd, Kubernetes, Redis, mDNS, or push-based).
type ServiceInstance ¶
type ServiceInstance struct {
// Unique ID for this instance (maps to SchemaManifest.InstanceID)
ID string `json:"id"`
// Service name (maps to SchemaManifest.ServiceName)
ServiceName string `json:"service_name"`
// Service version (maps to SchemaManifest.ServiceVersion)
Version string `json:"version,omitempty"`
// Network address (host or host:port)
Address string `json:"address"`
// Port number
Port int `json:"port"`
// Health status
Status farp.InstanceStatus `json:"status"`
// Tags for filtering and labeling
Tags []string `json:"tags,omitempty"`
// Metadata tags (key-value pairs for filtering, labeling)
Metadata map[string]string `json:"metadata,omitempty"`
// When this instance was registered
RegisteredAt time.Time `json:"registered_at"`
// When health was last reported
LastHealthCheck time.Time `json:"last_health_check"`
}
ServiceInstance represents a discovered service instance in infrastructure. This is the network-level presence of a service, distinct from SchemaManifest which describes the API contracts.
type ServiceNode ¶
type ServiceNode struct {
// contains filtered or unexported fields
}
ServiceNode manages the full FARP lifecycle for a service: schema generation, manifest building, HTTP endpoint serving, discovery registration, health reporting, and graceful shutdown.
func NewServiceNode ¶
func NewServiceNode(config ServiceNodeConfig) (*ServiceNode, error)
NewServiceNode creates a new ServiceNode. Either config.Discovery or config.GatewayURL must be set.
func (*ServiceNode) HTTPHandler ¶
func (n *ServiceNode) HTTPHandler() http.Handler
HTTPHandler returns an http.Handler that serves FARP endpoints.
func (*ServiceNode) Manifest ¶
func (n *ServiceNode) Manifest() *farp.SchemaManifest
Manifest returns the current SchemaManifest.
func (*ServiceNode) Start ¶
func (n *ServiceNode) Start(ctx context.Context) error
Start registers the service and begins the health reporting loop. It generates schemas from providers (if configured), builds the manifest, registers in the discovery backend (or pushes to gateway), and starts a background goroutine for health reporting and TTL renewal.
func (*ServiceNode) Stop ¶
func (n *ServiceNode) Stop(ctx context.Context) error
Stop gracefully deregisters the service and stops the health loop.
func (*ServiceNode) UpdateSchema ¶
func (n *ServiceNode) UpdateSchema(ctx context.Context) error
UpdateSchema triggers re-generation of schemas from providers and pushes the updated manifest to the discovery/registry backend.
type ServiceNodeConfig ¶
type ServiceNodeConfig struct {
// Required: Service identity
ServiceName string
ServiceVersion string
InstanceID string // auto-generated UUID if empty
Address string // host:port this service listens on
// Discovery mode (use one):
// Option A: Registry-based — register in external backend
Discovery ServiceDiscovery
// Option B: Push-based — push manifest directly to gateway
GatewayURL string // e.g., "http://gateway:9090/_farp/v1"
// Optional: SchemaRegistry for publishing manifest to KV store
Registry farp.SchemaRegistry
// Optional: Schema providers for auto-generating schemas
Providers []farp.SchemaProvider
// Health/TTL configuration
HealthInterval time.Duration // default: 10s
TTL time.Duration // default: 30s
MaxRetries int // default: 5
RetryBackoff time.Duration // default: 2s
// Tags for service instance filtering/labeling
Tags []string
// Metadata is additional key-value metadata to include on the registered
// ServiceInstance. These are merged with the auto-generated FARP metadata
// keys (farp.enabled, farp.openapi, etc.) — user-provided keys take precedence.
Metadata map[string]string
// Endpoints configures the service's introspection endpoints.
// These flow into the manifest and are also advertised as farp.* metadata
// keys on the registered ServiceInstance (e.g., farp.openapi, farp.health).
Endpoints farp.SchemaEndpoints
// Routing configuration (flows into manifest)
MountStrategy farp.MountStrategy
BasePath string
PathRules []farp.PathRule
// Routes provides route information for OpenAPI schema generation.
// Can be []farp.RouteDescriptor, map[string]any (OpenAPI paths), or any
// type that the configured schema provider understands.
// If nil, the provider's Generate() may fail or produce empty paths.
Routes any
// Service hints (flows into manifest)
Hints *farp.ServiceHints
// HTTP client for push mode (optional, uses default if nil)
HTTPClient *http.Client
}
ServiceNodeConfig configures a ServiceNode.
type TagDiscovery ¶
type TagDiscovery interface {
// DiscoverWithTags returns instances matching all given tags.
DiscoverWithTags(ctx context.Context, serviceName string, tags []string) ([]ServiceInstance, error)
}
TagDiscovery is an optional interface for backends that support tag-based filtering.