idem

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: MIT Imports: 19 Imported by: 0

README

idem

idem 是 Genesis 业务层的结果复用型幂等组件,用来抑制同一请求、同一消息或同一 RPC 的重复成功提交。它的核心机制是“结果缓存 + 锁保护”:第一次执行成功后缓存结果,后续相同 key 直接复用;如果执行还在进行中,则通过锁避免并发穿透。

这里的语义边界需要先说清。idem 不是严格的 exactly-once 执行器,它更准确地说是“防重复成功提交”和“结果复用”组件。成功结果会被缓存,失败结果不会缓存;如果执行过程中锁丢失或存储异常,也不能承诺绝对的一次且仅一次。

适用场景

适合的场景包括:HTTP 接口幂等提交、gRPC 一元调用去重、消息消费去重,以及业务层显式控制的“只希望成功一次”的操作。

不太适合的场景包括:你需要强类型结果恢复、复杂流式响应缓存、严格的分布式事务语义,或者希望组件替你保证数据库层面的 exactly-once 提交。当前 idem 更适合做应用层幂等保护,而不是事务系统。

快速开始

idemComp, err := idem.New(&idem.Config{
	Driver:     idem.DriverRedis,
	Prefix:     "myapp:idem:",
	DefaultTTL: 24 * time.Hour,
	LockTTL:    30 * time.Second,
}, idem.WithRedisConnector(redisConn), idem.WithLogger(logger))
if err != nil {
	return err
}

result, err := idemComp.Execute(ctx, "order:create:req-123", func(ctx context.Context) (any, error) {
	return map[string]any{"order_id": "123"}, nil
})

Execute 会把首次成功执行与缓存命中都统一成同一套 JSON 编解码后的结果形态,因此返回值适合按通用 JSON 结构读取,而不是依赖第一次执行时的原始 Go 类型。

核心能力

Execute 适合业务层直接调用。它会先查结果缓存,未命中时抢锁执行,成功后写入缓存并释放锁;失败则不缓存,后续允许重试。

Consume 适合消息消费去重。它只关心“是否已处理”,不会返回业务结果;如果发现同 key 已完成,直接返回 executed=false

GinMiddlewareUnaryServerInterceptor 则把这套逻辑分别接到 HTTP 和 gRPC 服务端入口。默认情况下,Gin 只缓存 2xx 响应,gRPC 只缓存成功的 proto.Message 响应。这两个策略现在都可以通过 option 显式调整。

配置说明

字段 类型 默认值 说明
Driver DriverType redis 后端类型,支持 redismemory
Prefix string idem: 存储 key 前缀。
DefaultTTL time.Duration 24h 成功结果的缓存有效期。
LockTTL time.Duration 30s 执行阶段锁的有效期。
WaitTimeout time.Duration 0 等待结果或锁的超时;0 表示仅受上层 ctx 控制。
WaitInterval time.Duration 50ms 等待轮询间隔。

负数配置现在会被显式拒绝,而不是静默回退默认值。

缓存策略

HTTP 中间件默认缓存 2xx 响应。如果你希望把某些 4xx 也视为可复用结果,可以通过 WithHTTPStatusCacheFunc 显式指定:

middleware := idemComp.GinMiddleware(
	idem.WithHTTPStatusCacheFunc(func(status int) bool {
		return status == http.StatusConflict
	}),
).(func(*gin.Context))

gRPC 拦截器默认缓存成功的 proto.Message 响应。你也可以通过 WithGRPCResponseCacheFunc 进一步缩小缓存范围:

interceptor := idemComp.UnaryServerInterceptor(
	idem.WithGRPCResponseCacheFunc(func(msg proto.Message) bool {
		return msg.ProtoReflect().Descriptor().FullName() == "demo.OrderReply"
	}),
)

需要注意的是,当前 gRPC 幂等缓存仍然只支持 proto.Message。非 proto 成功结果不会被缓存。

续期与异常边界

对于耗时较长的执行,idem 会在锁生命周期过半时尝试自动续期,避免执行过程中锁提前过期。如果续期失败,组件现在会把它视为真实错误,而不是只记 warning。对 ExecuteConsume 这类直接调用场景,这会阻止成功结果被继续缓存,降低“锁已经丢了但本地还在提交结果”的风险。

对 HTTP/gRPC 中间件场景,续期失败同样会阻止结果进入缓存,但如果业务 handler 已经把响应写给客户端,组件无法回滚已经发送出去的响应。这是应用层幂等组件的天然边界。

推荐实践

最重要的设计点仍然是 key 设计。幂等 key 必须和业务操作绑定,至少要能区分“同一个用户的同一次提交”和“两个不同请求”。常见做法是 source + business_id + request_id

第二个关键点是 把返回值当作 JSON 友好数据读取。如果你在 Execute 的返回值上依赖具体 Go 结构体类型断言,那么第一次执行和缓存命中都很容易出问题。对于强类型恢复需求,更合适的方向通常是业务层自带编解码,或者后续引入显式 codec。

Documentation

Overview

Package idem 提供结果复用型幂等组件,用于抑制同一请求或消息的重复成功提交。

idem 在 Genesis 业务层中承担“去重和结果复用”职责。它的核心语义不是严格的 exactly-once 执行,而是:

  • 同一 key 的成功结果会被缓存,后续请求直接复用
  • 同一 key 的并发执行会被锁保护,避免并发穿透
  • 业务执行失败不会缓存结果,后续允许重试

当前组件提供四个入口:

  • Execute:手动幂等执行,适合业务逻辑直接调用
  • Consume:消息消费去重,只关心“是否已执行”
  • GinMiddleware:HTTP 幂等中间件
  • UnaryServerInterceptor:gRPC 一元服务端幂等拦截器

组件同时支持 Redis 和 Memory 两种后端。Redis 适合分布式环境,Memory 适合单机、 本地开发和测试。

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConfigNil 配置为空
	ErrConfigNil = xerrors.New("idem: config is nil")

	// ErrKeyEmpty 幂等键为空
	ErrKeyEmpty = xerrors.New("idem: key is empty")

	// ErrConcurrentRequest 并发请求
	ErrConcurrentRequest = xerrors.New("idem: concurrent request detected")

	// ErrLockLost 表示执行过程中丢失了幂等锁
	ErrLockLost = xerrors.New("idem: lock lost during execution")

	// ErrResultNotFound 结果未找到(内部使用)
	ErrResultNotFound = xerrors.New("idem: result not found")
)

错误定义

Functions

This section is empty.

Types

type Config

type Config struct {
	// Driver 后端类型: "redis" | "memory" (默认 "redis")
	Driver DriverType `json:"driver" yaml:"driver"`

	// Prefix Redis Key 前缀,默认 "idem:"
	// 例如:"myapp:idem:" 将使用 "myapp:idem:{key}" 作为存储键
	Prefix string `json:"prefix" yaml:"prefix"`

	// DefaultTTL 幂等记录有效期,默认 24h
	// 超过此时间后,缓存的结果将被清理,后续相同请求将重新执行
	DefaultTTL time.Duration `json:"default_ttl" yaml:"default_ttl"`

	// LockTTL 处理过程中的锁超时时间,默认 30s
	// 防止业务逻辑崩溃导致死锁,超时后锁自动释放
	LockTTL time.Duration `json:"lock_ttl" yaml:"lock_ttl"`

	// WaitTimeout 等待结果的最长时间,默认 0(仅受 ctx 影响)
	// 当未获取到锁时,将阻塞等待结果或锁可用
	WaitTimeout time.Duration `json:"wait_timeout" yaml:"wait_timeout"`

	// WaitInterval 等待结果的轮询间隔,默认 50ms
	WaitInterval time.Duration `json:"wait_interval" yaml:"wait_interval"`
}

Config 幂等性组件配置

type DeletableStore added in v0.5.0

type DeletableStore interface {
	Store
	DeleteResult(ctx context.Context, key string) error
}

DeletableStore 可删除缓存结果的存储实现。 用于清理损坏的缓存数据并触发重新执行。

type DriverType

type DriverType string

DriverType 幂等组件驱动类型

const (
	// DriverRedis 使用 Redis 作为后端
	DriverRedis DriverType = "redis"
	// DriverMemory 使用内存作为后端(仅单机)
	DriverMemory DriverType = "memory"
)

type Idempotency

type Idempotency interface {
	// Execute 执行幂等操作
	//
	// 工作流程:
	//   1. 如果 key 已存在且完成 → 直接返回缓存结果
	//   2. 如果 key 正在处理中 → 等待结果或重新尝试获取锁
	//   3. 如果 key 不存在 → 执行 fn 并缓存成功结果
	//
	// 参数:
	//   - ctx: 上下文,用于取消和超时控制
	//   - key: 幂等性键,全局唯一标识这次操作
	//   - fn: 业务逻辑函数,只在第一次请求时执行
	//
	// 返回:
	//   - 执行结果或缓存的结果。为保证首次执行与缓存命中的类型一致,返回值会经过同一套 JSON 编解码规范化。
	//   - 错误:ErrKeyEmpty、上下文错误、锁丢失错误等
	Execute(ctx context.Context, key string, fn func(ctx context.Context) (any, error)) (any, error)

	// Consume 用于消息消费的幂等处理
	//
	// 工作流程:
	//   1. 如果 key 已存在且完成 → 直接返回 false
	//   2. 如果 key 正在处理中 → 返回 ErrConcurrentRequest
	//   3. 如果 key 不存在 → 执行 fn 并标记已处理
	//
	// 返回:
	//   - executed: 是否执行了 fn
	//   - 错误:ErrKeyEmpty, ErrConcurrentRequest 等
	Consume(ctx context.Context, key string, ttl time.Duration, fn func(ctx context.Context) error) (executed bool, err error)

	// GinMiddleware 创建 Gin 框架中间件
	//
	// 使用示例:
	//   middleware := idem.GinMiddleware().(func(*gin.Context))
	//   router.POST("/orders", middleware, handler)
	//   // 或者直接使用(Gin 会自动处理):
	//   router.Use(idem.GinMiddleware().(func(*gin.Context)))
	//
	// 工作原理:
	//   1. 从 HTTP 请求头 X-Idempotency-Key 提取幂等性键
	//   2. 如果缓存命中,直接返回缓存的响应
	//   3. 如果未命中,执行 handler 并按缓存策略缓存响应
	//
	// 参数:
	//   - opts: 中间件选项,可自定义请求头名称等
	//
	// 返回:
	//   - func(*gin.Context) 类型的中间件函数
	//
	// 注意:
	//   返回类型为 interface{} 是为了避免强依赖 gin 包,
	//   实际返回的是 func(*gin.Context) 类型。
	//   传给 gin 的 router 时需要显式类型断言为 gin.HandlerFunc。
	GinMiddleware(opts ...MiddlewareOption) any

	// UnaryServerInterceptor 创建 gRPC 一元服务端拦截器
	//
	// 使用示例:
	//   server := grpc.NewServer(
	//       grpc.UnaryInterceptor(idem.UnaryServerInterceptor()),
	//   )
	//
	// 工作原理:
	//   1. 从 gRPC metadata 提取 x-idem-key
	//   2. 使用分布式锁防止并发执行
	//   3. 如果缓存命中,返回缓存的 protobuf 响应
	//   4. 如果未命中,执行 RPC handler 并按缓存策略缓存成功响应
	//
	// 参数:
	//   - opts: 拦截器选项,可自定义 metadata 键名称等
	//
	// 返回:
	//   - gRPC 一元服务端拦截器
	//
	// 注意:
	//   只支持一元 RPC 调用,不支持流式 RPC(因为流式交互的复杂性)。
	//   当前默认只缓存成功的 proto.Message 响应。
	UnaryServerInterceptor(opts ...InterceptorOption) grpc.UnaryServerInterceptor
}

Idempotency 幂等性组件核心接口

支持三种使用方式: 1. Execute: 手动调用,适合业务层直接使用 2. GinMiddleware: Gin 框架中间件,自动处理 HTTP 请求幂等性 3. UnaryServerInterceptor: gRPC 一元拦截器,处理单次 RPC 调用幂等性

func New

func New(cfg *Config, opts ...Option) (Idempotency, error)

New 创建幂等性组件实例

这是标准的工厂函数,支持配置驱动和显式依赖注入。

参数:

  • cfg: 幂等性配置,不可为 nil
  • opts: 可选配置,如 WithLogger(), WithRedisConnector()

返回:

  • Idempotency 组件实例
  • 错误:缺少必要连接器或配置非法

使用示例:

idem, err := idem.New(&idem.Config{
    Driver:     idem.DriverRedis,
    Prefix:     "myapp:idem:",
    DefaultTTL: 24 * time.Hour,
    LockTTL:    30 * time.Second,
}, idem.WithRedisConnector(redisConn), idem.WithLogger(logger))

type InterceptorOption

type InterceptorOption func(*interceptorOptions)

InterceptorOption gRPC 拦截器选项函数

func WithGRPCResponseCacheFunc added in v0.5.0

func WithGRPCResponseCacheFunc(fn func(msg proto.Message) bool) InterceptorOption

WithGRPCResponseCacheFunc 设置 gRPC 拦截器的响应缓存策略。 只有满足该条件的 proto.Message 成功响应才会被缓存。

func WithMetadataKey

func WithMetadataKey(metadataKey string) InterceptorOption

WithMetadataKey 设置 gRPC 拦截器的幂等键 metadata 键名。 默认为 "x-idem-key"。

type LockToken

type LockToken string

LockToken 锁令牌,用于保证解锁安全

type MiddlewareOption

type MiddlewareOption func(*middlewareOptions)

MiddlewareOption Gin 中间件选项函数

func WithHTTPStatusCacheFunc added in v0.5.0

func WithHTTPStatusCacheFunc(fn func(status int) bool) MiddlewareOption

WithHTTPStatusCacheFunc 设置 Gin 中间件的 HTTP 响应缓存策略。 返回 true 表示该状态码的响应会被缓存。

func WithHeaderKey

func WithHeaderKey(headerKey string) MiddlewareOption

WithHeaderKey 设置 Gin 中间件的幂等键 HTTP 头名称。 默认为 "X-Idempotency-Key"。

type Option

type Option func(*options)

Option 组件初始化选项函数

func WithLogger

func WithLogger(logger clog.Logger) Option

WithLogger 设置 Logger。

func WithRedisConnector

func WithRedisConnector(conn connector.RedisConnector) Option

WithRedisConnector 注入 Redis 连接器。

type RefreshableStore

type RefreshableStore interface {
	Store
	Refresh(ctx context.Context, key string, token LockToken, ttl time.Duration) error
}

RefreshableStore 可刷新锁 TTL 的存储实现 用于长时间执行时保持锁不失效

type Store

type Store interface {
	// Lock 尝试获取锁(标记处理中)
	// 返回 true 表示成功获取锁,false 表示已被其他请求锁定
	Lock(ctx context.Context, key string, ttl time.Duration) (LockToken, bool, error)

	// Unlock 释放锁(通常用于执行失败时清理)
	Unlock(ctx context.Context, key string, token LockToken) error

	// SetResult 保存执行结果并标记完成
	// 同时会自动释放锁
	SetResult(ctx context.Context, key string, val []byte, ttl time.Duration, token LockToken) error

	// GetResult 获取已完成的结果
	// 如果结果不存在,返回 ErrResultNotFound
	GetResult(ctx context.Context, key string) ([]byte, error)
}

Store 幂等性存储接口

定义了幂等性组件与存储后端的交互方式。 存储后端需要支持三种状态:

  1. 锁定中(processing): Lock() 成功后的状态
  2. 已完成(completed): SetResult() 后的状态
  3. 不存在(absent): 初始状态或 TTL 过期后

默认提供 Redis / Memory 实现。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL