examples

package
v1.1.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EventBus

type EventBus interface {
	// PublishEnvelope 发布 Envelope 消息(领域事件)
	// ✅ 支持 Outbox 模式:通过 GetPublishResultChannel() 获取 ACK 结果
	// ✅ 可靠投递:不容许丢失的领域事件必须使用此方法
	PublishEnvelope(ctx context.Context, topic string, envelope *EventBusEnvelope) error

	// GetPublishResultChannel 获取异步发布结果通道
	// ⚠️ 仅 PublishEnvelope() 发送 ACK 结果到此通道
	// ⚠️ Publish() 不发送 ACK 结果(不支持 Outbox 模式)
	// 用于 Outbox Processor 监听发布结果并更新 Outbox 状态
	GetPublishResultChannel() <-chan *EventBusPublishResult
}

EventBus 接口(简化版本,与 jxt-core/sdk/pkg/eventbus.EventBus 兼容) 这里只定义 Outbox 需要的方法

type EventBusAdapter

type EventBusAdapter struct {
	// contains filtered or unexported fields
}

EventBusAdapter EventBus 适配器 将 EventBus 接口适配为 Outbox 的 EventPublisher 接口

使用示例:

// 1. 创建 EventBus 实例
eventBus, err := eventbus.NewKafkaEventBus(kafkaConfig)
if err != nil {
    panic(err)
}

// 2. 创建适配器
adapter := NewEventBusAdapter(eventBus)

// 3. 创建 Outbox Publisher
publisher := outbox.NewOutboxPublisher(repo, adapter, topicMapper, config)

// 4. 启动 ACK 监听器
publisher.StartACKListener(ctx)

// 5. 发布事件
event := outbox.NewOutboxEvent(...)
publisher.PublishEvent(ctx, event)

func NewEventBusAdapter

func NewEventBusAdapter(eventBus EventBus) *EventBusAdapter

NewEventBusAdapter 创建 EventBus 适配器

func (*EventBusAdapter) GetPublishResultChannel

func (a *EventBusAdapter) GetPublishResultChannel() <-chan *outbox.PublishResult

GetPublishResultChannel 实现 outbox.EventPublisher 接口

func (*EventBusAdapter) PublishEnvelope

func (a *EventBusAdapter) PublishEnvelope(ctx context.Context, topic string, envelope *outbox.Envelope) error

PublishEnvelope 实现 outbox.EventPublisher 接口

type EventBusEnvelope

type EventBusEnvelope struct {
	EventID       string `json:"event_id"`
	AggregateID   string `json:"aggregate_id"`
	EventType     string `json:"event_type"`
	EventVersion  int64  `json:"event_version"`
	Payload       []byte `json:"payload"`
	Timestamp     string `json:"timestamp"`
	TraceID       string `json:"trace_id,omitempty"`
	CorrelationID string `json:"correlation_id,omitempty"`
}

EventBusEnvelope EventBus Envelope 结构(与 eventbus.Envelope 兼容)

type EventBusPublishResult

type EventBusPublishResult struct {
	EventID     string
	Topic       string
	Success     bool
	Error       error
	Timestamp   string
	AggregateID string
	EventType   string
}

EventBusPublishResult EventBus 发布结果(与 eventbus.PublishResult 兼容)

type GetGormTxFunc

type GetGormTxFunc func(tx TransactionInterface) interface{}

GetGormTxFunc 从事务中提取 *gorm.DB 的函数类型 实际使用时应该使用:persistence.GetTx(tx)

type OutboxRepositoryAdapter

type OutboxRepositoryAdapter struct {
	// contains filtered or unexported fields
}

OutboxRepositoryAdapter 适配器:将 evidence-management 的事务接口适配到 jxt-core

用途:

  • 在 evidence-management 项目中使用 jxt-core 的 Outbox 组件
  • 自动处理事务类型转换
  • 保持与现有事务管理器的兼容性

使用示例:

// 1. 创建 jxt-core 的 Outbox 仓储
jxtRepo := gormadapter.NewGormOutboxRepository(db)

// 2. 创建适配器
adapter := NewOutboxRepositoryAdapter(jxtRepo, persistence.GetTx)

// 3. 在事务中使用
transaction.RunInTransaction(ctx, txManager, func(tx transaction.Transaction) error {
    // 保存聚合
    repo.CreateInTx(ctx, tx, aggregate)

    // 保存 Outbox 事件(适配器自动处理事务转换)
    for _, event := range aggregate.Events() {
        outboxEvent := convertToJxtCoreOutboxEvent(event)
        adapter.SaveInTx(ctx, tx, outboxEvent)  // ⭐ 传入 transaction.Transaction
    }
    return nil
})

func NewOutboxRepositoryAdapter

func NewOutboxRepositoryAdapter(repo outbox.OutboxRepository, getTxFunc GetGormTxFunc) *OutboxRepositoryAdapter

NewOutboxRepositoryAdapter 创建适配器

参数:

  • repo: jxt-core 的 Outbox 仓储实现
  • getTxFunc: 从 transaction.Transaction 中提取 *gorm.DB 的函数

返回:

  • *OutboxRepositoryAdapter: 适配器实例

示例:

import (
    "github.com/ChenBigdata421/jxt-core/sdk/pkg/outbox/adapters/gorm"
    "jxt-evidence-system/evidence-management/command/internal/infrastructure/persistence/gorm"
)

jxtRepo := gormadapter.NewGormOutboxRepository(db)
adapter := NewOutboxRepositoryAdapter(jxtRepo, persistence.GetTx)

func (*OutboxRepositoryAdapter) BatchUpdate

func (a *OutboxRepositoryAdapter) BatchUpdate(ctx context.Context, events []*outbox.OutboxEvent) error

BatchUpdate 批量更新事件(性能优化)

func (*OutboxRepositoryAdapter) Count

func (a *OutboxRepositoryAdapter) Count(ctx context.Context, status outbox.EventStatus, tenantID string) (int64, error)

Count 统计事件数量

func (*OutboxRepositoryAdapter) CountByStatus

func (a *OutboxRepositoryAdapter) CountByStatus(ctx context.Context, tenantID string) (map[outbox.EventStatus]int64, error)

CountByStatus 按状态统计事件数量

func (*OutboxRepositoryAdapter) Delete

func (a *OutboxRepositoryAdapter) Delete(ctx context.Context, id string) error

Delete 删除事件

func (*OutboxRepositoryAdapter) DeleteBatch

func (a *OutboxRepositoryAdapter) DeleteBatch(ctx context.Context, ids []string) error

DeleteBatch 批量删除事件

func (*OutboxRepositoryAdapter) DeleteFailedBefore

func (a *OutboxRepositoryAdapter) DeleteFailedBefore(ctx context.Context, before time.Time, tenantID string) (int64, error)

DeleteFailedBefore 删除指定时间之前失败的事件

func (*OutboxRepositoryAdapter) DeletePublishedBefore

func (a *OutboxRepositoryAdapter) DeletePublishedBefore(ctx context.Context, before time.Time, tenantID string) (int64, error)

DeletePublishedBefore 删除指定时间之前已发布的事件

func (*OutboxRepositoryAdapter) ExistsByIdempotencyKey

func (a *OutboxRepositoryAdapter) ExistsByIdempotencyKey(ctx context.Context, idempotencyKey string) (bool, error)

ExistsByIdempotencyKey 检查幂等性键是否已存在

func (*OutboxRepositoryAdapter) FindByAggregateID

func (a *OutboxRepositoryAdapter) FindByAggregateID(ctx context.Context, aggregateID string, tenantID string) ([]*outbox.OutboxEvent, error)

FindByAggregateID 根据聚合 ID 查找事件

func (*OutboxRepositoryAdapter) FindByAggregateType

func (a *OutboxRepositoryAdapter) FindByAggregateType(ctx context.Context, aggregateType string, limit int, tenantID string) ([]*outbox.OutboxEvent, error)

FindByAggregateType 根据聚合类型查找事件

func (*OutboxRepositoryAdapter) FindByID

FindByID 根据 ID 查找事件

func (*OutboxRepositoryAdapter) FindByIdempotencyKey

func (a *OutboxRepositoryAdapter) FindByIdempotencyKey(ctx context.Context, idempotencyKey string) (*outbox.OutboxEvent, error)

FindByIdempotencyKey 根据幂等性键查找事件

func (*OutboxRepositoryAdapter) FindByStatus

func (a *OutboxRepositoryAdapter) FindByStatus(ctx context.Context, status outbox.EventStatus, limit int, tenantID string) ([]*outbox.OutboxEvent, error)

FindByStatus 根据状态查找事件

func (*OutboxRepositoryAdapter) FindEventsForRetry

func (a *OutboxRepositoryAdapter) FindEventsForRetry(ctx context.Context, maxRetries int, limit int, tenantID string) ([]*outbox.OutboxEvent, error)

FindEventsForRetry 查找需要重试的事件

func (*OutboxRepositoryAdapter) FindMaxRetryEvents

func (a *OutboxRepositoryAdapter) FindMaxRetryEvents(ctx context.Context, limit int, tenantID string) ([]*outbox.OutboxEvent, error)

FindMaxRetryEvents 查找超过最大重试次数的事件

func (*OutboxRepositoryAdapter) FindPendingEvents

func (a *OutboxRepositoryAdapter) FindPendingEvents(ctx context.Context, limit int, tenantID string) ([]*outbox.OutboxEvent, error)

FindPendingEvents 查找待发布的事件

func (*OutboxRepositoryAdapter) FindPendingEventsWithDelay

func (a *OutboxRepositoryAdapter) FindPendingEventsWithDelay(ctx context.Context, limit int, tenantID string, delaySeconds int) ([]*outbox.OutboxEvent, error)

FindPendingEventsWithDelay 查找待发布的事件(带延迟)

func (*OutboxRepositoryAdapter) IncrementRetry

func (a *OutboxRepositoryAdapter) IncrementRetry(ctx context.Context, eventID string, errorMsg string) error

IncrementRetry 增加重试次数

func (*OutboxRepositoryAdapter) MarkAsMaxRetry

func (a *OutboxRepositoryAdapter) MarkAsMaxRetry(ctx context.Context, eventID string, errorMsg string) error

MarkAsMaxRetry 标记为超过最大重试次数

func (*OutboxRepositoryAdapter) Save

Save 保存事件(非事务)

func (*OutboxRepositoryAdapter) SaveBatch

func (a *OutboxRepositoryAdapter) SaveBatch(ctx context.Context, events []*outbox.OutboxEvent) error

SaveBatch 批量保存事件(非事务)

func (*OutboxRepositoryAdapter) SaveInTx

SaveInTx 在事务中保存事件(适配方法)

参数:

  • ctx: 上下文
  • tx: evidence-management 的 transaction.Transaction 接口
  • event: jxt-core 的 OutboxEvent

返回:

  • error: 保存失败时返回错误

工作流程:

  1. 使用 getTxFunc 从 transaction.Transaction 中提取 *gorm.DB
  2. 调用 jxt-core 仓储的 SaveInTx 方法(传入 *gorm.DB)
  3. 返回结果

示例:

err := adapter.SaveInTx(ctx, tx, outboxEvent)

func (*OutboxRepositoryAdapter) Update

Update 更新事件

type OutboxService

type OutboxService struct {
	// contains filtered or unexported fields
}

OutboxService Outbox 服务示例

func NewOutboxService

func NewOutboxService(
	eventBus EventBus,
	repo outbox.OutboxRepository,
	topicMapper outbox.TopicMapper,
) *OutboxService

NewOutboxService 创建 Outbox 服务

func (*OutboxService) PublishEvent

func (s *OutboxService) PublishEvent(ctx context.Context, event *outbox.OutboxEvent) error

PublishEvent 发布事件

func (*OutboxService) PublishPendingEvents

func (s *OutboxService) PublishPendingEvents(ctx context.Context, limit int, tenantID string) (int, error)

PublishPendingEvents 发布待发布的事件

func (*OutboxService) Start

func (s *OutboxService) Start(ctx context.Context) error

Start 启动 Outbox 服务

func (*OutboxService) Stop

func (s *OutboxService) Stop()

Stop 停止 Outbox 服务

type TransactionInterface

type TransactionInterface interface {
	Commit() error
	Rollback() error
}

TransactionInterface 代表 evidence-management 的事务接口 实际使用时应该导入:jxt-evidence-system/evidence-management/shared/common/transaction

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL