outbox

package
v1.1.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

README

Outbox Pattern 通用框架

📋 概述

Outbox 模式是一种用于确保分布式系统中数据一致性的设计模式。本框架提供了一个通用的、可复用的 Outbox 实现,可以被所有微服务使用。

🎯 核心特性

  • 数据库无关:领域模型完全独立于数据库实现
  • 依赖注入:通过 EventPublisher 接口实现依赖倒置
  • 零外部依赖:核心包不依赖任何外部库
  • 多租户支持:内置租户隔离
  • 自动重试:支持失败事件自动重试
  • 定时发布:支持延迟发布
  • 自动清理:自动清理已发布的事件
  • 健康检查:内置健康检查机制
  • 指标收集:支持指标收集和监控

🏗️ 架构设计

核心组件
┌─────────────────────────────────────────────────────────┐
│              jxt-core/sdk/pkg/outbox/                   │
├─────────────────────────────────────────────────────────┤
│  ✅ 1. OutboxEvent(领域模型)                           │
│  ✅ 2. OutboxRepository(仓储接口)                      │
│  ✅ 3. EventPublisher(事件发布接口)⭐                  │
│  ✅ 4. OutboxPublisher(发布器)                         │
│  ✅ 5. OutboxScheduler(调度器)                         │
│  ✅ 6. TopicMapper(Topic 映射)                         │
│                                                         │
│  ✅ 7. GORM 适配器(adapters/gorm/)                    │
│     - OutboxEventModel(数据库模型)                    │
│     - GormOutboxRepository(仓储实现)                  │
└─────────────────────────────────────────────────────────┘
依赖注入设计
业务微服务
    │
    ├─ 创建 EventBus(jxt-core)
    │
    ├─ 创建 EventBusAdapter(实现 EventPublisher 接口)⭐
    │
    └─ 注入到 OutboxScheduler
           │
           └─ OutboxPublisher(依赖 EventPublisher 接口)

🚀 快速开始

步骤 1:定义 TopicMapper
// internal/outbox/topics.go
package outbox

import (
    "github.com/ChenBigdata421/jxt-core/sdk/pkg/outbox"
)

func NewTopicMapper() outbox.TopicMapper {
    return outbox.NewMapBasedTopicMapper(map[string]string{
        "Archive": "archive-events",
        "Media":   "media-events",
    }, "default-events")
}
步骤 2:创建 EventBus 适配器 ⭐
// internal/outbox/eventbus_adapter.go
package outbox

import (
    "context"
    "github.com/ChenBigdata421/jxt-core/sdk/pkg/eventbus"
    "github.com/ChenBigdata421/jxt-core/sdk/pkg/outbox"
)

type EventBusAdapter struct {
    eventBus eventbus.EventBus
}

func NewEventBusAdapter(eventBus eventbus.EventBus) outbox.EventPublisher {
    return &EventBusAdapter{eventBus: eventBus}
}

func (a *EventBusAdapter) Publish(ctx context.Context, topic string, data []byte) error {
    return a.eventBus.Publish(ctx, topic, data)
}
步骤 3:初始化 Outbox
// cmd/main.go
package main

import (
    "context"
    "github.com/ChenBigdata421/jxt-core/sdk"
    "github.com/ChenBigdata421/jxt-core/sdk/pkg/outbox"
    gormadapter "github.com/ChenBigdata421/jxt-core/sdk/pkg/outbox/adapters/gorm"
    localoutbox "your-service/internal/outbox"
)

func main() {
    ctx := context.Background()
    
    // 1. 获取数据库
    db := sdk.Runtime.GetDB()
    
    // 2. 自动迁移(创建表)
    db.AutoMigrate(&gormadapter.OutboxEventModel{})
    
    // 3. 创建仓储
    outboxRepo := gormadapter.NewGormOutboxRepository(db)
    
    // 4. 创建 EventBus 适配器 ⭐
    eventBus := sdk.Runtime.GetEventBus()
    eventPublisher := localoutbox.NewEventBusAdapter(eventBus)
    
    // 5. 创建并启动调度器
    scheduler := outbox.NewScheduler(
        outbox.WithRepository(outboxRepo),
        outbox.WithEventPublisher(eventPublisher),  // 注入 EventPublisher ⭐
        outbox.WithTopicMapper(localoutbox.NewTopicMapper()),
    )
    
    // 6. 启动调度器
    if err := scheduler.Start(ctx); err != nil {
        panic(err)
    }
    defer scheduler.Stop(ctx)
    
    // 7. 启动应用...
}
步骤 4:业务代码使用
// internal/service/archive_service.go
package service

import (
    "context"
    "github.com/ChenBigdata421/jxt-core/sdk/pkg/outbox"
    gormadapter "github.com/ChenBigdata421/jxt-core/sdk/pkg/outbox/adapters/gorm"
)

type ArchiveService struct {
    db          *gorm.DB
    outboxRepo  outbox.OutboxRepository
}

func (s *ArchiveService) CreateArchive(ctx context.Context, archive *Archive) error {
    // 开始事务
    return s.db.Transaction(func(tx *gorm.DB) error {
        // 1. 保存业务数据
        if err := tx.Create(archive).Error; err != nil {
            return err
        }
        
        // 2. 创建 Outbox 事件
        event, err := outbox.NewOutboxEvent(
            archive.TenantID,
            archive.ID,
            "Archive",
            "ArchiveCreated",
            archive,
        )
        if err != nil {
            return err
        }
        
        // 3. 在同一事务中保存 Outbox 事件
        if err := s.outboxRepo.Save(ctx, event); err != nil {
            return err
        }
        
        return nil
    })
}

📚 详细文档

🎁 核心优势

1. 符合 SOLID 原则
  • 依赖倒置原则(DIP):Outbox 依赖 EventPublisher 接口,不依赖具体实现
  • 开闭原则(OCP):可以扩展新的 EventPublisher 实现,无需修改 Outbox 代码
  • 单一职责原则(SRP):每个组件职责明确
2. 零外部依赖
// jxt-core/sdk/pkg/outbox/ 的依赖
import (
    "context"
    "time"
    "encoding/json"
    // 没有任何外部包依赖!
)
3. 易于测试
// 可以轻松 mock EventPublisher
type MockEventPublisher struct {
    publishedEvents []PublishedEvent
}

func (m *MockEventPublisher) Publish(ctx context.Context, topic string, data []byte) error {
    m.publishedEvents = append(m.publishedEvents, PublishedEvent{topic, data})
    return nil
}
4. 灵活扩展
  • ✅ 支持任何 EventBus 实现(Kafka/NATS/Memory/自定义)
  • ✅ 支持任何数据库(GORM/MongoDB/自定义)
  • ✅ 支持自定义 TopicMapper

📊 组件职责

组件 位置 业务微服务需要做什么
OutboxEvent jxt-core ❌ 无需实现
OutboxRepository jxt-core ❌ 无需实现
EventPublisher 接口 jxt-core ❌ 无需实现(只是接口定义)⭐
OutboxPublisher jxt-core ❌ 无需实现
OutboxScheduler jxt-core ❌ 无需实现
GORM 适配器 jxt-core ❌ 无需实现(可选使用)
TopicMapper 业务微服务 需要实现(定义映射)
EventBusAdapter 业务微服务 需要实现(5-10 行代码)⭐
配置 业务微服务 可选(有默认值)
启动调度器 业务微服务 需要启动(一行代码)

🔧 配置选项

调度器配置
config := &outbox.SchedulerConfig{
    PollInterval:        10 * time.Second,  // 轮询间隔
    BatchSize:           100,                // 批量大小
    TenantID:            "",                 // 租户 ID(可选)
    CleanupInterval:     1 * time.Hour,      // 清理间隔
    CleanupRetention:    24 * time.Hour,     // 清理保留时间
    HealthCheckInterval: 30 * time.Second,   // 健康检查间隔
    EnableHealthCheck:   true,               // 启用健康检查
    EnableCleanup:       true,               // 启用自动清理
    EnableMetrics:       true,               // 启用指标收集
}

scheduler := outbox.NewScheduler(
    outbox.WithRepository(repo),
    outbox.WithEventPublisher(eventPublisher),
    outbox.WithTopicMapper(topicMapper),
    outbox.WithSchedulerConfig(config),
)
发布器配置
config := &outbox.PublisherConfig{
    MaxRetries:     3,                    // 最大重试次数
    RetryDelay:     time.Second,          // 重试延迟
    PublishTimeout: 30 * time.Second,     // 发布超时
    EnableMetrics:  true,                 // 启用指标收集
    ErrorHandler: func(event *outbox.OutboxEvent, err error) {
        // 自定义错误处理
    },
}

scheduler := outbox.NewScheduler(
    outbox.WithRepository(repo),
    outbox.WithEventPublisher(eventPublisher),
    outbox.WithTopicMapper(topicMapper),
    outbox.WithPublisherConfig(config),
)

📈 监控指标

// 获取调度器指标
metrics := scheduler.GetMetrics()
fmt.Printf("Poll Count: %d\n", metrics.PollCount)
fmt.Printf("Processed Count: %d\n", metrics.ProcessedCount)
fmt.Printf("Error Count: %d\n", metrics.ErrorCount)

// 获取发布器指标
publisherMetrics := publisher.GetMetrics()
fmt.Printf("Published Count: %d\n", publisherMetrics.PublishedCount)
fmt.Printf("Failed Count: %d\n", publisherMetrics.FailedCount)

📝 许可证

MIT License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultTopicMapper = FuncTopicMapper(func(aggregateType string) string {
	return fmt.Sprintf("%s-events", aggregateType)
})

DefaultTopicMapper 默认 TopicMapper 使用简单的命名规则:{aggregateType}-events

Functions

This section is empty.

Types

type ChainTopicMapper

type ChainTopicMapper struct {
	// contains filtered or unexported fields
}

ChainTopicMapper 链式 TopicMapper 依次尝试多个 TopicMapper,直到找到非空 Topic

func (*ChainTopicMapper) GetTopic

func (c *ChainTopicMapper) GetTopic(aggregateType string) string

GetTopic 实现 TopicMapper 接口

type CompositeDLQHandler

type CompositeDLQHandler struct {
	// contains filtered or unexported fields
}

CompositeDLQHandler 组合 DLQ 处理器(示例实现) 允许同时使用多个 DLQ 处理器

func NewCompositeDLQHandler

func NewCompositeDLQHandler(handlers ...DLQHandler) *CompositeDLQHandler

NewCompositeDLQHandler 创建组合 DLQ 处理器

func (*CompositeDLQHandler) Handle

func (h *CompositeDLQHandler) Handle(ctx context.Context, event *OutboxEvent) error

Handle 处理死信事件(依次调用所有处理器)

type DLQAlertHandler

type DLQAlertHandler interface {
	// Alert 发送告警
	// ctx: 上下文
	// event: 失败的事件
	// 返回:发送失败时返回错误
	Alert(ctx context.Context, event *OutboxEvent) error
}

DLQAlertHandler 死信队列告警处理器接口 用于发送告警通知(邮件、短信、钉钉、企业微信等)

实现示例:

type EmailAlertHandler struct {
    emailService EmailService
}

func (h *EmailAlertHandler) Alert(ctx context.Context, event *OutboxEvent) error {
    subject := fmt.Sprintf("Outbox Event Failed: %s", event.EventType)
    body := fmt.Sprintf("Event ID: %s\nError: %s", event.ID, event.LastError)
    return h.emailService.Send(subject, body)
}

type DLQAlertHandlerFunc

type DLQAlertHandlerFunc func(ctx context.Context, event *OutboxEvent) error

DLQAlertHandlerFunc 函数式 DLQAlertHandler 允许使用函数作为 DLQAlertHandler 实现

func (DLQAlertHandlerFunc) Alert

func (f DLQAlertHandlerFunc) Alert(ctx context.Context, event *OutboxEvent) error

Alert 实现 DLQAlertHandler 接口

type DLQHandler

type DLQHandler interface {
	// Handle 处理死信事件
	// ctx: 上下文
	// event: 失败的事件
	// 返回:处理失败时返回错误
	Handle(ctx context.Context, event *OutboxEvent) error
}

DLQHandler 死信队列处理器接口 用于处理超过最大重试次数的失败事件

实现示例:

type MyDLQHandler struct {
    logger *zap.Logger
}

func (h *MyDLQHandler) Handle(ctx context.Context, event *OutboxEvent) error {
    // 1. 记录到专门的死信队列表
    // 2. 发送到死信队列 Topic
    // 3. 记录详细日志
    h.logger.Error("Event moved to DLQ",
        zap.String("event_id", event.ID),
        zap.String("error", event.LastError))
    return nil
}

type DLQHandlerFunc

type DLQHandlerFunc func(ctx context.Context, event *OutboxEvent) error

DLQHandlerFunc 函数式 DLQHandler 允许使用函数作为 DLQHandler 实现

func (DLQHandlerFunc) Handle

func (f DLQHandlerFunc) Handle(ctx context.Context, event *OutboxEvent) error

Handle 实现 DLQHandler 接口

type EmailAlertHandler

type EmailAlertHandler struct {
	// contains filtered or unexported fields
}

EmailAlertHandler 邮件告警处理器(示例实现) 发送邮件告警

func NewEmailAlertHandler

func NewEmailAlertHandler(smtpHost string, smtpPort int, from string, to []string) *EmailAlertHandler

NewEmailAlertHandler 创建邮件告警处理器

func (*EmailAlertHandler) Alert

func (h *EmailAlertHandler) Alert(ctx context.Context, event *OutboxEvent) error

Alert 发送告警

type Envelope

type Envelope struct {
	// EventID 事件唯一ID(必填,用于 Outbox 模式)
	EventID string `json:"event_id"`
	// AggregateID 聚合根ID
	AggregateID string `json:"aggregate_id"`
	// EventType 事件类型
	EventType string `json:"event_type"`
	// EventVersion 事件版本
	EventVersion int64 `json:"event_version"`
	// Payload 事件负载
	Payload []byte `json:"payload"`
	// Timestamp 事件时间戳
	Timestamp time.Time `json:"timestamp"`
	// TraceID 链路追踪ID(可选)
	TraceID string `json:"trace_id,omitempty"`
	// CorrelationID 关联ID(可选)
	CorrelationID string `json:"correlation_id,omitempty"`
}

Envelope Envelope 消息结构(与 eventbus.Envelope 兼容) 为了避免 Outbox 包依赖 EventBus 包,这里定义一个简化的 Envelope 结构

type EventPublisher

type EventPublisher interface {
	// PublishEnvelope 发布 Envelope 消息(推荐)
	// ✅ 支持 Outbox 模式:通过 GetPublishResultChannel() 获取 ACK 结果
	// ✅ 可靠投递:不容许丢失的领域事件必须使用此方法
	//
	// 参数:
	//   ctx: 上下文(可能包含租户信息、追踪信息等)
	//   topic: 目标 topic(由 TopicMapper 提供)
	//   envelope: Envelope 消息(包含 EventID、AggregateID、EventType 等元数据)
	//
	// 返回:
	//   error: 提交失败时返回错误(注意:立即返回,不等待 ACK)
	//
	// 注意:
	//   - 此方法是异步的,立即返回,不等待 ACK
	//   - ACK 结果通过 GetPublishResultChannel() 异步通知
	//   - 实现者应该记录日志
	//   - 实现者应该处理超时
	PublishEnvelope(ctx context.Context, topic string, envelope *Envelope) error

	// GetPublishResultChannel 获取异步发布结果通道
	// ⚠️ 仅 PublishEnvelope() 发送 ACK 结果到此通道
	// 用于 Outbox Processor 监听发布结果并更新 Outbox 状态
	//
	// 返回:
	//   <-chan *PublishResult: 只读的发布结果通道
	GetPublishResultChannel() <-chan *PublishResult
}

EventPublisher 事件发布器接口 Outbox 组件通过这个接口发布事件,不依赖具体的 EventBus 实现 这是依赖注入的核心,符合依赖倒置原则(DIP)

设计原则: 1. 最小化接口:只包含 Outbox 需要的方法 2. 零外部依赖:不依赖任何其他包 3. 清晰语义:方法签名简单明了

使用方式: 业务微服务需要创建一个适配器,将具体的 EventBus 实现适配为 EventPublisher 接口

示例(推荐使用 PublishEnvelope):

type EventBusAdapter struct {
    eventBus eventbus.EventBus
}

func (a *EventBusAdapter) PublishEnvelope(ctx context.Context, topic string, envelope *Envelope) error {
    return a.eventBus.PublishEnvelope(ctx, topic, envelope)
}

func (a *EventBusAdapter) GetPublishResultChannel() <-chan *PublishResult {
    return a.eventBus.GetPublishResultChannel()
}

func NewNoOpEventPublisher

func NewNoOpEventPublisher() EventPublisher

NewNoOpEventPublisher 创建空操作 EventPublisher

type EventStatus

type EventStatus string

EventStatus 事件状态

const (
	// EventStatusPending 待发布
	EventStatusPending EventStatus = "pending"
	// EventStatusPublished 已发布
	EventStatusPublished EventStatus = "published"
	// EventStatusFailed 发布失败
	EventStatusFailed EventStatus = "failed"
	// EventStatusMaxRetry 超过最大重试次数
	EventStatusMaxRetry EventStatus = "max_retry"
)

type FuncTopicMapper

type FuncTopicMapper func(aggregateType string) string

FuncTopicMapper 函数式 TopicMapper 允许使用函数作为 TopicMapper 实现

func (FuncTopicMapper) GetTopic

func (f FuncTopicMapper) GetTopic(aggregateType string) string

GetTopic 实现 TopicMapper 接口

type InMemoryMetricsCollector

type InMemoryMetricsCollector struct {

	// 计数器
	PublishedCount int64
	FailedCount    int64
	RetryCount     int64
	DLQCount       int64

	// 按租户统计
	PendingByTenant map[string]int64
	FailedByTenant  map[string]int64
	DLQByTenant     map[string]int64

	// 按事件类型统计
	PublishedByType map[string]int64
	FailedByType    map[string]int64

	// 耗时统计
	TotalDuration time.Duration
	AvgDuration   time.Duration
	MaxDuration   time.Duration
	MinDuration   time.Duration
	DurationCount int64
	// contains filtered or unexported fields
}

InMemoryMetricsCollector 内存指标收集器(用于测试和简单场景)

func NewInMemoryMetricsCollector

func NewInMemoryMetricsCollector() *InMemoryMetricsCollector

NewInMemoryMetricsCollector 创建内存指标收集器

func (*InMemoryMetricsCollector) GetSnapshot

func (c *InMemoryMetricsCollector) GetSnapshot() map[string]interface{}

GetSnapshot 获取指标快照(用于测试和调试)

func (*InMemoryMetricsCollector) RecordDLQ

func (c *InMemoryMetricsCollector) RecordDLQ(tenantID, aggregateType, eventType string)

RecordDLQ 记录事件进入死信队列

func (*InMemoryMetricsCollector) RecordFailed

func (c *InMemoryMetricsCollector) RecordFailed(tenantID, aggregateType, eventType string, err error)

RecordFailed 记录事件发布失败

func (*InMemoryMetricsCollector) RecordPublishDuration

func (c *InMemoryMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)

RecordPublishDuration 记录发布耗时

func (*InMemoryMetricsCollector) RecordPublished

func (c *InMemoryMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string)

RecordPublished 记录事件发布成功

func (*InMemoryMetricsCollector) RecordRetry

func (c *InMemoryMetricsCollector) RecordRetry(tenantID, aggregateType, eventType string)

RecordRetry 记录事件重试

func (*InMemoryMetricsCollector) Reset

func (c *InMemoryMetricsCollector) Reset()

Reset 重置所有指标(用于测试)

func (*InMemoryMetricsCollector) SetDLQCount

func (c *InMemoryMetricsCollector) SetDLQCount(tenantID string, count int64)

SetDLQCount 设置死信队列事件数量

func (*InMemoryMetricsCollector) SetFailedCount

func (c *InMemoryMetricsCollector) SetFailedCount(tenantID string, count int64)

SetFailedCount 设置失败事件数量

func (*InMemoryMetricsCollector) SetPendingCount

func (c *InMemoryMetricsCollector) SetPendingCount(tenantID string, count int64)

SetPendingCount 设置待发布事件数量

type LoggingDLQHandler

type LoggingDLQHandler struct {
	// contains filtered or unexported fields
}

LoggingDLQHandler 日志记录 DLQ 处理器(示例实现) 将死信事件记录到日志中

func NewLoggingDLQHandler

func NewLoggingDLQHandler() *LoggingDLQHandler

NewLoggingDLQHandler 创建日志记录 DLQ 处理器

func (*LoggingDLQHandler) Handle

func (h *LoggingDLQHandler) Handle(ctx context.Context, event *OutboxEvent) error

Handle 处理死信事件

type MapBasedTopicMapper

type MapBasedTopicMapper struct {
	// contains filtered or unexported fields
}

MapBasedTopicMapper 基于 Map 的 TopicMapper 实现 提供简单的映射表实现

func (*MapBasedTopicMapper) GetTopic

func (m *MapBasedTopicMapper) GetTopic(aggregateType string) string

GetTopic 实现 TopicMapper 接口

type MetricsCollector

type MetricsCollector interface {
	// RecordPublished 记录事件发布成功
	RecordPublished(tenantID, aggregateType, eventType string)

	// RecordFailed 记录事件发布失败
	RecordFailed(tenantID, aggregateType, eventType string, err error)

	// RecordRetry 记录事件重试
	RecordRetry(tenantID, aggregateType, eventType string)

	// RecordDLQ 记录事件进入死信队列
	RecordDLQ(tenantID, aggregateType, eventType string)

	// RecordPublishDuration 记录发布耗时
	RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)

	// SetPendingCount 设置待发布事件数量
	SetPendingCount(tenantID string, count int64)

	// SetFailedCount 设置失败事件数量
	SetFailedCount(tenantID string, count int64)

	// SetDLQCount 设置死信队列事件数量
	SetDLQCount(tenantID string, count int64)
}

MetricsCollector Outbox 指标收集器接口 用于集成 Prometheus、StatsD 等监控系统

实现示例(Prometheus):

type PrometheusMetricsCollector struct {
    publishedTotal   prometheus.Counter
    failedTotal      prometheus.Counter
    retryTotal       prometheus.Counter
    publishDuration  prometheus.Histogram
    pendingGauge     prometheus.Gauge
}

func (c *PrometheusMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string) {
    c.publishedTotal.With(prometheus.Labels{
        "tenant_id":      tenantID,
        "aggregate_type": aggregateType,
        "event_type":     eventType,
    }).Inc()
}

type NoOpDLQAlertHandler

type NoOpDLQAlertHandler struct{}

NoOpDLQAlertHandler 空操作 DLQAlertHandler(默认实现)

func (*NoOpDLQAlertHandler) Alert

func (n *NoOpDLQAlertHandler) Alert(ctx context.Context, event *OutboxEvent) error

Alert 实现 DLQAlertHandler 接口(什么都不做)

type NoOpDLQHandler

type NoOpDLQHandler struct{}

NoOpDLQHandler 空操作 DLQHandler(默认实现)

func (*NoOpDLQHandler) Handle

func (n *NoOpDLQHandler) Handle(ctx context.Context, event *OutboxEvent) error

Handle 实现 DLQHandler 接口(什么都不做)

type NoOpEventPublisher

type NoOpEventPublisher struct {
	// contains filtered or unexported fields
}

NoOpEventPublisher 空操作 EventPublisher(用于测试)

func (*NoOpEventPublisher) GetPublishResultChannel

func (n *NoOpEventPublisher) GetPublishResultChannel() <-chan *PublishResult

GetPublishResultChannel 实现 EventPublisher 接口

func (*NoOpEventPublisher) PublishEnvelope

func (n *NoOpEventPublisher) PublishEnvelope(ctx context.Context, topic string, envelope *Envelope) error

PublishEnvelope 实现 EventPublisher 接口(什么都不做,立即返回成功)

type NoOpMetricsCollector

type NoOpMetricsCollector struct{}

NoOpMetricsCollector 空操作指标收集器(默认实现)

func (*NoOpMetricsCollector) RecordDLQ

func (n *NoOpMetricsCollector) RecordDLQ(tenantID, aggregateType, eventType string)

RecordDLQ 实现 MetricsCollector 接口

func (*NoOpMetricsCollector) RecordFailed

func (n *NoOpMetricsCollector) RecordFailed(tenantID, aggregateType, eventType string, err error)

RecordFailed 实现 MetricsCollector 接口

func (*NoOpMetricsCollector) RecordPublishDuration

func (n *NoOpMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)

RecordPublishDuration 实现 MetricsCollector 接口

func (*NoOpMetricsCollector) RecordPublished

func (n *NoOpMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string)

RecordPublished 实现 MetricsCollector 接口

func (*NoOpMetricsCollector) RecordRetry

func (n *NoOpMetricsCollector) RecordRetry(tenantID, aggregateType, eventType string)

RecordRetry 实现 MetricsCollector 接口

func (*NoOpMetricsCollector) SetDLQCount

func (n *NoOpMetricsCollector) SetDLQCount(tenantID string, count int64)

SetDLQCount 实现 MetricsCollector 接口

func (*NoOpMetricsCollector) SetFailedCount

func (n *NoOpMetricsCollector) SetFailedCount(tenantID string, count int64)

SetFailedCount 实现 MetricsCollector 接口

func (*NoOpMetricsCollector) SetPendingCount

func (n *NoOpMetricsCollector) SetPendingCount(tenantID string, count int64)

SetPendingCount 实现 MetricsCollector 接口

type OutboxEvent

type OutboxEvent struct {
	// ID 事件唯一标识(UUID)
	ID string

	// TenantID 租户 ID(多租户支持)
	TenantID string

	// AggregateID 聚合根 ID
	AggregateID string

	// AggregateType 聚合根类型(用于 Topic 映射)
	// 例如:"Archive", "Media", "User"
	AggregateType string

	// EventType 事件类型
	// 例如:"ArchiveCreated", "MediaUploaded", "UserRegistered"
	EventType string

	// Payload 事件负载(JSON 格式)
	Payload json.RawMessage

	// Status 事件状态
	Status EventStatus

	// RetryCount 重试次数
	RetryCount int

	// MaxRetries 最大重试次数
	MaxRetries int

	// LastError 最后一次错误信息
	LastError string

	// CreatedAt 创建时间
	CreatedAt time.Time

	// UpdatedAt 更新时间
	UpdatedAt time.Time

	// PublishedAt 发布时间
	PublishedAt *time.Time

	// ScheduledAt 计划发布时间(延迟发布)
	ScheduledAt *time.Time

	// LastRetryAt 最后重试时间
	LastRetryAt *time.Time

	// Version 事件版本(用于事件演化)
	Version int64

	// TraceID 链路追踪ID(用于分布式追踪)
	// 用于追踪事件在多个微服务之间的流转
	TraceID string

	// CorrelationID 关联ID(用于关联相关事件)
	// 用于关联同一业务流程中的多个事件(例如:Saga 模式)
	CorrelationID string

	// IdempotencyKey 幂等性键(用于防止重复发布)
	// 格式建议:{TenantID}:{AggregateType}:{AggregateID}:{EventType}:{Timestamp}
	// 或者使用业务自定义的唯一标识
	// 用于在发布前检查是否已经发布过相同的事件
	IdempotencyKey string
}

OutboxEvent Outbox 事件领域模型 完全数据库无关,不包含任何数据库标签

func NewOutboxEvent

func NewOutboxEvent(
	tenantID string,
	aggregateID string,
	aggregateType string,
	eventType string,
	payload interface{},
) (*OutboxEvent, error)

NewOutboxEvent 创建新的 Outbox 事件

func (*OutboxEvent) CanRetry

func (e *OutboxEvent) CanRetry() bool

CanRetry 判断是否可以重试

func (*OutboxEvent) Clone

func (e *OutboxEvent) Clone() *OutboxEvent

Clone 克隆事件(用于测试)

func (*OutboxEvent) GetIdempotencyKey

func (e *OutboxEvent) GetIdempotencyKey() string

GetIdempotencyKey 获取幂等性键

func (*OutboxEvent) GetPayloadAs

func (e *OutboxEvent) GetPayloadAs(v interface{}) error

GetPayloadAs 将 Payload 反序列化为指定类型

func (*OutboxEvent) IncrementRetry

func (e *OutboxEvent) IncrementRetry(errorMessage string)

IncrementRetry 增加重试次数

func (*OutboxEvent) IsExpired

func (e *OutboxEvent) IsExpired(maxAge time.Duration) bool

IsExpired 判断事件是否过期(超过指定时间未发布)

func (*OutboxEvent) IsFailed

func (e *OutboxEvent) IsFailed() bool

IsFailed 判断是否失败

func (*OutboxEvent) IsMaxRetry

func (e *OutboxEvent) IsMaxRetry() bool

IsMaxRetry 判断是否超过最大重试次数

func (*OutboxEvent) IsPending

func (e *OutboxEvent) IsPending() bool

IsPending 判断是否为待发布状态

func (*OutboxEvent) IsPublished

func (e *OutboxEvent) IsPublished() bool

IsPublished 判断是否已发布

func (*OutboxEvent) MarkAsFailed

func (e *OutboxEvent) MarkAsFailed(err error)

MarkAsFailed 标记为失败

func (*OutboxEvent) MarkAsMaxRetry

func (e *OutboxEvent) MarkAsMaxRetry(errorMessage string)

MarkAsMaxRetry 标记为超过最大重试次数

func (*OutboxEvent) MarkAsPublished

func (e *OutboxEvent) MarkAsPublished()

MarkAsPublished 标记为已发布

func (*OutboxEvent) ResetForRetry

func (e *OutboxEvent) ResetForRetry()

ResetForRetry 重置为待发布状态(用于重试)

func (*OutboxEvent) SetPayload

func (e *OutboxEvent) SetPayload(payload interface{}) error

SetPayload 设置 Payload

func (*OutboxEvent) ShouldPublishNow

func (e *OutboxEvent) ShouldPublishNow() bool

ShouldPublishNow 判断是否应该立即发布

func (*OutboxEvent) ToEnvelope

func (e *OutboxEvent) ToEnvelope() interface{}

ToEnvelope 转换为 EventBus Envelope 用于将 OutboxEvent 转换为 EventBus 的统一消息包络格式

func (*OutboxEvent) WithCorrelationID

func (e *OutboxEvent) WithCorrelationID(correlationID string) *OutboxEvent

WithCorrelationID 设置关联ID(支持链式调用)

func (*OutboxEvent) WithIdempotencyKey

func (e *OutboxEvent) WithIdempotencyKey(idempotencyKey string) *OutboxEvent

WithIdempotencyKey 设置自定义幂等性键(支持链式调用) 用于业务代码自定义幂等性键的生成逻辑

参数:

  • idempotencyKey: 自定义的幂等性键

返回:

  • *OutboxEvent: 事件对象(支持链式调用)

示例:

event.WithIdempotencyKey("custom-key-123")

func (*OutboxEvent) WithTraceID

func (e *OutboxEvent) WithTraceID(traceID string) *OutboxEvent

WithTraceID 设置链路追踪ID(支持链式调用)

type OutboxPublisher

type OutboxPublisher struct {
	// contains filtered or unexported fields
}

OutboxPublisher Outbox 事件发布器 负责将 Outbox 事件发布到 EventBus

func NewOutboxPublisher

func NewOutboxPublisher(
	repo OutboxRepository,
	eventPublisher EventPublisher,
	topicMapper TopicMapper,
	config *PublisherConfig,
) *OutboxPublisher

NewOutboxPublisher 创建 Outbox 发布器

参数:

repo: 仓储
eventPublisher: 事件发布器(接口)
topicMapper: Topic 映射器
config: 配置(可选,nil 表示使用默认配置)

返回:

*OutboxPublisher: 发布器实例

Panics:

如果配置验证失败,会 panic

func (*OutboxPublisher) GetMetrics

func (p *OutboxPublisher) GetMetrics() *PublisherMetrics

GetMetrics 获取发布器指标

func (*OutboxPublisher) PublishBatch

func (p *OutboxPublisher) PublishBatch(ctx context.Context, events []*OutboxEvent) (int, error)

PublishBatch 批量发布事件(优化版本)

参数:

ctx: 上下文
events: 要发布的事件列表

返回:

int: 成功发布的事件数量
error: 发布失败时返回错误

性能优化:

  1. 批量幂等性检查(一次查询)
  2. 批量发布到 EventBus
  3. 批量更新数据库状态(一次更新)

func (*OutboxPublisher) PublishEvent

func (p *OutboxPublisher) PublishEvent(ctx context.Context, event *OutboxEvent) error

PublishEvent 发布单个事件

参数:

ctx: 上下文
event: 要发布的事件

返回:

error: 发布失败时返回错误

func (*OutboxPublisher) PublishPendingEvents

func (p *OutboxPublisher) PublishPendingEvents(ctx context.Context, limit int, tenantID string) (int, error)

PublishPendingEvents 发布所有待发布的事件

参数:

ctx: 上下文
limit: 最大发布数量
tenantID: 租户 ID(可选)

返回:

int: 成功发布的事件数量
error: 发布失败时返回错误

func (*OutboxPublisher) ResetMetrics

func (p *OutboxPublisher) ResetMetrics()

ResetMetrics 重置指标

func (*OutboxPublisher) RetryFailedEvent

func (p *OutboxPublisher) RetryFailedEvent(ctx context.Context, event *OutboxEvent) error

RetryFailedEvent 重试失败的事件

参数:

ctx: 上下文
event: 要重试的事件

返回:

error: 重试失败时返回错误

func (*OutboxPublisher) StartACKListener

func (p *OutboxPublisher) StartACKListener(ctx context.Context)

StartACKListener 启动 ACK 监听器 监听 EventBus 的异步发布结果,并更新 Outbox 事件状态

参数:

ctx: 上下文(用于控制监听器生命周期)

注意:

  • 此方法应该在应用启动时调用一次
  • 监听器会在后台运行,直到 ctx 被取消或调用 StopACKListener()
  • 重复调用此方法是安全的(会忽略后续调用)

func (*OutboxPublisher) StopACKListener

func (p *OutboxPublisher) StopACKListener()

StopACKListener 停止 ACK 监听器

type OutboxRepository

type OutboxRepository interface {
	// Save 保存事件(在事务中)
	// ctx: 上下文(可能包含事务信息)
	// event: 要保存的事件
	Save(ctx context.Context, event *OutboxEvent) error

	// SaveBatch 批量保存事件(在事务中)
	// ctx: 上下文(可能包含事务信息)
	// events: 要保存的事件列表
	SaveBatch(ctx context.Context, events []*OutboxEvent) error

	// FindPendingEvents 查找待发布的事件
	// ctx: 上下文
	// limit: 最大返回数量
	// tenantID: 租户 ID(可选,空字符串表示查询所有租户)
	FindPendingEvents(ctx context.Context, limit int, tenantID string) ([]*OutboxEvent, error)

	// FindPendingEventsWithDelay 查找创建时间超过指定延迟的待发布事件
	// 用于调度器避让机制,防止与立即发布产生竞态
	// ctx: 上下文
	// tenantID: 租户 ID(可选)
	// delaySeconds: 延迟秒数(只查询创建时间超过此延迟的事件)
	// limit: 最大返回数量
	FindPendingEventsWithDelay(ctx context.Context, tenantID string, delaySeconds int, limit int) ([]*OutboxEvent, error)

	// FindEventsForRetry 查找需要重试的失败事件
	// ctx: 上下文
	// maxRetries: 最大重试次数(只查询重试次数小于此值的事件)
	// limit: 最大返回数量
	FindEventsForRetry(ctx context.Context, maxRetries int, limit int) ([]*OutboxEvent, error)

	// FindByAggregateType 根据聚合类型查找待发布事件
	// ctx: 上下文
	// aggregateType: 聚合类型
	// limit: 最大返回数量
	FindByAggregateType(ctx context.Context, aggregateType string, limit int) ([]*OutboxEvent, error)

	// FindByID 根据 ID 查找事件
	// ctx: 上下文
	// id: 事件 ID
	FindByID(ctx context.Context, id string) (*OutboxEvent, error)

	// FindByAggregateID 根据聚合根 ID 查找事件
	// ctx: 上下文
	// aggregateID: 聚合根 ID
	// tenantID: 租户 ID(可选)
	FindByAggregateID(ctx context.Context, aggregateID string, tenantID string) ([]*OutboxEvent, error)

	// Update 更新事件
	// ctx: 上下文
	// event: 要更新的事件
	Update(ctx context.Context, event *OutboxEvent) error

	// MarkAsPublished 标记事件为已发布
	// ctx: 上下文
	// id: 事件 ID
	MarkAsPublished(ctx context.Context, id string) error

	// MarkAsFailed 标记事件为失败
	// ctx: 上下文
	// id: 事件 ID
	// err: 错误信息
	MarkAsFailed(ctx context.Context, id string, err error) error

	// IncrementRetry 增加重试次数(同时更新错误信息和最后重试时间)
	// ctx: 上下文
	// id: 事件 ID
	// errorMsg: 错误信息
	IncrementRetry(ctx context.Context, id string, errorMsg string) error

	// MarkAsMaxRetry 标记事件为超过最大重试次数
	// ctx: 上下文
	// id: 事件 ID
	// errorMsg: 错误信息
	MarkAsMaxRetry(ctx context.Context, id string, errorMsg string) error

	// IncrementRetryCount 增加重试次数(已废弃,使用 IncrementRetry 代替)
	// ctx: 上下文
	// id: 事件 ID
	// Deprecated: 使用 IncrementRetry 代替
	IncrementRetryCount(ctx context.Context, id string) error

	// Delete 删除事件
	// ctx: 上下文
	// id: 事件 ID
	Delete(ctx context.Context, id string) error

	// DeleteBatch 批量删除事件
	// ctx: 上下文
	// ids: 事件 ID 列表
	DeleteBatch(ctx context.Context, ids []string) error

	// DeletePublishedBefore 删除指定时间之前已发布的事件
	// ctx: 上下文
	// before: 时间阈值
	// tenantID: 租户 ID(可选)
	DeletePublishedBefore(ctx context.Context, before time.Time, tenantID string) (int64, error)

	// DeleteFailedBefore 删除指定时间之前失败的事件
	// ctx: 上下文
	// before: 时间阈值
	// tenantID: 租户 ID(可选)
	DeleteFailedBefore(ctx context.Context, before time.Time, tenantID string) (int64, error)

	// Count 统计事件数量
	// ctx: 上下文
	// status: 事件状态(可选,空字符串表示所有状态)
	// tenantID: 租户 ID(可选)
	Count(ctx context.Context, status EventStatus, tenantID string) (int64, error)

	// CountByStatus 按状态统计事件数量
	// ctx: 上下文
	// tenantID: 租户 ID(可选)
	// 返回:map[EventStatus]int64
	CountByStatus(ctx context.Context, tenantID string) (map[EventStatus]int64, error)

	// FindByIdempotencyKey 根据幂等性键查找事件
	// ctx: 上下文
	// idempotencyKey: 幂等性键
	// 返回:事件对象(如果不存在返回 nil)
	FindByIdempotencyKey(ctx context.Context, idempotencyKey string) (*OutboxEvent, error)

	// ExistsByIdempotencyKey 检查幂等性键是否已存在
	// ctx: 上下文
	// idempotencyKey: 幂等性键
	// 返回:true 表示已存在,false 表示不存在
	ExistsByIdempotencyKey(ctx context.Context, idempotencyKey string) (bool, error)

	// FindMaxRetryEvents 查找超过最大重试次数的事件
	// ctx: 上下文
	// limit: 最大返回数量
	// tenantID: 租户 ID(可选)
	// 返回:事件列表
	FindMaxRetryEvents(ctx context.Context, limit int, tenantID string) ([]*OutboxEvent, error)

	// BatchUpdate 批量更新事件(可选接口,用于性能优化)
	// ctx: 上下文
	// events: 要更新的事件列表
	// 返回:更新失败时返回错误
	BatchUpdate(ctx context.Context, events []*OutboxEvent) error
}

OutboxRepository Outbox 仓储接口 定义数据访问契约,不依赖具体数据库实现

type OutboxScheduler

type OutboxScheduler struct {
	// contains filtered or unexported fields
}

OutboxScheduler Outbox 调度器 负责定时轮询待发布的事件并触发发布

func NewScheduler

func NewScheduler(options ...SchedulerOption) *OutboxScheduler

NewScheduler 创建调度器

参数:

options: 函数式选项

示例:

scheduler := outbox.NewScheduler(
    outbox.WithRepository(repo),
    outbox.WithEventPublisher(eventPublisher),
    outbox.WithTopicMapper(topicMapper),
    outbox.WithSchedulerConfig(config),
)

func (*OutboxScheduler) GetMetrics

func (s *OutboxScheduler) GetMetrics() *SchedulerMetricsSnapshot

GetMetrics 获取调度器指标快照

func (*OutboxScheduler) IsRunning

func (s *OutboxScheduler) IsRunning() bool

IsRunning 判断是否正在运行(免锁版本)

func (*OutboxScheduler) Start

func (s *OutboxScheduler) Start(ctx context.Context) error

Start 启动调度器

func (*OutboxScheduler) Stop

func (s *OutboxScheduler) Stop(ctx context.Context) error

Stop 停止调度器(优雅关闭)

优雅关闭流程: 1. 设置 running = false,阻止新任务启动 2. 发送停止信号到所有循环 3. 等待所有正在进行的任务完成(使用 WaitGroup) 4. 如果超时,强制退出

参数:

ctx: 上下文(用于外部取消)

返回:

error: 停止失败时返回错误

type PrefixTopicMapper

type PrefixTopicMapper struct {
	// contains filtered or unexported fields
}

PrefixTopicMapper 基于前缀的 TopicMapper 自动为聚合类型添加前缀和后缀

func (*PrefixTopicMapper) GetTopic

func (p *PrefixTopicMapper) GetTopic(aggregateType string) string

GetTopic 实现 TopicMapper 接口

type PrometheusMetricsCollector

type PrometheusMetricsCollector struct {
	// contains filtered or unexported fields
}

PrometheusMetricsCollector Prometheus 指标收集器(示例实现)

使用方法:

  1. 添加依赖: go get github.com/prometheus/client_golang/prometheus go get github.com/prometheus/client_golang/prometheus/promauto

  2. 创建收集器: collector := outbox.NewPrometheusMetricsCollector("myapp")

  3. 注册到 Prometheus: prometheus.MustRegister(collector.publishedTotal) prometheus.MustRegister(collector.failedTotal) // ... 注册其他指标

  4. 配置发布器和调度器: publisher := outbox.NewOutboxPublisher(repo, eventPublisher, topicMapper, &outbox.PublisherConfig{ MetricsCollector: collector, })

  5. 暴露 metrics 端点: http.Handle("/metrics", promhttp.Handler()) http.ListenAndServe(":9090", nil)

注意:这是一个示例实现,实际使用时需要取消注释并添加 prometheus 依赖

func NewPrometheusMetricsCollector

func NewPrometheusMetricsCollector(namespace string) *PrometheusMetricsCollector

NewPrometheusMetricsCollector 创建 Prometheus 指标收集器

实际实现示例(需要取消注释):

func NewPrometheusMetricsCollector(namespace string) *PrometheusMetricsCollector {
    return &PrometheusMetricsCollector{
        namespace: namespace,
        publishedTotal: promauto.NewCounterVec(
            prometheus.CounterOpts{
                Namespace: namespace,
                Subsystem: "outbox",
                Name:      "published_total",
                Help:      "Total number of published events",
            },
            []string{"tenant_id", "aggregate_type", "event_type"},
        ),
        failedTotal: promauto.NewCounterVec(
            prometheus.CounterOpts{
                Namespace: namespace,
                Subsystem: "outbox",
                Name:      "failed_total",
                Help:      "Total number of failed events",
            },
            []string{"tenant_id", "aggregate_type", "event_type"},
        ),
        retryTotal: promauto.NewCounterVec(
            prometheus.CounterOpts{
                Namespace: namespace,
                Subsystem: "outbox",
                Name:      "retry_total",
                Help:      "Total number of retried events",
            },
            []string{"tenant_id", "aggregate_type", "event_type"},
        ),
        dlqTotal: promauto.NewCounterVec(
            prometheus.CounterOpts{
                Namespace: namespace,
                Subsystem: "outbox",
                Name:      "dlq_total",
                Help:      "Total number of events moved to DLQ",
            },
            []string{"tenant_id", "aggregate_type", "event_type"},
        ),
        publishDuration: promauto.NewHistogramVec(
            prometheus.HistogramOpts{
                Namespace: namespace,
                Subsystem: "outbox",
                Name:      "publish_duration_seconds",
                Help:      "Event publish duration in seconds",
                Buckets:   prometheus.DefBuckets,
            },
            []string{"tenant_id", "aggregate_type", "event_type"},
        ),
        pendingGauge: promauto.NewGaugeVec(
            prometheus.GaugeOpts{
                Namespace: namespace,
                Subsystem: "outbox",
                Name:      "pending_events",
                Help:      "Number of pending events",
            },
            []string{"tenant_id"},
        ),
        failedGauge: promauto.NewGaugeVec(
            prometheus.GaugeOpts{
                Namespace: namespace,
                Subsystem: "outbox",
                Name:      "failed_events",
                Help:      "Number of failed events",
            },
            []string{"tenant_id"},
        ),
        dlqGauge: promauto.NewGaugeVec(
            prometheus.GaugeOpts{
                Namespace: namespace,
                Subsystem: "outbox",
                Name:      "dlq_events",
                Help:      "Number of events in DLQ",
            },
            []string{"tenant_id"},
        ),
    }
}

func (*PrometheusMetricsCollector) RecordDLQ

func (c *PrometheusMetricsCollector) RecordDLQ(tenantID, aggregateType, eventType string)

RecordDLQ 记录事件进入死信队列

func (*PrometheusMetricsCollector) RecordFailed

func (c *PrometheusMetricsCollector) RecordFailed(tenantID, aggregateType, eventType string, err error)

RecordFailed 记录事件发布失败

func (*PrometheusMetricsCollector) RecordPublishDuration

func (c *PrometheusMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)

RecordPublishDuration 记录发布耗时

实际实现示例(需要取消注释):

func (c *PrometheusMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration) {
    c.publishDuration.(*prometheus.HistogramVec).With(prometheus.Labels{
        "tenant_id":      tenantID,
        "aggregate_type": aggregateType,
        "event_type":     eventType,
    }).Observe(duration.Seconds())
}

func (*PrometheusMetricsCollector) RecordPublished

func (c *PrometheusMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string)

RecordPublished 记录事件发布成功

实际实现示例(需要取消注释):

func (c *PrometheusMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string) {
    c.publishedTotal.(*prometheus.CounterVec).With(prometheus.Labels{
        "tenant_id":      tenantID,
        "aggregate_type": aggregateType,
        "event_type":     eventType,
    }).Inc()
}

func (*PrometheusMetricsCollector) RecordRetry

func (c *PrometheusMetricsCollector) RecordRetry(tenantID, aggregateType, eventType string)

RecordRetry 记录事件重试

func (*PrometheusMetricsCollector) SetDLQCount

func (c *PrometheusMetricsCollector) SetDLQCount(tenantID string, count int64)

SetDLQCount 设置死信队列事件数量

func (*PrometheusMetricsCollector) SetFailedCount

func (c *PrometheusMetricsCollector) SetFailedCount(tenantID string, count int64)

SetFailedCount 设置失败事件数量

func (*PrometheusMetricsCollector) SetPendingCount

func (c *PrometheusMetricsCollector) SetPendingCount(tenantID string, count int64)

SetPendingCount 设置待发布事件数量

type PublishResult

type PublishResult struct {
	// EventID 事件ID(来自 Envelope.EventID)
	EventID string
	// Topic 主题
	Topic string
	// Success 是否成功
	Success bool
	// Error 错误信息(失败时)
	Error error
	// Timestamp 发布时间戳
	Timestamp time.Time
	// AggregateID 聚合ID(可选,来自 Envelope)
	AggregateID string
	// EventType 事件类型(可选,来自 Envelope)
	EventType string
}

PublishResult 异步发布结果

type PublisherConfig

type PublisherConfig struct {
	// MaxRetries 最大重试次数
	MaxRetries int

	// RetryDelay 重试延迟
	RetryDelay time.Duration

	// PublishTimeout 发布超时时间
	PublishTimeout time.Duration

	// EnableMetrics 是否启用指标收集
	EnableMetrics bool

	// MetricsCollector 指标收集器(可选)
	// 用于集成 Prometheus、StatsD 等监控系统
	MetricsCollector MetricsCollector

	// ErrorHandler 错误处理器(可选)
	ErrorHandler func(event *OutboxEvent, err error)

	// ConcurrentPublish 是否启用并发发布(默认 false)
	ConcurrentPublish bool

	// PublishConcurrency 并发发布的并发数(默认 10)
	// 只有当 ConcurrentPublish = true 时生效
	PublishConcurrency int
}

PublisherConfig 发布器配置

func DefaultPublisherConfig

func DefaultPublisherConfig() *PublisherConfig

DefaultPublisherConfig 默认发布器配置

func (*PublisherConfig) Validate

func (c *PublisherConfig) Validate() error

Validate 验证配置

type PublisherMetrics

type PublisherMetrics struct {
	// PublishedCount 已发布事件数量
	PublishedCount int64

	// FailedCount 失败事件数量
	FailedCount int64

	// ErrorCount 错误事件数量(批量发布时使用)
	ErrorCount int64

	// RetryCount 重试次数
	RetryCount int64

	// LastPublishTime 最后发布时间
	LastPublishTime time.Time

	// LastError 最后错误
	LastError error
}

PublisherMetrics 发布器指标

type RepositoryStats

type RepositoryStats struct {
	// PendingCount 待发布事件数量
	PendingCount int64

	// PublishedCount 已发布事件数量
	PublishedCount int64

	// FailedCount 失败事件数量
	FailedCount int64

	// TotalCount 总事件数量
	TotalCount int64

	// OldestPendingAge 最老的待发布事件年龄
	OldestPendingAge time.Duration

	// TenantID 租户 ID
	TenantID string
}

RepositoryStats 仓储统计信息

type RepositoryStatsProvider

type RepositoryStatsProvider interface {
	// GetStats 获取统计信息
	// ctx: 上下文
	// tenantID: 租户 ID(可选)
	GetStats(ctx context.Context, tenantID string) (*RepositoryStats, error)
}

GetStats 获取仓储统计信息(可选方法) 实现此接口的仓储可以提供统计信息

type SchedulerConfig

type SchedulerConfig struct {
	// PollInterval 轮询间隔
	PollInterval time.Duration

	// BatchSize 每次处理的事件数量
	BatchSize int

	// TenantID 租户 ID(可选,空字符串表示处理所有租户)
	TenantID string

	// CleanupInterval 清理间隔
	CleanupInterval time.Duration

	// CleanupRetention 清理保留时间(已发布事件保留多久)
	CleanupRetention time.Duration

	// HealthCheckInterval 健康检查间隔
	HealthCheckInterval time.Duration

	// EnableHealthCheck 是否启用健康检查
	EnableHealthCheck bool

	// EnableCleanup 是否启用自动清理
	EnableCleanup bool

	// EnableMetrics 是否启用指标收集
	EnableMetrics bool

	// EnableRetry 是否启用失败重试
	EnableRetry bool

	// RetryInterval 重试间隔
	RetryInterval time.Duration

	// MaxRetries 最大重试次数
	MaxRetries int

	// EnableDLQ 是否启用死信队列
	EnableDLQ bool

	// DLQInterval 死信队列处理间隔
	DLQInterval time.Duration

	// DLQHandler 死信队列处理器(可选)
	// 用于处理超过最大重试次数的失败事件
	DLQHandler DLQHandler

	// DLQAlertHandler 死信队列告警处理器(可选)
	// 用于发送告警通知
	DLQAlertHandler DLQAlertHandler

	// MetricsCollector 指标收集器(可选)
	// 用于集成 Prometheus、StatsD 等监控系统
	MetricsCollector MetricsCollector

	// ShutdownTimeout 优雅关闭超时时间
	// 调度器停止时等待正在处理的任务完成的最大时间
	// 默认:30 秒
	ShutdownTimeout time.Duration
}

SchedulerConfig 调度器配置

func DefaultSchedulerConfig

func DefaultSchedulerConfig() *SchedulerConfig

DefaultSchedulerConfig 默认调度器配置

func (*SchedulerConfig) Validate

func (c *SchedulerConfig) Validate() error

Validate 验证配置

type SchedulerMetrics

type SchedulerMetrics struct {
	// PollCount 轮询次数(原子操作)
	PollCount atomic.Int64

	// ProcessedCount 处理的事件数量(原子操作)
	ProcessedCount atomic.Int64

	// ErrorCount 错误次数(原子操作)
	ErrorCount atomic.Int64

	// LastPollTime 最后轮询时间(原子操作)
	LastPollTime atomic.Value // time.Time

	// LastCleanupTime 最后清理时间(原子操作)
	LastCleanupTime atomic.Value // time.Time

	// RetryCount 重试次数(原子操作)
	RetryCount atomic.Int64

	// RetriedCount 重试成功的事件数量(原子操作)
	RetriedCount atomic.Int64

	// LastRetryTime 最后重试时间(原子操作)
	LastRetryTime atomic.Value // time.Time

	// LastError 最后错误(原子操作)
	LastError atomic.Value // error
}

SchedulerMetrics 调度器指标 使用 atomic 操作保证并发安全,避免数据竞争

func NewSchedulerMetrics

func NewSchedulerMetrics() *SchedulerMetrics

NewSchedulerMetrics 创建调度器指标

type SchedulerMetricsSnapshot

type SchedulerMetricsSnapshot struct {
	PollCount       int64
	ProcessedCount  int64
	ErrorCount      int64
	LastPollTime    time.Time
	LastCleanupTime time.Time
	RetryCount      int64
	RetriedCount    int64
	LastRetryTime   time.Time
	LastError       error
}

SchedulerMetricsSnapshot 调度器指标快照(用于读取)

type SchedulerOption

type SchedulerOption func(*schedulerOptions)

SchedulerOption 调度器选项

func WithBatchSize

func WithBatchSize(size int) SchedulerOption

WithBatchSize 设置批量大小

func WithCleanupEnabled

func WithCleanupEnabled(enabled bool) SchedulerOption

WithCleanupEnabled 设置是否启用清理

func WithEventPublisher

func WithEventPublisher(eventPublisher EventPublisher) SchedulerOption

WithEventPublisher 设置事件发布器

func WithHealthCheckEnabled

func WithHealthCheckEnabled(enabled bool) SchedulerOption

WithHealthCheckEnabled 设置是否启用健康检查

func WithMetricsEnabled

func WithMetricsEnabled(enabled bool) SchedulerOption

WithMetricsEnabled 设置是否启用指标收集

func WithPollInterval

func WithPollInterval(interval time.Duration) SchedulerOption

WithPollInterval 设置轮询间隔

func WithPublisherConfig

func WithPublisherConfig(config *PublisherConfig) SchedulerOption

WithPublisherConfig 设置发布器配置

func WithRepository

func WithRepository(repo OutboxRepository) SchedulerOption

WithRepository 设置仓储

func WithSchedulerConfig

func WithSchedulerConfig(config *SchedulerConfig) SchedulerOption

WithSchedulerConfig 设置调度器配置

func WithTenantID

func WithTenantID(tenantID string) SchedulerOption

WithTenantID 设置租户 ID

func WithTopicMapper

func WithTopicMapper(topicMapper TopicMapper) SchedulerOption

WithTopicMapper 设置 Topic 映射器

type StaticTopicMapper

type StaticTopicMapper struct {
	// contains filtered or unexported fields
}

StaticTopicMapper 静态 TopicMapper 所有聚合类型都映射到同一个 Topic

func (*StaticTopicMapper) GetTopic

func (s *StaticTopicMapper) GetTopic(aggregateType string) string

GetTopic 实现 TopicMapper 接口

type TopicMapper

type TopicMapper interface {
	// GetTopic 根据聚合类型获取 Topic
	//
	// 参数:
	//   aggregateType: 聚合类型(例如:"Archive", "Media", "User")
	//
	// 返回:
	//   string: Topic 名称(例如:"archive-events", "media-events")
	//
	// 注意:
	//   - 如果聚合类型未映射,应该返回默认 Topic 或错误
	//   - Topic 名称应该符合 EventBus 的命名规范
	GetTopic(aggregateType string) string
}

TopicMapper Topic 映射器接口 将聚合类型映射到 EventBus Topic 由业务微服务实现,定义业务特定的映射规则

func NewChainTopicMapper

func NewChainTopicMapper(mappers ...TopicMapper) TopicMapper

NewChainTopicMapper 创建链式 TopicMapper

参数:

mappers: TopicMapper 列表

示例:

mapper := NewChainTopicMapper(
    customMapper,
    defaultMapper,
)

func NewMapBasedTopicMapper

func NewMapBasedTopicMapper(mapping map[string]string, defaultTopic string) TopicMapper

NewMapBasedTopicMapper 创建基于 Map 的 TopicMapper

参数:

mapping: 聚合类型到 Topic 的映射表
defaultTopic: 默认 Topic(可选,空字符串表示无默认值)

示例:

mapper := NewMapBasedTopicMapper(map[string]string{
    "Archive": "archive-events",
    "Media":   "media-events",
    "User":    "user-events",
}, "default-events")

func NewPrefixTopicMapper

func NewPrefixTopicMapper(prefix, suffix, separator string) TopicMapper

NewPrefixTopicMapper 创建基于前缀的 TopicMapper

参数:

prefix: Topic 前缀
suffix: Topic 后缀
separator: 分隔符

示例:

mapper := NewPrefixTopicMapper("jxt", "events", ".")
// Archive -> jxt.archive.events
// Media -> jxt.media.events

func NewStaticTopicMapper

func NewStaticTopicMapper(topic string) TopicMapper

NewStaticTopicMapper 创建静态 TopicMapper

参数:

topic: 固定的 Topic 名称

示例:

mapper := NewStaticTopicMapper("all-events")
// 所有聚合类型都映射到 "all-events"

type TransactionalRepository

type TransactionalRepository interface {
	OutboxRepository

	// SaveInTx 在事务中保存事件
	// ctx: 上下文
	// tx: 事务对象(具体类型由实现决定,例如 *gorm.DB)
	// event: 要保存的事件
	SaveInTx(ctx context.Context, tx interface{}, event *OutboxEvent) error

	// BeginTx 开始事务
	// ctx: 上下文
	// 返回:事务对象
	BeginTx(ctx context.Context) (interface{}, error)

	// CommitTx 提交事务
	// tx: 事务对象
	CommitTx(tx interface{}) error

	// RollbackTx 回滚事务
	// tx: 事务对象
	RollbackTx(tx interface{}) error
}

TransactionalRepository 支持事务的仓储接口(可选) 如果数据库支持事务,可以实现此接口

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL