Documentation
¶
Overview ¶
Package idem 提供结果复用型幂等组件,用于抑制同一请求或消息的重复成功提交。
idem 在 Genesis 业务层中承担“去重和结果复用”职责。它的核心语义不是严格的 exactly-once 执行,而是:
- 同一 key 的成功结果会被缓存,后续请求直接复用
- 同一 key 的并发执行会被锁保护,避免并发穿透
- 业务执行失败不会缓存结果,后续允许重试
当前组件提供四个入口:
- Execute:手动幂等执行,适合业务逻辑直接调用
- Consume:消息消费去重,只关心“是否已执行”
- GinMiddleware:HTTP 幂等中间件
- UnaryServerInterceptor:gRPC 一元服务端幂等拦截器
组件同时支持 Redis 和 Memory 两种后端。Redis 适合分布式环境,Memory 适合单机、 本地开发和测试。
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConfigNil 配置为空 ErrConfigNil = xerrors.New("idem: config is nil") // ErrKeyEmpty 幂等键为空 ErrKeyEmpty = xerrors.New("idem: key is empty") // ErrConcurrentRequest 并发请求 ErrConcurrentRequest = xerrors.New("idem: concurrent request detected") // ErrLockLost 表示执行过程中丢失了幂等锁 ErrLockLost = xerrors.New("idem: lock lost during execution") // ErrResultNotFound 结果未找到(内部使用) ErrResultNotFound = xerrors.New("idem: result not found") )
错误定义
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Driver 后端类型: "redis" | "memory" (默认 "redis")
Driver DriverType `json:"driver" yaml:"driver"`
// Prefix Redis Key 前缀,默认 "idem:"
// 例如:"myapp:idem:" 将使用 "myapp:idem:{key}" 作为存储键
Prefix string `json:"prefix" yaml:"prefix"`
// DefaultTTL 幂等记录有效期,默认 24h
// 超过此时间后,缓存的结果将被清理,后续相同请求将重新执行
DefaultTTL time.Duration `json:"default_ttl" yaml:"default_ttl"`
// LockTTL 处理过程中的锁超时时间,默认 30s
// 防止业务逻辑崩溃导致死锁,超时后锁自动释放
LockTTL time.Duration `json:"lock_ttl" yaml:"lock_ttl"`
// WaitTimeout 等待结果的最长时间,默认 0(仅受 ctx 影响)
// 当未获取到锁时,将阻塞等待结果或锁可用
WaitTimeout time.Duration `json:"wait_timeout" yaml:"wait_timeout"`
// WaitInterval 等待结果的轮询间隔,默认 50ms
WaitInterval time.Duration `json:"wait_interval" yaml:"wait_interval"`
}
Config 幂等性组件配置
type DeletableStore ¶ added in v0.5.0
DeletableStore 可删除缓存结果的存储实现。 用于清理损坏的缓存数据并触发重新执行。
type DriverType ¶
type DriverType string
DriverType 幂等组件驱动类型
const ( // DriverRedis 使用 Redis 作为后端 DriverRedis DriverType = "redis" // DriverMemory 使用内存作为后端(仅单机) DriverMemory DriverType = "memory" )
type Idempotency ¶
type Idempotency interface {
// Execute 执行幂等操作
//
// 工作流程:
// 1. 如果 key 已存在且完成 → 直接返回缓存结果
// 2. 如果 key 正在处理中 → 等待结果或重新尝试获取锁
// 3. 如果 key 不存在 → 执行 fn 并缓存成功结果
//
// 参数:
// - ctx: 上下文,用于取消和超时控制
// - key: 幂等性键,全局唯一标识这次操作
// - fn: 业务逻辑函数,只在第一次请求时执行
//
// 返回:
// - 执行结果或缓存的结果。为保证首次执行与缓存命中的类型一致,返回值会经过同一套 JSON 编解码规范化。
// - 错误:ErrKeyEmpty、上下文错误、锁丢失错误等
Execute(ctx context.Context, key string, fn func(ctx context.Context) (any, error)) (any, error)
// Consume 用于消息消费的幂等处理
//
// 工作流程:
// 1. 如果 key 已存在且完成 → 直接返回 false
// 2. 如果 key 正在处理中 → 返回 ErrConcurrentRequest
// 3. 如果 key 不存在 → 执行 fn 并标记已处理
//
// 返回:
// - executed: 是否执行了 fn
// - 错误:ErrKeyEmpty, ErrConcurrentRequest 等
Consume(ctx context.Context, key string, ttl time.Duration, fn func(ctx context.Context) error) (executed bool, err error)
// GinMiddleware 创建 Gin 框架中间件
//
// 使用示例:
// middleware := idem.GinMiddleware().(func(*gin.Context))
// router.POST("/orders", middleware, handler)
// // 或者直接使用(Gin 会自动处理):
// router.Use(idem.GinMiddleware().(func(*gin.Context)))
//
// 工作原理:
// 1. 从 HTTP 请求头 X-Idempotency-Key 提取幂等性键
// 2. 如果缓存命中,直接返回缓存的响应
// 3. 如果未命中,执行 handler 并按缓存策略缓存响应
//
// 参数:
// - opts: 中间件选项,可自定义请求头名称等
//
// 返回:
// - func(*gin.Context) 类型的中间件函数
//
// 注意:
// 返回类型为 interface{} 是为了避免强依赖 gin 包,
// 实际返回的是 func(*gin.Context) 类型。
// 传给 gin 的 router 时需要显式类型断言为 gin.HandlerFunc。
GinMiddleware(opts ...MiddlewareOption) any
// UnaryServerInterceptor 创建 gRPC 一元服务端拦截器
//
// 使用示例:
// server := grpc.NewServer(
// grpc.UnaryInterceptor(idem.UnaryServerInterceptor()),
// )
//
// 工作原理:
// 1. 从 gRPC metadata 提取 x-idem-key
// 2. 使用分布式锁防止并发执行
// 3. 如果缓存命中,返回缓存的 protobuf 响应
// 4. 如果未命中,执行 RPC handler 并按缓存策略缓存成功响应
//
// 参数:
// - opts: 拦截器选项,可自定义 metadata 键名称等
//
// 返回:
// - gRPC 一元服务端拦截器
//
// 注意:
// 只支持一元 RPC 调用,不支持流式 RPC(因为流式交互的复杂性)。
// 当前默认只缓存成功的 proto.Message 响应。
UnaryServerInterceptor(opts ...InterceptorOption) grpc.UnaryServerInterceptor
}
Idempotency 幂等性组件核心接口
支持三种使用方式: 1. Execute: 手动调用,适合业务层直接使用 2. GinMiddleware: Gin 框架中间件,自动处理 HTTP 请求幂等性 3. UnaryServerInterceptor: gRPC 一元拦截器,处理单次 RPC 调用幂等性
func New ¶
func New(cfg *Config, opts ...Option) (Idempotency, error)
New 创建幂等性组件实例
这是标准的工厂函数,支持配置驱动和显式依赖注入。
参数:
- cfg: 幂等性配置,不可为 nil
- opts: 可选配置,如 WithLogger(), WithRedisConnector()
返回:
- Idempotency 组件实例
- 错误:缺少必要连接器或配置非法
使用示例:
idem, err := idem.New(&idem.Config{
Driver: idem.DriverRedis,
Prefix: "myapp:idem:",
DefaultTTL: 24 * time.Hour,
LockTTL: 30 * time.Second,
}, idem.WithRedisConnector(redisConn), idem.WithLogger(logger))
type InterceptorOption ¶
type InterceptorOption func(*interceptorOptions)
InterceptorOption gRPC 拦截器选项函数
func WithGRPCResponseCacheFunc ¶ added in v0.5.0
func WithGRPCResponseCacheFunc(fn func(msg proto.Message) bool) InterceptorOption
WithGRPCResponseCacheFunc 设置 gRPC 拦截器的响应缓存策略。 只有满足该条件的 proto.Message 成功响应才会被缓存。
func WithMetadataKey ¶
func WithMetadataKey(metadataKey string) InterceptorOption
WithMetadataKey 设置 gRPC 拦截器的幂等键 metadata 键名。 默认为 "x-idem-key"。
type MiddlewareOption ¶
type MiddlewareOption func(*middlewareOptions)
MiddlewareOption Gin 中间件选项函数
func WithHTTPStatusCacheFunc ¶ added in v0.5.0
func WithHTTPStatusCacheFunc(fn func(status int) bool) MiddlewareOption
WithHTTPStatusCacheFunc 设置 Gin 中间件的 HTTP 响应缓存策略。 返回 true 表示该状态码的响应会被缓存。
func WithHeaderKey ¶
func WithHeaderKey(headerKey string) MiddlewareOption
WithHeaderKey 设置 Gin 中间件的幂等键 HTTP 头名称。 默认为 "X-Idempotency-Key"。
type Option ¶
type Option func(*options)
Option 组件初始化选项函数
func WithRedisConnector ¶
func WithRedisConnector(conn connector.RedisConnector) Option
WithRedisConnector 注入 Redis 连接器。
type RefreshableStore ¶
type RefreshableStore interface {
Store
Refresh(ctx context.Context, key string, token LockToken, ttl time.Duration) error
}
RefreshableStore 可刷新锁 TTL 的存储实现 用于长时间执行时保持锁不失效
type Store ¶
type Store interface {
// Lock 尝试获取锁(标记处理中)
// 返回 true 表示成功获取锁,false 表示已被其他请求锁定
Lock(ctx context.Context, key string, ttl time.Duration) (LockToken, bool, error)
// Unlock 释放锁(通常用于执行失败时清理)
Unlock(ctx context.Context, key string, token LockToken) error
// SetResult 保存执行结果并标记完成
// 同时会自动释放锁
SetResult(ctx context.Context, key string, val []byte, ttl time.Duration, token LockToken) error
// GetResult 获取已完成的结果
// 如果结果不存在,返回 ErrResultNotFound
GetResult(ctx context.Context, key string) ([]byte, error)
}
Store 幂等性存储接口
定义了幂等性组件与存储后端的交互方式。 存储后端需要支持三种状态:
- 锁定中(processing): Lock() 成功后的状态
- 已完成(completed): SetResult() 后的状态
- 不存在(absent): 初始状态或 TTL 过期后
默认提供 Redis / Memory 实现。