redis

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2025 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	redis.Cmdable
	Close() error
	Ping(ctx context.Context) *redis.StatusCmd
}

Client Redis客户端接口

type ClientFactory

type ClientFactory struct {
	// contains filtered or unexported fields
}

ClientFactory Redis客户端工厂

func NewClientFactory

func NewClientFactory(config config.RedisConfig) *ClientFactory

NewClientFactory 创建客户端工厂

func (*ClientFactory) CreateClient

func (f *ClientFactory) CreateClient() (Client, error)

CreateClient 创建Redis客户端

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer Redis消费者

func NewRedisConsumer

func NewRedisConsumer(
	client redis.Cmdable,
	observer observability.Observer,
	config config.RedisConfig,
	recorder *observability.MetricsRecorder,
	ser serializer.Serializer,
	keys *KeyGenerator,
) *Consumer

NewRedisConsumer 创建Redis消费者

func (*Consumer) Close

func (c *Consumer) Close() error

Close 关闭消费者

func (*Consumer) GetWorkerPoolStats

func (c *Consumer) GetWorkerPoolStats() (queueSize, queueCapacity int)

GetWorkerPoolStats 获取工作池统计信息

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(ctx context.Context, topic string, handler message.Handler) error

Subscribe 订阅消息

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe(topic string) error

Unsubscribe 取消订阅

type DelayProcessor

type DelayProcessor struct {
	// contains filtered or unexported fields
}

DelayProcessor 延时消息处理器

func NewDelayProcessor

func NewDelayProcessor(
	client Client,
	observer observability.Observer,
	recorder *observability.MetricsRecorder,
	serializer serializer.Serializer,
	keys *KeyGenerator,
) *DelayProcessor

NewDelayProcessor 创建延时消息处理器

func (*DelayProcessor) Start

func (dp *DelayProcessor) Start(ctx context.Context)

Start 启动延时消息处理

type DelayQueue

type DelayQueue struct {
	// contains filtered or unexported fields
}

DelayQueue Redis延时队列实现

func NewRedisDelayQueue

func NewRedisDelayQueue(
	client redis.Cmdable,
	observer observability.Observer,
	recorder *observability.MetricsRecorder,
	serializer serializer.Serializer,
	keys *KeyGenerator,
) *DelayQueue

NewRedisDelayQueue 创建Redis延时队列

func (*DelayQueue) Pop

func (dq *DelayQueue) Pop(ctx context.Context) (*message.Message, error)

Pop 弹出到期消息

func (*DelayQueue) Push

func (dq *DelayQueue) Push(ctx context.Context, msg *message.Message, delay time.Duration) error

Push 推送延时消息

func (*DelayQueue) Remove

func (dq *DelayQueue) Remove(ctx context.Context, msgID string) error

Remove 移除消息

func (*DelayQueue) Size

func (dq *DelayQueue) Size(ctx context.Context) (int64, error)

Size 获取队列大小

type KeyGenerator

type KeyGenerator struct {
	// contains filtered or unexported fields
}

KeyGenerator Redis键名生成器

func NewKeyGenerator

func NewKeyGenerator(keyPrefix string) *KeyGenerator

NewKeyGenerator 创建键名生成器

func (*KeyGenerator) DelayMessageKey

func (kg *KeyGenerator) DelayMessageKey(msgID string) string

DelayMessageKey 生成延时消息键名

func (*KeyGenerator) DelayQueueKey

func (kg *KeyGenerator) DelayQueueKey() string

DelayQueueKey 生成延时队列键名

func (*KeyGenerator) QueueKey

func (kg *KeyGenerator) QueueKey(topic string) string

QueueKey 生成队列键名

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer Redis生产者

func NewRedisProducer

func NewRedisProducer(
	client redis.Cmdable,
	observer observability.Observer,
	config config.RedisConfig,
	recorder *observability.MetricsRecorder,
	ser serializer.Serializer,
	keys *KeyGenerator,
) *Producer

func (*Producer) Close

func (p *Producer) Close() error

Close 关闭生产者

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, msg *message.Message) error

func (*Producer) SendBatch

func (p *Producer) SendBatch(ctx context.Context, messages []*message.Message) error

SendBatch 批量发送消息

func (*Producer) SendDelay

func (p *Producer) SendDelay(ctx context.Context, msg *message.Message, delay time.Duration) error

SendDelay 发送延时消息

type Redis

type Redis struct {
	// contains filtered or unexported fields
}

Redis Redis消息队列实现

func NewRedisMQ

func NewRedisMQ(cfg config.RedisConfig, observer observability.Observer, keyPrefix string) (*Redis, error)

NewRedisMQ 创建Redis消息队列

func (*Redis) Close

func (r *Redis) Close() error

Close 关闭连接

func (*Redis) Consumer

func (r *Redis) Consumer() contract.Consumer

Consumer 获取消费者

func (*Redis) DelayQueue

func (r *Redis) DelayQueue() contract.DelayQueue

DelayQueue 获取延时队列

func (*Redis) HealthCheck

func (r *Redis) HealthCheck() error

HealthCheck 健康检查

func (*Redis) Producer

func (r *Redis) Producer() contract.Producer

Producer 获取生产者

type Task

type Task struct {
	Message *message.Message
	Handler message.Handler
	Topic   string
}

Task 任务

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool 工作池

func NewWorkerPool

func NewWorkerPool(workerCount, bufferSize int, logger *zap.Logger, metrics *observability.MetricsRecorder) *WorkerPool

NewWorkerPool 创建工作池

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start 启动工作池

func (*WorkerPool) Stats

func (wp *WorkerPool) Stats() (int, int)

Stats 获取统计信息

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop 停止工作池

func (*WorkerPool) Submit

func (wp *WorkerPool) Submit(task *Task) bool

Submit 提交任务

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL