Documentation
¶
Index ¶
- type MsgStorage
- func (storage *MsgStorage) Add(message *amqp.Message, queue string) error
- func (storage *MsgStorage) Close() error
- func (storage *MsgStorage) Del(message *amqp.Message, queue string) error
- func (storage *MsgStorage) GetQueueLength(queue string) uint64
- func (storage *MsgStorage) Iterate(fn func(queue string, message *amqp.Message))
- func (storage *MsgStorage) IterateByQueue(queue string, limit uint64, fn func(message *amqp.Message))
- func (storage *MsgStorage) IterateByQueueFromMsgID(queue string, msgID uint64, limit uint64, fn func(message *amqp.Message)) uint64
- func (storage *MsgStorage) PurgeQueue(queue string)
- func (storage *MsgStorage) ReceiveConfirms() chan *amqp.Message
- func (storage *MsgStorage) Update(message *amqp.Message, queue string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MsgStorage ¶
type MsgStorage struct {
// contains filtered or unexported fields
}
MsgStorage represents storage for store all durable messages All operations (add, update and delete) store into little queues and periodically persist every 20ms If storage in confirm-mode - in every persisted message storage send confirm to vhost
func NewMsgStorage ¶
func NewMsgStorage(db interfaces.DbStorage, protoVersion string) *MsgStorage
NewMsgStorage returns new instance of message storage
func (*MsgStorage) Add ¶
func (storage *MsgStorage) Add(message *amqp.Message, queue string) error
Add append message into add-queue
func (*MsgStorage) Close ¶
func (storage *MsgStorage) Close() error
Close properly "stop" message storage
func (*MsgStorage) Del ¶
func (storage *MsgStorage) Del(message *amqp.Message, queue string) error
Del append message into del-queue
func (*MsgStorage) GetQueueLength ¶
func (storage *MsgStorage) GetQueueLength(queue string) uint64
GetQueueLength returns queue length in message storage
func (*MsgStorage) Iterate ¶
func (storage *MsgStorage) Iterate(fn func(queue string, message *amqp.Message))
Iterate iterates over all messages
func (*MsgStorage) IterateByQueue ¶
func (storage *MsgStorage) IterateByQueue(queue string, limit uint64, fn func(message *amqp.Message))
IterateByQueue iterates over queue and call fn on each message
func (*MsgStorage) IterateByQueueFromMsgID ¶
func (storage *MsgStorage) IterateByQueueFromMsgID(queue string, msgID uint64, limit uint64, fn func(message *amqp.Message)) uint64
IterateByQueueFromMsgID iterates over queue from specific msgId and call fn on each message
func (*MsgStorage) PurgeQueue ¶
func (storage *MsgStorage) PurgeQueue(queue string)
PurgeQueue delete messages
func (*MsgStorage) ReceiveConfirms ¶
func (storage *MsgStorage) ReceiveConfirms() chan *amqp.Message
ReceiveConfirms set message storage in confirm mode and return channel for receive confirms