Documentation
¶
Index ¶
- type Client
- type ClientFactory
- type Consumer
- type DelayProcessor
- type DelayQueue
- func (dq *DelayQueue) Pop(ctx context.Context) (*message.Message, error)
- func (dq *DelayQueue) Push(ctx context.Context, msg *message.Message, delay time.Duration) error
- func (dq *DelayQueue) Remove(ctx context.Context, msgID string) error
- func (dq *DelayQueue) Size(ctx context.Context) (int64, error)
- type KeyGenerator
- type Producer
- type Redis
- type Task
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ClientFactory ¶
type ClientFactory struct {
// contains filtered or unexported fields
}
ClientFactory Redis客户端工厂
func NewClientFactory ¶
func NewClientFactory(config config.RedisConfig) *ClientFactory
NewClientFactory 创建客户端工厂
func (*ClientFactory) CreateClient ¶
func (f *ClientFactory) CreateClient() (Client, error)
CreateClient 创建Redis客户端
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer Redis消费者
func NewRedisConsumer ¶
func NewRedisConsumer( client redis.Cmdable, observer observability.Observer, config config.RedisConfig, recorder *observability.MetricsRecorder, ser serializer.Serializer, keys *KeyGenerator, ) *Consumer
NewRedisConsumer 创建Redis消费者
func (*Consumer) GetWorkerPoolStats ¶
GetWorkerPoolStats 获取工作池统计信息
type DelayProcessor ¶
type DelayProcessor struct {
// contains filtered or unexported fields
}
DelayProcessor 延时消息处理器
func NewDelayProcessor ¶
func NewDelayProcessor( client Client, observer observability.Observer, recorder *observability.MetricsRecorder, serializer serializer.Serializer, keys *KeyGenerator, ) *DelayProcessor
NewDelayProcessor 创建延时消息处理器
type DelayQueue ¶
type DelayQueue struct {
// contains filtered or unexported fields
}
DelayQueue Redis延时队列实现
func NewRedisDelayQueue ¶
func NewRedisDelayQueue( client redis.Cmdable, observer observability.Observer, recorder *observability.MetricsRecorder, serializer serializer.Serializer, keys *KeyGenerator, ) *DelayQueue
NewRedisDelayQueue 创建Redis延时队列
type KeyGenerator ¶
type KeyGenerator struct {
// contains filtered or unexported fields
}
KeyGenerator Redis键名生成器
func (*KeyGenerator) DelayMessageKey ¶
func (kg *KeyGenerator) DelayMessageKey(msgID string) string
DelayMessageKey 生成延时消息键名
func (*KeyGenerator) DelayQueueKey ¶
func (kg *KeyGenerator) DelayQueueKey() string
DelayQueueKey 生成延时队列键名
func (*KeyGenerator) QueueKey ¶
func (kg *KeyGenerator) QueueKey(topic string) string
QueueKey 生成队列键名
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer Redis生产者
func NewRedisProducer ¶
func NewRedisProducer( client redis.Cmdable, observer observability.Observer, config config.RedisConfig, recorder *observability.MetricsRecorder, ser serializer.Serializer, keys *KeyGenerator, ) *Producer
type Redis ¶
type Redis struct {
// contains filtered or unexported fields
}
Redis Redis消息队列实现
func NewRedisMQ ¶
func NewRedisMQ(cfg config.RedisConfig, observer observability.Observer, keyPrefix string) (*Redis, error)
NewRedisMQ 创建Redis消息队列
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool 工作池
func NewWorkerPool ¶
func NewWorkerPool(workerCount, bufferSize int, logger *zap.Logger, metrics *observability.MetricsRecorder) *WorkerPool
NewWorkerPool 创建工作池
Click to show internal directories.
Click to hide internal directories.