eventbus

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 7 Imported by: 1

README

RabbitMQ 事件总线组件

组件介绍

这是一个基于 RabbitMQ 的事件总线实现,提供了简单的发布/订阅功能,使用 Watermill 库作为底层 AMQP 协议实现。

主要特性
  • 接口抽象:定义了通用的 EventBus 接口,便于替换不同的消息中间件实现
  • 发布订阅:支持消息的发布和订阅功能
  • 元数据支持:在消息中携带额外的元数据信息
  • 自动确认:订阅者处理完消息后自动发送 ACK 确认
核心组件
  • EventBus:事件总线接口定义
  • RabbitMQEventBus:RabbitMQ 实现的具体结构体
  • NewRabbitMQEventBus:创建事件总线实例的工厂函数

Event Bus

基于 RabbitMQ 的事件总线实现,使用 Watermill 库提供 AMQP 支持。

功能特性

  • 消息发布/订阅模式
  • 支持消息元数据
  • 自动消息确认机制
  • 接口化设计,易于扩展

安装

bash
go get github.com/your-repo/eventbus

使用示例

go
package main

import (
    "context"
    "eventbus"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    // 创建事件总线实例
    eb, err := eventbus.NewRabbitMQEventBus("amqp://user:password@localhost:5672/")
    if err != nil {
        panic(err)
    }

    // 订阅主题
    err = eb.Subscribe(context.Background(), "test-topic", func(msg *message.Message) {
        println("Received:", string(msg.Payload))
    })
    if err != nil {
        panic(err)
    }

    // 发布消息
    err = eb.Publish(context.Background(), "test-topic", []byte("Hello World"), nil)
    if err != nil {
        panic(err)
    }
}

API 接口

  • Publish(ctx, topic, payload, metadata) - 发布消息到指定主题
  • Subscribe(ctx, topic, handler) - 订阅指定主题的消息

依赖

  • github.com/ThreeDotsLabs/watermill
  • github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp

许可证

MIT

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EventMetadataToMap added in v0.2.0

func EventMetadataToMap(em *EventMetadata) map[string]string

EventMetadataToMap 将 EventMetadata 转换为 map[string]string

Types

type EventBus

type EventBus interface {
	Publish(ctx context.Context, topic string, payload []byte, metadata map[string]string) error
	Subscribe(ctx context.Context, topic string, handler EventHandler) error
}

EventBus 接口定义

func NewRabbitMQEventBus

func NewRabbitMQEventBus(amqpUri string, prefetchCount int) (EventBus, error)

NewRabbitMQEventBus 初始化事件总线 prefetchCount 控制 RabbitMQ QoS PrefetchCount 及并发消费数,<=0 时默认为 1

type EventHandler

type EventHandler func(eventID string, payload []byte, metadata map[string]string) error

type EventMetadata added in v0.2.0

type EventMetadata struct {
	//EventID      string  `json:"event_id"` // UUID
	EventType   string `json:"event_type"`
	TenantID    uint32 `json:"tenant_id"`
	UserID      uint32 `json:"user_id"`
	TaskID      uint32 `json:"task_id"` // 执行任务WorkTaskID
	WorkTaskID  uint32 `json:"work_task_id"`
	CompanyID   uint32 `json:"company_id"`
	CompanyName string `json:"company_name"`
}

EventMetadata 事件元数据,结果入库用

func MapToEventMetadata added in v0.2.0

func MapToEventMetadata(m map[string]string) (*EventMetadata, error)

type RabbitMQEventBus

type RabbitMQEventBus struct {
	// contains filtered or unexported fields
}

RabbitMQEventBus 实现

func (*RabbitMQEventBus) Publish

func (e *RabbitMQEventBus) Publish(ctx context.Context, topic string, payload []byte, metadata map[string]string) error

Publish 发布事件

func (*RabbitMQEventBus) Subscribe

func (e *RabbitMQEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) error

Subscribe 订阅事件,并发消费,并发度与 PrefetchCount 一致

type TaskPayload added in v0.2.0

type TaskPayload struct {
	OssPath string      `json:"oss_path"`
	Options interface{} `json:"options,omitempty"`
}

TaskPayload 任务信息,启动任务用

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL