Documentation
¶
Index ¶
- Variables
- type ChainTopicMapper
- type CompositeDLQHandler
- type DLQAlertHandler
- type DLQAlertHandlerFunc
- type DLQHandler
- type DLQHandlerFunc
- type EmailAlertHandler
- type Envelope
- type EventPublisher
- type EventStatus
- type FuncTopicMapper
- type InMemoryMetricsCollector
- func (c *InMemoryMetricsCollector) GetSnapshot() map[string]interface{}
- func (c *InMemoryMetricsCollector) RecordDLQ(tenantID, aggregateType, eventType string)
- func (c *InMemoryMetricsCollector) RecordFailed(tenantID, aggregateType, eventType string, err error)
- func (c *InMemoryMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)
- func (c *InMemoryMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string)
- func (c *InMemoryMetricsCollector) RecordRetry(tenantID, aggregateType, eventType string)
- func (c *InMemoryMetricsCollector) Reset()
- func (c *InMemoryMetricsCollector) SetDLQCount(tenantID string, count int64)
- func (c *InMemoryMetricsCollector) SetFailedCount(tenantID string, count int64)
- func (c *InMemoryMetricsCollector) SetPendingCount(tenantID string, count int64)
- type LoggingDLQHandler
- type MapBasedTopicMapper
- type MetricsCollector
- type NoOpDLQAlertHandler
- type NoOpDLQHandler
- type NoOpEventPublisher
- type NoOpMetricsCollector
- func (n *NoOpMetricsCollector) RecordDLQ(tenantID, aggregateType, eventType string)
- func (n *NoOpMetricsCollector) RecordFailed(tenantID, aggregateType, eventType string, err error)
- func (n *NoOpMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)
- func (n *NoOpMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string)
- func (n *NoOpMetricsCollector) RecordRetry(tenantID, aggregateType, eventType string)
- func (n *NoOpMetricsCollector) SetDLQCount(tenantID string, count int64)
- func (n *NoOpMetricsCollector) SetFailedCount(tenantID string, count int64)
- func (n *NoOpMetricsCollector) SetPendingCount(tenantID string, count int64)
- type OutboxEvent
- func (e *OutboxEvent) CanRetry() bool
- func (e *OutboxEvent) Clone() *OutboxEvent
- func (e *OutboxEvent) GetIdempotencyKey() string
- func (e *OutboxEvent) GetPayloadAs(v interface{}) error
- func (e *OutboxEvent) IncrementRetry(errorMessage string)
- func (e *OutboxEvent) IsExpired(maxAge time.Duration) bool
- func (e *OutboxEvent) IsFailed() bool
- func (e *OutboxEvent) IsMaxRetry() bool
- func (e *OutboxEvent) IsPending() bool
- func (e *OutboxEvent) IsPublished() bool
- func (e *OutboxEvent) MarkAsFailed(err error)
- func (e *OutboxEvent) MarkAsMaxRetry(errorMessage string)
- func (e *OutboxEvent) MarkAsPublished()
- func (e *OutboxEvent) ResetForRetry()
- func (e *OutboxEvent) SetPayload(payload interface{}) error
- func (e *OutboxEvent) ShouldPublishNow() bool
- func (e *OutboxEvent) ToEnvelope() interface{}
- func (e *OutboxEvent) WithCorrelationID(correlationID string) *OutboxEvent
- func (e *OutboxEvent) WithIdempotencyKey(idempotencyKey string) *OutboxEvent
- func (e *OutboxEvent) WithTraceID(traceID string) *OutboxEvent
- type OutboxPublisher
- func (p *OutboxPublisher) GetMetrics() *PublisherMetrics
- func (p *OutboxPublisher) PublishBatch(ctx context.Context, events []*OutboxEvent) (int, error)
- func (p *OutboxPublisher) PublishEvent(ctx context.Context, event *OutboxEvent) error
- func (p *OutboxPublisher) PublishPendingEvents(ctx context.Context, limit int, tenantID string) (int, error)
- func (p *OutboxPublisher) ResetMetrics()
- func (p *OutboxPublisher) RetryFailedEvent(ctx context.Context, event *OutboxEvent) error
- func (p *OutboxPublisher) StartACKListener(ctx context.Context)
- func (p *OutboxPublisher) StopACKListener()
- type OutboxRepository
- type OutboxScheduler
- type PrefixTopicMapper
- type PrometheusMetricsCollector
- func (c *PrometheusMetricsCollector) RecordDLQ(tenantID, aggregateType, eventType string)
- func (c *PrometheusMetricsCollector) RecordFailed(tenantID, aggregateType, eventType string, err error)
- func (c *PrometheusMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)
- func (c *PrometheusMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string)
- func (c *PrometheusMetricsCollector) RecordRetry(tenantID, aggregateType, eventType string)
- func (c *PrometheusMetricsCollector) SetDLQCount(tenantID string, count int64)
- func (c *PrometheusMetricsCollector) SetFailedCount(tenantID string, count int64)
- func (c *PrometheusMetricsCollector) SetPendingCount(tenantID string, count int64)
- type PublishResult
- type PublisherConfig
- type PublisherMetrics
- type RepositoryStats
- type RepositoryStatsProvider
- type SchedulerConfig
- type SchedulerMetrics
- type SchedulerMetricsSnapshot
- type SchedulerOption
- func WithBatchSize(size int) SchedulerOption
- func WithCleanupEnabled(enabled bool) SchedulerOption
- func WithEventPublisher(eventPublisher EventPublisher) SchedulerOption
- func WithHealthCheckEnabled(enabled bool) SchedulerOption
- func WithMetricsEnabled(enabled bool) SchedulerOption
- func WithPollInterval(interval time.Duration) SchedulerOption
- func WithPublisherConfig(config *PublisherConfig) SchedulerOption
- func WithRepository(repo OutboxRepository) SchedulerOption
- func WithSchedulerConfig(config *SchedulerConfig) SchedulerOption
- func WithTenantID(tenantID string) SchedulerOption
- func WithTopicMapper(topicMapper TopicMapper) SchedulerOption
- type StaticTopicMapper
- type TopicMapper
- type TransactionalRepository
Constants ¶
This section is empty.
Variables ¶
var DefaultTopicMapper = FuncTopicMapper(func(aggregateType string) string { return fmt.Sprintf("%s-events", aggregateType) })
DefaultTopicMapper 默认 TopicMapper 使用简单的命名规则:{aggregateType}-events
Functions ¶
This section is empty.
Types ¶
type ChainTopicMapper ¶
type ChainTopicMapper struct {
// contains filtered or unexported fields
}
ChainTopicMapper 链式 TopicMapper 依次尝试多个 TopicMapper,直到找到非空 Topic
func (*ChainTopicMapper) GetTopic ¶
func (c *ChainTopicMapper) GetTopic(aggregateType string) string
GetTopic 实现 TopicMapper 接口
type CompositeDLQHandler ¶
type CompositeDLQHandler struct {
// contains filtered or unexported fields
}
CompositeDLQHandler 组合 DLQ 处理器(示例实现) 允许同时使用多个 DLQ 处理器
func NewCompositeDLQHandler ¶
func NewCompositeDLQHandler(handlers ...DLQHandler) *CompositeDLQHandler
NewCompositeDLQHandler 创建组合 DLQ 处理器
func (*CompositeDLQHandler) Handle ¶
func (h *CompositeDLQHandler) Handle(ctx context.Context, event *OutboxEvent) error
Handle 处理死信事件(依次调用所有处理器)
type DLQAlertHandler ¶
type DLQAlertHandler interface {
// Alert 发送告警
// ctx: 上下文
// event: 失败的事件
// 返回:发送失败时返回错误
Alert(ctx context.Context, event *OutboxEvent) error
}
DLQAlertHandler 死信队列告警处理器接口 用于发送告警通知(邮件、短信、钉钉、企业微信等)
实现示例:
type EmailAlertHandler struct {
emailService EmailService
}
func (h *EmailAlertHandler) Alert(ctx context.Context, event *OutboxEvent) error {
subject := fmt.Sprintf("Outbox Event Failed: %s", event.EventType)
body := fmt.Sprintf("Event ID: %s\nError: %s", event.ID, event.LastError)
return h.emailService.Send(subject, body)
}
type DLQAlertHandlerFunc ¶
type DLQAlertHandlerFunc func(ctx context.Context, event *OutboxEvent) error
DLQAlertHandlerFunc 函数式 DLQAlertHandler 允许使用函数作为 DLQAlertHandler 实现
func (DLQAlertHandlerFunc) Alert ¶
func (f DLQAlertHandlerFunc) Alert(ctx context.Context, event *OutboxEvent) error
Alert 实现 DLQAlertHandler 接口
type DLQHandler ¶
type DLQHandler interface {
// Handle 处理死信事件
// ctx: 上下文
// event: 失败的事件
// 返回:处理失败时返回错误
Handle(ctx context.Context, event *OutboxEvent) error
}
DLQHandler 死信队列处理器接口 用于处理超过最大重试次数的失败事件
实现示例:
type MyDLQHandler struct {
logger *zap.Logger
}
func (h *MyDLQHandler) Handle(ctx context.Context, event *OutboxEvent) error {
// 1. 记录到专门的死信队列表
// 2. 发送到死信队列 Topic
// 3. 记录详细日志
h.logger.Error("Event moved to DLQ",
zap.String("event_id", event.ID),
zap.String("error", event.LastError))
return nil
}
type DLQHandlerFunc ¶
type DLQHandlerFunc func(ctx context.Context, event *OutboxEvent) error
DLQHandlerFunc 函数式 DLQHandler 允许使用函数作为 DLQHandler 实现
func (DLQHandlerFunc) Handle ¶
func (f DLQHandlerFunc) Handle(ctx context.Context, event *OutboxEvent) error
Handle 实现 DLQHandler 接口
type EmailAlertHandler ¶
type EmailAlertHandler struct {
// contains filtered or unexported fields
}
EmailAlertHandler 邮件告警处理器(示例实现) 发送邮件告警
func NewEmailAlertHandler ¶
func NewEmailAlertHandler(smtpHost string, smtpPort int, from string, to []string) *EmailAlertHandler
NewEmailAlertHandler 创建邮件告警处理器
func (*EmailAlertHandler) Alert ¶
func (h *EmailAlertHandler) Alert(ctx context.Context, event *OutboxEvent) error
Alert 发送告警
type Envelope ¶
type Envelope struct {
// EventID 事件唯一ID(必填,用于 Outbox 模式)
EventID string `json:"event_id"`
// AggregateID 聚合根ID
AggregateID string `json:"aggregate_id"`
// EventType 事件类型
EventType string `json:"event_type"`
// EventVersion 事件版本
EventVersion int64 `json:"event_version"`
// Payload 事件负载
Payload []byte `json:"payload"`
// Timestamp 事件时间戳
Timestamp time.Time `json:"timestamp"`
// TraceID 链路追踪ID(可选)
TraceID string `json:"trace_id,omitempty"`
// CorrelationID 关联ID(可选)
CorrelationID string `json:"correlation_id,omitempty"`
}
Envelope Envelope 消息结构(与 eventbus.Envelope 兼容) 为了避免 Outbox 包依赖 EventBus 包,这里定义一个简化的 Envelope 结构
type EventPublisher ¶
type EventPublisher interface {
// PublishEnvelope 发布 Envelope 消息(推荐)
// ✅ 支持 Outbox 模式:通过 GetPublishResultChannel() 获取 ACK 结果
// ✅ 可靠投递:不容许丢失的领域事件必须使用此方法
//
// 参数:
// ctx: 上下文(可能包含租户信息、追踪信息等)
// topic: 目标 topic(由 TopicMapper 提供)
// envelope: Envelope 消息(包含 EventID、AggregateID、EventType 等元数据)
//
// 返回:
// error: 提交失败时返回错误(注意:立即返回,不等待 ACK)
//
// 注意:
// - 此方法是异步的,立即返回,不等待 ACK
// - ACK 结果通过 GetPublishResultChannel() 异步通知
// - 实现者应该记录日志
// - 实现者应该处理超时
PublishEnvelope(ctx context.Context, topic string, envelope *Envelope) error
// GetPublishResultChannel 获取异步发布结果通道
// ⚠️ 仅 PublishEnvelope() 发送 ACK 结果到此通道
// 用于 Outbox Processor 监听发布结果并更新 Outbox 状态
//
// 返回:
// <-chan *PublishResult: 只读的发布结果通道
GetPublishResultChannel() <-chan *PublishResult
}
EventPublisher 事件发布器接口 Outbox 组件通过这个接口发布事件,不依赖具体的 EventBus 实现 这是依赖注入的核心,符合依赖倒置原则(DIP)
设计原则: 1. 最小化接口:只包含 Outbox 需要的方法 2. 零外部依赖:不依赖任何其他包 3. 清晰语义:方法签名简单明了
使用方式: 业务微服务需要创建一个适配器,将具体的 EventBus 实现适配为 EventPublisher 接口
示例(推荐使用 PublishEnvelope):
type EventBusAdapter struct {
eventBus eventbus.EventBus
}
func (a *EventBusAdapter) PublishEnvelope(ctx context.Context, topic string, envelope *Envelope) error {
return a.eventBus.PublishEnvelope(ctx, topic, envelope)
}
func (a *EventBusAdapter) GetPublishResultChannel() <-chan *PublishResult {
return a.eventBus.GetPublishResultChannel()
}
func NewNoOpEventPublisher ¶
func NewNoOpEventPublisher() EventPublisher
NewNoOpEventPublisher 创建空操作 EventPublisher
type EventStatus ¶
type EventStatus string
EventStatus 事件状态
const ( // EventStatusPending 待发布 EventStatusPending EventStatus = "pending" // EventStatusPublished 已发布 EventStatusPublished EventStatus = "published" // EventStatusFailed 发布失败 EventStatusFailed EventStatus = "failed" // EventStatusMaxRetry 超过最大重试次数 EventStatusMaxRetry EventStatus = "max_retry" )
type FuncTopicMapper ¶
FuncTopicMapper 函数式 TopicMapper 允许使用函数作为 TopicMapper 实现
func (FuncTopicMapper) GetTopic ¶
func (f FuncTopicMapper) GetTopic(aggregateType string) string
GetTopic 实现 TopicMapper 接口
type InMemoryMetricsCollector ¶
type InMemoryMetricsCollector struct {
// 计数器
PublishedCount int64
FailedCount int64
RetryCount int64
DLQCount int64
// 按租户统计
PendingByTenant map[string]int64
FailedByTenant map[string]int64
DLQByTenant map[string]int64
// 按事件类型统计
PublishedByType map[string]int64
FailedByType map[string]int64
// 耗时统计
TotalDuration time.Duration
AvgDuration time.Duration
MaxDuration time.Duration
MinDuration time.Duration
DurationCount int64
// contains filtered or unexported fields
}
InMemoryMetricsCollector 内存指标收集器(用于测试和简单场景)
func NewInMemoryMetricsCollector ¶
func NewInMemoryMetricsCollector() *InMemoryMetricsCollector
NewInMemoryMetricsCollector 创建内存指标收集器
func (*InMemoryMetricsCollector) GetSnapshot ¶
func (c *InMemoryMetricsCollector) GetSnapshot() map[string]interface{}
GetSnapshot 获取指标快照(用于测试和调试)
func (*InMemoryMetricsCollector) RecordDLQ ¶
func (c *InMemoryMetricsCollector) RecordDLQ(tenantID, aggregateType, eventType string)
RecordDLQ 记录事件进入死信队列
func (*InMemoryMetricsCollector) RecordFailed ¶
func (c *InMemoryMetricsCollector) RecordFailed(tenantID, aggregateType, eventType string, err error)
RecordFailed 记录事件发布失败
func (*InMemoryMetricsCollector) RecordPublishDuration ¶
func (c *InMemoryMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)
RecordPublishDuration 记录发布耗时
func (*InMemoryMetricsCollector) RecordPublished ¶
func (c *InMemoryMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string)
RecordPublished 记录事件发布成功
func (*InMemoryMetricsCollector) RecordRetry ¶
func (c *InMemoryMetricsCollector) RecordRetry(tenantID, aggregateType, eventType string)
RecordRetry 记录事件重试
func (*InMemoryMetricsCollector) Reset ¶
func (c *InMemoryMetricsCollector) Reset()
Reset 重置所有指标(用于测试)
func (*InMemoryMetricsCollector) SetDLQCount ¶
func (c *InMemoryMetricsCollector) SetDLQCount(tenantID string, count int64)
SetDLQCount 设置死信队列事件数量
func (*InMemoryMetricsCollector) SetFailedCount ¶
func (c *InMemoryMetricsCollector) SetFailedCount(tenantID string, count int64)
SetFailedCount 设置失败事件数量
func (*InMemoryMetricsCollector) SetPendingCount ¶
func (c *InMemoryMetricsCollector) SetPendingCount(tenantID string, count int64)
SetPendingCount 设置待发布事件数量
type LoggingDLQHandler ¶
type LoggingDLQHandler struct {
// contains filtered or unexported fields
}
LoggingDLQHandler 日志记录 DLQ 处理器(示例实现) 将死信事件记录到日志中
func NewLoggingDLQHandler ¶
func NewLoggingDLQHandler() *LoggingDLQHandler
NewLoggingDLQHandler 创建日志记录 DLQ 处理器
func (*LoggingDLQHandler) Handle ¶
func (h *LoggingDLQHandler) Handle(ctx context.Context, event *OutboxEvent) error
Handle 处理死信事件
type MapBasedTopicMapper ¶
type MapBasedTopicMapper struct {
// contains filtered or unexported fields
}
MapBasedTopicMapper 基于 Map 的 TopicMapper 实现 提供简单的映射表实现
func (*MapBasedTopicMapper) GetTopic ¶
func (m *MapBasedTopicMapper) GetTopic(aggregateType string) string
GetTopic 实现 TopicMapper 接口
type MetricsCollector ¶
type MetricsCollector interface {
// RecordPublished 记录事件发布成功
RecordPublished(tenantID, aggregateType, eventType string)
// RecordFailed 记录事件发布失败
RecordFailed(tenantID, aggregateType, eventType string, err error)
// RecordRetry 记录事件重试
RecordRetry(tenantID, aggregateType, eventType string)
// RecordDLQ 记录事件进入死信队列
RecordDLQ(tenantID, aggregateType, eventType string)
// RecordPublishDuration 记录发布耗时
RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)
// SetPendingCount 设置待发布事件数量
SetPendingCount(tenantID string, count int64)
// SetFailedCount 设置失败事件数量
SetFailedCount(tenantID string, count int64)
// SetDLQCount 设置死信队列事件数量
SetDLQCount(tenantID string, count int64)
}
MetricsCollector Outbox 指标收集器接口 用于集成 Prometheus、StatsD 等监控系统
实现示例(Prometheus):
type PrometheusMetricsCollector struct {
publishedTotal prometheus.Counter
failedTotal prometheus.Counter
retryTotal prometheus.Counter
publishDuration prometheus.Histogram
pendingGauge prometheus.Gauge
}
func (c *PrometheusMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string) {
c.publishedTotal.With(prometheus.Labels{
"tenant_id": tenantID,
"aggregate_type": aggregateType,
"event_type": eventType,
}).Inc()
}
type NoOpDLQAlertHandler ¶
type NoOpDLQAlertHandler struct{}
NoOpDLQAlertHandler 空操作 DLQAlertHandler(默认实现)
func (*NoOpDLQAlertHandler) Alert ¶
func (n *NoOpDLQAlertHandler) Alert(ctx context.Context, event *OutboxEvent) error
Alert 实现 DLQAlertHandler 接口(什么都不做)
type NoOpDLQHandler ¶
type NoOpDLQHandler struct{}
NoOpDLQHandler 空操作 DLQHandler(默认实现)
func (*NoOpDLQHandler) Handle ¶
func (n *NoOpDLQHandler) Handle(ctx context.Context, event *OutboxEvent) error
Handle 实现 DLQHandler 接口(什么都不做)
type NoOpEventPublisher ¶
type NoOpEventPublisher struct {
// contains filtered or unexported fields
}
NoOpEventPublisher 空操作 EventPublisher(用于测试)
func (*NoOpEventPublisher) GetPublishResultChannel ¶
func (n *NoOpEventPublisher) GetPublishResultChannel() <-chan *PublishResult
GetPublishResultChannel 实现 EventPublisher 接口
func (*NoOpEventPublisher) PublishEnvelope ¶
func (n *NoOpEventPublisher) PublishEnvelope(ctx context.Context, topic string, envelope *Envelope) error
PublishEnvelope 实现 EventPublisher 接口(什么都不做,立即返回成功)
type NoOpMetricsCollector ¶
type NoOpMetricsCollector struct{}
NoOpMetricsCollector 空操作指标收集器(默认实现)
func (*NoOpMetricsCollector) RecordDLQ ¶
func (n *NoOpMetricsCollector) RecordDLQ(tenantID, aggregateType, eventType string)
RecordDLQ 实现 MetricsCollector 接口
func (*NoOpMetricsCollector) RecordFailed ¶
func (n *NoOpMetricsCollector) RecordFailed(tenantID, aggregateType, eventType string, err error)
RecordFailed 实现 MetricsCollector 接口
func (*NoOpMetricsCollector) RecordPublishDuration ¶
func (n *NoOpMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)
RecordPublishDuration 实现 MetricsCollector 接口
func (*NoOpMetricsCollector) RecordPublished ¶
func (n *NoOpMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string)
RecordPublished 实现 MetricsCollector 接口
func (*NoOpMetricsCollector) RecordRetry ¶
func (n *NoOpMetricsCollector) RecordRetry(tenantID, aggregateType, eventType string)
RecordRetry 实现 MetricsCollector 接口
func (*NoOpMetricsCollector) SetDLQCount ¶
func (n *NoOpMetricsCollector) SetDLQCount(tenantID string, count int64)
SetDLQCount 实现 MetricsCollector 接口
func (*NoOpMetricsCollector) SetFailedCount ¶
func (n *NoOpMetricsCollector) SetFailedCount(tenantID string, count int64)
SetFailedCount 实现 MetricsCollector 接口
func (*NoOpMetricsCollector) SetPendingCount ¶
func (n *NoOpMetricsCollector) SetPendingCount(tenantID string, count int64)
SetPendingCount 实现 MetricsCollector 接口
type OutboxEvent ¶
type OutboxEvent struct {
// ID 事件唯一标识(UUID)
ID string
// TenantID 租户 ID(多租户支持)
TenantID string
// AggregateID 聚合根 ID
AggregateID string
// AggregateType 聚合根类型(用于 Topic 映射)
// 例如:"Archive", "Media", "User"
AggregateType string
// EventType 事件类型
// 例如:"ArchiveCreated", "MediaUploaded", "UserRegistered"
EventType string
// Payload 事件负载(JSON 格式)
Payload json.RawMessage
// Status 事件状态
Status EventStatus
// RetryCount 重试次数
RetryCount int
// MaxRetries 最大重试次数
MaxRetries int
// LastError 最后一次错误信息
LastError string
// CreatedAt 创建时间
CreatedAt time.Time
// UpdatedAt 更新时间
UpdatedAt time.Time
// PublishedAt 发布时间
PublishedAt *time.Time
// ScheduledAt 计划发布时间(延迟发布)
ScheduledAt *time.Time
// LastRetryAt 最后重试时间
LastRetryAt *time.Time
// Version 事件版本(用于事件演化)
Version int64
// TraceID 链路追踪ID(用于分布式追踪)
// 用于追踪事件在多个微服务之间的流转
TraceID string
// CorrelationID 关联ID(用于关联相关事件)
// 用于关联同一业务流程中的多个事件(例如:Saga 模式)
CorrelationID string
// IdempotencyKey 幂等性键(用于防止重复发布)
// 格式建议:{TenantID}:{AggregateType}:{AggregateID}:{EventType}:{Timestamp}
// 或者使用业务自定义的唯一标识
// 用于在发布前检查是否已经发布过相同的事件
IdempotencyKey string
}
OutboxEvent Outbox 事件领域模型 完全数据库无关,不包含任何数据库标签
func NewOutboxEvent ¶
func NewOutboxEvent( tenantID string, aggregateID string, aggregateType string, eventType string, payload interface{}, ) (*OutboxEvent, error)
NewOutboxEvent 创建新的 Outbox 事件
func (*OutboxEvent) GetIdempotencyKey ¶
func (e *OutboxEvent) GetIdempotencyKey() string
GetIdempotencyKey 获取幂等性键
func (*OutboxEvent) GetPayloadAs ¶
func (e *OutboxEvent) GetPayloadAs(v interface{}) error
GetPayloadAs 将 Payload 反序列化为指定类型
func (*OutboxEvent) IncrementRetry ¶
func (e *OutboxEvent) IncrementRetry(errorMessage string)
IncrementRetry 增加重试次数
func (*OutboxEvent) IsExpired ¶
func (e *OutboxEvent) IsExpired(maxAge time.Duration) bool
IsExpired 判断事件是否过期(超过指定时间未发布)
func (*OutboxEvent) MarkAsMaxRetry ¶
func (e *OutboxEvent) MarkAsMaxRetry(errorMessage string)
MarkAsMaxRetry 标记为超过最大重试次数
func (*OutboxEvent) MarkAsPublished ¶
func (e *OutboxEvent) MarkAsPublished()
MarkAsPublished 标记为已发布
func (*OutboxEvent) ResetForRetry ¶
func (e *OutboxEvent) ResetForRetry()
ResetForRetry 重置为待发布状态(用于重试)
func (*OutboxEvent) SetPayload ¶
func (e *OutboxEvent) SetPayload(payload interface{}) error
SetPayload 设置 Payload
func (*OutboxEvent) ShouldPublishNow ¶
func (e *OutboxEvent) ShouldPublishNow() bool
ShouldPublishNow 判断是否应该立即发布
func (*OutboxEvent) ToEnvelope ¶
func (e *OutboxEvent) ToEnvelope() interface{}
ToEnvelope 转换为 EventBus Envelope 用于将 OutboxEvent 转换为 EventBus 的统一消息包络格式
func (*OutboxEvent) WithCorrelationID ¶
func (e *OutboxEvent) WithCorrelationID(correlationID string) *OutboxEvent
WithCorrelationID 设置关联ID(支持链式调用)
func (*OutboxEvent) WithIdempotencyKey ¶
func (e *OutboxEvent) WithIdempotencyKey(idempotencyKey string) *OutboxEvent
WithIdempotencyKey 设置自定义幂等性键(支持链式调用) 用于业务代码自定义幂等性键的生成逻辑
参数:
- idempotencyKey: 自定义的幂等性键
返回:
- *OutboxEvent: 事件对象(支持链式调用)
示例:
event.WithIdempotencyKey("custom-key-123")
func (*OutboxEvent) WithTraceID ¶
func (e *OutboxEvent) WithTraceID(traceID string) *OutboxEvent
WithTraceID 设置链路追踪ID(支持链式调用)
type OutboxPublisher ¶
type OutboxPublisher struct {
// contains filtered or unexported fields
}
OutboxPublisher Outbox 事件发布器 负责将 Outbox 事件发布到 EventBus
func NewOutboxPublisher ¶
func NewOutboxPublisher( repo OutboxRepository, eventPublisher EventPublisher, topicMapper TopicMapper, config *PublisherConfig, ) *OutboxPublisher
NewOutboxPublisher 创建 Outbox 发布器
参数:
repo: 仓储 eventPublisher: 事件发布器(接口) topicMapper: Topic 映射器 config: 配置(可选,nil 表示使用默认配置)
返回:
*OutboxPublisher: 发布器实例
Panics:
如果配置验证失败,会 panic
func (*OutboxPublisher) GetMetrics ¶
func (p *OutboxPublisher) GetMetrics() *PublisherMetrics
GetMetrics 获取发布器指标
func (*OutboxPublisher) PublishBatch ¶
func (p *OutboxPublisher) PublishBatch(ctx context.Context, events []*OutboxEvent) (int, error)
PublishBatch 批量发布事件(优化版本)
参数:
ctx: 上下文 events: 要发布的事件列表
返回:
int: 成功发布的事件数量 error: 发布失败时返回错误
性能优化:
- 批量幂等性检查(一次查询)
- 批量发布到 EventBus
- 批量更新数据库状态(一次更新)
func (*OutboxPublisher) PublishEvent ¶
func (p *OutboxPublisher) PublishEvent(ctx context.Context, event *OutboxEvent) error
PublishEvent 发布单个事件
参数:
ctx: 上下文 event: 要发布的事件
返回:
error: 发布失败时返回错误
func (*OutboxPublisher) PublishPendingEvents ¶
func (p *OutboxPublisher) PublishPendingEvents(ctx context.Context, limit int, tenantID string) (int, error)
PublishPendingEvents 发布所有待发布的事件
参数:
ctx: 上下文 limit: 最大发布数量 tenantID: 租户 ID(可选)
返回:
int: 成功发布的事件数量 error: 发布失败时返回错误
func (*OutboxPublisher) RetryFailedEvent ¶
func (p *OutboxPublisher) RetryFailedEvent(ctx context.Context, event *OutboxEvent) error
RetryFailedEvent 重试失败的事件
参数:
ctx: 上下文 event: 要重试的事件
返回:
error: 重试失败时返回错误
func (*OutboxPublisher) StartACKListener ¶
func (p *OutboxPublisher) StartACKListener(ctx context.Context)
StartACKListener 启动 ACK 监听器 监听 EventBus 的异步发布结果,并更新 Outbox 事件状态
参数:
ctx: 上下文(用于控制监听器生命周期)
注意:
- 此方法应该在应用启动时调用一次
- 监听器会在后台运行,直到 ctx 被取消或调用 StopACKListener()
- 重复调用此方法是安全的(会忽略后续调用)
func (*OutboxPublisher) StopACKListener ¶
func (p *OutboxPublisher) StopACKListener()
StopACKListener 停止 ACK 监听器
type OutboxRepository ¶
type OutboxRepository interface {
// Save 保存事件(在事务中)
// ctx: 上下文(可能包含事务信息)
// event: 要保存的事件
Save(ctx context.Context, event *OutboxEvent) error
// SaveBatch 批量保存事件(在事务中)
// ctx: 上下文(可能包含事务信息)
// events: 要保存的事件列表
SaveBatch(ctx context.Context, events []*OutboxEvent) error
// FindPendingEvents 查找待发布的事件
// ctx: 上下文
// limit: 最大返回数量
// tenantID: 租户 ID(可选,空字符串表示查询所有租户)
FindPendingEvents(ctx context.Context, limit int, tenantID string) ([]*OutboxEvent, error)
// FindPendingEventsWithDelay 查找创建时间超过指定延迟的待发布事件
// 用于调度器避让机制,防止与立即发布产生竞态
// ctx: 上下文
// tenantID: 租户 ID(可选)
// delaySeconds: 延迟秒数(只查询创建时间超过此延迟的事件)
// limit: 最大返回数量
FindPendingEventsWithDelay(ctx context.Context, tenantID string, delaySeconds int, limit int) ([]*OutboxEvent, error)
// FindEventsForRetry 查找需要重试的失败事件
// ctx: 上下文
// maxRetries: 最大重试次数(只查询重试次数小于此值的事件)
// limit: 最大返回数量
FindEventsForRetry(ctx context.Context, maxRetries int, limit int) ([]*OutboxEvent, error)
// FindByAggregateType 根据聚合类型查找待发布事件
// ctx: 上下文
// aggregateType: 聚合类型
// limit: 最大返回数量
FindByAggregateType(ctx context.Context, aggregateType string, limit int) ([]*OutboxEvent, error)
// FindByID 根据 ID 查找事件
// ctx: 上下文
// id: 事件 ID
FindByID(ctx context.Context, id string) (*OutboxEvent, error)
// FindByAggregateID 根据聚合根 ID 查找事件
// ctx: 上下文
// aggregateID: 聚合根 ID
// tenantID: 租户 ID(可选)
FindByAggregateID(ctx context.Context, aggregateID string, tenantID string) ([]*OutboxEvent, error)
// Update 更新事件
// ctx: 上下文
// event: 要更新的事件
Update(ctx context.Context, event *OutboxEvent) error
// MarkAsPublished 标记事件为已发布
// ctx: 上下文
// id: 事件 ID
MarkAsPublished(ctx context.Context, id string) error
// MarkAsFailed 标记事件为失败
// ctx: 上下文
// id: 事件 ID
// err: 错误信息
MarkAsFailed(ctx context.Context, id string, err error) error
// IncrementRetry 增加重试次数(同时更新错误信息和最后重试时间)
// ctx: 上下文
// id: 事件 ID
// errorMsg: 错误信息
IncrementRetry(ctx context.Context, id string, errorMsg string) error
// MarkAsMaxRetry 标记事件为超过最大重试次数
// ctx: 上下文
// id: 事件 ID
// errorMsg: 错误信息
MarkAsMaxRetry(ctx context.Context, id string, errorMsg string) error
// IncrementRetryCount 增加重试次数(已废弃,使用 IncrementRetry 代替)
// ctx: 上下文
// id: 事件 ID
// Deprecated: 使用 IncrementRetry 代替
IncrementRetryCount(ctx context.Context, id string) error
// Delete 删除事件
// ctx: 上下文
// id: 事件 ID
Delete(ctx context.Context, id string) error
// DeleteBatch 批量删除事件
// ctx: 上下文
// ids: 事件 ID 列表
DeleteBatch(ctx context.Context, ids []string) error
// DeletePublishedBefore 删除指定时间之前已发布的事件
// ctx: 上下文
// before: 时间阈值
// tenantID: 租户 ID(可选)
DeletePublishedBefore(ctx context.Context, before time.Time, tenantID string) (int64, error)
// DeleteFailedBefore 删除指定时间之前失败的事件
// ctx: 上下文
// before: 时间阈值
// tenantID: 租户 ID(可选)
DeleteFailedBefore(ctx context.Context, before time.Time, tenantID string) (int64, error)
// Count 统计事件数量
// ctx: 上下文
// status: 事件状态(可选,空字符串表示所有状态)
// tenantID: 租户 ID(可选)
Count(ctx context.Context, status EventStatus, tenantID string) (int64, error)
// CountByStatus 按状态统计事件数量
// ctx: 上下文
// tenantID: 租户 ID(可选)
// 返回:map[EventStatus]int64
CountByStatus(ctx context.Context, tenantID string) (map[EventStatus]int64, error)
// FindByIdempotencyKey 根据幂等性键查找事件
// ctx: 上下文
// idempotencyKey: 幂等性键
// 返回:事件对象(如果不存在返回 nil)
FindByIdempotencyKey(ctx context.Context, idempotencyKey string) (*OutboxEvent, error)
// ExistsByIdempotencyKey 检查幂等性键是否已存在
// ctx: 上下文
// idempotencyKey: 幂等性键
// 返回:true 表示已存在,false 表示不存在
ExistsByIdempotencyKey(ctx context.Context, idempotencyKey string) (bool, error)
// FindMaxRetryEvents 查找超过最大重试次数的事件
// ctx: 上下文
// limit: 最大返回数量
// tenantID: 租户 ID(可选)
// 返回:事件列表
FindMaxRetryEvents(ctx context.Context, limit int, tenantID string) ([]*OutboxEvent, error)
// BatchUpdate 批量更新事件(可选接口,用于性能优化)
// ctx: 上下文
// events: 要更新的事件列表
// 返回:更新失败时返回错误
BatchUpdate(ctx context.Context, events []*OutboxEvent) error
}
OutboxRepository Outbox 仓储接口 定义数据访问契约,不依赖具体数据库实现
type OutboxScheduler ¶
type OutboxScheduler struct {
// contains filtered or unexported fields
}
OutboxScheduler Outbox 调度器 负责定时轮询待发布的事件并触发发布
func NewScheduler ¶
func NewScheduler(options ...SchedulerOption) *OutboxScheduler
NewScheduler 创建调度器
参数:
options: 函数式选项
示例:
scheduler := outbox.NewScheduler(
outbox.WithRepository(repo),
outbox.WithEventPublisher(eventPublisher),
outbox.WithTopicMapper(topicMapper),
outbox.WithSchedulerConfig(config),
)
func (*OutboxScheduler) GetMetrics ¶
func (s *OutboxScheduler) GetMetrics() *SchedulerMetricsSnapshot
GetMetrics 获取调度器指标快照
func (*OutboxScheduler) IsRunning ¶
func (s *OutboxScheduler) IsRunning() bool
IsRunning 判断是否正在运行(免锁版本)
type PrefixTopicMapper ¶
type PrefixTopicMapper struct {
// contains filtered or unexported fields
}
PrefixTopicMapper 基于前缀的 TopicMapper 自动为聚合类型添加前缀和后缀
func (*PrefixTopicMapper) GetTopic ¶
func (p *PrefixTopicMapper) GetTopic(aggregateType string) string
GetTopic 实现 TopicMapper 接口
type PrometheusMetricsCollector ¶
type PrometheusMetricsCollector struct {
// contains filtered or unexported fields
}
PrometheusMetricsCollector Prometheus 指标收集器(示例实现)
使用方法:
添加依赖: go get github.com/prometheus/client_golang/prometheus go get github.com/prometheus/client_golang/prometheus/promauto
创建收集器: collector := outbox.NewPrometheusMetricsCollector("myapp")
注册到 Prometheus: prometheus.MustRegister(collector.publishedTotal) prometheus.MustRegister(collector.failedTotal) // ... 注册其他指标
配置发布器和调度器: publisher := outbox.NewOutboxPublisher(repo, eventPublisher, topicMapper, &outbox.PublisherConfig{ MetricsCollector: collector, })
暴露 metrics 端点: http.Handle("/metrics", promhttp.Handler()) http.ListenAndServe(":9090", nil)
注意:这是一个示例实现,实际使用时需要取消注释并添加 prometheus 依赖
func NewPrometheusMetricsCollector ¶
func NewPrometheusMetricsCollector(namespace string) *PrometheusMetricsCollector
NewPrometheusMetricsCollector 创建 Prometheus 指标收集器
实际实现示例(需要取消注释):
func NewPrometheusMetricsCollector(namespace string) *PrometheusMetricsCollector {
return &PrometheusMetricsCollector{
namespace: namespace,
publishedTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "outbox",
Name: "published_total",
Help: "Total number of published events",
},
[]string{"tenant_id", "aggregate_type", "event_type"},
),
failedTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "outbox",
Name: "failed_total",
Help: "Total number of failed events",
},
[]string{"tenant_id", "aggregate_type", "event_type"},
),
retryTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "outbox",
Name: "retry_total",
Help: "Total number of retried events",
},
[]string{"tenant_id", "aggregate_type", "event_type"},
),
dlqTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "outbox",
Name: "dlq_total",
Help: "Total number of events moved to DLQ",
},
[]string{"tenant_id", "aggregate_type", "event_type"},
),
publishDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: "outbox",
Name: "publish_duration_seconds",
Help: "Event publish duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"tenant_id", "aggregate_type", "event_type"},
),
pendingGauge: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "outbox",
Name: "pending_events",
Help: "Number of pending events",
},
[]string{"tenant_id"},
),
failedGauge: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "outbox",
Name: "failed_events",
Help: "Number of failed events",
},
[]string{"tenant_id"},
),
dlqGauge: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "outbox",
Name: "dlq_events",
Help: "Number of events in DLQ",
},
[]string{"tenant_id"},
),
}
}
func (*PrometheusMetricsCollector) RecordDLQ ¶
func (c *PrometheusMetricsCollector) RecordDLQ(tenantID, aggregateType, eventType string)
RecordDLQ 记录事件进入死信队列
func (*PrometheusMetricsCollector) RecordFailed ¶
func (c *PrometheusMetricsCollector) RecordFailed(tenantID, aggregateType, eventType string, err error)
RecordFailed 记录事件发布失败
func (*PrometheusMetricsCollector) RecordPublishDuration ¶
func (c *PrometheusMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration)
RecordPublishDuration 记录发布耗时
实际实现示例(需要取消注释):
func (c *PrometheusMetricsCollector) RecordPublishDuration(tenantID, aggregateType, eventType string, duration time.Duration) {
c.publishDuration.(*prometheus.HistogramVec).With(prometheus.Labels{
"tenant_id": tenantID,
"aggregate_type": aggregateType,
"event_type": eventType,
}).Observe(duration.Seconds())
}
func (*PrometheusMetricsCollector) RecordPublished ¶
func (c *PrometheusMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string)
RecordPublished 记录事件发布成功
实际实现示例(需要取消注释):
func (c *PrometheusMetricsCollector) RecordPublished(tenantID, aggregateType, eventType string) {
c.publishedTotal.(*prometheus.CounterVec).With(prometheus.Labels{
"tenant_id": tenantID,
"aggregate_type": aggregateType,
"event_type": eventType,
}).Inc()
}
func (*PrometheusMetricsCollector) RecordRetry ¶
func (c *PrometheusMetricsCollector) RecordRetry(tenantID, aggregateType, eventType string)
RecordRetry 记录事件重试
func (*PrometheusMetricsCollector) SetDLQCount ¶
func (c *PrometheusMetricsCollector) SetDLQCount(tenantID string, count int64)
SetDLQCount 设置死信队列事件数量
func (*PrometheusMetricsCollector) SetFailedCount ¶
func (c *PrometheusMetricsCollector) SetFailedCount(tenantID string, count int64)
SetFailedCount 设置失败事件数量
func (*PrometheusMetricsCollector) SetPendingCount ¶
func (c *PrometheusMetricsCollector) SetPendingCount(tenantID string, count int64)
SetPendingCount 设置待发布事件数量
type PublishResult ¶
type PublishResult struct {
// EventID 事件ID(来自 Envelope.EventID)
EventID string
// Topic 主题
Topic string
// Success 是否成功
Success bool
// Error 错误信息(失败时)
Error error
// Timestamp 发布时间戳
Timestamp time.Time
// AggregateID 聚合ID(可选,来自 Envelope)
AggregateID string
// EventType 事件类型(可选,来自 Envelope)
EventType string
}
PublishResult 异步发布结果
type PublisherConfig ¶
type PublisherConfig struct {
// MaxRetries 最大重试次数
MaxRetries int
// RetryDelay 重试延迟
RetryDelay time.Duration
// PublishTimeout 发布超时时间
PublishTimeout time.Duration
// EnableMetrics 是否启用指标收集
EnableMetrics bool
// MetricsCollector 指标收集器(可选)
// 用于集成 Prometheus、StatsD 等监控系统
MetricsCollector MetricsCollector
// ErrorHandler 错误处理器(可选)
ErrorHandler func(event *OutboxEvent, err error)
// ConcurrentPublish 是否启用并发发布(默认 false)
ConcurrentPublish bool
// PublishConcurrency 并发发布的并发数(默认 10)
// 只有当 ConcurrentPublish = true 时生效
PublishConcurrency int
}
PublisherConfig 发布器配置
func DefaultPublisherConfig ¶
func DefaultPublisherConfig() *PublisherConfig
DefaultPublisherConfig 默认发布器配置
type PublisherMetrics ¶
type PublisherMetrics struct {
// PublishedCount 已发布事件数量
PublishedCount int64
// FailedCount 失败事件数量
FailedCount int64
// ErrorCount 错误事件数量(批量发布时使用)
ErrorCount int64
// RetryCount 重试次数
RetryCount int64
// LastPublishTime 最后发布时间
LastPublishTime time.Time
// LastError 最后错误
LastError error
}
PublisherMetrics 发布器指标
type RepositoryStats ¶
type RepositoryStats struct {
// PendingCount 待发布事件数量
PendingCount int64
// PublishedCount 已发布事件数量
PublishedCount int64
// FailedCount 失败事件数量
FailedCount int64
// TotalCount 总事件数量
TotalCount int64
// OldestPendingAge 最老的待发布事件年龄
OldestPendingAge time.Duration
// TenantID 租户 ID
TenantID string
}
RepositoryStats 仓储统计信息
type RepositoryStatsProvider ¶
type RepositoryStatsProvider interface {
// GetStats 获取统计信息
// ctx: 上下文
// tenantID: 租户 ID(可选)
GetStats(ctx context.Context, tenantID string) (*RepositoryStats, error)
}
GetStats 获取仓储统计信息(可选方法) 实现此接口的仓储可以提供统计信息
type SchedulerConfig ¶
type SchedulerConfig struct {
// PollInterval 轮询间隔
PollInterval time.Duration
// BatchSize 每次处理的事件数量
BatchSize int
// TenantID 租户 ID(可选,空字符串表示处理所有租户)
TenantID string
// CleanupInterval 清理间隔
CleanupInterval time.Duration
// CleanupRetention 清理保留时间(已发布事件保留多久)
CleanupRetention time.Duration
// HealthCheckInterval 健康检查间隔
HealthCheckInterval time.Duration
// EnableHealthCheck 是否启用健康检查
EnableHealthCheck bool
// EnableCleanup 是否启用自动清理
EnableCleanup bool
// EnableMetrics 是否启用指标收集
EnableMetrics bool
// EnableRetry 是否启用失败重试
EnableRetry bool
// RetryInterval 重试间隔
RetryInterval time.Duration
// MaxRetries 最大重试次数
MaxRetries int
// EnableDLQ 是否启用死信队列
EnableDLQ bool
// DLQInterval 死信队列处理间隔
DLQInterval time.Duration
// DLQHandler 死信队列处理器(可选)
// 用于处理超过最大重试次数的失败事件
DLQHandler DLQHandler
// DLQAlertHandler 死信队列告警处理器(可选)
// 用于发送告警通知
DLQAlertHandler DLQAlertHandler
// MetricsCollector 指标收集器(可选)
// 用于集成 Prometheus、StatsD 等监控系统
MetricsCollector MetricsCollector
// ShutdownTimeout 优雅关闭超时时间
// 调度器停止时等待正在处理的任务完成的最大时间
// 默认:30 秒
ShutdownTimeout time.Duration
}
SchedulerConfig 调度器配置
func DefaultSchedulerConfig ¶
func DefaultSchedulerConfig() *SchedulerConfig
DefaultSchedulerConfig 默认调度器配置
type SchedulerMetrics ¶
type SchedulerMetrics struct {
// PollCount 轮询次数(原子操作)
PollCount atomic.Int64
// ProcessedCount 处理的事件数量(原子操作)
ProcessedCount atomic.Int64
// ErrorCount 错误次数(原子操作)
ErrorCount atomic.Int64
// LastPollTime 最后轮询时间(原子操作)
LastPollTime atomic.Value // time.Time
// LastCleanupTime 最后清理时间(原子操作)
LastCleanupTime atomic.Value // time.Time
// RetryCount 重试次数(原子操作)
RetryCount atomic.Int64
// RetriedCount 重试成功的事件数量(原子操作)
RetriedCount atomic.Int64
// LastRetryTime 最后重试时间(原子操作)
LastRetryTime atomic.Value // time.Time
// LastError 最后错误(原子操作)
LastError atomic.Value // error
}
SchedulerMetrics 调度器指标 使用 atomic 操作保证并发安全,避免数据竞争
type SchedulerMetricsSnapshot ¶
type SchedulerMetricsSnapshot struct {
PollCount int64
ProcessedCount int64
ErrorCount int64
LastPollTime time.Time
LastCleanupTime time.Time
RetryCount int64
RetriedCount int64
LastRetryTime time.Time
LastError error
}
SchedulerMetricsSnapshot 调度器指标快照(用于读取)
type SchedulerOption ¶
type SchedulerOption func(*schedulerOptions)
SchedulerOption 调度器选项
func WithCleanupEnabled ¶
func WithCleanupEnabled(enabled bool) SchedulerOption
WithCleanupEnabled 设置是否启用清理
func WithEventPublisher ¶
func WithEventPublisher(eventPublisher EventPublisher) SchedulerOption
WithEventPublisher 设置事件发布器
func WithHealthCheckEnabled ¶
func WithHealthCheckEnabled(enabled bool) SchedulerOption
WithHealthCheckEnabled 设置是否启用健康检查
func WithMetricsEnabled ¶
func WithMetricsEnabled(enabled bool) SchedulerOption
WithMetricsEnabled 设置是否启用指标收集
func WithPollInterval ¶
func WithPollInterval(interval time.Duration) SchedulerOption
WithPollInterval 设置轮询间隔
func WithPublisherConfig ¶
func WithPublisherConfig(config *PublisherConfig) SchedulerOption
WithPublisherConfig 设置发布器配置
func WithRepository ¶
func WithRepository(repo OutboxRepository) SchedulerOption
WithRepository 设置仓储
func WithSchedulerConfig ¶
func WithSchedulerConfig(config *SchedulerConfig) SchedulerOption
WithSchedulerConfig 设置调度器配置
func WithTopicMapper ¶
func WithTopicMapper(topicMapper TopicMapper) SchedulerOption
WithTopicMapper 设置 Topic 映射器
type StaticTopicMapper ¶
type StaticTopicMapper struct {
// contains filtered or unexported fields
}
StaticTopicMapper 静态 TopicMapper 所有聚合类型都映射到同一个 Topic
func (*StaticTopicMapper) GetTopic ¶
func (s *StaticTopicMapper) GetTopic(aggregateType string) string
GetTopic 实现 TopicMapper 接口
type TopicMapper ¶
type TopicMapper interface {
// GetTopic 根据聚合类型获取 Topic
//
// 参数:
// aggregateType: 聚合类型(例如:"Archive", "Media", "User")
//
// 返回:
// string: Topic 名称(例如:"archive-events", "media-events")
//
// 注意:
// - 如果聚合类型未映射,应该返回默认 Topic 或错误
// - Topic 名称应该符合 EventBus 的命名规范
GetTopic(aggregateType string) string
}
TopicMapper Topic 映射器接口 将聚合类型映射到 EventBus Topic 由业务微服务实现,定义业务特定的映射规则
func NewChainTopicMapper ¶
func NewChainTopicMapper(mappers ...TopicMapper) TopicMapper
NewChainTopicMapper 创建链式 TopicMapper
参数:
mappers: TopicMapper 列表
示例:
mapper := NewChainTopicMapper(
customMapper,
defaultMapper,
)
func NewMapBasedTopicMapper ¶
func NewMapBasedTopicMapper(mapping map[string]string, defaultTopic string) TopicMapper
NewMapBasedTopicMapper 创建基于 Map 的 TopicMapper
参数:
mapping: 聚合类型到 Topic 的映射表 defaultTopic: 默认 Topic(可选,空字符串表示无默认值)
示例:
mapper := NewMapBasedTopicMapper(map[string]string{
"Archive": "archive-events",
"Media": "media-events",
"User": "user-events",
}, "default-events")
func NewPrefixTopicMapper ¶
func NewPrefixTopicMapper(prefix, suffix, separator string) TopicMapper
NewPrefixTopicMapper 创建基于前缀的 TopicMapper
参数:
prefix: Topic 前缀 suffix: Topic 后缀 separator: 分隔符
示例:
mapper := NewPrefixTopicMapper("jxt", "events", ".")
// Archive -> jxt.archive.events
// Media -> jxt.media.events
func NewStaticTopicMapper ¶
func NewStaticTopicMapper(topic string) TopicMapper
NewStaticTopicMapper 创建静态 TopicMapper
参数:
topic: 固定的 Topic 名称
示例:
mapper := NewStaticTopicMapper("all-events")
// 所有聚合类型都映射到 "all-events"
type TransactionalRepository ¶
type TransactionalRepository interface {
OutboxRepository
// SaveInTx 在事务中保存事件
// ctx: 上下文
// tx: 事务对象(具体类型由实现决定,例如 *gorm.DB)
// event: 要保存的事件
SaveInTx(ctx context.Context, tx interface{}, event *OutboxEvent) error
// BeginTx 开始事务
// ctx: 上下文
// 返回:事务对象
BeginTx(ctx context.Context) (interface{}, error)
// CommitTx 提交事务
// tx: 事务对象
CommitTx(tx interface{}) error
// RollbackTx 回滚事务
// tx: 事务对象
RollbackTx(tx interface{}) error
}
TransactionalRepository 支持事务的仓储接口(可选) 如果数据库支持事务,可以实现此接口