Documentation
¶
Overview ¶
Package idem 提供了幂等性组件,用于确保在分布式环境中操作的"一次且仅一次"执行。
idem 是 Genesis 业务层的核心组件,它提供了: - 统一的 Idempotency 接口,支持手动调用、Gin 中间件、gRPC 拦截器 - 结果缓存:自动缓存执行结果,重复请求直接返回缓存数据 - 并发控制:内置分布式锁机制,防止同一幂等键的并发穿透 - 后端可配置:支持 Redis / Memory - 与 L0 基础组件(日志)的深度集成
## 基本使用
// 创建幂等组件
idem, _ := idem.New(&idem.Config{
Driver: idem.DriverRedis,
Prefix: "myapp:idem:",
DefaultTTL: 24 * time.Hour,
}, idem.WithRedisConnector(redisConn), idem.WithLogger(logger))
// 执行幂等操作
result, err := idem.Execute(ctx, "order:create:12345", func(ctx context.Context) (interface{}, error) {
// 业务逻辑
return map[string]interface{}{"order_id": "12345"}, nil
})
## Gin 中间件
r := gin.Default()
r.POST("/orders", idem.GinMiddleware(), func(c *gin.Context) {
c.JSON(200, gin.H{"order_id": "123"})
})
## gRPC 拦截器
s := grpc.NewServer(
grpc.UnaryInterceptor(idem.UnaryServerInterceptor()),
)
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConfigNil 配置为空 ErrConfigNil = xerrors.New("idem: config is nil") // ErrKeyEmpty 幂等键为空 ErrKeyEmpty = xerrors.New("idem: key is empty") // ErrConcurrentRequest 并发请求 ErrConcurrentRequest = xerrors.New("idem: concurrent request detected") // ErrResultNotFound 结果未找到(内部使用) ErrResultNotFound = xerrors.New("idem: result not found") )
错误定义
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Driver 后端类型: "redis" | "memory" (默认 "redis")
Driver DriverType `json:"driver" yaml:"driver"`
// Prefix Redis Key 前缀,默认 "idem:"
// 例如:"myapp:idem:" 将使用 "myapp:idem:{key}" 作为存储键
Prefix string `json:"prefix" yaml:"prefix"`
// DefaultTTL 幂等记录有效期,默认 24h
// 超过此时间后,缓存的结果将被清理,后续相同请求将重新执行
DefaultTTL time.Duration `json:"default_ttl" yaml:"default_ttl"`
// LockTTL 处理过程中的锁超时时间,默认 30s
// 防止业务逻辑崩溃导致死锁,超时后锁自动释放
LockTTL time.Duration `json:"lock_ttl" yaml:"lock_ttl"`
// WaitTimeout 等待结果的最长时间,默认 0(仅受 ctx 影响)
// 当未获取到锁时,将阻塞等待结果或锁可用
WaitTimeout time.Duration `json:"wait_timeout" yaml:"wait_timeout"`
// WaitInterval 等待结果的轮询间隔,默认 50ms
WaitInterval time.Duration `json:"wait_interval" yaml:"wait_interval"`
}
Config 幂等性组件配置
type DriverType ¶
type DriverType string
DriverType 幂等组件驱动类型
const ( // DriverRedis 使用 Redis 作为后端 DriverRedis DriverType = "redis" // DriverMemory 使用内存作为后端(仅单机) DriverMemory DriverType = "memory" )
type Idempotency ¶
type Idempotency interface {
// Execute 执行幂等操作
//
// 工作流程:
// 1. 如果 key 已存在且完成 → 直接返回缓存结果
// 2. 如果 key 正在处理中 → 返回 ErrConcurrentRequest
// 3. 如果 key 不存在 → 执行 fn 并缓存结果
//
// 参数:
// - ctx: 上下文,用于取消和超时控制
// - key: 幂等性键,全局唯一标识这次操作
// - fn: 业务逻辑函数,只在第一次请求时执行
//
// 返回:
// - 执行结果或缓存的结果
// - 错误:ErrKeyEmpty, ErrConcurrentRequest 等
Execute(ctx context.Context, key string, fn func(ctx context.Context) (interface{}, error)) (interface{}, error)
// Consume 用于消息消费的幂等处理
//
// 工作流程:
// 1. 如果 key 已存在且完成 → 直接返回 false
// 2. 如果 key 正在处理中 → 返回 ErrConcurrentRequest
// 3. 如果 key 不存在 → 执行 fn 并标记已处理
//
// 返回:
// - executed: 是否执行了 fn
// - 错误:ErrKeyEmpty, ErrConcurrentRequest 等
Consume(ctx context.Context, key string, ttl time.Duration, fn func(ctx context.Context) error) (executed bool, err error)
// GinMiddleware 创建 Gin 框架中间件
//
// 使用示例:
// middleware := idem.GinMiddleware().(func(*gin.Context))
// router.POST("/orders", middleware, handler)
// // 或者直接使用(Gin 会自动处理):
// router.Use(idem.GinMiddleware().(func(*gin.Context)))
//
// 工作原理:
// 1. 从 HTTP 请求头 X-Idempotency-Key 提取幂等性键
// 2. 如果缓存命中,直接返回缓存的响应
// 3. 如果未命中,执行 handler 并缓存响应
//
// 参数:
// - opts: 中间件选项,可自定义请求头名称等
//
// 返回:
// - func(*gin.Context) 类型的中间件函数
//
// 注意:
// 返回类型为 interface{} 是为了避免强依赖 gin 包,
// 实际返回的是 func(*gin.Context) 类型。
// 传给 gin 的 router 时需要显式类型断言为 gin.HandlerFunc。
GinMiddleware(opts ...MiddlewareOption) interface{}
// UnaryServerInterceptor 创建 gRPC 一元服务端拦截器
//
// 使用示例:
// server := grpc.NewServer(
// grpc.UnaryInterceptor(idem.UnaryServerInterceptor()),
// )
//
// 工作原理:
// 1. 从 gRPC metadata 提取 x-idem-key
// 2. 使用分布式锁防止并发执行
// 3. 如果缓存命中,返回缓存的 protobuf 响应
// 4. 如果未命中,执行 RPC handler 并缓存结果
//
// 参数:
// - opts: 拦截器选项,可自定义 metadata 键名称等
//
// 返回:
// - gRPC 一元服务端拦截器
//
// 注意:
// 只支持一元 RPC 调用,不支持流式 RPC(因为流式交互的复杂性)。
UnaryServerInterceptor(opts ...InterceptorOption) grpc.UnaryServerInterceptor
}
Idempotency 幂等性组件核心接口
支持三种使用方式: 1. Execute: 手动调用,适合业务层直接使用 2. GinMiddleware: Gin 框架中间件,自动处理 HTTP 请求幂等性 3. UnaryServerInterceptor: gRPC 一元拦截器,处理单次 RPC 调用幂等性
func New ¶
func New(cfg *Config, opts ...Option) (Idempotency, error)
New 创建幂等性组件实例
这是标准的工厂函数,支持配置驱动和显式依赖注入。
参数:
- cfg: 幂等性配置,不可为 nil
- opts: 可选配置,如 WithLogger(), WithRedisConnector()
返回:
- Idempotency 组件实例
- 错误:缺少必要连接器或配置非法
使用示例:
idem, err := idem.New(&idem.Config{
Driver: idem.DriverRedis,
Prefix: "myapp:idem:",
DefaultTTL: 24 * time.Hour,
LockTTL: 30 * time.Second,
}, idem.WithRedisConnector(redisConn), idem.WithLogger(logger))
type InterceptorOption ¶
type InterceptorOption func(*interceptorOptions)
InterceptorOption gRPC 拦截器选项函数
func WithMetadataKey ¶
func WithMetadataKey(metadataKey string) InterceptorOption
WithMetadataKey 设置 gRPC 拦截器的幂等键 metadata 键名 默认为 "x-idem-key"
type MiddlewareOption ¶
type MiddlewareOption func(*middlewareOptions)
MiddlewareOption Gin 中间件选项函数
func WithHeaderKey ¶
func WithHeaderKey(headerKey string) MiddlewareOption
WithHeaderKey 设置 Gin 中间件的幂等键 HTTP 头名称 默认为 "X-Idempotency-Key"
type Option ¶
type Option func(*options)
Option 组件初始化选项函数
func WithRedisConnector ¶
func WithRedisConnector(conn connector.RedisConnector) Option
WithRedisConnector 注入 Redis 连接器
type RefreshableStore ¶
type RefreshableStore interface {
Store
Refresh(ctx context.Context, key string, token LockToken, ttl time.Duration) error
}
RefreshableStore 可刷新锁 TTL 的存储实现 用于长时间执行时保持锁不失效
type Store ¶
type Store interface {
// Lock 尝试获取锁(标记处理中)
// 返回 true 表示成功获取锁,false 表示已被其他请求锁定
Lock(ctx context.Context, key string, ttl time.Duration) (LockToken, bool, error)
// Unlock 释放锁(通常用于执行失败时清理)
Unlock(ctx context.Context, key string, token LockToken) error
// SetResult 保存执行结果并标记完成
// 同时会自动释放锁
SetResult(ctx context.Context, key string, val []byte, ttl time.Duration, token LockToken) error
// GetResult 获取已完成的结果
// 如果结果不存在,返回 ErrResultNotFound
GetResult(ctx context.Context, key string) ([]byte, error)
}
Store 幂等性存储接口
定义了幂等性组件与存储后端的交互方式。 存储后端需要支持三种状态:
- 锁定中(processing): Lock() 成功后的状态
- 已完成(completed): SetResult() 后的状态
- 不存在(absent): 初始状态或 TTL 过期后
默认提供 Redis / Memory 实现。