queue_reliable

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2022 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type RedisQueueStatus

type RedisQueueStatus struct {
	Key         string // 标识消费者的唯一Key
	MachineName string // 机器名
	UserName    string // 用户名
	ProcessId   int    // 进程Id
	Ip          string // Ip地址
	CreateTime  int64  // 开始时间
	LastActive  int64  // 最后活跃时间
	Consumes    int64  // 消费消息数
	Acks        int64  // 确认消息数
}

func CreateStatus

func CreateStatus() RedisQueueStatus

type RedisReliableQueue

type RedisReliableQueue struct {
	DB                          int
	ThrowOnFailure              bool   // 失败时抛出异常。默认false
	RetryTimesWhenSendFailed    int    //发送消息失败时的重试次数。默认3次
	RetryIntervalWhenSendFailed int    // 重试间隔。默认1000ms
	AckKey                      string // 用于确认的列表
	RetryInterval               int64  // 重新处理确认队列中死信的间隔。默认60s
	MinPipeline                 int64  // 最小管道阈值,达到该值时使用管道,默认3

	IsEmpty bool             // 是否为空
	Status  RedisQueueStatus // 消费状态
	// contains filtered or unexported fields
}

func NewRedisReliable

func NewRedisReliable(client *redis.Client, key string, logger glog.ILogger) *RedisReliableQueue

func (*RedisReliableQueue) Add

func (r *RedisReliableQueue) Add(values ...interface{}) int64

Add 批量生产添加

func (*RedisReliableQueue) RetryAck

func (r *RedisReliableQueue) RetryAck()

RetryAck 消费获取,从Key弹出并备份到AckKey,支持阻塞 假定前面获取的消息已经确认,因该方法内部可能回滚确认队列,避免误杀 超时时间,默认0秒永远阻塞;负数表示直接返回,不阻塞。

func (*RedisReliableQueue) RollbackAck

func (r *RedisReliableQueue) RollbackAck(key, ackKey string) []string

RollbackAck 回滚指定AckKey内的消息到Key

func (*RedisReliableQueue) RollbackAllAck

func (r *RedisReliableQueue) RollbackAllAck() int64

RollbackAllAck 全局回滚死信,一般由单一线程执行,避免干扰处理中数据

func (*RedisReliableQueue) Take

func (r *RedisReliableQueue) Take(count ...int64)

func (*RedisReliableQueue) TakeOne

func (r *RedisReliableQueue) TakeOne(timeout ...int64) string

func (*RedisReliableQueue) UpdateStatus

func (r *RedisReliableQueue) UpdateStatus()

UpdateStatus 更新状态

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL