discovery

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package discovery provides pluggable service discovery for FARP.

It supports three discovery modes:

  • Registry-based (pull): Services register in Consul/etcd/K8s/Redis/mDNS, gateways watch the registry for changes.
  • Push-based (reverse): Services push their manifest directly to the gateway via HTTP. No external registry needed.
  • Hybrid: Registry for discovery + direct push for fast propagation.

Service Side

Use ServiceNode to auto-register, serve FARP HTTP endpoints, and manage the full lifecycle:

node, _ := discovery.NewServiceNode(discovery.ServiceNodeConfig{
    ServiceName: "user-service",
    Address:     "10.0.0.5:8080",
    Discovery:   consulBackend,
})
node.Start(ctx)
defer node.Stop(ctx)
http.Handle("/_farp/", node.HTTPHandler())

Gateway Side

Use GatewayNode to auto-discover services, fetch manifests, and manage routes:

gw, _ := discovery.NewGatewayNode(discovery.GatewayNodeConfig{
    Discovery:       consulBackend,
    OnRoutesChanged: updateRoutes,
})
gw.Start(ctx)
defer gw.Stop(ctx)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DiscoveryEvent

type DiscoveryEvent struct {
	Type      farp.EventType  `json:"type"`
	Instance  ServiceInstance `json:"instance"`
	Timestamp time.Time       `json:"timestamp"`

	// Manifest is populated when available (e.g., push mode includes it directly).
	// For registry-based discovery, this may be nil and the gateway fetches it separately.
	Manifest *farp.SchemaManifest `json:"manifest,omitempty"`
}

DiscoveryEvent represents a change in service discovery.

type DiscoveryEventHandler

type DiscoveryEventHandler func(event DiscoveryEvent)

DiscoveryEventHandler is called when a service discovery event occurs.

type FARPHandler

type FARPHandler struct {
	// contains filtered or unexported fields
}

FARPHandler serves FARP protocol HTTP endpoints for a service. Mount this on your HTTP router to expose manifest, health, and schema endpoints.

Endpoints served (relative to mount point):

GET /_farp/manifest         — returns SchemaManifest JSON
GET /_farp/health           — returns health status (200 or 503)
GET /_farp/schemas/{type}   — returns schema by type (openapi, asyncapi, graphql, etc.)

func NewFARPHandler

func NewFARPHandler(manifest *farp.SchemaManifest, schemas map[farp.SchemaType]any) *FARPHandler

NewFARPHandler creates a new FARP HTTP handler.

func (*FARPHandler) ServeHTTP

func (h *FARPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

func (*FARPHandler) SetHealth

func (h *FARPHandler) SetHealth(status farp.InstanceStatus)

SetHealth sets the health status returned by /_farp/health.

func (*FARPHandler) UpdateManifest

func (h *FARPHandler) UpdateManifest(manifest *farp.SchemaManifest)

UpdateManifest updates the manifest served by /_farp/manifest.

func (*FARPHandler) UpdateSchema

func (h *FARPHandler) UpdateSchema(schemaType farp.SchemaType, schema any)

UpdateSchema updates a schema served by /_farp/schemas/{type}.

type GatewayNode

type GatewayNode struct {
	// contains filtered or unexported fields
}

GatewayNode manages the full FARP lifecycle for a gateway: service discovery, manifest fetching, schema registration, route management, and health tracking — all automatic.

func NewGatewayNode

func NewGatewayNode(config GatewayNodeConfig) (*GatewayNode, error)

NewGatewayNode creates a new GatewayNode.

func (*GatewayNode) GatewayClient

func (n *GatewayNode) GatewayClient() *gateway.Client

GatewayClient returns the underlying gateway.Client.

func (*GatewayNode) ListServiceNames

func (n *GatewayNode) ListServiceNames() []string

ListServiceNames returns distinct service names from all known manifests.

func (*GatewayNode) PushHandler

func (n *GatewayNode) PushHandler() http.Handler

PushHandler returns the HTTP handler for push-based registration. Returns nil if EnablePush is false.

func (*GatewayNode) Registry

func (n *GatewayNode) Registry() farp.SchemaRegistry

Registry returns the underlying SchemaRegistry.

func (*GatewayNode) Routes

func (n *GatewayNode) Routes() []gateway.ServiceRoute

Routes returns the current computed route table.

func (*GatewayNode) Services

func (n *GatewayNode) Services() map[string]*farp.SchemaManifest

Services returns all currently known services and their manifests.

func (*GatewayNode) Start

func (n *GatewayNode) Start(ctx context.Context) error

Start begins watching for services and auto-managing routes.

func (*GatewayNode) Stop

func (n *GatewayNode) Stop(_ context.Context) error

Stop stops watching and cleans up. Before shutting down, it sends a fire-and-forget gateway.shutdown notification to all services that have a webhook endpoint configured.

type GatewayNodeConfig

type GatewayNodeConfig struct {
	// Discovery mode (use one or both):
	// Option A: Registry-based — watch external backend for services
	Discovery ServiceDiscovery
	// Option B: Push-based — accept manifest pushes from services
	EnablePush bool

	// Optional: SchemaRegistry for storing discovered manifests.
	// If nil, uses an in-memory registry internally.
	Registry farp.SchemaRegistry

	// Route change callback — called when routes change.
	// Use this for simple integrations.
	OnRoutesChanged func(routes []gateway.ServiceRoute)

	// OnServiceEvent is called when a service discovery event occurs
	// (add, update, remove). Called before manifest processing, so the
	// gateway can track service lifecycle independently of routes.
	OnServiceEvent func(event DiscoveryEvent)

	// Or use atomic swap handler for zero-downtime route updates.
	RouteHandler farp.RouteUpdateHandler

	// Which services to watch (empty = all services)
	ServiceNames []string

	// HTTP client for fetching manifests from services (registry mode)
	HTTPClient *http.Client

	// ManifestFetcher for custom manifest fetching logic.
	// If nil, uses HTTPManifestFetcher (GET /_farp/manifest).
	Fetcher ManifestFetcher

	// How often to poll for health (for backends that don't push health)
	HealthPollInterval time.Duration // default: 15s

	// Push mode: how long before an instance with no heartbeat is removed
	HeartbeatTimeout time.Duration // default: 30s
}

GatewayNodeConfig configures a GatewayNode.

type HTTPManifestFetcher

type HTTPManifestFetcher struct {
	// contains filtered or unexported fields
}

HTTPManifestFetcher fetches manifests from service instances via HTTP. It GETs /_farp/manifest from the service address.

func NewHTTPManifestFetcher

func NewHTTPManifestFetcher(client *http.Client) *HTTPManifestFetcher

NewHTTPManifestFetcher creates a new HTTP-based manifest fetcher.

func (*HTTPManifestFetcher) FetchManifest

func (f *HTTPManifestFetcher) FetchManifest(_ context.Context, instance ServiceInstance) (*farp.SchemaManifest, error)

FetchManifest fetches the SchemaManifest from a service instance via HTTP.

type ListableDiscovery

type ListableDiscovery interface {
	// ListServices returns all known service names.
	ListServices(ctx context.Context) ([]string, error)
}

ListableDiscovery is an optional interface for backends that can list all service names.

type ManifestFetcher

type ManifestFetcher interface {
	// FetchManifest fetches the SchemaManifest from a service instance.
	FetchManifest(ctx context.Context, instance ServiceInstance) (*farp.SchemaManifest, error)
}

ManifestFetcher fetches a FARP manifest from a live service instance. Used by GatewayNode when discovery events don't include the manifest directly.

type NamedDiscovery

type NamedDiscovery interface {
	// Name returns the backend name (e.g., "consul", "etcd", "mdns").
	Name() string

	// Initialize performs any deferred initialization.
	Initialize(ctx context.Context) error
}

NamedDiscovery is an optional interface that discovery backends can implement to provide identity and initialization. This is useful for frameworks (like Forge) that need to manage backend lifecycle.

type PushDiscovery

type PushDiscovery struct {
	// contains filtered or unexported fields
}

PushDiscovery implements ServiceDiscovery by pushing manifests directly to a gateway via HTTP. No external service registry needed.

This is the service-side component of push-based discovery. The gateway-side component is PushHandler.

func NewPushDiscovery

func NewPushDiscovery(gatewayURL string, client *http.Client) *PushDiscovery

NewPushDiscovery creates a new push-based discovery client. gatewayURL is the base URL of the gateway's FARP push endpoint.

func (*PushDiscovery) Close

func (p *PushDiscovery) Close() error

Close is a no-op for push discovery.

func (*PushDiscovery) Deregister

func (p *PushDiscovery) Deregister(ctx context.Context, instanceID string) error

Deregister removes the service from the gateway.

func (*PushDiscovery) Discover

func (p *PushDiscovery) Discover(ctx context.Context, serviceName string) ([]ServiceInstance, error)

Discover queries the gateway for instances of a service.

func (*PushDiscovery) Health

func (p *PushDiscovery) Health(ctx context.Context) error

Health checks if the gateway is reachable.

func (*PushDiscovery) Register

func (p *PushDiscovery) Register(ctx context.Context, instance ServiceInstance) error

Register pushes the service instance to the gateway.

func (*PushDiscovery) RegisterWithManifest

func (p *PushDiscovery) RegisterWithManifest(ctx context.Context, instance ServiceInstance, manifest *farp.SchemaManifest) (*PushRegistrationResponse, error)

RegisterWithManifest pushes both the instance and manifest to the gateway. Use this when the gateway needs the manifest directly (e.g. after a heartbeat reconciliation mismatch — see §17.4.1).

func (*PushDiscovery) ReportHealth

func (p *PushDiscovery) ReportHealth(ctx context.Context, instanceID string, status farp.InstanceStatus) error

ReportHealth sends a heartbeat to the gateway (without checksum). Satisfies the ServiceDiscovery interface.

func (*PushDiscovery) ReportHealthWithChecksum

func (p *PushDiscovery) ReportHealthWithChecksum(ctx context.Context, instanceID string, status farp.InstanceStatus, routesChecksum string) (*PushHeartbeatResponse, error)

ReportHealthWithChecksum sends a heartbeat with the service's expected routes_checksum for reconciliation (FARP spec §17.4.1). The response includes the gateway's applied checksum and schema count.

func (*PushDiscovery) Watch

Watch is not natively supported in push mode from the service side. Services push to the gateway; they don't watch it. This is a no-op that blocks until context is cancelled.

type PushHandler

type PushHandler struct {
	// contains filtered or unexported fields
}

PushHandler is the gateway-side HTTP handler for push-based service registration. Services POST their manifests directly to this handler.

It also implements ServiceDiscovery so GatewayNode can use it like any other backend.

Endpoints:

POST   /register              — service pushes instance + manifest
PUT    /heartbeat/{id}        — service sends heartbeat
DELETE /deregister/{id}       — service deregisters
GET    /services              — list all registered services
GET    /services/{name}       — get instances of a service

func NewPushHandler

func NewPushHandler(heartbeatTimeout time.Duration) *PushHandler

NewPushHandler creates a new push registration handler. heartbeatTimeout is how long before an instance with no heartbeat is evicted.

func (*PushHandler) Close

func (h *PushHandler) Close() error

Close stops the reaper goroutine.

func (*PushHandler) Deregister

func (h *PushHandler) Deregister(_ context.Context, instanceID string) error

Deregister removes an instance.

func (*PushHandler) Discover

func (h *PushHandler) Discover(_ context.Context, serviceName string) ([]ServiceInstance, error)

Discover returns all known instances, optionally filtered by service name.

func (*PushHandler) Health

func (h *PushHandler) Health(_ context.Context) error

Health always returns nil (push handler is always "healthy").

func (*PushHandler) Register

func (h *PushHandler) Register(_ context.Context, instance ServiceInstance) error

Register adds an instance (same as receiving a push via HTTP).

func (*PushHandler) ReportHealth

func (h *PushHandler) ReportHealth(_ context.Context, instanceID string, status farp.InstanceStatus) error

ReportHealth updates the health status of an instance.

func (*PushHandler) ServeHTTP

func (h *PushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP routes push API requests.

func (*PushHandler) Watch

func (h *PushHandler) Watch(ctx context.Context, _ string, handler DiscoveryEventHandler) error

Watch registers a watcher for discovery events.

type PushHeartbeat

type PushHeartbeat struct {
	Status         farp.InstanceStatus `json:"status"`
	RoutesChecksum string              `json:"routes_checksum,omitempty"`
}

PushHeartbeat is the body sent for heartbeat updates. RoutesChecksum is optional (§17.4.1 reconciliation).

type PushHeartbeatResponse

type PushHeartbeatResponse struct {
	Status         string `json:"status"`
	RoutesChecksum string `json:"routes_checksum"`
	SchemasApplied int    `json:"schemas_applied"`
}

PushHeartbeatResponse is the gateway's response to a heartbeat.

type PushRegistration

type PushRegistration struct {
	Instance ServiceInstance      `json:"instance"`
	Manifest *farp.SchemaManifest `json:"manifest"`
}

PushRegistration is the body sent when a service pushes its manifest to the gateway.

type PushRegistrationResponse

type PushRegistrationResponse struct {
	Status         string `json:"status"`
	RoutesChecksum string `json:"routes_checksum"`
	SchemasApplied int    `json:"schemas_applied"`
}

PushRegistrationResponse is the gateway's response to a registration.

type ServiceDiscovery

type ServiceDiscovery interface {
	// Discover returns all known instances of a service.
	// If serviceName is empty, returns all instances across all services.
	Discover(ctx context.Context, serviceName string) ([]ServiceInstance, error)

	// Watch watches for changes to instances of a service.
	// The handler is called when instances are added, removed, or change health status.
	// If serviceName is empty, watches all services.
	// The watch runs until the context is cancelled.
	Watch(ctx context.Context, serviceName string, handler DiscoveryEventHandler) error

	// Register registers a service instance in the discovery backend.
	Register(ctx context.Context, instance ServiceInstance) error

	// Deregister removes a service instance from the discovery backend.
	Deregister(ctx context.Context, instanceID string) error

	// ReportHealth reports the health status of a registered instance.
	ReportHealth(ctx context.Context, instanceID string, status farp.InstanceStatus) error

	// Close closes the discovery backend connection and releases resources.
	Close() error

	// Health returns nil if the discovery backend itself is reachable.
	Health(ctx context.Context) error
}

ServiceDiscovery provides pluggable service discovery operations. Implementations wrap infrastructure-specific discovery mechanisms (Consul, etcd, Kubernetes, Redis, mDNS, or push-based).

type ServiceInstance

type ServiceInstance struct {
	// Unique ID for this instance (maps to SchemaManifest.InstanceID)
	ID string `json:"id"`

	// Service name (maps to SchemaManifest.ServiceName)
	ServiceName string `json:"service_name"`

	// Service version (maps to SchemaManifest.ServiceVersion)
	Version string `json:"version,omitempty"`

	// Network address (host or host:port)
	Address string `json:"address"`

	// Port number
	Port int `json:"port"`

	// Health status
	Status farp.InstanceStatus `json:"status"`

	// Tags for filtering and labeling
	Tags []string `json:"tags,omitempty"`

	// Metadata tags (key-value pairs for filtering, labeling)
	Metadata map[string]string `json:"metadata,omitempty"`

	// When this instance was registered
	RegisteredAt time.Time `json:"registered_at"`

	// When health was last reported
	LastHealthCheck time.Time `json:"last_health_check"`
}

ServiceInstance represents a discovered service instance in infrastructure. This is the network-level presence of a service, distinct from SchemaManifest which describes the API contracts.

type ServiceNode

type ServiceNode struct {
	// contains filtered or unexported fields
}

ServiceNode manages the full FARP lifecycle for a service: schema generation, manifest building, HTTP endpoint serving, discovery registration, health reporting, and graceful shutdown.

func NewServiceNode

func NewServiceNode(config ServiceNodeConfig) (*ServiceNode, error)

NewServiceNode creates a new ServiceNode. Either config.Discovery or config.GatewayURL must be set.

func (*ServiceNode) HTTPHandler

func (n *ServiceNode) HTTPHandler() http.Handler

HTTPHandler returns an http.Handler that serves FARP endpoints.

func (*ServiceNode) Manifest

func (n *ServiceNode) Manifest() *farp.SchemaManifest

Manifest returns the current SchemaManifest.

func (*ServiceNode) Start

func (n *ServiceNode) Start(ctx context.Context) error

Start registers the service and begins the health reporting loop. It generates schemas from providers (if configured), builds the manifest, registers in the discovery backend (or pushes to gateway), and starts a background goroutine for health reporting and TTL renewal.

func (*ServiceNode) Stop

func (n *ServiceNode) Stop(ctx context.Context) error

Stop gracefully deregisters the service and stops the health loop.

func (*ServiceNode) UpdateSchema

func (n *ServiceNode) UpdateSchema(ctx context.Context) error

UpdateSchema triggers re-generation of schemas from providers and pushes the updated manifest to the discovery/registry backend.

type ServiceNodeConfig

type ServiceNodeConfig struct {
	// Required: Service identity
	ServiceName    string
	ServiceVersion string
	InstanceID     string // auto-generated UUID if empty
	Address        string // host:port this service listens on

	// Discovery mode (use one):
	// Option A: Registry-based — register in external backend
	Discovery ServiceDiscovery
	// Option B: Push-based — push manifest directly to gateway
	GatewayURL string // e.g., "http://gateway:9090/_farp/v1"

	// Optional: SchemaRegistry for publishing manifest to KV store
	Registry farp.SchemaRegistry

	// Optional: Schema providers for auto-generating schemas
	Providers []farp.SchemaProvider

	// Health/TTL configuration
	HealthInterval time.Duration // default: 10s
	TTL            time.Duration // default: 30s
	MaxRetries     int           // default: 5
	RetryBackoff   time.Duration // default: 2s

	// Tags for service instance filtering/labeling
	Tags []string

	// Metadata is additional key-value metadata to include on the registered
	// ServiceInstance. These are merged with the auto-generated FARP metadata
	// keys (farp.enabled, farp.openapi, etc.) — user-provided keys take precedence.
	Metadata map[string]string

	// Endpoints configures the service's introspection endpoints.
	// These flow into the manifest and are also advertised as farp.* metadata
	// keys on the registered ServiceInstance (e.g., farp.openapi, farp.health).
	Endpoints farp.SchemaEndpoints

	// Routing configuration (flows into manifest)
	MountStrategy farp.MountStrategy
	BasePath      string
	PathRules     []farp.PathRule

	// Routes provides route information for OpenAPI schema generation.
	// Can be []farp.RouteDescriptor, map[string]any (OpenAPI paths), or any
	// type that the configured schema provider understands.
	// If nil, the provider's Generate() may fail or produce empty paths.
	Routes any

	// Service hints (flows into manifest)
	Hints *farp.ServiceHints

	// HTTP client for push mode (optional, uses default if nil)
	HTTPClient *http.Client
}

ServiceNodeConfig configures a ServiceNode.

type TagDiscovery

type TagDiscovery interface {
	// DiscoverWithTags returns instances matching all given tags.
	DiscoverWithTags(ctx context.Context, serviceName string, tags []string) ([]ServiceInstance, error)
}

TagDiscovery is an optional interface for backends that support tag-based filtering.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL