memory

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer 内存消费者

func NewMemoryConsumer

func NewMemoryConsumer(cfg config.MemoryConfig, recorder *observability.MetricsRecorder, keyPrefix string) *Consumer

NewMemoryConsumer 创建内存消费者

func (*Consumer) Close

func (c *Consumer) Close() error

Close 关闭消费者

func (*Consumer) SetProducer

func (c *Consumer) SetProducer(producer *Producer)

SetProducer 设置生产者引用(用于访问队列)

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(ctx context.Context, topic string, handler message.Handler) error

Subscribe 订阅消息

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe(topic string) error

Unsubscribe 取消订阅

type DelayHeap

type DelayHeap []*DelayMessage

DelayHeap 延时消息堆

func (DelayHeap) Len

func (h DelayHeap) Len() int

func (DelayHeap) Less

func (h DelayHeap) Less(i, j int) bool

func (*DelayHeap) Pop

func (h *DelayHeap) Pop() interface{}

func (*DelayHeap) Push

func (h *DelayHeap) Push(x interface{})

func (DelayHeap) Swap

func (h DelayHeap) Swap(i, j int)

type DelayMessage

type DelayMessage struct {
	Message   *message.Message
	ExecuteAt time.Time
	Index     int // heap中的索引
}

DelayMessage 延时消息

type DelayQueue

type DelayQueue struct {
	// contains filtered or unexported fields
}

DelayQueue 延时队列

func NewMemoryDelayQueue

func NewMemoryDelayQueue(cfg config.MemoryConfig, recorder *observability.MetricsRecorder, keyPrefix string) *DelayQueue

NewMemoryDelayQueue 创建内存延时队列

func (*DelayQueue) Pop

func (dq *DelayQueue) Pop(ctx context.Context) (*message.Message, error)

Pop 弹出到期消息

func (*DelayQueue) Push

func (dq *DelayQueue) Push(ctx context.Context, msg *message.Message, delay time.Duration) error

Push 推送延时消息

func (*DelayQueue) Remove

func (dq *DelayQueue) Remove(ctx context.Context, msgID string) error

Remove 移除消息

func (*DelayQueue) Size

func (dq *DelayQueue) Size(ctx context.Context) (int64, error)

Size 获取队列大小

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

Memory 内存消息队列实现

func NewMemoryMQ

func NewMemoryMQ(cfg config.MemoryConfig, observer observability.Observer, keyPrefix string) (*Memory, error)

NewMemoryMQ 创建内存消息队列

func (*Memory) Close

func (m *Memory) Close() error

Close 关闭消息队列

func (*Memory) Consumer

func (m *Memory) Consumer() contract.Consumer

Consumer 返回消费者

func (*Memory) DelayQueue

func (m *Memory) DelayQueue() contract.DelayQueue

DelayQueue 返回延时队列

func (*Memory) HealthCheck

func (m *Memory) HealthCheck() error

HealthCheck 健康检查

func (*Memory) Producer

func (m *Memory) Producer() contract.Producer

Producer 返回生产者

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer 内存生产者

func NewMemoryProducer

func NewMemoryProducer(cfg config.MemoryConfig, recorder *observability.MetricsRecorder, keyPrefix string) *Producer

NewMemoryProducer 创建内存生产者

func (*Producer) Close

func (p *Producer) Close() error

Close 关闭生产者

func (*Producer) GetQueue

func (p *Producer) GetQueue(topic string) *Queue

GetQueue 获取指定topic的队列(用于消费者)

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, msg *message.Message) error

Send 发送消息

func (*Producer) SendBatch

func (p *Producer) SendBatch(ctx context.Context, msgs []*message.Message) error

SendBatch 批量发送消息

func (*Producer) SendDelay

func (p *Producer) SendDelay(ctx context.Context, msg *message.Message, delay time.Duration) error

SendDelay 发送延时消息

func (*Producer) SetDelayQueue

func (p *Producer) SetDelayQueue(delayQueue *DelayQueue)

SetDelayQueue 设置延时队列引用

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue 内存队列

func NewQueue

func NewQueue(maxSize int) *Queue

NewQueue 创建新队列

func (*Queue) IsEmpty

func (q *Queue) IsEmpty() bool

IsEmpty 检查队列是否为空

func (*Queue) Pop

func (q *Queue) Pop() *message.Message

Pop 从队列弹出消息

func (*Queue) Push

func (q *Queue) Push(msg *message.Message) error

Push 推送消息到队列

func (*Queue) Size

func (q *Queue) Size() int

Size 获取队列大小

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL