queue

package
v0.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2025 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyQueue = fmt.Errorf("queue is empty")
View Source
var ErrMessageNotFound = fmt.Errorf("message not found")
View Source
var ErrQueueNotFound = fmt.Errorf("queue not found")

Functions

This section is empty.

Types

type Message

type Message struct {
	Group    string
	ID       uint64
	Priority int64
	Content  string
	Metadata map[string]string

	QueueName string `json:"QueueName,omitempty"`
	QueueType string `json:"QueueType,omitempty"`
}

func MessageFromBytes

func MessageFromBytes(data []byte) (*Message, error)

func (*Message) ToBytes

func (m *Message) ToBytes() ([]byte, error)

func (*Message) UpdatePriority

func (m *Message) UpdatePriority(newPriority int64)

type PriorityQueue

type PriorityQueue struct {
	PrometheusMetrics *metrics.PrometheusMetrics
	// contains filtered or unexported fields
}

func NewPriorityQueue

func NewPriorityQueue(
	db *badger.DB, cfg *config.Config, promMetrics *metrics.PrometheusMetrics,
) *PriorityQueue

func (*PriorityQueue) Ack added in v0.3.6

func (bpq *PriorityQueue) Ack(id uint64) error

func (*PriorityQueue) Create added in v0.3.6

func (bpq *PriorityQueue) Create(queueType, queueName string) error

func (*PriorityQueue) Delete added in v0.3.0

func (bpq *PriorityQueue) Delete(id uint64) error

func (*PriorityQueue) DeleteQueue added in v0.3.6

func (bpq *PriorityQueue) DeleteQueue() error

func (*PriorityQueue) Dequeue added in v0.3.6

func (bpq *PriorityQueue) Dequeue(ack bool) (*Message, error)

func (*PriorityQueue) Enqueue added in v0.3.6

func (bpq *PriorityQueue) Enqueue(
	id uint64,
	group string,
	priority int64,
	content string,
	metadata map[string]string,
) (*Message, error)

func (*PriorityQueue) Get added in v0.3.0

func (bpq *PriorityQueue) Get(id uint64) (*Message, error)

func (*PriorityQueue) GetMessagesKey added in v0.3.6

func (bpq *PriorityQueue) GetMessagesKey(id uint64) []byte

func (*PriorityQueue) GetQueueKey added in v0.3.6

func (bpq *PriorityQueue) GetQueueKey(queueName string) []byte

func (*PriorityQueue) GetStats added in v0.3.6

func (bpq *PriorityQueue) GetStats() *QueueInfo

func (*PriorityQueue) Init added in v0.3.6

func (bpq *PriorityQueue) Init(queueType, queueName string) error

func (*PriorityQueue) Len

func (bpq *PriorityQueue) Len() int

func (*PriorityQueue) Load added in v0.3.6

func (bpq *PriorityQueue) Load(queueName string, loadMessages bool) error

func (*PriorityQueue) Nack added in v0.3.6

func (bpq *PriorityQueue) Nack(id uint64, priority int64, metadata map[string]string) error

func (*PriorityQueue) PersistSnapshot added in v0.3.6

func (bpq *PriorityQueue) PersistSnapshot(sink raft.SnapshotSink) error

func (*PriorityQueue) StartAckQueueMonitoring added in v0.3.6

func (bpq *PriorityQueue) StartAckQueueMonitoring()

func (*PriorityQueue) StopAckQueueMonitoring added in v0.3.6

func (bpq *PriorityQueue) StopAckQueueMonitoring()

func (*PriorityQueue) UpdatePriority

func (bpq *PriorityQueue) UpdatePriority(id uint64, newPriority int64) error

type QueueConfig

type QueueConfig struct {
	Name string
	Type string
}

func QueueConfigFromBytes

func QueueConfigFromBytes(data []byte) (*QueueConfig, error)

func (*QueueConfig) ToBytes

func (qc *QueueConfig) ToBytes() ([]byte, error)

type QueueInfo added in v0.2.0

type QueueInfo struct {
	Name    string
	Type    string
	Stats   *metrics.Stats
	Ready   int64
	Unacked int64
	Total   int64
}

type QueueManager

type QueueManager struct {
	PrometheusMetrics *metrics.PrometheusMetrics
	// contains filtered or unexported fields
}

func NewQueueManager

func NewQueueManager(db *badger.DB, cfg *config.Config, metrics *metrics.PrometheusMetrics) *QueueManager

func (*QueueManager) CreateQueue added in v0.3.0

func (qm *QueueManager) CreateQueue(queueType, queueName string) (*PriorityQueue, error)

func (*QueueManager) DeleteQueue added in v0.3.0

func (qm *QueueManager) DeleteQueue(queueName string) error

func (*QueueManager) GetQueue

func (qm *QueueManager) GetQueue(queueName string) (*PriorityQueue, error)

func (*QueueManager) GetQueues added in v0.2.0

func (qm *QueueManager) GetQueues() []*QueueInfo

func (*QueueManager) LoadQueues added in v0.2.0

func (qm *QueueManager) LoadQueues()

func (*QueueManager) PersistSnapshot added in v0.3.2

func (qm *QueueManager) PersistSnapshot(sink raft.SnapshotSink) error

func (*QueueManager) RunValueLogGC

func (qm *QueueManager) RunValueLogGC()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL