Documentation
¶
Index ¶
- Variables
- type Message
- type PriorityQueue
- func (bpq *PriorityQueue) Ack(id uint64) error
- func (bpq *PriorityQueue) Create(queueType, queueName string) error
- func (bpq *PriorityQueue) Delete(id uint64) error
- func (bpq *PriorityQueue) DeleteQueue() error
- func (bpq *PriorityQueue) Dequeue(ack bool) (*Message, error)
- func (bpq *PriorityQueue) Enqueue(id uint64, group string, priority int64, content string, ...) (*Message, error)
- func (bpq *PriorityQueue) Get(id uint64) (*Message, error)
- func (bpq *PriorityQueue) GetMessagesKey(id uint64) []byte
- func (bpq *PriorityQueue) GetQueueKey(queueName string) []byte
- func (bpq *PriorityQueue) GetStats() *QueueInfo
- func (bpq *PriorityQueue) Init(queueType, queueName string) error
- func (bpq *PriorityQueue) Len() int
- func (bpq *PriorityQueue) Load(queueName string, loadMessages bool) error
- func (bpq *PriorityQueue) Nack(id uint64, priority int64, metadata map[string]string) error
- func (bpq *PriorityQueue) PersistSnapshot(sink raft.SnapshotSink) error
- func (bpq *PriorityQueue) StartAckQueueMonitoring()
- func (bpq *PriorityQueue) StopAckQueueMonitoring()
- func (bpq *PriorityQueue) UpdatePriority(id uint64, newPriority int64) error
- type QueueConfig
- type QueueInfo
- type QueueManager
- func (qm *QueueManager) CreateQueue(queueType, queueName string) (*PriorityQueue, error)
- func (qm *QueueManager) DeleteQueue(queueName string) error
- func (qm *QueueManager) GetQueue(queueName string) (*PriorityQueue, error)
- func (qm *QueueManager) GetQueues() []*QueueInfo
- func (qm *QueueManager) LoadQueues()
- func (qm *QueueManager) PersistSnapshot(sink raft.SnapshotSink) error
- func (qm *QueueManager) RunValueLogGC()
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrEmptyQueue = fmt.Errorf("queue is empty")
View Source
var ErrMessageNotFound = fmt.Errorf("message not found")
View Source
var ErrQueueNotFound = fmt.Errorf("queue not found")
Functions ¶
This section is empty.
Types ¶
type Message ¶
type Message struct {
Group string
ID uint64
Priority int64
Content string
Metadata map[string]string
QueueName string `json:"QueueName,omitempty"`
QueueType string `json:"QueueType,omitempty"`
}
func MessageFromBytes ¶
func (*Message) UpdatePriority ¶
type PriorityQueue ¶
type PriorityQueue struct {
PrometheusMetrics *metrics.PrometheusMetrics
// contains filtered or unexported fields
}
func NewPriorityQueue ¶
func NewPriorityQueue( db *badger.DB, cfg *config.Config, promMetrics *metrics.PrometheusMetrics, ) *PriorityQueue
func (*PriorityQueue) Ack ¶ added in v0.3.6
func (bpq *PriorityQueue) Ack(id uint64) error
func (*PriorityQueue) Create ¶ added in v0.3.6
func (bpq *PriorityQueue) Create(queueType, queueName string) error
func (*PriorityQueue) Delete ¶ added in v0.3.0
func (bpq *PriorityQueue) Delete(id uint64) error
func (*PriorityQueue) DeleteQueue ¶ added in v0.3.6
func (bpq *PriorityQueue) DeleteQueue() error
func (*PriorityQueue) Dequeue ¶ added in v0.3.6
func (bpq *PriorityQueue) Dequeue(ack bool) (*Message, error)
func (*PriorityQueue) Get ¶ added in v0.3.0
func (bpq *PriorityQueue) Get(id uint64) (*Message, error)
func (*PriorityQueue) GetMessagesKey ¶ added in v0.3.6
func (bpq *PriorityQueue) GetMessagesKey(id uint64) []byte
func (*PriorityQueue) GetQueueKey ¶ added in v0.3.6
func (bpq *PriorityQueue) GetQueueKey(queueName string) []byte
func (*PriorityQueue) GetStats ¶ added in v0.3.6
func (bpq *PriorityQueue) GetStats() *QueueInfo
func (*PriorityQueue) Init ¶ added in v0.3.6
func (bpq *PriorityQueue) Init(queueType, queueName string) error
func (*PriorityQueue) Len ¶
func (bpq *PriorityQueue) Len() int
func (*PriorityQueue) Load ¶ added in v0.3.6
func (bpq *PriorityQueue) Load(queueName string, loadMessages bool) error
func (*PriorityQueue) PersistSnapshot ¶ added in v0.3.6
func (bpq *PriorityQueue) PersistSnapshot(sink raft.SnapshotSink) error
func (*PriorityQueue) StartAckQueueMonitoring ¶ added in v0.3.6
func (bpq *PriorityQueue) StartAckQueueMonitoring()
func (*PriorityQueue) StopAckQueueMonitoring ¶ added in v0.3.6
func (bpq *PriorityQueue) StopAckQueueMonitoring()
func (*PriorityQueue) UpdatePriority ¶
func (bpq *PriorityQueue) UpdatePriority(id uint64, newPriority int64) error
type QueueConfig ¶
func QueueConfigFromBytes ¶
func QueueConfigFromBytes(data []byte) (*QueueConfig, error)
func (*QueueConfig) ToBytes ¶
func (qc *QueueConfig) ToBytes() ([]byte, error)
type QueueManager ¶
type QueueManager struct {
PrometheusMetrics *metrics.PrometheusMetrics
// contains filtered or unexported fields
}
func NewQueueManager ¶
func NewQueueManager(db *badger.DB, cfg *config.Config, metrics *metrics.PrometheusMetrics) *QueueManager
func (*QueueManager) CreateQueue ¶ added in v0.3.0
func (qm *QueueManager) CreateQueue(queueType, queueName string) (*PriorityQueue, error)
func (*QueueManager) DeleteQueue ¶ added in v0.3.0
func (qm *QueueManager) DeleteQueue(queueName string) error
func (*QueueManager) GetQueue ¶
func (qm *QueueManager) GetQueue(queueName string) (*PriorityQueue, error)
func (*QueueManager) GetQueues ¶ added in v0.2.0
func (qm *QueueManager) GetQueues() []*QueueInfo
func (*QueueManager) LoadQueues ¶ added in v0.2.0
func (qm *QueueManager) LoadQueues()
func (*QueueManager) PersistSnapshot ¶ added in v0.3.2
func (qm *QueueManager) PersistSnapshot(sink raft.SnapshotSink) error
func (*QueueManager) RunValueLogGC ¶
func (qm *QueueManager) RunValueLogGC()
Source Files
¶
Click to show internal directories.
Click to hide internal directories.