gpq

package module
v1.0.0-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2025 License: MIT Imports: 9 Imported by: 2

README

GPQ is an extremely fast and flexible priority queue, capable of millions transactions a second. GPQ supports a complex "Double Priority Queue" which allows for priorities to be distributed across N buckets, with each bucket holding a second priority queue which allows for internal escalation and timeouts of items based on parameters the user can specify during submission combined with how frequently you ask GPQ to prioritize the queue.

Go version Go Reference GitHub License GitHub Release GitHub Issues GitHub Branch Status

Notice

While GPQ is largely stable, bugs are more than likely present at this early stage, and you should carefully consider if your application can tolerate any down time or lost messages that may result from adopting this project into a production workflow. If you run into any bugs please submit an issue or better a PR! Check out the guide to contributing below.

Table of Contents

Background

GPQ was written as an experiment when I was playing with Fibonacci Heaps and wanted to find something faster. I was disappointed by the state of research and libraries being used by most common applications, so GPQ is meant to be a highly flexible framework that can support a multitude of workloads.

Should I Use GPQ?

GPQ is a concurrency safe, embeddable priority queue that can be used in a variety of applications. GPQ might be the right choice if:

  • Your data requires strict ordering guarantees
  • You need to prioritize items that are in the queue too long
  • You need to timeout items
  • You have multiple writers and readers that need to access the queue concurrently
  • You run critical workloads and need to store the queue on disk in case of a crash

Sister Projects

Benchmarks

Due to the fact that most operations are done in constant time O(1) or logarithmic time O(log n), with the exception of the prioritize function which happens in linear time O(n), all GPQ operations are extremely fast. A single GPQ can handle a few million transactions a second and can be tuned depending on your work load. I have included some basic benchmarks using C++, Rust, Zig, Python, and Go to measure GPQ's performance against the standard implementations of other languages that can be found here at: pq-bench

Time-Spent Queue-Speed-WITHOUT-Reprioritize
Queue-Speed-WITH-Reprioritize Average Time to Send and Receive VS Bucket Count

Usage

GPQ at the core is a embeddable priority queue meant to be used at the core of critical workloads that require complex queueing and delivery order guarantees. The best way to use it is just to import it.

import "github.com/JustinTimperio/gpq"

Prerequisites

For this you will need Go >= 1.22 and gpq itself uses hashmap, btree and BadgerDB.

API Reference

  • NewGPQ[d any](options schema.GPQOptions) (uint, *GPQ[d], error) - Creates a new GPQ with the specified options and returns the number of restored items, the GPQ, and an error if one occurred.
    • ItemsInQueue() uint - Returns the number of items in the queue.
    • ItemsInDB() (uint, error) - Returns the number of items in the database.
    • ActiveBuckets() uint - Returns the number of active buckets.
    • Enqueue(item schema.Item[d]) error - Enqueues an item into the queue.
    • EnqueueBatch(items []schema.Item[d]) error - Enqueues a batch of items into the queue.
    • Dequeue() (*schema.Item[d], error) - Dequeues an item from the queue.
    • DequeueBatch(batchSize uint) ([]*schema.Item[d], error) - Dequeues a batch of items from the queue.
    • Prioritize() error - Prioritizes the queue based on the values in each item.
    • Close() - Closes the queue and saves the queue to disk.

Submitting Items to the Queue

Once you have an initialized queue you can easily submit items like the following:

package main

import (
  "log"
  "time"

  "github.com/JustinTimperio/gpq"
)

func main() {
	defaultMessageOptions := schema.EnqueueOptions{
		ShouldEscalate: true,
		EscalationRate: time.Duration(time.Second),
		CanTimeout:     true,
		Timeout:        time.Duration(time.Second * 5),
	}

	opts := schema.GPQOptions{
		MaxPriority: maxBuckets,

		DiskCacheEnabled:      true,
		DiskCachePath:         "/tmp/gpq",
		DiskCacheCompression:  true,
		DiskEncryptionEnabled: true,
		DiskEncryptionKey:     []byte("12345678901234567890123456789012"),
		DiskWriteDelay:       time.Duration(time.Second * 5),

		LazyDiskCacheEnabled: true,
		LazyDiskCacheChannelSize:  1_000_000,
		LazyDiskBatchSize:    10_000,
	}

	_, queue, err := gpq.NewGPQ[uint](opts)
	if err != nil {
		log.Fatalln(err)
	}

	for i := uint(0); i < total; i++ {
		p := i % maxBuckets
		item := schema.NewItem(p, i, defaultMessageOptions)

		err := queue.Enqueue(item)
		if err != nil {
			log.Fatalln(err)
		}
	}

	for i := uint(0); i < total; i++ {
		item, err := queue.Dequeue()
		if err != nil {
			log.Fatalln(err)
		}
	}

	queue.Close()
}

You have a few options when you submit a job such as if the item should escalate over time if not sent, or inversely can timeout if it has been enqueued to long to be relevant anymore.

Contributing

GPQ is actively looking for maintainers so feel free to help out when:

  • Reporting a bug
  • Discussing the current state of the code
  • Submitting a fix
  • Proposing new features

We Develop with Github

We use github to host code, to track issues and feature requests, as well as accept pull requests.

All Code Changes Happen Through Pull Requests

  1. Fork the repo and create your branch from master.
  2. If you've added code that should be tested, add tests.
  3. If you've changed APIs, update the documentation.
  4. Ensure the test suite passes.
  5. Make sure your code lints.
  6. Issue that pull request!

Any contributions you make will be under the MIT Software License

In short, when you submit code changes, your submissions are understood to be under the same MIT License that covers the project. Feel free to contact the maintainers if that's a concern.

Report bugs using Github's Issues

We use GitHub issues to track public bugs. Report a bug by opening a new issue; it's that easy!

Write bug reports with detail, background, and sample code

Great Bug Reports tend to have:

  • A quick summary and/or background
  • Steps to reproduce
    • Be specific!
    • Give sample code if you can.
  • What you expected would happen
  • What actually happens
  • Notes (possibly including why you think this might be happening, or stuff you tried that didn't work)

License

All code here was originally written by me, Justin Timperio, under an MIT license with the exception of some code directly forked under a BSD license from the Go maintainers.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GPQ

type GPQ[d any] struct {
	// contains filtered or unexported fields
}

GPQ is a generic priority queue that supports priority levels and timeouts It is implemented using a heap for each priority level and a priority queue of non-empty buckets It also supports disk caching using badgerDB with the option to lazily disk writes and deletes The GPQ is thread-safe and supports concurrent access

func NewGPQ

func NewGPQ[d any](Options schema.GPQOptions) (uint, *GPQ[d], error)

NewGPQ creates a new GPQ with the given number of buckets The number of buckets is the number of priority levels you want to support You must provide the number of buckets ahead of time and all priorities you submit must be within the range of 0 to NumOfBuckets

func (*GPQ[d]) ActiveBuckets

func (g *GPQ[d]) ActiveBuckets() uint

ActiveBuckets returns the total number of buckets(priorities) that have messages within

func (*GPQ[d]) Close

func (g *GPQ[d]) Close()

Close performs a safe shutdown of the GPQ and the disk cache preventing data loss

func (*GPQ[d]) Dequeue

func (g *GPQ[d]) Dequeue() (item *schema.Item[d], err error)

Dequeue removes and returns the item with the highest priority in the queue

func (*GPQ[d]) DequeueBatch

func (g *GPQ[d]) DequeueBatch(batchSize uint) (items []*schema.Item[d], errs []error)

DequeueBatch takes a batch size, and returns a slice ordered by priority up to the batchSize provided enough messages are present to fill the batch. Partial batches will be returned if a error is encountered.

func (*GPQ[d]) Enqueue

func (g *GPQ[d]) Enqueue(item schema.Item[d]) error

Enqueue adds an item to the queue with the given options

func (*GPQ[d]) EnqueueBatch

func (g *GPQ[d]) EnqueueBatch(items []schema.Item[d]) []error

EnqueueBatch takes a slice of items and attempts to enqueue them in their perspective buckets If a error is generated, it is attached to a slice of errors. Currently the batch will be commit in the partial state, and it is up to the user to parse the errors and resend messages that failed. In the future this will most likely change with the addition of transactions.

func (*GPQ[d]) ItemsInDB

func (g *GPQ[d]) ItemsInDB() (uint, error)

ItemsInDB returns the total number of items currently commit to disk

func (*GPQ[d]) ItemsInQueue

func (g *GPQ[d]) ItemsInQueue() uint

ItemsInQueue returns the total number of items in the queue

func (*GPQ[d]) Prioritize

func (g *GPQ[d]) Prioritize() (escalated, removed uint, err error)

Prioritize orders the queue based on the individual options added to every message in the queue. Prioritizing the queue is a stop-the-world event, so consider your usage carefully.

Directories

Path Synopsis
gheap
Package heap provides heap operations for any type that implements heap.Interface.
Package heap provides heap operations for any type that implements heap.Interface.
Package timekeeper provides a high-performance cached time implementation optimized for high-throughput applications that need fast time access without the overhead of syscalls on every time.Now() call.
Package timekeeper provides a high-performance cached time implementation optimized for high-throughput applications that need fast time access without the overhead of syscalls on every time.Now() call.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL