queue

package
v0.2.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyQueue = fmt.Errorf("queue is empty")
View Source
var ErrMessageNotFound = fmt.Errorf("message not found")
View Source
var ErrQueueNotFound = fmt.Errorf("queue not found")

Functions

func NewQueueStats added in v0.2.0

func NewQueueStats(windowSize int) *queueStats

Types

type BadgerPriorityQueue

type BadgerPriorityQueue struct {
	PrometheusMetrics *PrometheusMetrics
	// contains filtered or unexported fields
}

func NewBadgerPriorityQueue

func NewBadgerPriorityQueue(db *badger.DB, cfg *config.Config, metrics *PrometheusMetrics) *BadgerPriorityQueue

func (*BadgerPriorityQueue) Ack

func (bpq *BadgerPriorityQueue) Ack(id uint64) error

func (*BadgerPriorityQueue) Create

func (bpq *BadgerPriorityQueue) Create(queueType, queueName string) error

func (*BadgerPriorityQueue) Delete

func (bpq *BadgerPriorityQueue) Delete() error

func (*BadgerPriorityQueue) Dequeue

func (bpq *BadgerPriorityQueue) Dequeue(ack bool) (*Message, error)

func (*BadgerPriorityQueue) Enqueue

func (bpq *BadgerPriorityQueue) Enqueue(id uint64, group string, priority int64, content string) (*Message, error)

func (*BadgerPriorityQueue) GetByID

func (bpq *BadgerPriorityQueue) GetByID(id uint64) (*Message, error)

func (*BadgerPriorityQueue) GetMessagesKey

func (bpq *BadgerPriorityQueue) GetMessagesKey(id uint64) []byte

func (*BadgerPriorityQueue) GetQueueKey

func (bpq *BadgerPriorityQueue) GetQueueKey(queueName string) []byte

func (*BadgerPriorityQueue) GetStats added in v0.2.0

func (bpq *BadgerPriorityQueue) GetStats() *QueueInfo

func (*BadgerPriorityQueue) Init

func (bpq *BadgerPriorityQueue) Init(queueType, queueName string) error

func (*BadgerPriorityQueue) Len

func (bpq *BadgerPriorityQueue) Len() int

func (*BadgerPriorityQueue) Load

func (bpq *BadgerPriorityQueue) Load(queueName string, loadMessages bool) error

func (*BadgerPriorityQueue) Nack added in v0.2.4

func (bpq *BadgerPriorityQueue) Nack(id uint64) error

func (*BadgerPriorityQueue) StartAckQueueMonitoring added in v0.1.12

func (bpq *BadgerPriorityQueue) StartAckQueueMonitoring()

func (*BadgerPriorityQueue) StopAckQueueMonitoring added in v0.1.12

func (bpq *BadgerPriorityQueue) StopAckQueueMonitoring()

func (*BadgerPriorityQueue) UpdatePriority

func (bpq *BadgerPriorityQueue) UpdatePriority(id uint64, newPriority int64) error

type DelayedPriorityQueue

type DelayedPriorityQueue struct {
	// contains filtered or unexported fields
}

func NewDelayedPriorityQueue

func NewDelayedPriorityQueue(minFirst bool) *DelayedPriorityQueue

func (*DelayedPriorityQueue) DeleteByID

func (pq *DelayedPriorityQueue) DeleteByID(group string, id uint64) *Item

func (*DelayedPriorityQueue) Dequeue

func (pq *DelayedPriorityQueue) Dequeue() *Item

func (*DelayedPriorityQueue) Enqueue

func (pq *DelayedPriorityQueue) Enqueue(group string, item *Item)

func (*DelayedPriorityQueue) GetByID

func (pq *DelayedPriorityQueue) GetByID(group string, id uint64) *Item

func (*DelayedPriorityQueue) Len

func (pq *DelayedPriorityQueue) Len() uint64

func (*DelayedPriorityQueue) UpdatePriority

func (pq *DelayedPriorityQueue) UpdatePriority(group string, id uint64, priority int64)

type FairPriorityQueue

type FairPriorityQueue struct {
	// contains filtered or unexported fields
}

FairQueue is a fair queue that balances between different groups

func NewFairPriorityQueue

func NewFairPriorityQueue() *FairPriorityQueue

FairPriorityQueue creates a new FairQueue

func (*FairPriorityQueue) DeleteByID

func (fq *FairPriorityQueue) DeleteByID(group string, id uint64) *Item

func (*FairPriorityQueue) Dequeue

func (fq *FairPriorityQueue) Dequeue() *Item

Dequeue removes and returns the next message in a fair way

func (*FairPriorityQueue) Enqueue

func (fq *FairPriorityQueue) Enqueue(group string, item *Item)

Enqueue adds a message to the queue

func (*FairPriorityQueue) GetByID

func (fq *FairPriorityQueue) GetByID(group string, id uint64) *Item

func (*FairPriorityQueue) Len

func (fq *FairPriorityQueue) Len() uint64

func (*FairPriorityQueue) UpdatePriority

func (fq *FairPriorityQueue) UpdatePriority(group string, id uint64, priority int64)

type Item

type Item struct {
	ID       uint64
	Priority int64
	Group    int64
}

func (*Item) UpdatePriority

func (m *Item) UpdatePriority(newPriority int64)

type LinkedList

type LinkedList struct {
	// contains filtered or unexported fields
}

func (*LinkedList) Append

func (l *LinkedList) Append(node *LinkedListNode)

func (*LinkedList) Len

func (l *LinkedList) Len() uint64

func (*LinkedList) Remove

func (l *LinkedList) Remove(node *LinkedListNode)

type LinkedListNode

type LinkedListNode struct {
	// contains filtered or unexported fields
}

func NewLinkedListNode

func NewLinkedListNode(group string, queue *PriorityQueue) *LinkedListNode

func (*LinkedListNode) Queue

func (n *LinkedListNode) Queue() *PriorityQueue

type Message

type Message struct {
	Group    string
	ID       uint64
	Priority int64
	Content  string
}

func MessageFromBytes

func MessageFromBytes(data []byte) (*Message, error)

func (*Message) ToBytes

func (m *Message) ToBytes() ([]byte, error)

func (*Message) UpdatePriority

func (m *Message) UpdatePriority(newPriority int64)

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

func NewPriorityQueue

func NewPriorityQueue(minFirst bool) *PriorityQueue

func (*PriorityQueue) DeleteByID

func (pq *PriorityQueue) DeleteByID(id uint64) *Item

func (*PriorityQueue) GetByID

func (pq *PriorityQueue) GetByID(id uint64) *Item

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Peek

func (pq *PriorityQueue) Peek() any

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() any

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x any)

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

func (*PriorityQueue) UpdatePriority

func (pq *PriorityQueue) UpdatePriority(id uint64, priority int64)

type PrometheusMetrics added in v0.2.0

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

func NewPrometheusMetrics added in v0.2.0

func NewPrometheusMetrics(registry prometheus.Registerer, namespace, subsystem string) *PrometheusMetrics

type Queue

type Queue interface {
	Enqueue(group string, item *Item)
	Dequeue() *Item
	GetByID(group string, id uint64) *Item
	DeleteByID(group string, id uint64) *Item
	UpdatePriority(group string, id uint64, priority int64)
	Len() uint64
}

type QueueConfig

type QueueConfig struct {
	Name string
	Type string
}

func QueueConfigFromBytes

func QueueConfigFromBytes(data []byte) (*QueueConfig, error)

func (*QueueConfig) ToBytes

func (qc *QueueConfig) ToBytes() ([]byte, error)

type QueueInfo added in v0.2.0

type QueueInfo struct {
	Name    string
	Type    string
	Stats   *QueueStats
	Ready   int64
	Unacked int64
	Total   int64
}

type QueueManager

type QueueManager struct {
	PrometheusMetrics *PrometheusMetrics
	// contains filtered or unexported fields
}

func NewQueueManager

func NewQueueManager(db *badger.DB, cfg *config.Config, metrics *PrometheusMetrics) *QueueManager

func (*QueueManager) Create

func (qm *QueueManager) Create(queueType, queueName string) (*BadgerPriorityQueue, error)

func (*QueueManager) Delete

func (qm *QueueManager) Delete(queueName string) error

func (*QueueManager) GetQueue

func (qm *QueueManager) GetQueue(queueName string) (*BadgerPriorityQueue, error)

func (*QueueManager) GetQueues added in v0.2.0

func (qm *QueueManager) GetQueues() []*QueueInfo

func (*QueueManager) LoadQueues added in v0.2.0

func (qm *QueueManager) LoadQueues()

func (*QueueManager) RunValueLogGC

func (qm *QueueManager) RunValueLogGC()

type QueueStats added in v0.2.0

type QueueStats struct {
	EnqueueRPS float64
	DequeueRPS float64
	AckRPS     float64
	NackRPS    float64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL