registry

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: MIT Imports: 15 Imported by: 0

README

registry - Genesis 服务注册发现组件

Go Reference

registry 是 Genesis 治理层的核心组件,提供基于 Etcd 的服务注册与发现能力,深度集成 gRPC Resolver 实现客户端负载均衡。

特性

  • 所属层级:L3 (Governance) — 流量治理,提供服务注册发现能力
  • 核心职责:在 Etcd 连接器的基础上提供统一的服务注册与发现语义
  • 设计原则
    • 借用模型:借用 Etcd 连接器的连接,不负责连接的生命周期
    • gRPC 原生支持:实现 gRPC resolver.Builder 接口,支持 etcd://<service_name> 解析
  • 实时监听:通过 Etcd Watch 机制实时感知服务变化
  • 自动续约:Lease 机制确保服务可用性,自动处理续租
  • 优雅下线:Close() 方法自动撤销租约,停止监听器(Close 后实例不可再使用)
  • 单实例约束:进程内仅允许一个 active registry(用于 gRPC resolver)
  • 可观测性:集成 clog 和 metrics,提供完整的日志和指标能力

目录结构(完全扁平化设计)

registry/                  # 公开 API + 实现(完全扁平化)
├── README.md              # 本文档
├── registry.go            # Registry 接口和 Etcd 实现,New 构造函数
├── interface.go           # Registry 接口定义
├── config.go              # 配置结构:Config
├── service.go             # 服务模型:ServiceInstance、ServiceEvent
├── options.go             # 函数式选项:Option、WithLogger
├── errors.go              # 错误定义
├── resolver.go            # gRPC Resolver 实现
└── *_test.go              # 测试文件

设计原则:完全扁平化设计,所有公开 API 和实现都在根目录

快速开始

import "github.com/ceyewan/genesis/registry"
基础使用
// 1. 创建连接器
etcdConn, _ := connector.NewEtcd(&cfg.Etcd, connector.WithLogger(logger))
defer etcdConn.Close()
etcdConn.Connect(ctx)

// 2. 创建注册组件
reg, _ := registry.New(etcdConn, &registry.Config{
    Namespace:  "/genesis/services",
    DefaultTTL: 30 * time.Second,
}, registry.WithLogger(logger))
defer reg.Close()

// 3. 注册服务
service := &registry.ServiceInstance{
    ID:        "user-service-001",
    Name:      "user-service",
    Version:   "1.0.0",
    Endpoints: []string{"grpc://127.0.0.1:9001"},
}
err := reg.Register(ctx, service, 30*time.Second)

// 4. 服务发现
instances, err := reg.GetService(ctx, "user-service")

核心接口

Registry 接口
type Registry interface {
    // --- 服务注册 ---
    Register(ctx context.Context, service *ServiceInstance, ttl time.Duration) error
    Deregister(ctx context.Context, serviceID string) error

    // --- 服务发现 ---
    GetService(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
    Watch(ctx context.Context, serviceName string) (<-chan ServiceEvent, error)

    // --- gRPC 集成 ---
    GetConnection(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error)

    // --- 资源管理 ---
    Close() error
}
服务模型
// ServiceInstance 代表一个服务实例
type ServiceInstance struct {
    ID        string            `json:"id"`        // 唯一实例 ID
    Name      string            `json:"name"`      // 服务名称
    Version   string            `json:"version"`   // 版本号
    Metadata  map[string]string `json:"metadata"`  // 元数据
    Endpoints []string          `json:"endpoints"` // 服务地址列表
}

// ServiceEvent 服务变化事件
type ServiceEvent struct {
    Type    EventType        // 事件类型 (PUT/DELETE)
    Service *ServiceInstance // 服务实例信息
}

配置设计

Config 结构
type Config struct {
    // Namespace: Etcd Key 前缀,默认 "/genesis/services"
    Namespace string `json:"namespace" yaml:"namespace"`

    // DefaultTTL: 默认服务注册租约时长,默认 30s
    DefaultTTL time.Duration `json:"default_ttl" yaml:"default_ttl"`

    // RetryInterval: 重连/重试间隔,默认 1s
    RetryInterval time.Duration `json:"retry_interval" yaml:"retry_interval"`
}

说明:DefaultTTL 需为 >= 1s(或为 0 使用默认值);gRPC resolver 的 scheme 固定为 etcd,无需额外配置;进程内仅允许一个 active registry,如需切换请先 Close。

使用模式

1. 服务注册
// 定义服务实例
service := &registry.ServiceInstance{
    ID:        "user-service-001",
    Name:      "user-service",
    Version:   "1.0.0",
    Endpoints: []string{"grpc://192.168.1.100:8080"},
    Metadata: map[string]string{
        "region": "us-west-1",
        "zone":   "zone-a",
        "weight": "100",
    },
}

// 注册服务,指定 30s TTL
err := reg.Register(ctx, service, 30*time.Second)
if err != nil {
    logger.Error("failed to register service", clog.Error(err))
    return
}

// 优雅下线时注销
defer reg.Deregister(ctx, service.ID)
2. 服务发现
// 获取服务实例列表
instances, err := reg.GetService(ctx, "user-service")
if err != nil {
    logger.Error("failed to get service", clog.Error(err))
    return
}

logger.Info("found service instances", clog.Int("count", len(instances)))
for _, instance := range instances {
    logger.Info("service instance",
        clog.String("id", instance.ID),
        clog.String("version", instance.Version),
        clog.Any("endpoints", instance.Endpoints))
}
3. gRPC 集成(方式一:GetConnection)
import "google.golang.org/grpc/credentials/insecure"

// 必须传入 grpc.WithTransportCredentials() 或其他凭证选项
conn, err := reg.GetConnection(ctx, "user-service",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
    logger.Error("failed to get connection", clog.Error(err))
    return
}
defer conn.Close()

// 使用连接调用 gRPC 服务
client := pb.NewUserServiceClient(conn)
resp, err := client.GetUser(ctx, &pb.GetUserRequest{ID: "123"})
4. gRPC 集成(方式二:原生 gRPC Dial)
import (
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

// Registry 初始化时已自动注册 gRPC Resolver Builder
// 使用标准 gRPC Dial
conn, err := grpc.NewClient(
    "etcd:///user-service",
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
    logger.Error("failed to dial", clog.Error(err))
    return
}
defer conn.Close()

// 使用连接
client := pb.NewUserServiceClient(conn)
5. 监听服务变化
// 监听服务变化
eventCh, err := reg.Watch(ctx, "user-service")
if err != nil {
    logger.Error("failed to watch service", clog.Error(err))
    return
}

// 处理事件
go func() {
    for event := range eventCh {
        switch event.Type {
        case registry.EventTypePut:
            logger.Info("service registered/updated",
                clog.String("service_id", event.Service.ID),
                clog.Any("endpoints", event.Service.Endpoints))
        case registry.EventTypeDelete:
            logger.Info("service deregistered",
                clog.String("service_id", event.Service.ID))
        }
    }
}()

函数式选项

// WithLogger 注入日志记录器
reg, err := registry.New(etcdConn, cfg, registry.WithLogger(logger))

Etcd 存储结构

服务实例在 Etcd 中的存储采用层级结构:

<namespace>/<service_name>/<instance_id> -> JSON(ServiceInstance)

例如:

  • /genesis/services/user-service/uuid-1234-5678
  • /genesis/services/order-service/uuid-abcd-efgh

这种设计便于:

  • 使用前缀 Watch 监听特定服务的变化
  • 层次化的命名空间管理
  • 清晰的服务组织结构

负载均衡

gRPC 集成原理
  1. Resolver 注册:Registry 初始化时自动注册 etcd:// scheme 的 resolver
  2. 服务发现:Resolver 通过 Watch 机制获取服务实例列表
  3. 连接更新:当服务实例发生变化时,Resolver 自动更新 gRPC 连接池
  4. 负载均衡:gRPC Balancer 在更新的连接池中进行负载分发(默认 round_robin)
配置负载均衡
// 在 grpc.NewClient 中指定负载均衡策略
conn, err := grpc.NewClient(
    "etcd:///user-service",
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
)

支持的负载均衡策略:

  • round_robin:轮询(默认)
  • pick_first:选择第一个
  • 自定义策略

资源所有权模型

Registry 组件采用借用模型 (Borrowing Model)

  1. 连接器 (Owner):拥有底层 Etcd 连接,负责连接池管理
  2. Registry 组件 (Borrower):借用连接器中的客户端,不拥有其生命周期
  3. 生命周期控制:使用 defer 确保关闭顺序与创建顺序相反(LIFO)
// ✅ 正确示例
etcdConn, _ := connector.NewEtcd(&cfg.Etcd, connector.WithLogger(logger))
defer etcdConn.Close() // 应用结束时关闭底层连接
etcdConn.Connect(ctx)

reg, _ := registry.New(etcdConn, &registry.Config{}, registry.WithLogger(logger))
defer reg.Close()     // 撤销租约、停止监听器

与其他组件配合

func main() {
    ctx := context.Background()
    logger := clog.Must(&clog.Config{Level: "info"})

    // 1. 创建连接器
    etcdConn, _ := connector.NewEtcd(&cfg.Etcd, connector.WithLogger(logger))
    defer etcdConn.Close()
    etcdConn.Connect(ctx)

    // 2. 创建注册组件
    reg, _ := registry.New(etcdConn, &registry.Config{}, registry.WithLogger(logger))
    defer reg.Close()

    // 3. 注册当前服务
    service := &registry.ServiceInstance{
        ID:        "my-service-001",
        Name:      "my-service",
        Endpoints: []string{"grpc://127.0.0.1:8080"},
    }
    err := reg.Register(ctx, service, 30*time.Second)
    if err != nil {
        logger.Error("failed to register", clog.Error(err))
        return
    }
    defer reg.Deregister(ctx, service.ID)

    // 4. 调用其他服务
    userConn, _ := reg.GetConnection(ctx, "user-service",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    defer userConn.Close()

    userClient := pb.NewUserServiceClient(userConn)
    user, err := userClient.GetUser(ctx, &pb.GetUserRequest{ID: "123"})
}

语义说明

  • Watch 基于 Etcd Revision 增量监听,进程内自动重连。
  • 当发生 compaction(历史 revision 被清理)时,会触发一次全量拉取并从最新 revision 继续监听。
  • compaction 期间可能出现事件丢失,需保证业务侧具备幂等或容错处理能力。

最佳实践

  1. 服务命名:使用有意义的服务名,如 user-serviceorder-service
  2. 实例 ID:使用 UUID 或包含主机信息的唯一标识符
  3. TTL 设置:根据服务特点设置合理的 TTL,建议 30s-60s
  4. 元数据:在 Metadata 中存储 region、zone、version 等有用信息
  5. 错误处理:使用 xerrors.Wrapf() 包装错误,保留错误链
  6. 优雅下线:确保在应用退出时调用 Deregister 或依赖 Close() 自动处理
  7. 监控:通过 WithLogger 注入日志组件

完整示例

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/ceyewan/genesis/registry"
    "github.com/ceyewan/genesis/clog"
    "github.com/ceyewan/genesis/connector"
    "google.golang.org/grpc"
    pb "github.com/your-org/your-proto"
)

func main() {
    ctx := context.Background()
    logger := clog.Must(&clog.Config{Level: "info"})

    // 1. 创建 Etcd 连接器
    etcdConn, err := connector.NewEtcd(&connector.EtcdConfig{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    }, connector.WithLogger(logger))
    if err != nil {
        panic(err)
    }
    defer etcdConn.Close()

    // 2. 连接到 Etcd
    if err := etcdConn.Connect(ctx); err != nil {
        panic(err)
    }

    // 3. 创建 Registry 实例
reg, err := registry.New(etcdConn, &registry.Config{
        Namespace:  "/genesis/services",
        DefaultTTL: 30 * time.Second,
    }, registry.WithLogger(logger))
    if err != nil {
        panic(err)
    }
    defer reg.Close()

    // 4. 注册当前服务
    service := &registry.ServiceInstance{
        ID:      fmt.Sprintf("order-service-%s", getPodID()),
        Name:    "order-service",
        Version: "1.0.0",
        Endpoints: []string{
            "grpc://127.0.0.1:8080",
        },
        Metadata: map[string]string{
            "region":    "us-west-1",
            "zone":      "zone-a",
            "weight":    "100",
            "commit":    getGitCommit(),
        },
    }

    err = reg.Register(ctx, service, 30*time.Second)
    if err != nil {
        panic(err)
    }
    logger.Info("service registered successfully",
        clog.String("service_id", service.ID),
        clog.Any("endpoints", service.Endpoints))

    // 5. 监听其他服务变化
    go watchUserService(reg, logger)

    // 6. 调用其他服务
    callUserService(reg, logger)

    // 7. 保持服务运行
    logger.Info("service is running...")
    select {}
}

func watchUserService(reg registry.Registry, logger clog.Logger) {
    ctx := context.Background()
    eventCh, err := reg.Watch(ctx, "user-service")
    if err != nil {
        logger.Error("failed to watch user service", clog.Error(err))
        return
    }

    for event := range eventCh {
        switch event.Type {
        case registry.EventTypePut:
            logger.Info("user service registered/updated",
                clog.String("service_id", event.Service.ID),
                clog.String("version", event.Service.Version),
                clog.Any("endpoints", event.Service.Endpoints))
        case registry.EventTypeDelete:
            logger.Info("user service deregistered",
                clog.String("service_id", event.Service.ID))
        }
    }
}

func callUserService(reg registry.Registry, logger clog.Logger) {
    ctx := context.Background()

    // 方式一:使用 GetConnection
    conn, err := reg.GetConnection(ctx, "user-service",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        logger.Error("failed to get user service connection", clog.Error(err))
        return
    }
    defer conn.Close()

    client := pb.NewUserServiceClient(conn)
    resp, err := client.GetUser(ctx, &pb.GetUserRequest{ID: "123"})
    if err != nil {
        logger.Error("failed to call user service", clog.Error(err))
        return
    }

    logger.Info("user service call successful",
        clog.String("user_id", resp.User.ID),
        clog.String("user_name", resp.User.Name))
}

Documentation

Overview

Package registry 提供了基于 Etcd 的服务注册发现组件,支持 gRPC 集成和客户端负载均衡。

registry 组件是 Genesis 治理层的核心组件,它在 Etcd 连接器的基础上提供了: - 服务注册与发现能力 - 实时服务变化监听 - gRPC Resolver 集成,支持 `etcd://<service_name>` 解析 - 自动租约续约和优雅下线 - 与 L0 基础组件(日志、指标、错误)的深度集成

## 基本使用

etcdConn, _ := connector.NewEtcd(&cfg.Etcd, connector.WithLogger(logger))
defer etcdConn.Close()
etcdConn.Connect(ctx)

reg, _ := registry.New(etcdConn, &registry.Config{
	Namespace:  "/genesis/services",
	DefaultTTL: 30 * time.Second,
}, registry.WithLogger(logger))
defer reg.Close()

// 注册服务
service := &registry.ServiceInstance{
	ID:        "user-service-001",
	Name:      "user-service",
	Endpoints: []string{"grpc://127.0.0.1:8080"},
}
err := reg.Register(ctx, service, 30*time.Second)

// 服务发现
instances, err := reg.GetService(ctx, "user-service")

// gRPC 集成
conn, err := reg.GetConnection(ctx, "user-service")
defer conn.Close()
client := pb.NewUserServiceClient(conn)

## Etcd 存储结构

服务实例在 Etcd 中的存储采用层级结构:

<namespace>/<service_name>/<instance_id> -> JSON(ServiceInstance)

例如: - `/genesis/services/user-service/uuid-1234-5678` - `/genesis/services/order-service/uuid-abcd-efgh`

## gRPC 集成

Registry 组件实现了 gRPC resolver.Builder 接口,支持原生 gRPC 服务发现:

// 方式一:使用 GetConnection(推荐)
conn, err := reg.GetConnection(ctx, "user-service")

// 方式二:使用原生 gRPC Dial
conn, err := grpc.NewClient(
	"etcd:///user-service",
	grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
	grpc.WithTransportCredentials(insecure.NewCredentials()),
)

## 设计原则

- **借用模型**:registry 组件借用 Etcd 连接器的连接,不负责连接的生命周期 - **显式依赖**:通过构造函数显式注入连接器和选项 - **gRPC 原生支持**:深度集成 gRPC 生态,提供开箱即用的服务发现 - **可观测性**:集成 clog 和 metrics,提供完整的日志和指标能力

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrServiceNotFound 服务未找到
	ErrServiceNotFound = xerrors.New("service not found")

	// ErrServiceAlreadyRegistered 服务已注册
	ErrServiceAlreadyRegistered = xerrors.New("service already registered")

	// ErrInvalidServiceInstance 无效的服务实例
	ErrInvalidServiceInstance = xerrors.New("invalid service instance")

	// ErrRegistryAlreadyInitialized registry 已初始化
	ErrRegistryAlreadyInitialized = xerrors.New("registry already initialized")

	// ErrRegistryClosed registry 已关闭
	ErrRegistryClosed = xerrors.New("registry is closed")

	// ErrInvalidTTL 无效的 TTL
	ErrInvalidTTL = xerrors.New("invalid ttl")

	// ErrLeaseExpired 租约已过期
	ErrLeaseExpired = xerrors.New("lease expired")

	// ErrWatchClosed Watch 已关闭
	ErrWatchClosed = xerrors.New("watch closed")

	// ErrConnectionFailed 连接失败
	ErrConnectionFailed = xerrors.New("connection failed")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// Namespace Etcd Key 前缀,默认 "/genesis/services"
	Namespace string `yaml:"namespace" json:"namespace"`

	// DefaultTTL 默认服务注册租约时长,默认 30s
	DefaultTTL time.Duration `yaml:"default_ttl" json:"default_ttl"`

	// RetryInterval 重连/重试间隔,默认 1s
	RetryInterval time.Duration `yaml:"retry_interval" json:"retry_interval"`
}

Config Registry 组件配置

type EventType

type EventType string

EventType 事件类型

const (
	EventTypePut    EventType = "PUT"    // 服务注册或更新
	EventTypeDelete EventType = "DELETE" // 服务注销
)

type Option

type Option func(*options)

Option 组件初始化选项函数

func WithLogger

func WithLogger(l clog.Logger) Option

WithLogger 注入日志记录器 组件内部会自动追加 "registry" namespace

type Registry

type Registry interface {

	// Register 注册服务实例
	// ctx: 上下文
	// service: 服务实例信息
	// ttl: 租约有效期 (例如 10s),超时后若无续约服务将自动下线
	Register(ctx context.Context, service *ServiceInstance, ttl time.Duration) error

	// Deregister 注销服务实例
	// serviceID: 服务实例 ID
	Deregister(ctx context.Context, serviceID string) error

	// GetService 获取服务实例列表
	// 每次从注册中心获取服务实例列表
	GetService(ctx context.Context, serviceName string) ([]*ServiceInstance, error)

	// Watch 监听服务实例变化
	// 返回一个事件通道,接收服务变化事件 (PUT/DELETE)
	Watch(ctx context.Context, serviceName string) (<-chan ServiceEvent, error)

	// GetConnection 获取到指定服务的 gRPC 连接
	// 内部封装了 Resolver 和 Balancer 的配置,提供开箱即用的连接对象
	// 支持自动服务发现和客户端负载均衡
	GetConnection(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error)

	// Close 停止后台任务并清理资源(撤销租约、停止监听)
	Close() error
}

Registry 服务注册与发现接口

func New

func New(conn connector.EtcdConnector, cfg *Config, opts ...Option) (Registry, error)

New 创建 Registry 实例(基于 Etcd) 这是标准的工厂函数,支持在不依赖 Container 的情况下独立实例化

参数:

  • conn: Etcd 连接器
  • cfg: Registry 配置
  • opts: 可选参数 (Logger, Meter, Tracer)

使用示例:

etcdConn, _ := connector.NewEtcd(etcdConfig)
registry, _ := registry.New(etcdConn, &registry.Config{
    Namespace: "/genesis/services",
}, registry.WithLogger(logger))

type ServiceEvent

type ServiceEvent struct {
	Type    EventType        // 事件类型 (PUT/DELETE)
	Service *ServiceInstance // 服务实例信息
}

ServiceEvent 服务变化事件

type ServiceInstance

type ServiceInstance struct {
	ID        string            `json:"id"`        // 唯一实例 ID (通常是 UUID)
	Name      string            `json:"name"`      // 服务名称 (如 user-service)
	Version   string            `json:"version"`   // 版本号
	Metadata  map[string]string `json:"metadata"`  // 元数据 (Region, Zone, Weight, Group 等)
	Endpoints []string          `json:"endpoints"` // 服务地址列表 (如 grpc://192.168.1.10:9090)
}

ServiceInstance 代表一个服务实例

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL