Documentation
¶
Index ¶
- Constants
- type CallBackHandler
- type Client
- type ConsumeOption
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) DirectSubscribe(ctx context.Context, exchange, queue string, cb CallBackHandler, ...) error
- func (c *Consumer) FanoutSubscribe(ctx context.Context, subject string, cb CallBackHandler, opts ...ConsumeOption) error
- func (c *Consumer) TopicSubscribe(ctx context.Context, exchange, routingKey string, cb CallBackHandler, ...) error
- type EXCHANGE_TYPE
- type Message
- type Publisher
- type RabbitConn
- type RabbitConnConfig
Constants ¶
View Source
const (
APP_NAME = "rabbitmq"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CallBackHandler ¶
CallBackHandler 消息处理回调
type Client ¶
type Client struct { ioc.ObjectImpl // 连接配置 RabbitConnConfig // contains filtered or unexported fields }
type ConsumeOption ¶
type ConsumeOption func(*consumerConfig)
ConsumeOption 消费者配置选项
func WithArgs ¶
func WithArgs(args amqp.Table) ConsumeOption
func WithAutoAck ¶
func WithAutoAck(autoAck bool) ConsumeOption
func WithExchangeType ¶
func WithExchangeType(exchangeType EXCHANGE_TYPE) ConsumeOption
func WithExclusive ¶
func WithExclusive(exclusive bool) ConsumeOption
func WithRoutingKey ¶
func WithRoutingKey(routingKey string) ConsumeOption
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer RabbitMQ消费者
func (*Consumer) DirectSubscribe ¶
func (c *Consumer) DirectSubscribe(ctx context.Context, exchange, queue string, cb CallBackHandler, opts ...ConsumeOption) error
Queue 队列模式 (Competing Consumers) 简化使用, 直接使用默认exchange
func (*Consumer) FanoutSubscribe ¶
func (c *Consumer) FanoutSubscribe(ctx context.Context, subject string, cb CallBackHandler, opts ...ConsumeOption) error
Subscribe Fanout订阅模式 (Pub/Sub)
func (*Consumer) TopicSubscribe ¶
func (c *Consumer) TopicSubscribe(ctx context.Context, exchange, routingKey string, cb CallBackHandler, opts ...ConsumeOption) error
TopicSubscribe Topic订阅模式 (Pub/Sub with wildcard routing) subject: 交换机名称 routingKey: 绑定队列的路由键(支持 * 和 # 通配符)
通配符规则: * 匹配一个单词(如 order.* 可匹配 order.paid,但不能匹配 order.paid.failed)。 # 匹配零或多个单词(如 order.# 可匹配 order.paid 和 order.paid.failed)。
cb: 消息处理回调函数 opts: 可选配置项(如自动确认、独占队列等)
type EXCHANGE_TYPE ¶
type EXCHANGE_TYPE string
const ( EXCHANGE_TYPE_DIRECT EXCHANGE_TYPE = "direct" // Routing Key 完全匹配 Binding Key - 精确路由(如任务分发) EXCHANGE_TYPE_FANOUT EXCHANGE_TYPE = "fanout" // 忽略 Routing Key,广播到所有队列 - 发布/订阅(如事件通知) EXCHANGE_TYPE_TOPIC EXCHANGE_TYPE = "topic" // Routing Key 通配符匹配 Binding Key - 动态路由(如分类日志) EXCHANGE_TYPE_HEADERS EXCHANGE_TYPE = "headers" // 匹配消息的 Headers 属性 - 复杂条件过滤(如协议适配) EXCHANGE_TYPE_DEFAULT EXCHANGE_TYPE = "" // 默认交换机: Routing Key = 队列名 - 简单测试或直接队列通信 )
func (EXCHANGE_TYPE) String ¶
func (e EXCHANGE_TYPE) String() string
type Message ¶
type Message struct { Exchange string RoutingKey string Mandatory bool Immediate bool Body []byte Headers amqp.Table }
Message 消息结构
func NewFanoutMessage ¶
func NewQueueMessage ¶
func NewTopicMessage ¶
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher 增强版,支持自动重连
type RabbitConn ¶
type RabbitConn struct {
// contains filtered or unexported fields
}
func GetConn ¶
func GetConn() *RabbitConn
func NewRabbitConn ¶
func NewRabbitConn(conf RabbitConnConfig) (*RabbitConn, error)
func (*RabbitConn) GetConnection ¶
func (rc *RabbitConn) GetConnection() *amqp.Connection
GetConnection 获取当前连接(线程安全)
func (*RabbitConn) RegisterReconnectCallback ¶
func (rc *RabbitConn) RegisterReconnectCallback(cb func(*amqp.Connection))
RegisterReconnectCallback 注册重连回调
type RabbitConnConfig ¶
type RabbitConnConfig struct { // RabbitMQ连接URL URL string `toml:"url" json:"url" yaml:"url" env:"URL"` // 连接超时时间 Timeout time.Duration `toml:"timeout" json:"timeout" yaml:"timeout" env:"TIMEOUT"` // 心跳间隔 Heartbeat time.Duration `toml:"heart_beat" json:"heart_beat" yaml:"heart_beat" env:"HEART_BEAT"` // 重连配置 ReconnectInterval time.Duration `toml:"reconect_interval" json:"reconect_interval" yaml:"reconect_interval" env:"RECONNECT_INTERVAL"` // 最大重连次数, 为0 时表示无限重连 MaxReconnectAttempts int `toml:"max_reconnect_attempts" json:"max_reconnect_attempts" yaml:"max_reconnect_attempts" env:"MAX_RECONNECT_ATTEMPTS"` // 开启链路追踪 Trace bool `toml:"trace" json:"trace" yaml:"trace" env:"TRACE"` }
Click to show internal directories.
Click to hide internal directories.