Documentation
¶
Index ¶
Constants ¶
View Source
const ( DefaultConsumerAwaitDuration = time.Second * 5 DefaultConsumerMaxMessageNum int32 = 16 DefaultConsumerInvisibleDuration = time.Second * 20 ConsumerMaxMessageNum int32 = 32 //rocketmq sdk限制最大只能是32 // DefaultConsumerStartTimeout 默认的 Consumer Start() 超时时间 DefaultConsumerStartTimeout = time.Second * 10 )
View Source
const ( // DefaultProducerStartTimeout 默认的 Producer Start() 超时时间 // 如果遇到连接问题,可以尝试增加到 30-60 秒 DefaultProducerStartTimeout = time.Second * 30 // DefaultDialTimeout 默认的 gRPC 连接超时时间 // 如果遇到连接问题,可以尝试增加到 20-30 秒 DefaultDialTimeout = time.Second * 20 )
View Source
const (
CodeMessageNotFound = v2.Code_MESSAGE_NOT_FOUND
)
Variables ¶
View Source
var AsErrRpcStatus = rmqClient.AsErrRpcStatus
View Source
var NewFilterExpression = rmqClient.NewFilterExpression
View Source
var NewFilterExpressionWithType = rmqClient.NewFilterExpressionWithType
View Source
var SubAll = rmqClient.SUB_ALL
Functions ¶
This section is empty.
Types ¶
type AsyncSendHandler ¶
type AsyncSendHandler = func(context.Context, []*SendReceipt, error)
type Consumer ¶
type Consumer interface {
Start() error
Receive() ([]*MessageView, error)
ReceiveWithContext(ctx context.Context) ([]*MessageView, error)
GetSimpleConsumer() SimpleConsumer
Ack(ctx context.Context, messageView *MessageView) error
Close() error
}
func NewSimpleConsumer ¶
func NewSimpleConsumer(conf *ConsumerConfig) (Consumer, error)
NewSimpleConsumer 创建新的简单消费者 conf: 消费者配置
- StartTimeout: Start() 方法的超时时间,如果为 0 则使用默认值 10 秒
返回: Consumer 实例和错误 使用示例:
// 使用默认配置(超时 10 秒)
consumer, err := NewSimpleConsumer(&ConsumerConfig{
Group: "your-group",
Endpoint: "your-endpoint",
AccessKey: "your-key",
SecretKey: "your-secret",
Topic: "your-topic",
})
// 自定义超时时间
consumer, err := NewSimpleConsumer(&ConsumerConfig{
Group: "your-group",
Endpoint: "your-endpoint",
AccessKey: "your-key",
SecretKey: "your-secret",
Topic: "your-topic",
StartTimeout: 30 * time.Second, // 自定义启动超时为 30 秒
})
type ConsumerConfig ¶
type ConsumerConfig struct {
Topic string
Group string
Endpoint string
AccessKey string
SecretKey string
SubscriptionExpressions map[string]*FilterExpression
AwaitDuration time.Duration // maximum waiting time for receive func
MaxMessageNum int32 // maximum number of messages received at one time
InvisibleDuration time.Duration // invisibleDuration should > 20s
// StartTimeout Start() 方法的超时时间
// 如果为 0,则使用默认值 DefaultConsumerStartTimeout (10秒)
// 注意:这是应用层的超时控制,用于防止 Start() 方法无限阻塞
// 官方 SDK 的 Start() 方法本身没有内置超时机制
// 建议设置为 10-30 秒,避免启动时长时间阻塞
StartTimeout time.Duration
}
type ErrRpcStatus ¶ added in v0.0.4
type ErrRpcStatus = rmqClient.ErrRpcStatus
type FilterExpression ¶ added in v0.0.3
type FilterExpression = rmqClient.FilterExpression
type MessageView ¶
type MessageView = rmqClient.MessageView
type MessageViewFunc ¶
type MessageViewFunc func(*MessageView) bool
type Producer ¶
type Producer interface {
Start() error
Close() error
GetProducer() RProducer
// SendNormalMessage 同步发送普通消息(阻塞方法)
// 注意:此方法会阻塞等待服务器响应,建议使用 context 设置超时时间
// 如果需要非阻塞发送,请使用 AsyncSendNormalMessage
SendNormalMessage(context.Context, *Message) ([]*SendReceipt, error)
// AsyncSendNormalMessage 异步发送普通消息(非阻塞方法)
// handler: 发送完成后的回调函数
AsyncSendNormalMessage(context.Context, *Message, AsyncSendHandler)
// SendFifoMessage 同步发送顺序消息(阻塞方法)
// 注意:此方法会阻塞等待服务器响应,建议使用 context 设置超时时间
SendFifoMessage(context.Context, *Message) ([]*SendReceipt, error)
// SendDelayMessage 同步发送延时消息(阻塞方法)
// 注意:此方法会阻塞等待服务器响应,建议使用 context 设置超时时间
SendDelayMessage(context.Context, *Message, time.Time) ([]*SendReceipt, error)
}
func NewProducer ¶
func NewProducer(conf *ProducerConfig) (Producer, error)
NewProducer 创建新的生产者 conf: 生产者配置
- StartTimeout: Start() 方法的超时时间,如果为 0 则使用默认值 30 秒
- DialTimeout: gRPC 连接建立的超时时间,如果为 0 则使用默认值 20 秒
返回: Producer 实例和错误 使用示例:
// 使用默认配置(超时 30/20 秒)
producer, err := NewProducer(&ProducerConfig{
Endpoint: "your-endpoint",
AccessKey: "your-key",
SecretKey: "your-secret",
})
// 自定义超时时间
producer, err := NewProducer(&ProducerConfig{
Endpoint: "your-endpoint",
AccessKey: "your-key",
SecretKey: "your-secret",
StartTimeout: 30 * time.Second, // 自定义启动超时为 30 秒
DialTimeout: 15 * time.Second, // 自定义连接超时为 15 秒
})
type ProducerConfig ¶
type ProducerConfig struct {
//Topic string
Endpoint string
AccessKey string
SecretKey string
// StartTimeout Start() 方法的超时时间
// 如果为 0,则使用默认值 DefaultProducerStartTimeout (30秒)
// 注意:这是应用层的超时控制,用于防止 Start() 方法无限阻塞
// 官方 SDK 的 Start() 方法本身没有内置超时机制
// 如果遇到连接问题,建议设置为 30-60 秒
StartTimeout time.Duration
// DialTimeout gRPC 连接建立的超时时间
// 如果为 0,则使用默认值 DefaultDialTimeout (20秒)
// 注意:这是 SDK 层面的连接超时,用于控制 gRPC Dial 操作的超时
// 如果遇到连接问题,建议设置为 20-30 秒,应该小于或等于 StartTimeout
DialTimeout time.Duration
// MaxAttempts 消息发送的最大重试次数(默认 SDK 为 3)
// 注意:此参数只用于消息发送重试,不用于 Start() 连接过程
// Start() 的连接过程没有最大尝试次数限制,会无限重试
// 因此仍需要 StartTimeout 来控制 Start() 的超时
MaxAttempts int32
}
type SendReceipt ¶
type SendReceipt = rmqClient.SendReceipt
type SimpleConsumer ¶
type SimpleConsumer = rmqClient.SimpleConsumer
Click to show internal directories.
Click to hide internal directories.