Documentation
¶
Index ¶
- type EventBus
- type EventBusAdapter
- type EventBusEnvelope
- type EventBusPublishResult
- type GetGormTxFunc
- type OutboxRepositoryAdapter
- func (a *OutboxRepositoryAdapter) BatchUpdate(ctx context.Context, events []*outbox.OutboxEvent) error
- func (a *OutboxRepositoryAdapter) Count(ctx context.Context, status outbox.EventStatus, tenantID string) (int64, error)
- func (a *OutboxRepositoryAdapter) CountByStatus(ctx context.Context, tenantID string) (map[outbox.EventStatus]int64, error)
- func (a *OutboxRepositoryAdapter) Delete(ctx context.Context, id string) error
- func (a *OutboxRepositoryAdapter) DeleteBatch(ctx context.Context, ids []string) error
- func (a *OutboxRepositoryAdapter) DeleteFailedBefore(ctx context.Context, before time.Time, tenantID string) (int64, error)
- func (a *OutboxRepositoryAdapter) DeletePublishedBefore(ctx context.Context, before time.Time, tenantID string) (int64, error)
- func (a *OutboxRepositoryAdapter) ExistsByIdempotencyKey(ctx context.Context, idempotencyKey string) (bool, error)
- func (a *OutboxRepositoryAdapter) FindByAggregateID(ctx context.Context, aggregateID string, tenantID string) ([]*outbox.OutboxEvent, error)
- func (a *OutboxRepositoryAdapter) FindByAggregateType(ctx context.Context, aggregateType string, limit int, tenantID string) ([]*outbox.OutboxEvent, error)
- func (a *OutboxRepositoryAdapter) FindByID(ctx context.Context, id string) (*outbox.OutboxEvent, error)
- func (a *OutboxRepositoryAdapter) FindByIdempotencyKey(ctx context.Context, idempotencyKey string) (*outbox.OutboxEvent, error)
- func (a *OutboxRepositoryAdapter) FindByStatus(ctx context.Context, status outbox.EventStatus, limit int, tenantID string) ([]*outbox.OutboxEvent, error)
- func (a *OutboxRepositoryAdapter) FindEventsForRetry(ctx context.Context, maxRetries int, limit int, tenantID string) ([]*outbox.OutboxEvent, error)
- func (a *OutboxRepositoryAdapter) FindMaxRetryEvents(ctx context.Context, limit int, tenantID string) ([]*outbox.OutboxEvent, error)
- func (a *OutboxRepositoryAdapter) FindPendingEvents(ctx context.Context, limit int, tenantID string) ([]*outbox.OutboxEvent, error)
- func (a *OutboxRepositoryAdapter) FindPendingEventsWithDelay(ctx context.Context, limit int, tenantID string, delaySeconds int) ([]*outbox.OutboxEvent, error)
- func (a *OutboxRepositoryAdapter) IncrementRetry(ctx context.Context, eventID string, errorMsg string) error
- func (a *OutboxRepositoryAdapter) MarkAsMaxRetry(ctx context.Context, eventID string, errorMsg string) error
- func (a *OutboxRepositoryAdapter) Save(ctx context.Context, event *outbox.OutboxEvent) error
- func (a *OutboxRepositoryAdapter) SaveBatch(ctx context.Context, events []*outbox.OutboxEvent) error
- func (a *OutboxRepositoryAdapter) SaveInTx(ctx context.Context, tx TransactionInterface, event *outbox.OutboxEvent) error
- func (a *OutboxRepositoryAdapter) Update(ctx context.Context, event *outbox.OutboxEvent) error
- type OutboxService
- type TransactionInterface
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EventBus ¶
type EventBus interface {
// PublishEnvelope 发布 Envelope 消息(领域事件)
// ✅ 支持 Outbox 模式:通过 GetPublishResultChannel() 获取 ACK 结果
// ✅ 可靠投递:不容许丢失的领域事件必须使用此方法
PublishEnvelope(ctx context.Context, topic string, envelope *EventBusEnvelope) error
// GetPublishResultChannel 获取异步发布结果通道
// ⚠️ 仅 PublishEnvelope() 发送 ACK 结果到此通道
// ⚠️ Publish() 不发送 ACK 结果(不支持 Outbox 模式)
// 用于 Outbox Processor 监听发布结果并更新 Outbox 状态
GetPublishResultChannel() <-chan *EventBusPublishResult
}
EventBus 接口(简化版本,与 jxt-core/sdk/pkg/eventbus.EventBus 兼容) 这里只定义 Outbox 需要的方法
type EventBusAdapter ¶
type EventBusAdapter struct {
// contains filtered or unexported fields
}
EventBusAdapter EventBus 适配器 将 EventBus 接口适配为 Outbox 的 EventPublisher 接口
使用示例:
// 1. 创建 EventBus 实例
eventBus, err := eventbus.NewKafkaEventBus(kafkaConfig)
if err != nil {
panic(err)
}
// 2. 创建适配器
adapter := NewEventBusAdapter(eventBus)
// 3. 创建 Outbox Publisher
publisher := outbox.NewOutboxPublisher(repo, adapter, topicMapper, config)
// 4. 启动 ACK 监听器
publisher.StartACKListener(ctx)
// 5. 发布事件
event := outbox.NewOutboxEvent(...)
publisher.PublishEvent(ctx, event)
func NewEventBusAdapter ¶
func NewEventBusAdapter(eventBus EventBus) *EventBusAdapter
NewEventBusAdapter 创建 EventBus 适配器
func (*EventBusAdapter) GetPublishResultChannel ¶
func (a *EventBusAdapter) GetPublishResultChannel() <-chan *outbox.PublishResult
GetPublishResultChannel 实现 outbox.EventPublisher 接口
func (*EventBusAdapter) PublishEnvelope ¶
func (a *EventBusAdapter) PublishEnvelope(ctx context.Context, topic string, envelope *outbox.Envelope) error
PublishEnvelope 实现 outbox.EventPublisher 接口
type EventBusEnvelope ¶
type EventBusEnvelope struct {
EventID string `json:"event_id"`
AggregateID string `json:"aggregate_id"`
EventType string `json:"event_type"`
EventVersion int64 `json:"event_version"`
Payload []byte `json:"payload"`
Timestamp string `json:"timestamp"`
TraceID string `json:"trace_id,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
}
EventBusEnvelope EventBus Envelope 结构(与 eventbus.Envelope 兼容)
type EventBusPublishResult ¶
type EventBusPublishResult struct {
EventID string
Topic string
Success bool
Error error
Timestamp string
AggregateID string
EventType string
}
EventBusPublishResult EventBus 发布结果(与 eventbus.PublishResult 兼容)
type GetGormTxFunc ¶
type GetGormTxFunc func(tx TransactionInterface) interface{}
GetGormTxFunc 从事务中提取 *gorm.DB 的函数类型 实际使用时应该使用:persistence.GetTx(tx)
type OutboxRepositoryAdapter ¶
type OutboxRepositoryAdapter struct {
// contains filtered or unexported fields
}
OutboxRepositoryAdapter 适配器:将 evidence-management 的事务接口适配到 jxt-core
用途:
- 在 evidence-management 项目中使用 jxt-core 的 Outbox 组件
- 自动处理事务类型转换
- 保持与现有事务管理器的兼容性
使用示例:
// 1. 创建 jxt-core 的 Outbox 仓储
jxtRepo := gormadapter.NewGormOutboxRepository(db)
// 2. 创建适配器
adapter := NewOutboxRepositoryAdapter(jxtRepo, persistence.GetTx)
// 3. 在事务中使用
transaction.RunInTransaction(ctx, txManager, func(tx transaction.Transaction) error {
// 保存聚合
repo.CreateInTx(ctx, tx, aggregate)
// 保存 Outbox 事件(适配器自动处理事务转换)
for _, event := range aggregate.Events() {
outboxEvent := convertToJxtCoreOutboxEvent(event)
adapter.SaveInTx(ctx, tx, outboxEvent) // ⭐ 传入 transaction.Transaction
}
return nil
})
func NewOutboxRepositoryAdapter ¶
func NewOutboxRepositoryAdapter(repo outbox.OutboxRepository, getTxFunc GetGormTxFunc) *OutboxRepositoryAdapter
NewOutboxRepositoryAdapter 创建适配器
参数:
- repo: jxt-core 的 Outbox 仓储实现
- getTxFunc: 从 transaction.Transaction 中提取 *gorm.DB 的函数
返回:
- *OutboxRepositoryAdapter: 适配器实例
示例:
import (
"github.com/ChenBigdata421/jxt-core/sdk/pkg/outbox/adapters/gorm"
"jxt-evidence-system/evidence-management/command/internal/infrastructure/persistence/gorm"
)
jxtRepo := gormadapter.NewGormOutboxRepository(db)
adapter := NewOutboxRepositoryAdapter(jxtRepo, persistence.GetTx)
func (*OutboxRepositoryAdapter) BatchUpdate ¶
func (a *OutboxRepositoryAdapter) BatchUpdate(ctx context.Context, events []*outbox.OutboxEvent) error
BatchUpdate 批量更新事件(性能优化)
func (*OutboxRepositoryAdapter) Count ¶
func (a *OutboxRepositoryAdapter) Count(ctx context.Context, status outbox.EventStatus, tenantID string) (int64, error)
Count 统计事件数量
func (*OutboxRepositoryAdapter) CountByStatus ¶
func (a *OutboxRepositoryAdapter) CountByStatus(ctx context.Context, tenantID string) (map[outbox.EventStatus]int64, error)
CountByStatus 按状态统计事件数量
func (*OutboxRepositoryAdapter) Delete ¶
func (a *OutboxRepositoryAdapter) Delete(ctx context.Context, id string) error
Delete 删除事件
func (*OutboxRepositoryAdapter) DeleteBatch ¶
func (a *OutboxRepositoryAdapter) DeleteBatch(ctx context.Context, ids []string) error
DeleteBatch 批量删除事件
func (*OutboxRepositoryAdapter) DeleteFailedBefore ¶
func (a *OutboxRepositoryAdapter) DeleteFailedBefore(ctx context.Context, before time.Time, tenantID string) (int64, error)
DeleteFailedBefore 删除指定时间之前失败的事件
func (*OutboxRepositoryAdapter) DeletePublishedBefore ¶
func (a *OutboxRepositoryAdapter) DeletePublishedBefore(ctx context.Context, before time.Time, tenantID string) (int64, error)
DeletePublishedBefore 删除指定时间之前已发布的事件
func (*OutboxRepositoryAdapter) ExistsByIdempotencyKey ¶
func (a *OutboxRepositoryAdapter) ExistsByIdempotencyKey(ctx context.Context, idempotencyKey string) (bool, error)
ExistsByIdempotencyKey 检查幂等性键是否已存在
func (*OutboxRepositoryAdapter) FindByAggregateID ¶
func (a *OutboxRepositoryAdapter) FindByAggregateID(ctx context.Context, aggregateID string, tenantID string) ([]*outbox.OutboxEvent, error)
FindByAggregateID 根据聚合 ID 查找事件
func (*OutboxRepositoryAdapter) FindByAggregateType ¶
func (a *OutboxRepositoryAdapter) FindByAggregateType(ctx context.Context, aggregateType string, limit int, tenantID string) ([]*outbox.OutboxEvent, error)
FindByAggregateType 根据聚合类型查找事件
func (*OutboxRepositoryAdapter) FindByID ¶
func (a *OutboxRepositoryAdapter) FindByID(ctx context.Context, id string) (*outbox.OutboxEvent, error)
FindByID 根据 ID 查找事件
func (*OutboxRepositoryAdapter) FindByIdempotencyKey ¶
func (a *OutboxRepositoryAdapter) FindByIdempotencyKey(ctx context.Context, idempotencyKey string) (*outbox.OutboxEvent, error)
FindByIdempotencyKey 根据幂等性键查找事件
func (*OutboxRepositoryAdapter) FindByStatus ¶
func (a *OutboxRepositoryAdapter) FindByStatus(ctx context.Context, status outbox.EventStatus, limit int, tenantID string) ([]*outbox.OutboxEvent, error)
FindByStatus 根据状态查找事件
func (*OutboxRepositoryAdapter) FindEventsForRetry ¶
func (a *OutboxRepositoryAdapter) FindEventsForRetry(ctx context.Context, maxRetries int, limit int, tenantID string) ([]*outbox.OutboxEvent, error)
FindEventsForRetry 查找需要重试的事件
func (*OutboxRepositoryAdapter) FindMaxRetryEvents ¶
func (a *OutboxRepositoryAdapter) FindMaxRetryEvents(ctx context.Context, limit int, tenantID string) ([]*outbox.OutboxEvent, error)
FindMaxRetryEvents 查找超过最大重试次数的事件
func (*OutboxRepositoryAdapter) FindPendingEvents ¶
func (a *OutboxRepositoryAdapter) FindPendingEvents(ctx context.Context, limit int, tenantID string) ([]*outbox.OutboxEvent, error)
FindPendingEvents 查找待发布的事件
func (*OutboxRepositoryAdapter) FindPendingEventsWithDelay ¶
func (a *OutboxRepositoryAdapter) FindPendingEventsWithDelay(ctx context.Context, limit int, tenantID string, delaySeconds int) ([]*outbox.OutboxEvent, error)
FindPendingEventsWithDelay 查找待发布的事件(带延迟)
func (*OutboxRepositoryAdapter) IncrementRetry ¶
func (a *OutboxRepositoryAdapter) IncrementRetry(ctx context.Context, eventID string, errorMsg string) error
IncrementRetry 增加重试次数
func (*OutboxRepositoryAdapter) MarkAsMaxRetry ¶
func (a *OutboxRepositoryAdapter) MarkAsMaxRetry(ctx context.Context, eventID string, errorMsg string) error
MarkAsMaxRetry 标记为超过最大重试次数
func (*OutboxRepositoryAdapter) Save ¶
func (a *OutboxRepositoryAdapter) Save(ctx context.Context, event *outbox.OutboxEvent) error
Save 保存事件(非事务)
func (*OutboxRepositoryAdapter) SaveBatch ¶
func (a *OutboxRepositoryAdapter) SaveBatch(ctx context.Context, events []*outbox.OutboxEvent) error
SaveBatch 批量保存事件(非事务)
func (*OutboxRepositoryAdapter) SaveInTx ¶
func (a *OutboxRepositoryAdapter) SaveInTx(ctx context.Context, tx TransactionInterface, event *outbox.OutboxEvent) error
SaveInTx 在事务中保存事件(适配方法)
参数:
- ctx: 上下文
- tx: evidence-management 的 transaction.Transaction 接口
- event: jxt-core 的 OutboxEvent
返回:
- error: 保存失败时返回错误
工作流程:
- 使用 getTxFunc 从 transaction.Transaction 中提取 *gorm.DB
- 调用 jxt-core 仓储的 SaveInTx 方法(传入 *gorm.DB)
- 返回结果
示例:
err := adapter.SaveInTx(ctx, tx, outboxEvent)
func (*OutboxRepositoryAdapter) Update ¶
func (a *OutboxRepositoryAdapter) Update(ctx context.Context, event *outbox.OutboxEvent) error
Update 更新事件
type OutboxService ¶
type OutboxService struct {
// contains filtered or unexported fields
}
OutboxService Outbox 服务示例
func NewOutboxService ¶
func NewOutboxService( eventBus EventBus, repo outbox.OutboxRepository, topicMapper outbox.TopicMapper, ) *OutboxService
NewOutboxService 创建 Outbox 服务
func (*OutboxService) PublishEvent ¶
func (s *OutboxService) PublishEvent(ctx context.Context, event *outbox.OutboxEvent) error
PublishEvent 发布事件
func (*OutboxService) PublishPendingEvents ¶
func (s *OutboxService) PublishPendingEvents(ctx context.Context, limit int, tenantID string) (int, error)
PublishPendingEvents 发布待发布的事件
type TransactionInterface ¶
TransactionInterface 代表 evidence-management 的事务接口 实际使用时应该导入:jxt-evidence-system/evidence-management/shared/common/transaction