Documentation
¶
Index ¶
- type GPQ
- func (g *GPQ[d]) ActiveBuckets() uint
- func (g *GPQ[d]) Close()
- func (g *GPQ[d]) Dequeue() (item *schema.Item[d], err error)
- func (g *GPQ[d]) DequeueBatch(batchSize uint) (items []*schema.Item[d], errs []error)
- func (g *GPQ[d]) Enqueue(item schema.Item[d]) error
- func (g *GPQ[d]) EnqueueBatch(items []schema.Item[d]) []error
- func (g *GPQ[d]) ItemsInDB() (uint, error)
- func (g *GPQ[d]) ItemsInQueue() uint
- func (g *GPQ[d]) Prioritize() (escalated, removed uint, err error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GPQ ¶
type GPQ[d any] struct { // contains filtered or unexported fields }
GPQ is a generic priority queue that supports priority levels and timeouts It is implemented using a heap for each priority level and a priority queue of non-empty buckets It also supports disk caching using badgerDB with the option to lazily disk writes and deletes The GPQ is thread-safe and supports concurrent access
func NewGPQ ¶
NewGPQ creates a new GPQ with the given number of buckets The number of buckets is the number of priority levels you want to support You must provide the number of buckets ahead of time and all priorities you submit must be within the range of 0 to NumOfBuckets
func (*GPQ[d]) ActiveBuckets ¶
ActiveBuckets returns the total number of buckets(priorities) that have messages within
func (*GPQ[d]) Close ¶
func (g *GPQ[d]) Close()
Close performs a safe shutdown of the GPQ and the disk cache preventing data loss
func (*GPQ[d]) Dequeue ¶
Dequeue removes and returns the item with the highest priority in the queue
func (*GPQ[d]) DequeueBatch ¶
DequeueBatch takes a batch size, and returns a slice ordered by priority up to the batchSize provided enough messages are present to fill the batch. Partial batches will be returned if a error is encountered.
func (*GPQ[d]) EnqueueBatch ¶
EnqueueBatch takes a slice of items and attempts to enqueue them in their perspective buckets If a error is generated, it is attached to a slice of errors. Currently the batch will be commit in the partial state, and it is up to the user to parse the errors and resend messages that failed. In the future this will most likely change with the addition of transactions.
func (*GPQ[d]) ItemsInQueue ¶
ItemsInQueue returns the total number of items in the queue
func (*GPQ[d]) Prioritize ¶
Prioritize orders the queue based on the individual options added to every message in the queue. Prioritizing the queue is a stop-the-world event, so consider your usage carefully.
Directories
¶
| Path | Synopsis |
|---|---|
|
gheap
Package heap provides heap operations for any type that implements heap.Interface.
|
Package heap provides heap operations for any type that implements heap.Interface. |
|
Package timekeeper provides a high-performance cached time implementation optimized for high-throughput applications that need fast time access without the overhead of syscalls on every time.Now() call.
|
Package timekeeper provides a high-performance cached time implementation optimized for high-throughput applications that need fast time access without the overhead of syscalls on every time.Now() call. |



