rpc

package module
v0.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: MIT Imports: 24 Imported by: 0

README

RPC

基于 gRPC 的微服务 RPC 框架,集成 etcd 服务注册与发现、负载均衡、熔断器、重试、限流、可观测性等能力。

特性

  • 服务注册与发现 — 基于 etcd v3,支持自动注册、心跳续约、断线重连与服务重注册
  • 负载均衡 — 客户端侧负载均衡,支持 round_robinpick_first 策略
  • 熔断器 — 三态熔断(Closed/Open/HalfOpen),自动区分业务错误与系统错误
  • 重试机制 — 指数退避重试,可配置重试次数、退避时间和可重试错误码
  • 限流 — 令牌桶限流器,服务端拦截器
  • 中间件 — 支持 Unary 和 Streaming 拦截器链,内置 Recovery、Logging、Metrics、错误处理
  • 可观测性 — 基于 OpenTelemetry 的请求计数和延迟指标,结构化日志(zap)
  • 健康检查 — 内置 gRPC Health Check 服务
  • 错误体系 — 区分系统错误码(10000-10999)和业务错误码(11000-19999)

安装

go get github.com/dysodeng/rpc

要求 Go 1.23+

快速开始

服务端
package main

import (
    "log"

    "github.com/dysodeng/rpc"
    "github.com/dysodeng/rpc/config"
    "github.com/dysodeng/rpc/naming/etcd"
)

func main() {
    conf := &config.ServerConfig{
        ServiceAddr: ":9000",
        EtcdConfig: config.EtcdConfig{
            Endpoints: []string{"127.0.0.1:2379"},
            Namespace: "myapp",
        },
    }

    // 创建 etcd 注册中心
    registry, err := etcd.NewEtcdRegistry(conf)
    if err != nil {
        log.Fatal(err)
    }

    // 创建 RPC 服务
    srv := rpc.NewServer(conf, registry)

    // 注册 gRPC 服务
    // srv.RegisterService(&YourService{}, pb.RegisterYourServiceServer)

    log.Println("server starting on", conf.ServiceAddr)
    if err := srv.Serve(); err != nil {
        log.Fatal(err)
    }
}
客户端
package main

import (
    "log"

    "github.com/dysodeng/rpc"
    "github.com/dysodeng/rpc/breaker"
    "github.com/dysodeng/rpc/config"
    "github.com/dysodeng/rpc/naming/etcd"
    "github.com/dysodeng/rpc/retry"
)

func main() {
    etcdConf := &config.EtcdConfig{
        Endpoints: []string{"127.0.0.1:2379"},
        Namespace: "myapp",
    }

    // 创建 etcd 服务发现
    builder := etcd.NewEtcdBuilder(etcdConf)
    discovery := rpc.NewServiceDiscovery(builder)

    // 获取服务连接
    conn, err := discovery.ServiceConn("your-service",
        rpc.WithServiceDiscoveryLB(rpc.RoundRobin),
        rpc.WithServiceDiscoveryBreaker(breaker.NewCircuitBreaker()),
        rpc.WithServiceDiscoveryRetry(retry.DefaultRetryPolicy),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // 使用 conn 创建 gRPC client
    // client := pb.NewYourServiceClient(conn)
}

配置

服务端配置
&config.ServerConfig{
    ServiceAddr:           ":9000",           // 服务监听地址
    OtelCollectorEndpoint: "localhost:4317",   // OpenTelemetry Collector 地址(可选)
    ServiceReflection:     false,             // 服务反射,生产环境建议关闭
    EtcdConfig: config.EtcdConfig{
        Endpoints: []string{"127.0.0.1:2379"},
        Namespace: "myapp",                   // 命名空间,用于隔离不同应用的服务
    },
}
服务端选项
// 自定义 gRPC Server 选项
rpc.WithServerGrpcServerOption(opts ...grpc.ServerOption)

// 启用限流
rpc.WithServerLimiter(limiter.NewTokenBucketLimiter(1000, 100))

// Keepalive 配置
rpc.WithServerKeepalive(enforcementPolicy, serverParameters)
客户端选项
// 负载均衡策略
rpc.WithServiceDiscoveryLB(rpc.RoundRobin)  // 或 rpc.PickFirst

// 熔断器
rpc.WithServiceDiscoveryBreaker(breaker.NewCircuitBreaker(
    breaker.WithFailureThreshold(5),     // 5 次失败触发熔断
    breaker.WithSuccessThreshold(3),     // 半开状态 3 次成功恢复
    breaker.WithTimeout(10*time.Second), // 熔断超时时间
))

// 重试策略
rpc.WithServiceDiscoveryRetry(&retry.Policy{
    MaxAttempts:       3,
    InitialBackoff:    100 * time.Millisecond,
    MaxBackoff:        1 * time.Second,
    BackoffMultiplier: 2.0,
    RetryableErrors:   []codes.Code{codes.Unavailable, codes.DeadlineExceeded},
})

// Keepalive
rpc.WithServiceDiscoveryKeepalive(30*time.Second, 10*time.Second, backoff.DefaultConfig)

// TLS
rpc.WithServiceDiscoveryGrpcDialTransportCredentials(&creds)

错误处理

框架定义了统一的错误码体系:

// 系统级错误码 (10000-10999)
errors.Unknown         // 10000 未知错误
errors.Internal        // 10001 内部错误
errors.InvalidArgument // 10002 参数无效
errors.NotFound        // 10003 未找到
errors.Unauthorized    // 10004 未授权
errors.Timeout         // 10005 超时

// 业务级错误码 (11000-19999)
errors.BusinessError   // 11000 业务错误起始码

业务错误不会触发熔断器,日志以 INFO 级别记录:

import rpcErrors "github.com/dysodeng/rpc/errors"

// 创建业务错误
err := rpcErrors.New(11001, "用户不存在")

// 附加错误详情
err = rpcErrors.WithDetails(err, &pb.ErrorDetail{Field: "user_id"})

项目结构

.
├── server.go              # 服务端核心
├── discovery.go           # 客户端服务发现
├── config/                # 配置结构
├── metadata/              # 服务元数据
├── naming/                # 服务注册与发现
│   └── etcd/              # etcd 实现(registry/builder/resolver)
├── middleware/             # 中间件(recovery/logging/metrics)
├── breaker/               # 熔断器
├── retry/                 # 重试策略
├── limiter/               # 限流器
├── errors/                # 错误体系
├── health/                # 健康检查
├── logger/                # 日志(zap)
└── metrics/               # 指标(OpenTelemetry)

依赖

依赖 用途
google.golang.org/grpc gRPC 框架
go.etcd.io/etcd/client/v3 etcd 客户端
go.opentelemetry.io/otel 可观测性指标
go.uber.org/zap 结构化日志
golang.org/x/time 令牌桶限流
google.golang.org/protobuf Protocol Buffers

License

MIT License

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Server

type Server interface {
	RegisterService(serviceMetadata metadata.ServiceRegister, grpcRegister interface{}) error
	Serve() error
	Stop() error
}

Server grpc服务

func NewServer

func NewServer(conf *config.ServerConfig, registry naming.Registry, opts ...ServerOption) Server

type ServerOption

type ServerOption func(s *serverOption)

func WithServerGrpcServerOption

func WithServerGrpcServerOption(opts ...grpc.ServerOption) ServerOption

WithServerGrpcServerOption grpc服务配置

func WithServerKeepalive added in v0.2.1

WithServerKeepalive grpc服务端心跳配置

func WithServerLimiter added in v0.2.1

func WithServerLimiter(rateLimiter limiter.RateLimiter) ServerOption

WithServerLimiter 服务端限流

func WithServerOnStatusChange added in v0.2.6

func WithServerOnStatusChange(callback naming.StatusChangeCallback) ServerOption

WithServerOnStatusChange 注册服务状态变更回调

type ServiceDiscovery

type ServiceDiscovery interface {
	// ServiceConn 获取服务连接信息
	ServiceConn(serviceName string, opts ...ServiceDiscoveryOption) (*grpc.ClientConn, error)
}

ServiceDiscovery 服务发现

func NewServiceDiscovery

func NewServiceDiscovery(resolverBuilder resolver.Builder) ServiceDiscovery

type ServiceDiscoveryLB

type ServiceDiscoveryLB string

ServiceDiscoveryLB 负载均衡

const (
	PickFirst          ServiceDiscoveryLB = "pick_first"           // 选择第一个
	RoundRobin         ServiceDiscoveryLB = "round_robin"          // 轮询
	WeightedRoundRobin ServiceDiscoveryLB = "weighted_round_robin" // 加权轮询
)

type ServiceDiscoveryOption

type ServiceDiscoveryOption func(s *serviceDiscoveryOption)

func WithServiceDiscoveryBreaker

func WithServiceDiscoveryBreaker(cb breaker.CircuitBreaker) ServiceDiscoveryOption

WithServiceDiscoveryBreaker 设置熔断器

func WithServiceDiscoveryGrpcDialOption

func WithServiceDiscoveryGrpcDialOption(opts ...grpc.DialOption) ServiceDiscoveryOption

func WithServiceDiscoveryGrpcDialTransportCredentials

func WithServiceDiscoveryGrpcDialTransportCredentials(credentials *credentials.TransportCredentials) ServiceDiscoveryOption

func WithServiceDiscoveryKeepalive added in v0.2.1

func WithServiceDiscoveryKeepalive(keepaliveTime, timeout time.Duration, backoffConf backoff.Config) ServiceDiscoveryOption

WithServiceDiscoveryKeepalive 添加保活与超时选项

func WithServiceDiscoveryLB

func WithServiceDiscoveryLB(lb ServiceDiscoveryLB) ServiceDiscoveryOption

WithServiceDiscoveryLB 添加负载均衡选项

func WithServiceDiscoveryRetry

func WithServiceDiscoveryRetry(retryPolicy *retry.Policy) ServiceDiscoveryOption

WithServiceDiscoveryRetry 设置重试

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL