Documentation
¶
Index ¶
- Variables
- func NewQueueStats(windowSize int) *queueStats
- type BadgerPriorityQueue
- func (bpq *BadgerPriorityQueue) Ack(id uint64) error
- func (bpq *BadgerPriorityQueue) Create(queueType, queueName string) error
- func (bpq *BadgerPriorityQueue) Delete(id uint64) error
- func (bpq *BadgerPriorityQueue) DeleteQueue() error
- func (bpq *BadgerPriorityQueue) Dequeue(ack bool) (*Message, error)
- func (bpq *BadgerPriorityQueue) Enqueue(id uint64, group string, priority int64, content string, ...) (*Message, error)
- func (bpq *BadgerPriorityQueue) Get(id uint64) (*Message, error)
- func (bpq *BadgerPriorityQueue) GetMessagesKey(id uint64) []byte
- func (bpq *BadgerPriorityQueue) GetQueueKey(queueName string) []byte
- func (bpq *BadgerPriorityQueue) GetStats() *QueueInfo
- func (bpq *BadgerPriorityQueue) Init(queueType, queueName string) error
- func (bpq *BadgerPriorityQueue) Len() int
- func (bpq *BadgerPriorityQueue) Load(queueName string, loadMessages bool) error
- func (bpq *BadgerPriorityQueue) Nack(id uint64, priority int64, metadata map[string]string) error
- func (bpq *BadgerPriorityQueue) PersistSnapshot(sink raft.SnapshotSink) error
- func (bpq *BadgerPriorityQueue) StartAckQueueMonitoring()
- func (bpq *BadgerPriorityQueue) StopAckQueueMonitoring()
- func (bpq *BadgerPriorityQueue) UpdatePriority(id uint64, newPriority int64) error
- type DelayedPriorityQueue
- func (pq *DelayedPriorityQueue) Delete(group string, id uint64) *Item
- func (pq *DelayedPriorityQueue) Dequeue() *Item
- func (pq *DelayedPriorityQueue) Enqueue(group string, item *Item)
- func (pq *DelayedPriorityQueue) Get(group string, id uint64) *Item
- func (pq *DelayedPriorityQueue) Len() uint64
- func (pq *DelayedPriorityQueue) UpdatePriority(group string, id uint64, priority int64)
- type FairPriorityQueue
- func (fq *FairPriorityQueue) Delete(group string, id uint64) *Item
- func (fq *FairPriorityQueue) Dequeue() *Item
- func (fq *FairPriorityQueue) Enqueue(group string, item *Item)
- func (fq *FairPriorityQueue) Get(group string, id uint64) *Item
- func (fq *FairPriorityQueue) Len() uint64
- func (fq *FairPriorityQueue) UpdatePriority(group string, id uint64, priority int64)
- type Item
- type LinkedList
- type LinkedListNode
- type Message
- type PriorityQueue
- func (pq *PriorityQueue) Delete(id uint64) *Item
- func (pq *PriorityQueue) Get(id uint64) *Item
- func (pq PriorityQueue) Len() int
- func (pq PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) Peek() any
- func (pq *PriorityQueue) Pop() any
- func (pq *PriorityQueue) Push(x any)
- func (pq PriorityQueue) Swap(i, j int)
- func (pq *PriorityQueue) UpdatePriority(id uint64, priority int64)
- type PrometheusMetrics
- type Queue
- type QueueConfig
- type QueueInfo
- type QueueManager
- func (qm *QueueManager) CreateQueue(queueType, queueName string) (*BadgerPriorityQueue, error)
- func (qm *QueueManager) DeleteQueue(queueName string) error
- func (qm *QueueManager) GetQueue(queueName string) (*BadgerPriorityQueue, error)
- func (qm *QueueManager) GetQueues() []*QueueInfo
- func (qm *QueueManager) LoadQueues()
- func (qm *QueueManager) PersistSnapshot(sink raft.SnapshotSink) error
- func (qm *QueueManager) RunValueLogGC()
- type QueueStats
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrEmptyQueue = fmt.Errorf("queue is empty")
View Source
var ErrMessageNotFound = fmt.Errorf("message not found")
View Source
var ErrQueueNotFound = fmt.Errorf("queue not found")
Functions ¶
func NewQueueStats ¶ added in v0.2.0
func NewQueueStats(windowSize int) *queueStats
Types ¶
type BadgerPriorityQueue ¶
type BadgerPriorityQueue struct {
PrometheusMetrics *PrometheusMetrics
// contains filtered or unexported fields
}
func NewBadgerPriorityQueue ¶
func NewBadgerPriorityQueue(db *badger.DB, cfg *config.Config, metrics *PrometheusMetrics) *BadgerPriorityQueue
func (*BadgerPriorityQueue) Ack ¶
func (bpq *BadgerPriorityQueue) Ack(id uint64) error
func (*BadgerPriorityQueue) Create ¶
func (bpq *BadgerPriorityQueue) Create(queueType, queueName string) error
func (*BadgerPriorityQueue) Delete ¶
func (bpq *BadgerPriorityQueue) Delete(id uint64) error
func (*BadgerPriorityQueue) DeleteQueue ¶ added in v0.3.0
func (bpq *BadgerPriorityQueue) DeleteQueue() error
func (*BadgerPriorityQueue) Dequeue ¶
func (bpq *BadgerPriorityQueue) Dequeue(ack bool) (*Message, error)
func (*BadgerPriorityQueue) Get ¶ added in v0.3.0
func (bpq *BadgerPriorityQueue) Get(id uint64) (*Message, error)
func (*BadgerPriorityQueue) GetMessagesKey ¶
func (bpq *BadgerPriorityQueue) GetMessagesKey(id uint64) []byte
func (*BadgerPriorityQueue) GetQueueKey ¶
func (bpq *BadgerPriorityQueue) GetQueueKey(queueName string) []byte
func (*BadgerPriorityQueue) GetStats ¶ added in v0.2.0
func (bpq *BadgerPriorityQueue) GetStats() *QueueInfo
func (*BadgerPriorityQueue) Init ¶
func (bpq *BadgerPriorityQueue) Init(queueType, queueName string) error
func (*BadgerPriorityQueue) Len ¶
func (bpq *BadgerPriorityQueue) Len() int
func (*BadgerPriorityQueue) Load ¶
func (bpq *BadgerPriorityQueue) Load(queueName string, loadMessages bool) error
func (*BadgerPriorityQueue) PersistSnapshot ¶ added in v0.3.2
func (bpq *BadgerPriorityQueue) PersistSnapshot(sink raft.SnapshotSink) error
func (*BadgerPriorityQueue) StartAckQueueMonitoring ¶ added in v0.1.12
func (bpq *BadgerPriorityQueue) StartAckQueueMonitoring()
func (*BadgerPriorityQueue) StopAckQueueMonitoring ¶ added in v0.1.12
func (bpq *BadgerPriorityQueue) StopAckQueueMonitoring()
func (*BadgerPriorityQueue) UpdatePriority ¶
func (bpq *BadgerPriorityQueue) UpdatePriority(id uint64, newPriority int64) error
type DelayedPriorityQueue ¶
type DelayedPriorityQueue struct {
// contains filtered or unexported fields
}
func NewDelayedPriorityQueue ¶
func NewDelayedPriorityQueue(minFirst bool) *DelayedPriorityQueue
func (*DelayedPriorityQueue) Delete ¶ added in v0.3.0
func (pq *DelayedPriorityQueue) Delete(group string, id uint64) *Item
func (*DelayedPriorityQueue) Dequeue ¶
func (pq *DelayedPriorityQueue) Dequeue() *Item
func (*DelayedPriorityQueue) Enqueue ¶
func (pq *DelayedPriorityQueue) Enqueue(group string, item *Item)
func (*DelayedPriorityQueue) Get ¶ added in v0.3.0
func (pq *DelayedPriorityQueue) Get(group string, id uint64) *Item
func (*DelayedPriorityQueue) Len ¶
func (pq *DelayedPriorityQueue) Len() uint64
func (*DelayedPriorityQueue) UpdatePriority ¶
func (pq *DelayedPriorityQueue) UpdatePriority(group string, id uint64, priority int64)
type FairPriorityQueue ¶
type FairPriorityQueue struct {
// contains filtered or unexported fields
}
FairQueue is a fair queue that balances between different groups
func NewFairPriorityQueue ¶
func NewFairPriorityQueue() *FairPriorityQueue
FairPriorityQueue creates a new FairQueue
func (*FairPriorityQueue) Delete ¶ added in v0.3.0
func (fq *FairPriorityQueue) Delete(group string, id uint64) *Item
func (*FairPriorityQueue) Dequeue ¶
func (fq *FairPriorityQueue) Dequeue() *Item
Dequeue removes and returns the next message in a fair way
func (*FairPriorityQueue) Enqueue ¶
func (fq *FairPriorityQueue) Enqueue(group string, item *Item)
Enqueue adds a message to the queue
func (*FairPriorityQueue) Get ¶ added in v0.3.0
func (fq *FairPriorityQueue) Get(group string, id uint64) *Item
func (*FairPriorityQueue) Len ¶
func (fq *FairPriorityQueue) Len() uint64
func (*FairPriorityQueue) UpdatePriority ¶
func (fq *FairPriorityQueue) UpdatePriority(group string, id uint64, priority int64)
type Item ¶
func (*Item) UpdatePriority ¶
type LinkedList ¶
type LinkedList struct {
// contains filtered or unexported fields
}
func (*LinkedList) Append ¶
func (l *LinkedList) Append(node *LinkedListNode)
func (*LinkedList) Len ¶
func (l *LinkedList) Len() uint64
func (*LinkedList) Remove ¶
func (l *LinkedList) Remove(node *LinkedListNode)
type LinkedListNode ¶
type LinkedListNode struct {
// contains filtered or unexported fields
}
func NewLinkedListNode ¶
func NewLinkedListNode(group string, queue *PriorityQueue) *LinkedListNode
func (*LinkedListNode) Queue ¶
func (n *LinkedListNode) Queue() *PriorityQueue
type Message ¶
type Message struct {
Group string
ID uint64
Priority int64
Content string
Metadata map[string]string
QueueName string `json:"QueueName,omitempty"`
QueueType string `json:"QueueType,omitempty"`
}
func MessageFromBytes ¶
func (*Message) UpdatePriority ¶
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
func NewPriorityQueue ¶
func NewPriorityQueue(minFirst bool) *PriorityQueue
func (*PriorityQueue) Delete ¶ added in v0.3.0
func (pq *PriorityQueue) Delete(id uint64) *Item
func (*PriorityQueue) Get ¶ added in v0.3.0
func (pq *PriorityQueue) Get(id uint64) *Item
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Peek ¶
func (pq *PriorityQueue) Peek() any
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() any
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x any)
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
func (*PriorityQueue) UpdatePriority ¶
func (pq *PriorityQueue) UpdatePriority(id uint64, priority int64)
type PrometheusMetrics ¶ added in v0.2.0
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
func NewPrometheusMetrics ¶ added in v0.2.0
func NewPrometheusMetrics(registry prometheus.Registerer, namespace, subsystem string) *PrometheusMetrics
type QueueConfig ¶
func QueueConfigFromBytes ¶
func QueueConfigFromBytes(data []byte) (*QueueConfig, error)
func (*QueueConfig) ToBytes ¶
func (qc *QueueConfig) ToBytes() ([]byte, error)
type QueueManager ¶
type QueueManager struct {
PrometheusMetrics *PrometheusMetrics
// contains filtered or unexported fields
}
func NewQueueManager ¶
func NewQueueManager(db *badger.DB, cfg *config.Config, metrics *PrometheusMetrics) *QueueManager
func (*QueueManager) CreateQueue ¶ added in v0.3.0
func (qm *QueueManager) CreateQueue(queueType, queueName string) (*BadgerPriorityQueue, error)
func (*QueueManager) DeleteQueue ¶ added in v0.3.0
func (qm *QueueManager) DeleteQueue(queueName string) error
func (*QueueManager) GetQueue ¶
func (qm *QueueManager) GetQueue(queueName string) (*BadgerPriorityQueue, error)
func (*QueueManager) GetQueues ¶ added in v0.2.0
func (qm *QueueManager) GetQueues() []*QueueInfo
func (*QueueManager) LoadQueues ¶ added in v0.2.0
func (qm *QueueManager) LoadQueues()
func (*QueueManager) PersistSnapshot ¶ added in v0.3.2
func (qm *QueueManager) PersistSnapshot(sink raft.SnapshotSink) error
func (*QueueManager) RunValueLogGC ¶
func (qm *QueueManager) RunValueLogGC()
Click to show internal directories.
Click to hide internal directories.