rmq

package
v0.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultConsumerAwaitDuration           = time.Second * 5
	DefaultConsumerMaxMessageNum     int32 = 16
	DefaultConsumerInvisibleDuration       = time.Second * 20
	ConsumerMaxMessageNum            int32 = 32 //rocketmq sdk限制最大只能是32
	// DefaultConsumerStartTimeout 默认的 Consumer Start() 超时时间
	DefaultConsumerStartTimeout = time.Second * 10
)
View Source
const (
	// DefaultProducerStartTimeout 默认的 Producer Start() 超时时间
	// 如果遇到连接问题,可以尝试增加到 30-60 秒
	DefaultProducerStartTimeout = time.Second * 30
	// DefaultDialTimeout 默认的 gRPC 连接超时时间
	// 如果遇到连接问题,可以尝试增加到 20-30 秒
	DefaultDialTimeout = time.Second * 20
)
View Source
const (
	CodeMessageNotFound = v2.Code_MESSAGE_NOT_FOUND
)

Variables

View Source
var AsErrRpcStatus = rmqClient.AsErrRpcStatus
View Source
var NewFilterExpression = rmqClient.NewFilterExpression
View Source
var NewFilterExpressionWithType = rmqClient.NewFilterExpressionWithType

Functions

This section is empty.

Types

type AsyncSendHandler

type AsyncSendHandler = func(context.Context, []*SendReceipt, error)

type Consumer

type Consumer interface {
	Start() error
	Receive() ([]*MessageView, error)
	ReceiveWithContext(ctx context.Context) ([]*MessageView, error)
	GetSimpleConsumer() SimpleConsumer
	Ack(ctx context.Context, messageView *MessageView) error
	Close() error
}

func NewSimpleConsumer

func NewSimpleConsumer(conf *ConsumerConfig) (Consumer, error)

NewSimpleConsumer 创建新的简单消费者 conf: 消费者配置

  • StartTimeout: Start() 方法的超时时间,如果为 0 则使用默认值 10 秒

返回: Consumer 实例和错误 使用示例:

// 使用默认配置(超时 10 秒)
consumer, err := NewSimpleConsumer(&ConsumerConfig{
    Group:     "your-group",
    Endpoint:  "your-endpoint",
    AccessKey: "your-key",
    SecretKey: "your-secret",
    Topic:     "your-topic",
})

// 自定义超时时间
consumer, err := NewSimpleConsumer(&ConsumerConfig{
    Group:        "your-group",
    Endpoint:     "your-endpoint",
    AccessKey:    "your-key",
    SecretKey:    "your-secret",
    Topic:        "your-topic",
    StartTimeout: 30 * time.Second, // 自定义启动超时为 30 秒
})

type ConsumerConfig

type ConsumerConfig struct {
	Topic                   string
	Group                   string
	Endpoint                string
	AccessKey               string
	SecretKey               string
	SubscriptionExpressions map[string]*FilterExpression
	AwaitDuration           time.Duration // maximum waiting time for receive func
	MaxMessageNum           int32         // maximum number of messages received at one time
	InvisibleDuration       time.Duration // invisibleDuration should > 20s
	// StartTimeout Start() 方法的超时时间
	// 如果为 0,则使用默认值 DefaultConsumerStartTimeout (10秒)
	// 注意:这是应用层的超时控制,用于防止 Start() 方法无限阻塞
	// 官方 SDK 的 Start() 方法本身没有内置超时机制
	// 建议设置为 10-30 秒,避免启动时长时间阻塞
	StartTimeout time.Duration
}

type ErrRpcStatus added in v0.0.4

type ErrRpcStatus = rmqClient.ErrRpcStatus

type FilterExpression added in v0.0.3

type FilterExpression = rmqClient.FilterExpression

type Message

type Message = rmqClient.Message

type MessageView

type MessageView = rmqClient.MessageView

type MessageViewFunc

type MessageViewFunc func(*MessageView) bool

type Producer

type Producer interface {
	Start() error
	Close() error
	GetProducer() RProducer
	// SendNormalMessage 同步发送普通消息(阻塞方法)
	// 注意:此方法会阻塞等待服务器响应,建议使用 context 设置超时时间
	// 如果需要非阻塞发送,请使用 AsyncSendNormalMessage
	SendNormalMessage(context.Context, *Message) ([]*SendReceipt, error)
	// AsyncSendNormalMessage 异步发送普通消息(非阻塞方法)
	// handler: 发送完成后的回调函数
	AsyncSendNormalMessage(context.Context, *Message, AsyncSendHandler)
	// SendFifoMessage 同步发送顺序消息(阻塞方法)
	// 注意:此方法会阻塞等待服务器响应,建议使用 context 设置超时时间
	SendFifoMessage(context.Context, *Message) ([]*SendReceipt, error)
	// SendDelayMessage 同步发送延时消息(阻塞方法)
	// 注意:此方法会阻塞等待服务器响应,建议使用 context 设置超时时间
	SendDelayMessage(context.Context, *Message, time.Time) ([]*SendReceipt, error)
}

func NewProducer

func NewProducer(conf *ProducerConfig) (Producer, error)

NewProducer 创建新的生产者 conf: 生产者配置

  • StartTimeout: Start() 方法的超时时间,如果为 0 则使用默认值 30 秒
  • DialTimeout: gRPC 连接建立的超时时间,如果为 0 则使用默认值 20 秒

返回: Producer 实例和错误 使用示例:

// 使用默认配置(超时 30/20 秒)
producer, err := NewProducer(&ProducerConfig{
    Endpoint:  "your-endpoint",
    AccessKey: "your-key",
    SecretKey: "your-secret",
})

// 自定义超时时间
producer, err := NewProducer(&ProducerConfig{
    Endpoint:     "your-endpoint",
    AccessKey:    "your-key",
    SecretKey:    "your-secret",
    StartTimeout: 30 * time.Second, // 自定义启动超时为 30 秒
    DialTimeout:  15 * time.Second, // 自定义连接超时为 15 秒
})

type ProducerConfig

type ProducerConfig struct {
	//Topic     string
	Endpoint  string
	AccessKey string
	SecretKey string
	// StartTimeout Start() 方法的超时时间
	// 如果为 0,则使用默认值 DefaultProducerStartTimeout (30秒)
	// 注意:这是应用层的超时控制,用于防止 Start() 方法无限阻塞
	// 官方 SDK 的 Start() 方法本身没有内置超时机制
	// 如果遇到连接问题,建议设置为 30-60 秒
	StartTimeout time.Duration
	// DialTimeout gRPC 连接建立的超时时间
	// 如果为 0,则使用默认值 DefaultDialTimeout (20秒)
	// 注意:这是 SDK 层面的连接超时,用于控制 gRPC Dial 操作的超时
	// 如果遇到连接问题,建议设置为 20-30 秒,应该小于或等于 StartTimeout
	DialTimeout time.Duration
	// MaxAttempts 消息发送的最大重试次数(默认 SDK 为 3)
	// 注意:此参数只用于消息发送重试,不用于 Start() 连接过程
	// Start() 的连接过程没有最大尝试次数限制,会无限重试
	// 因此仍需要 StartTimeout 来控制 Start() 的超时
	MaxAttempts int32
}

type RProducer

type RProducer = rmqClient.Producer

type SendReceipt

type SendReceipt = rmqClient.SendReceipt

type SimpleConsumer

type SimpleConsumer = rmqClient.SimpleConsumer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL