idem

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: MIT Imports: 19 Imported by: 0

README

idem - 幂等性组件

Go Reference

分布式幂等性组件,确保操作的"一次且仅一次"执行。支持手动调用、Gin 中间件、gRPC 拦截器。

特性

  • 多场景支持:手动调用、Gin 中间件、gRPC 拦截器
  • 结果缓存:自动缓存执行结果,重复请求直接返回
  • 并发控制:内置分布式锁,防止并发穿透
  • 双驱动:Redis / Memory(Memory 仅单机)

快速开始

手动模式

适用于 MQ 消费、RPC 调用等需要显式控制的场景。

redisConn, _ := connector.NewRedis(&cfg.Redis, connector.WithLogger(logger))
defer redisConn.Close()

idem, _ := idem.New(&idem.Config{
    Driver:     idem.DriverRedis,
    Prefix:     "myapp:idem:",
    DefaultTTL: 24 * time.Hour,
}, idem.WithRedisConnector(redisConn), idem.WithLogger(logger))

result, err := idem.Execute(ctx, "order:create:12345", func(ctx context.Context) (interface{}, error) {
    // 只在第一次请求时执行
    return createOrder(ctx, req)
})
Gin 中间件

自动从 X-Idempotency-Key 头提取幂等键,缓存 HTTP 响应。

r := gin.Default()
r.POST("/orders",
    gin.HandlerFunc(idem.GinMiddleware().(func(*gin.Context))),
    handler,
)
消息消费去重

仅需判断是否消费过,不缓存结果。

executed, err := idem.Consume(ctx, "msg:"+msgID, 30*time.Minute, func(ctx context.Context) error {
    return handleMessage(ctx, msg)
})
if !executed {
    return // 已消费过
}
gRPC 拦截器

客户端在 metadata 中传递幂等键,服务端自动处理。

s := grpc.NewServer(
    grpc.UnaryInterceptor(idem.UnaryServerInterceptor()),
)

核心 API

Idempotency
type Idempotency interface {
    // Execute 执行幂等操作,返回结果或缓存
    Execute(ctx, key string, fn func(ctx) (interface{}, error)) (interface{}, error)

    // Consume 消息消费去重,返回是否执行了 fn
    Consume(ctx, key string, ttl time.Duration, fn func(ctx) error) (bool, error)

    // GinMiddleware 返回 Gin 中间件
    GinMiddleware(opts ...MiddlewareOption) interface{}

    // UnaryServerInterceptor 返回 gRPC 拦截器
    UnaryServerInterceptor(opts ...InterceptorOption) grpc.UnaryServerInterceptor
}
Config
type Config struct {
    Driver      DriverType   // redis | memory
    Prefix      string       // Key 前缀,默认 "idem:"
    DefaultTTL  time.Duration // 结果有效期,默认 24h
    LockTTL     time.Duration // 锁超时,默认 30s
    WaitTimeout time.Duration // 等待结果超时,默认 0
    WaitInterval time.Duration // 轮询间隔,默认 50ms
}

工作原理

状态 Redis Key 说明
锁定中 {prefix}{key}:lock 正在处理,其他请求等待
已完成 {prefix}{key}:result 处理完成,返回缓存结果

锁使用随机 token 保证安全性,避免误删。

中间件选项

// 自定义 HTTP 头名称
idem.GinMiddleware(idem.WithHeaderKey("X-Request-ID"))

// 自定义 gRPC metadata 键
idem.UnaryServerInterceptor(idem.WithMetadataKey("idem-key"))

标准错误

var (
    ErrConfigNil        = xerrors.New("idem: config is nil")
    ErrKeyEmpty         = xerrors.New("idem: key is empty")
    ErrConcurrentRequest = xerrors.New("idem: concurrent request detected")
    ErrResultNotFound   = xerrors.New("idem: result not found")
)

最佳实践

  1. Key 设计:确保全局唯一,如 source_id:event_iduser_id:request_id
  2. TTL 设置:根据业务"重复窗口"设置,订单支付 1h,财务操作可能更长
  3. 错误不缓存fn 返回 error 时不会缓存结果,允许重试
  4. 响应大小:Gin/gRPC 会缓存完整响应,注意 Redis 存储压力
  5. 4xx 响应:HTTP 中间件仅缓存 2xx 响应,4xx 不缓存(客户端参数错误)

测试

go test -v ./idem

示例

参考 examples/idem

Documentation

Overview

Package idem 提供了幂等性组件,用于确保在分布式环境中操作的"一次且仅一次"执行。

idem 是 Genesis 业务层的核心组件,它提供了: - 统一的 Idempotency 接口,支持手动调用、Gin 中间件、gRPC 拦截器 - 结果缓存:自动缓存执行结果,重复请求直接返回缓存数据 - 并发控制:内置分布式锁机制,防止同一幂等键的并发穿透 - 后端可配置:支持 Redis / Memory - 与 L0 基础组件(日志)的深度集成

## 基本使用

// 创建幂等组件
idem, _ := idem.New(&idem.Config{
    Driver:     idem.DriverRedis,
    Prefix:     "myapp:idem:",
    DefaultTTL: 24 * time.Hour,
}, idem.WithRedisConnector(redisConn), idem.WithLogger(logger))

// 执行幂等操作
result, err := idem.Execute(ctx, "order:create:12345", func(ctx context.Context) (interface{}, error) {
    // 业务逻辑
    return map[string]interface{}{"order_id": "12345"}, nil
})

## Gin 中间件

r := gin.Default()
r.POST("/orders", idem.GinMiddleware(), func(c *gin.Context) {
    c.JSON(200, gin.H{"order_id": "123"})
})

## gRPC 拦截器

s := grpc.NewServer(
    grpc.UnaryInterceptor(idem.UnaryServerInterceptor()),
)

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConfigNil 配置为空
	ErrConfigNil = xerrors.New("idem: config is nil")

	// ErrKeyEmpty 幂等键为空
	ErrKeyEmpty = xerrors.New("idem: key is empty")

	// ErrConcurrentRequest 并发请求
	ErrConcurrentRequest = xerrors.New("idem: concurrent request detected")

	// ErrResultNotFound 结果未找到(内部使用)
	ErrResultNotFound = xerrors.New("idem: result not found")
)

错误定义

Functions

This section is empty.

Types

type Config

type Config struct {
	// Driver 后端类型: "redis" | "memory" (默认 "redis")
	Driver DriverType `json:"driver" yaml:"driver"`

	// Prefix Redis Key 前缀,默认 "idem:"
	// 例如:"myapp:idem:" 将使用 "myapp:idem:{key}" 作为存储键
	Prefix string `json:"prefix" yaml:"prefix"`

	// DefaultTTL 幂等记录有效期,默认 24h
	// 超过此时间后,缓存的结果将被清理,后续相同请求将重新执行
	DefaultTTL time.Duration `json:"default_ttl" yaml:"default_ttl"`

	// LockTTL 处理过程中的锁超时时间,默认 30s
	// 防止业务逻辑崩溃导致死锁,超时后锁自动释放
	LockTTL time.Duration `json:"lock_ttl" yaml:"lock_ttl"`

	// WaitTimeout 等待结果的最长时间,默认 0(仅受 ctx 影响)
	// 当未获取到锁时,将阻塞等待结果或锁可用
	WaitTimeout time.Duration `json:"wait_timeout" yaml:"wait_timeout"`

	// WaitInterval 等待结果的轮询间隔,默认 50ms
	WaitInterval time.Duration `json:"wait_interval" yaml:"wait_interval"`
}

Config 幂等性组件配置

type DriverType

type DriverType string

DriverType 幂等组件驱动类型

const (
	// DriverRedis 使用 Redis 作为后端
	DriverRedis DriverType = "redis"
	// DriverMemory 使用内存作为后端(仅单机)
	DriverMemory DriverType = "memory"
)

type Idempotency

type Idempotency interface {
	// Execute 执行幂等操作
	//
	// 工作流程:
	//   1. 如果 key 已存在且完成 → 直接返回缓存结果
	//   2. 如果 key 正在处理中 → 返回 ErrConcurrentRequest
	//   3. 如果 key 不存在 → 执行 fn 并缓存结果
	//
	// 参数:
	//   - ctx: 上下文,用于取消和超时控制
	//   - key: 幂等性键,全局唯一标识这次操作
	//   - fn: 业务逻辑函数,只在第一次请求时执行
	//
	// 返回:
	//   - 执行结果或缓存的结果
	//   - 错误:ErrKeyEmpty, ErrConcurrentRequest 等
	Execute(ctx context.Context, key string, fn func(ctx context.Context) (interface{}, error)) (interface{}, error)

	// Consume 用于消息消费的幂等处理
	//
	// 工作流程:
	//   1. 如果 key 已存在且完成 → 直接返回 false
	//   2. 如果 key 正在处理中 → 返回 ErrConcurrentRequest
	//   3. 如果 key 不存在 → 执行 fn 并标记已处理
	//
	// 返回:
	//   - executed: 是否执行了 fn
	//   - 错误:ErrKeyEmpty, ErrConcurrentRequest 等
	Consume(ctx context.Context, key string, ttl time.Duration, fn func(ctx context.Context) error) (executed bool, err error)

	// GinMiddleware 创建 Gin 框架中间件
	//
	// 使用示例:
	//   middleware := idem.GinMiddleware().(func(*gin.Context))
	//   router.POST("/orders", middleware, handler)
	//   // 或者直接使用(Gin 会自动处理):
	//   router.Use(idem.GinMiddleware().(func(*gin.Context)))
	//
	// 工作原理:
	//   1. 从 HTTP 请求头 X-Idempotency-Key 提取幂等性键
	//   2. 如果缓存命中,直接返回缓存的响应
	//   3. 如果未命中,执行 handler 并缓存响应
	//
	// 参数:
	//   - opts: 中间件选项,可自定义请求头名称等
	//
	// 返回:
	//   - func(*gin.Context) 类型的中间件函数
	//
	// 注意:
	//   返回类型为 interface{} 是为了避免强依赖 gin 包,
	//   实际返回的是 func(*gin.Context) 类型。
	//   传给 gin 的 router 时需要显式类型断言为 gin.HandlerFunc。
	GinMiddleware(opts ...MiddlewareOption) interface{}

	// UnaryServerInterceptor 创建 gRPC 一元服务端拦截器
	//
	// 使用示例:
	//   server := grpc.NewServer(
	//       grpc.UnaryInterceptor(idem.UnaryServerInterceptor()),
	//   )
	//
	// 工作原理:
	//   1. 从 gRPC metadata 提取 x-idem-key
	//   2. 使用分布式锁防止并发执行
	//   3. 如果缓存命中,返回缓存的 protobuf 响应
	//   4. 如果未命中,执行 RPC handler 并缓存结果
	//
	// 参数:
	//   - opts: 拦截器选项,可自定义 metadata 键名称等
	//
	// 返回:
	//   - gRPC 一元服务端拦截器
	//
	// 注意:
	//   只支持一元 RPC 调用,不支持流式 RPC(因为流式交互的复杂性)。
	UnaryServerInterceptor(opts ...InterceptorOption) grpc.UnaryServerInterceptor
}

Idempotency 幂等性组件核心接口

支持三种使用方式: 1. Execute: 手动调用,适合业务层直接使用 2. GinMiddleware: Gin 框架中间件,自动处理 HTTP 请求幂等性 3. UnaryServerInterceptor: gRPC 一元拦截器,处理单次 RPC 调用幂等性

func New

func New(cfg *Config, opts ...Option) (Idempotency, error)

New 创建幂等性组件实例

这是标准的工厂函数,支持配置驱动和显式依赖注入。

参数:

  • cfg: 幂等性配置,不可为 nil
  • opts: 可选配置,如 WithLogger(), WithRedisConnector()

返回:

  • Idempotency 组件实例
  • 错误:缺少必要连接器或配置非法

使用示例:

idem, err := idem.New(&idem.Config{
    Driver:     idem.DriverRedis,
    Prefix:     "myapp:idem:",
    DefaultTTL: 24 * time.Hour,
    LockTTL:    30 * time.Second,
}, idem.WithRedisConnector(redisConn), idem.WithLogger(logger))

type InterceptorOption

type InterceptorOption func(*interceptorOptions)

InterceptorOption gRPC 拦截器选项函数

func WithMetadataKey

func WithMetadataKey(metadataKey string) InterceptorOption

WithMetadataKey 设置 gRPC 拦截器的幂等键 metadata 键名 默认为 "x-idem-key"

type LockToken

type LockToken string

LockToken 锁令牌,用于保证解锁安全

type MiddlewareOption

type MiddlewareOption func(*middlewareOptions)

MiddlewareOption Gin 中间件选项函数

func WithHeaderKey

func WithHeaderKey(headerKey string) MiddlewareOption

WithHeaderKey 设置 Gin 中间件的幂等键 HTTP 头名称 默认为 "X-Idempotency-Key"

type Option

type Option func(*options)

Option 组件初始化选项函数

func WithLogger

func WithLogger(logger clog.Logger) Option

WithLogger 设置 Logger

func WithRedisConnector

func WithRedisConnector(conn connector.RedisConnector) Option

WithRedisConnector 注入 Redis 连接器

type RefreshableStore

type RefreshableStore interface {
	Store
	Refresh(ctx context.Context, key string, token LockToken, ttl time.Duration) error
}

RefreshableStore 可刷新锁 TTL 的存储实现 用于长时间执行时保持锁不失效

type Store

type Store interface {
	// Lock 尝试获取锁(标记处理中)
	// 返回 true 表示成功获取锁,false 表示已被其他请求锁定
	Lock(ctx context.Context, key string, ttl time.Duration) (LockToken, bool, error)

	// Unlock 释放锁(通常用于执行失败时清理)
	Unlock(ctx context.Context, key string, token LockToken) error

	// SetResult 保存执行结果并标记完成
	// 同时会自动释放锁
	SetResult(ctx context.Context, key string, val []byte, ttl time.Duration, token LockToken) error

	// GetResult 获取已完成的结果
	// 如果结果不存在,返回 ErrResultNotFound
	GetResult(ctx context.Context, key string) ([]byte, error)
}

Store 幂等性存储接口

定义了幂等性组件与存储后端的交互方式。 存储后端需要支持三种状态:

  1. 锁定中(processing): Lock() 成功后的状态
  2. 已完成(completed): SetResult() 后的状态
  3. 不存在(absent): 初始状态或 TTL 过期后

默认提供 Redis / Memory 实现。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL