rabbitmq

package
v2.0.86 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2025 License: MIT Imports: 12 Imported by: 0

README

rabbitmq 客户端

开发环境搭建

docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=guest \
  -e RABBITMQ_DEFAULT_PASS=guest \
  rabbitmq:3-management

通过 http://localhost:15672 访问管理界面

Documentation

Index

Constants

View Source
const (
	APP_NAME = "rabbitmq"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CallBackHandler

type CallBackHandler func(ctx context.Context, msg *Message) error

CallBackHandler 消息处理回调

type Client

type Client struct {
	ioc.ObjectImpl

	// 连接配置
	RabbitConnConfig
	// contains filtered or unexported fields
}

func (*Client) Close

func (c *Client) Close(ctx context.Context)

func (*Client) Init

func (c *Client) Init() error

func (*Client) Name

func (c *Client) Name() string

func (*Client) Priority added in v2.0.72

func (i *Client) Priority() int

type ConsumeOption

type ConsumeOption func(*consumerConfig)

ConsumeOption 消费者配置选项

func WithArgs

func WithArgs(args amqp.Table) ConsumeOption

func WithAutoAck

func WithAutoAck(autoAck bool) ConsumeOption

func WithExchangeType

func WithExchangeType(exchangeType EXCHANGE_TYPE) ConsumeOption

func WithExclusive

func WithExclusive(exclusive bool) ConsumeOption

func WithRoutingKey

func WithRoutingKey(routingKey string) ConsumeOption

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer RabbitMQ消费者

func NewConsumer

func NewConsumer() (*Consumer, error)

NewConsumer 创建新的消费者实例

func (*Consumer) Close

func (c *Consumer) Close() error

Close 关闭消费者

func (*Consumer) DirectSubscribe

func (c *Consumer) DirectSubscribe(ctx context.Context, exchange, queue string, cb CallBackHandler, opts ...ConsumeOption) error

Queue 队列模式 (Competing Consumers) 简化使用, 直接使用默认exchange

func (*Consumer) FanoutSubscribe

func (c *Consumer) FanoutSubscribe(ctx context.Context, subject string, cb CallBackHandler, opts ...ConsumeOption) error

Subscribe Fanout订阅模式 (Pub/Sub)

func (*Consumer) TopicSubscribe

func (c *Consumer) TopicSubscribe(ctx context.Context, exchange, routingKey string, cb CallBackHandler, opts ...ConsumeOption) error

TopicSubscribe Topic订阅模式 (Pub/Sub with wildcard routing) subject: 交换机名称 routingKey: 绑定队列的路由键(支持 * 和 # 通配符)

通配符规则:
  * 匹配一个单词(如 order.* 可匹配 order.paid,但不能匹配 order.paid.failed)。
  # 匹配零或多个单词(如 order.# 可匹配 order.paid 和 order.paid.failed)。

cb: 消息处理回调函数 opts: 可选配置项(如自动确认、独占队列等)

type EXCHANGE_TYPE

type EXCHANGE_TYPE string
const (
	EXCHANGE_TYPE_DIRECT  EXCHANGE_TYPE = "direct"  // Routing Key 完全匹配 Binding Key - 精确路由(如任务分发)
	EXCHANGE_TYPE_FANOUT  EXCHANGE_TYPE = "fanout"  // 忽略 Routing Key,广播到所有队列 - 发布/订阅(如事件通知)
	EXCHANGE_TYPE_TOPIC   EXCHANGE_TYPE = "topic"   // Routing Key 通配符匹配 Binding Key - 动态路由(如分类日志)
	EXCHANGE_TYPE_HEADERS EXCHANGE_TYPE = "headers" // 匹配消息的 Headers 属性 - 复杂条件过滤(如协议适配)
	EXCHANGE_TYPE_DEFAULT EXCHANGE_TYPE = ""        // 默认交换机: Routing Key = 队列名 - 简单测试或直接队列通信
)

func (EXCHANGE_TYPE) String

func (e EXCHANGE_TYPE) String() string

type Message

type Message struct {
	Exchange   string
	RoutingKey string
	Mandatory  bool
	Immediate  bool
	Body       []byte
	Headers    amqp.Table
}

Message 消息结构

func NewFanoutMessage

func NewFanoutMessage(topic string, data []byte) *Message

func NewQueueMessage

func NewQueueMessage(queue string, data []byte) *Message

func NewTopicMessage

func NewTopicMessage(topic, routingKey string, data []byte) *Message

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher 增强版,支持自动重连

func NewPublisher

func NewPublisher() (*Publisher, error)

NewPublisher 创建支持自动重连的Publisher

func (*Publisher) Close

func (p *Publisher) Close() error

Close 关闭Publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, msg *Message) error

Publish 发布消息(支持自动重连)

type RabbitConn

type RabbitConn struct {
	// contains filtered or unexported fields
}

func GetConn

func GetConn() *RabbitConn

func NewRabbitConn

func NewRabbitConn(conf RabbitConnConfig) (*RabbitConn, error)

func (*RabbitConn) Close

func (rc *RabbitConn) Close() error

Close 关闭连接

func (*RabbitConn) GetConnection

func (rc *RabbitConn) GetConnection() *amqp.Connection

GetConnection 获取当前连接(线程安全)

func (*RabbitConn) IsClosed

func (rc *RabbitConn) IsClosed() bool

IsClosed 检查连接是否关闭

func (*RabbitConn) RegisterReconnectCallback

func (rc *RabbitConn) RegisterReconnectCallback(cb func(*amqp.Connection))

RegisterReconnectCallback 注册重连回调

type RabbitConnConfig

type RabbitConnConfig struct {
	// RabbitMQ连接URL
	URL string `toml:"url" json:"url" yaml:"url"  env:"URL"`
	// 连接超时时间
	Timeout time.Duration `toml:"timeout" json:"timeout" yaml:"timeout"  env:"TIMEOUT"`
	// 心跳间隔
	Heartbeat time.Duration `toml:"heart_beat" json:"heart_beat" yaml:"heart_beat"  env:"HEART_BEAT"`
	// 重连配置
	ReconnectInterval time.Duration `toml:"reconect_interval" json:"reconect_interval" yaml:"reconect_interval"  env:"RECONNECT_INTERVAL"`
	// 最大重连次数, 为0 时表示无限重连
	MaxReconnectAttempts int `toml:"max_reconnect_attempts" json:"max_reconnect_attempts" yaml:"max_reconnect_attempts"  env:"MAX_RECONNECT_ATTEMPTS"`
	// 开启链路追踪
	Trace bool `toml:"trace" json:"trace" yaml:"trace"  env:"TRACE"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL