ratelimit

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: MIT Imports: 16 Imported by: 0

README

ratelimit

Go Reference

ratelimit 是 Genesis 的治理层(L3)组件,提供两种限流能力:

  • standalone:基于内存的进程内限流
  • distributed:基于 Redis 的分布式限流

它解决的不是“所有限流问题”,而是最常见的接口保护问题:给某个业务键配置 Rate/Burst,然后在请求进入业务逻辑之前做一次非阻塞检查。

如果你需要的是:

  • HTTP / gRPC 请求入口保护;
  • 按 IP、用户、方法名、路径等维度限流;
  • 单机或 Redis 共享限流;
  • 在限流器异常时显式选择 fail_openfail_closed

那么当前 ratelimit 是合适的。

如果你需要的是:

  • 多种分布式限流算法切换;
  • 复杂配额体系;
  • 精细的剩余令牌、重试时间和窗口统计接口;
  • 分布式 Wait 或排队语义;

那么当前组件不覆盖这些能力。

快速开始

1. 单机模式
limiter, err := ratelimit.New(&ratelimit.Config{
	Driver: ratelimit.DriverStandalone,
	Standalone: &ratelimit.StandaloneConfig{
		CleanupInterval: time.Minute,
		IdleTimeout:     5 * time.Minute,
	},
}, ratelimit.WithLogger(logger))
if err != nil {
	panic(err)
}
defer limiter.Close()

allowed, err := limiter.Allow(ctx, "user:123", ratelimit.Limit{
	Rate:  10,
	Burst: 20,
})
if err != nil {
	panic(err)
}
if !allowed {
	return
}
2. 分布式模式
redisConn, err := connector.NewRedis(&cfg.Redis, connector.WithLogger(logger))
if err != nil {
	panic(err)
}
defer redisConn.Close()

limiter, err := ratelimit.New(&ratelimit.Config{
	Driver: ratelimit.DriverDistributed,
	Distributed: &ratelimit.DistributedConfig{
		Prefix: "myapp:ratelimit:",
	},
}, ratelimit.WithRedisConnector(redisConn), ratelimit.WithLogger(logger))
if err != nil {
	panic(err)
}

allowed, err := limiter.Allow(ctx, "user:123", ratelimit.Limit{
	Rate:  100,
	Burst: 200,
})

分布式模式当前仍然是令牌桶语义,但有两个实现细节需要知道:

  • Redis 键会把 key + rate + burst 一起编码进去,避免同一个业务键在不同规则下互相串扰。
  • 脚本使用 Redis TIME 作为统一时钟,而不是各节点本地时间。

Gin 集成

r := gin.New()
r.Use(ratelimit.GinMiddleware(limiter, &ratelimit.GinMiddlewareOptions{
	KeyFunc: func(c *gin.Context) string {
		return c.ClientIP()
	},
	LimitFunc: func(c *gin.Context) ratelimit.Limit {
		return ratelimit.Limit{Rate: 100, Burst: 200}
	},
}))

默认行为是:

  • KeyFunc 留空时使用 ClientIP()
  • LimitFunc 留空时视为无效规则并放行
  • 限流器内部异常时采用 fail_open

如果你希望限流器异常时直接拒绝请求,可以切换到 fail_closed

r.Use(ratelimit.GinMiddleware(limiter, &ratelimit.GinMiddlewareOptions{
	ErrorPolicy: ratelimit.ErrorPolicyFailClosed,
	KeyFunc: func(c *gin.Context) string {
		return c.ClientIP()
	},
	LimitFunc: func(c *gin.Context) ratelimit.Limit {
		return ratelimit.Limit{Rate: 100, Burst: 200}
	},
}))

gRPC 集成

最简单的接法是使用默认 fail_open 的拦截器:

server := grpc.NewServer(
	grpc.ChainUnaryInterceptor(
		ratelimit.UnaryServerInterceptor(limiter, nil, func(ctx context.Context, fullMethod string) ratelimit.Limit {
			return ratelimit.Limit{Rate: 100, Burst: 200}
		}),
	),
)

如果你需要显式错误策略,可以使用带 Options 的版本:

server := grpc.NewServer(
	grpc.ChainUnaryInterceptor(
		ratelimit.UnaryServerInterceptorWithOptions(
			limiter,
			nil,
			func(ctx context.Context, fullMethod string) ratelimit.Limit {
				return ratelimit.Limit{Rate: 100, Burst: 200}
			},
			&ratelimit.GRPCInterceptorOptions{
				ErrorPolicy: ratelimit.ErrorPolicyFailClosed,
				Logger:      logger,
			},
		),
	),
)

流式拦截器当前是 per-stream 限流,也就是只在流建立时检查一次,不对流中的每条消息逐条限流。

使用边界

  • Allow / AllowN 是核心能力,适用于两种驱动。
  • Wait 只适用于单机模式;分布式模式返回 ErrNotSupported
  • 当前分布式实现只有 Redis 令牌桶,没有滑动窗口、漏桶等可切换算法。
  • 中间件和拦截器默认 fail_open,这是为了把限流器故障和业务故障隔离开;如果你的场景更重保护而不是可用性,应显式改成 fail_closed
  • MetricAllowTotalMetricErrors 等更细粒度指标常量已经定义,但当前实现主要记录的是允许/拒绝计数。

更完整的设计背景、分布式语义和工程取舍见:Genesis ratelimit:请求入口限流组件的设计与取舍

Documentation

Overview

Package ratelimit 提供 Genesis 的限流组件。

`ratelimit` 位于治理层(L3),面向两类常见需求: 1. 进程内的轻量限流; 2. 基于 Redis 的集群共享限流。

这个包的核心能力是非阻塞的 `Allow` / `AllowN` 检查。单机模式使用 `golang.org/x/time/rate`,分布式模式使用 Redis Lua 脚本维护共享桶状态。

分布式模式有几个重要语义: - 桶状态按 `key + limit` 隔离,不同 `Rate/Burst` 不会共享同一个 Redis 键。 - 脚本使用 Redis `TIME` 作为统一时钟,避免多节点本地时钟漂移破坏限流精度。 - `Wait` 不是分布式能力,调用会返回 `ErrNotSupported`。

Gin 中间件和 gRPC 拦截器默认采用 `fail_open`,即限流器内部异常时放行业务请求; 如果希望把限流器异常视为保护失败,可切换到 `fail_closed`。

基本用法:

limiter, _ := ratelimit.New(&ratelimit.Config{
    Driver: ratelimit.DriverStandalone,
}, ratelimit.WithLogger(logger))

allowed, err := limiter.Allow(ctx, "user:123", ratelimit.Limit{
    Rate:  10,
    Burst: 20,
})

分布式用法:

redisConn, _ := connector.NewRedis(&cfg.Redis, connector.WithLogger(logger))
defer redisConn.Close()

limiter, _ := ratelimit.New(&ratelimit.Config{
    Driver: ratelimit.DriverDistributed,
    Distributed: &ratelimit.DistributedConfig{
        Prefix: "myapp:ratelimit:",
    },
}, ratelimit.WithRedisConnector(redisConn), ratelimit.WithLogger(logger))

Index

Constants

View Source
const (
	// MetricAllowTotal 限流检查总次数 (Counter)
	MetricAllowTotal = "ratelimit_allow_total"

	// MetricAllowed 允许通过的请求数 (Counter)
	MetricAllowed = "ratelimit_allowed_total"

	// MetricDenied 被拒绝的请求数 (Counter)
	MetricDenied = "ratelimit_denied_total"

	// MetricErrors 限流器错误数 (Counter)
	MetricErrors = "ratelimit_errors_total"

	// LabelMode 模式标签 (standalone/distributed)
	LabelMode = "mode"

	// LabelKey 限流键标签
	LabelKey = "key"

	// LabelErrorType 错误类型标签
	LabelErrorType = "error_type"
)

Metrics 指标常量定义

Variables

View Source
var (
	// ErrConfigNil 配置为空
	ErrConfigNil = xerrors.New("ratelimit: config is nil")

	// ErrConnectorNil 连接器为空
	ErrConnectorNil = xerrors.New("ratelimit: connector is nil")

	// ErrNotSupported 操作不支持
	ErrNotSupported = xerrors.New("ratelimit: operation not supported")

	// ErrKeyEmpty 限流键为空
	ErrKeyEmpty = xerrors.New("ratelimit: key is empty")

	// ErrInvalidLimit 限流规则无效
	ErrInvalidLimit = xerrors.New("ratelimit: invalid limit")

	// ErrRateLimitExceeded 限流阈值超出
	ErrRateLimitExceeded = xerrors.New("ratelimit: rate limit exceeded")
)

错误定义

Functions

func GinMiddleware

func GinMiddleware(limiter Limiter, opts *GinMiddlewareOptions) gin.HandlerFunc

GinMiddleware 创建 Gin 限流中间件

参数:

  • limiter: 限流器实例,为 nil 时自动使用 Discard()(始终放行)
  • opts: 中间件配置(可为空)

使用示例:

r := gin.New()
r.Use(ratelimit.GinMiddleware(limiter, &ratelimit.GinMiddlewareOptions{
    KeyFunc: func(c *gin.Context) string {
        return c.ClientIP()
    },
    LimitFunc: func(c *gin.Context) ratelimit.Limit {
        return ratelimit.Limit{Rate: 10, Burst: 20}
    },
}))

func StreamClientInterceptor

func StreamClientInterceptor(
	limiter Limiter,
	keyFunc GRPCKeyFunc,
	limitFunc GRPCLimitFunc,
) grpc.StreamClientInterceptor

StreamClientInterceptor 返回 gRPC 流式调用客户端拦截器 在流建立时进行一次限流检查(Per-Stream 限流);keyFunc 为空时使用 fullMethod

注意:采用 Per-Stream 限流而非 Per-Message 限流,原因: 1. 流式请求通常是高频场景,Per-Message 会快速耗尽令牌 2. 避免流中途被限流中断,导致不可预期的错误

func StreamClientInterceptorWithOptions added in v0.5.0

func StreamClientInterceptorWithOptions(
	limiter Limiter,
	keyFunc GRPCKeyFunc,
	limitFunc GRPCLimitFunc,
	opts *GRPCInterceptorOptions,
) grpc.StreamClientInterceptor

StreamClientInterceptorWithOptions 返回带错误策略的 gRPC 流式调用客户端拦截器。

func StreamServerInterceptor

func StreamServerInterceptor(
	limiter Limiter,
	keyFunc GRPCKeyFunc,
	limitFunc GRPCLimitFunc,
) grpc.StreamServerInterceptor

StreamServerInterceptor 返回 gRPC 流式调用服务端拦截器 在流建立时进行一次限流检查(Per-Stream 限流);keyFunc 为空时使用 fullMethod

注意:采用 Per-Stream 限流而非 Per-Message 限流,原因: 1. 流式请求通常是高频场景,Per-Message 会快速耗尽令牌 2. 避免流中途被限流中断,导致不可预期的错误

func StreamServerInterceptorWithOptions added in v0.5.0

func StreamServerInterceptorWithOptions(
	limiter Limiter,
	keyFunc GRPCKeyFunc,
	limitFunc GRPCLimitFunc,
	opts *GRPCInterceptorOptions,
) grpc.StreamServerInterceptor

StreamServerInterceptorWithOptions 返回带错误策略的 gRPC 流式调用服务端拦截器。

func UnaryClientInterceptor

func UnaryClientInterceptor(
	limiter Limiter,
	keyFunc GRPCKeyFunc,
	limitFunc GRPCLimitFunc,
) grpc.UnaryClientInterceptor

UnaryClientInterceptor 返回 gRPC 一元调用客户端拦截器

参数:

  • limiter: 限流器实例
  • keyFunc: 从请求中提取限流键的函数,如果为 nil,默认使用 fullMethod
  • limitFunc: 获取限流规则的函数

使用示例:

conn, _ := grpc.NewClient(
    "localhost:9001",
    grpc.WithUnaryInterceptor(
        ratelimit.UnaryClientInterceptor(limiter,
            nil,
            func(ctx context.Context, fullMethod string) ratelimit.Limit {
                return ratelimit.Limit{Rate: 100, Burst: 200}
            }),
    ),
)

func UnaryClientInterceptorWithOptions added in v0.5.0

func UnaryClientInterceptorWithOptions(
	limiter Limiter,
	keyFunc GRPCKeyFunc,
	limitFunc GRPCLimitFunc,
	opts *GRPCInterceptorOptions,
) grpc.UnaryClientInterceptor

UnaryClientInterceptorWithOptions 返回带错误策略的 gRPC 一元调用客户端拦截器。

func UnaryServerInterceptor

func UnaryServerInterceptor(
	limiter Limiter,
	keyFunc GRPCKeyFunc,
	limitFunc GRPCLimitFunc,
) grpc.UnaryServerInterceptor

UnaryServerInterceptor 返回 gRPC 一元调用服务端拦截器

参数:

  • limiter: 限流器实例
  • keyFunc: 从请求中提取限流键的函数,如果为 nil,默认使用 fullMethod
  • limitFunc: 获取限流规则的函数

使用示例:

server := grpc.NewServer(
    grpc.ChainUnaryInterceptor(
        ratelimit.UnaryServerInterceptor(limiter,
            nil,
            func(ctx context.Context, fullMethod string) ratelimit.Limit {
                return ratelimit.Limit{Rate: 100, Burst: 200}
            }),
    ),
)

func UnaryServerInterceptorWithOptions added in v0.5.0

func UnaryServerInterceptorWithOptions(
	limiter Limiter,
	keyFunc GRPCKeyFunc,
	limitFunc GRPCLimitFunc,
	opts *GRPCInterceptorOptions,
) grpc.UnaryServerInterceptor

UnaryServerInterceptorWithOptions 返回带错误策略的 gRPC 一元调用服务端拦截器。

Types

type Config

type Config struct {
	// Driver 限流模式: "standalone" | "distributed"
	Driver DriverType `json:"driver" yaml:"driver"`

	// Standalone 单机限流配置
	Standalone *StandaloneConfig `json:"standalone" yaml:"standalone"`

	// Distributed 分布式限流配置
	Distributed *DistributedConfig `json:"distributed" yaml:"distributed"`
}

Config 限流组件统一配置

type DistributedConfig

type DistributedConfig struct {
	// Prefix Redis Key 前缀(默认:"ratelimit:")
	Prefix string `json:"prefix" yaml:"prefix"`
}

DistributedConfig 分布式限流配置

type DriverType

type DriverType string

DriverType 限流驱动类型

const (
	// DriverStandalone 单机限流
	DriverStandalone DriverType = "standalone"
	// DriverDistributed 分布式限流
	DriverDistributed DriverType = "distributed"
)

type ErrorPolicy added in v0.5.0

type ErrorPolicy string

ErrorPolicy 定义限流检查出错时的处理策略。

const (
	// ErrorPolicyFailOpen 表示限流器出错时放行请求。
	ErrorPolicyFailOpen ErrorPolicy = "fail_open"
	// ErrorPolicyFailClosed 表示限流器出错时拒绝请求。
	ErrorPolicyFailClosed ErrorPolicy = "fail_closed"
)

type GRPCInterceptorOptions added in v0.5.0

type GRPCInterceptorOptions struct {
	ErrorPolicy ErrorPolicy
	Logger      clog.Logger
}

GRPCInterceptorOptions 定义 gRPC 限流拦截器的可选行为。

type GRPCKeyFunc

type GRPCKeyFunc func(ctx context.Context, fullMethod string) string

GRPCKeyFunc 从请求中提取限流键的函数类型

type GRPCLimitFunc

type GRPCLimitFunc func(ctx context.Context, fullMethod string) Limit

GRPCLimitFunc 获取限流规则的函数类型

type GinMiddlewareOptions

type GinMiddlewareOptions struct {
	WithHeaders bool
	KeyFunc     func(*gin.Context) string
	LimitFunc   func(*gin.Context) Limit
	ErrorPolicy ErrorPolicy
	Logger      clog.Logger
}

GinMiddlewareOptions Gin 限流中间件配置

type Limit

type Limit struct {
	Rate  float64 // 令牌生成速率(每秒生成多少个令牌)
	Burst int     // 令牌桶容量(突发最大请求数)
}

Limit 定义限流规则(令牌桶算法)

type Limiter

type Limiter interface {
	// Allow 尝试获取 1 个令牌(非阻塞)
	// key: 限流标识(如 IP, UserID, ServiceName)
	// limit: 限流规则
	// 返回: allowed(是否允许), error(系统错误)
	//
	// 使用示例:
	//
	//	allowed, err := limiter.Allow(ctx, "user:123", ratelimit.Limit{Rate: 10, Burst: 20})
	//	if err != nil {
	//	    // 处理系统错误
	//	}
	//	if !allowed {
	//	    // 请求被限流
	//	}
	Allow(ctx context.Context, key string, limit Limit) (bool, error)

	// AllowN 尝试获取 N 个令牌(非阻塞)
	AllowN(ctx context.Context, key string, limit Limit, n int) (bool, error)

	// Wait 阻塞等待直到获取 1 个令牌
	Wait(ctx context.Context, key string, limit Limit) error

	// Close 释放资源(如后台清理协程)
	Close() error
}

Limiter 限流器核心接口

func Discard

func Discard() Limiter

Discard 返回一个静默的限流器实例(No-op 实现) 返回的 Limiter 实现了接口,但所有方法始终返回 true(允许通过),零开销

使用场景: 配置驱动的条件启用

var limiter ratelimit.Limiter
if cfg.RateLimitEnabled {
    limiter, _ = ratelimit.New(&ratelimit.Config{
        Driver: ratelimit.DriverStandalone,
        Standalone: &cfg.Standalone,
    }, ratelimit.WithLogger(logger))
} else {
    limiter = ratelimit.Discard()  // 零开销
}

func New

func New(cfg *Config, opts ...Option) (Limiter, error)

New 根据配置创建限流器

使用示例:

// 单机模式
limiter, _ := ratelimit.New(&ratelimit.Config{
    Driver: ratelimit.DriverStandalone,
    Standalone: &ratelimit.StandaloneConfig{
        CleanupInterval: 1 * time.Minute,
    },
}, ratelimit.WithLogger(logger))

// 分布式模式(需注入 Redis 连接器)
redisConn, _ := connector.NewRedis(&cfg.Redis)
limiter, _ := ratelimit.New(&ratelimit.Config{
    Driver: ratelimit.DriverDistributed,
    Distributed: &ratelimit.DistributedConfig{Prefix: "myapp:"},
}, ratelimit.WithRedisConnector(redisConn), ratelimit.WithLogger(logger))

type Option

type Option func(*options)

Option 组件初始化选项函数

func WithLogger

func WithLogger(logger clog.Logger) Option

WithLogger 设置 Logger

func WithMeter

func WithMeter(meter metrics.Meter) Option

WithMeter 设置 Meter

func WithRedisConnector

func WithRedisConnector(redisConn connector.RedisConnector) Option

WithRedisConnector 设置 Redis 连接器(用于分布式限流)

type StandaloneConfig

type StandaloneConfig struct {
	// CleanupInterval 清理过期限流器的间隔(默认:1 分钟)
	CleanupInterval time.Duration `json:"cleanup_interval" yaml:"cleanup_interval"`

	// IdleTimeout 限流器空闲超时时间(默认:5 分钟)
	IdleTimeout time.Duration `json:"idle_timeout" yaml:"idle_timeout"`
}

StandaloneConfig 单机限流配置

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL