pgmq

package module
v0.0.0-...-8ab7baa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2025 License: MIT Imports: 13 Imported by: 0

README

pgmq-go

Go Reference Go Report Card CI codecov

A Go (Golang) client for Postgres Message Queue (PGMQ). Based loosely on the Rust client.

pgmq-go works with pgx. The second argument of most functions only needs to satisfy the DB interface, which means it can take, among others, a *pgx.Conn, *pgxpool.Pool, or pgx.Tx.

Usage

Start a Postgres instance with the PGMQ extension installed:

docker run -d --name postgres -e POSTGRES_PASSWORD=password -p 5432:5432 quay.io/tembo/pgmq-pg:latest

Then

package main

import (
    "context"
    "fmt"

    "github.com/notablecontr/pgmq-go"
)

func main() {
    ctx := context.Background()

    pool, err := pgmq.NewPgxPool(ctx, "postgres://postgres:password@localhost:5432/postgres")
    if err != nil {
        panic(err)
    }

    err = pgmq.CreatePGMQExtension(ctx, pool)
    if err != nil {
        panic(err)
    }

    err = pgmq.CreateQueue(ctx, pool, "my_queue")
    if err != nil {
        panic(err)
    }

    // We can perform various queue operations using a transaction.
    tx, err := pool.Begin(ctx)
    if err != nil {
        panic(err)
    }

    id, err := pgmq.Send(ctx, tx, "my_queue", json.RawMessage(`{"foo": "bar"}`))
    if err != nil {
        panic(err)
    }

    msg, err := pgmq.Read(ctx, tx, "my_queue", 30)
    if err != nil {
        panic(err)
    }

    // Archive the message by moving it to the "pgmq.a_<queue_name>" table.
    // Alternatively, you can `Delete` the message, or read and delete in one
    // call by using `Pop`.
    _, err = pgmq.Archive(ctx, tx, "my_queue", id)
    if err != nil {
        panic(err)
    }

    // Commit the transaction.
    err = tx.Commit(ctx)
    if err != nil {
        panic(err)
    }

    // Close the connection pool.
    pool.Close()
}

Contributions

We ❤ contributions.

See also

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoRows = errors.New("pgmq: no rows in result set")

Functions

func Archive

func Archive(ctx context.Context, db DB, queue string, msgID int64) (bool, error)

Archive moves a message from the queue table to the archive table by its id. View messages on the archive table with sql:

SELECT * FROM pgmq.a_<queue_name>;

func ArchiveBatch

func ArchiveBatch(ctx context.Context, db DB, queue string, msgIDs []int64) ([]int64, error)

ArchiveBatch moves a batch of messages from the queue table to the archive table by their ids. View messages on the archive table with sql:

SELECT * FROM pgmq.a_<queue_name>;

func CreatePGMQExtension

func CreatePGMQExtension(ctx context.Context, db DB) error

CreatePGMQExtension will create the PGMQ extension using the provided DB.

func CreateQueue

func CreateQueue(ctx context.Context, db DB, queue string) error

CreateQueue creates a new queue. This sets up the queue's tables, indexes, and metadata.

func CreateUnloggedQueue

func CreateUnloggedQueue(ctx context.Context, db DB, queue string) error

CreateUnloggedQueue creates a new unlogged queue, which uses an unlogged table under the hood. This sets up the queue's tables, indexes, and metadata.

func Delete

func Delete(ctx context.Context, db DB, queue string, msgID int64) (bool, error)

Delete deletes a message from the queue by its id. This is a permanent delete and cannot be undone. If you want to retain a log of the message, use the Archive method.

func DeleteBatch

func DeleteBatch(ctx context.Context, db DB, queue string, msgIDs []int64) ([]int64, error)

DeleteBatch deletes a batch of messages from the queue by their ids. This is a permanent delete and cannot be undone. If you want to retain a log of the messages, use the ArchiveBatch method.

func DropQueue

func DropQueue(ctx context.Context, db DB, queue string) error

DropQueue deletes the given queue. It deletes the queue's tables, indices, and metadata. It will return an error if the queue does not exist.

func NewPgxPool

func NewPgxPool(ctx context.Context, connString string) (*pgxpool.Pool, error)

NewPgxPool is a convenience function for creating a new *pgxpool.Pool.

func Send

func Send(ctx context.Context, db DB, queue string, msg json.RawMessage) (int64, error)

Send sends a single message to a queue. The message id, unique to the queue, is returned.

func SendBatch

func SendBatch(ctx context.Context, db DB, queue string, msgs []json.RawMessage) ([]int64, error)

SendBatch sends a batch of messages to a queue. The message ids, unique to the queue, are returned.

func SendBatchWithDelay

func SendBatchWithDelay(ctx context.Context, db DB, queue string, msgs []json.RawMessage, delay int) ([]int64, error)

SendBatchWithDelay sends a batch of messages to a queue with a delay. The delay is specified in seconds. The message ids, unique to the queue, are returned.

func SendBatchWithDelayTimestamp

func SendBatchWithDelayTimestamp(ctx context.Context, db DB, queue string, msgs []json.RawMessage, delay time.Time) ([]int64, error)

SendBatchWithDelayTimestamp sends a batch of messages to a queue with a delay. The delay is specified as a timestamp. The message ids, unique to the queue, are returned.

func SendWithDelay

func SendWithDelay(ctx context.Context, db DB, queue string, msg json.RawMessage, delay int) (int64, error)

SendWithDelay sends a single message to a queue with a delay. The delay is specified in seconds. The message id, unique to the queue, is returned.

func SendWithDelayTimestamp

func SendWithDelayTimestamp(ctx context.Context, db DB, queue string, msg json.RawMessage, delay time.Time) (int64, error)

SendWithDelayTimestamp sends a single message to a queue with a delay. The delay is specified as a timestamp. The message id, unique to the queue, is returned. Only supported in pgmq-pg17 and above.

Types

type DB

type DB interface {
	Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}

type Database

type Database struct {
	Pool *pgxpool.Pool
	// contains filtered or unexported fields
}

func (*Database) Init

func (d *Database) Init()

func (*Database) TestArchive

func (d *Database) TestArchive(t *testing.T)

func (*Database) TestArchiveBatch

func (d *Database) TestArchiveBatch(t *testing.T)

func (*Database) TestArchiveNotExist

func (d *Database) TestArchiveNotExist(t *testing.T)

func (*Database) TestCreateAndDropQueue

func (d *Database) TestCreateAndDropQueue(t *testing.T)

func (*Database) TestCreateUnloggedAndDropQueue

func (d *Database) TestCreateUnloggedAndDropQueue(t *testing.T)

func (*Database) TestDelete

func (d *Database) TestDelete(t *testing.T)

func (*Database) TestDeleteBatch

func (d *Database) TestDeleteBatch(t *testing.T)

func (*Database) TestDeleteNotExist

func (d *Database) TestDeleteNotExist(t *testing.T)

func (*Database) TestDropQueueWhichDoesNotExist

func (d *Database) TestDropQueueWhichDoesNotExist(t *testing.T)

func (*Database) TestPing

func (d *Database) TestPing(t *testing.T)

func (*Database) TestPop

func (d *Database) TestPop(t *testing.T)

func (*Database) TestPopEmptyQueueReturnsNoRows

func (d *Database) TestPopEmptyQueueReturnsNoRows(t *testing.T)

func (*Database) TestRead

func (d *Database) TestRead(t *testing.T)

func (*Database) TestReadBatch

func (d *Database) TestReadBatch(t *testing.T)

func (*Database) TestReadEmptyQueueReturnsNoRows

func (d *Database) TestReadEmptyQueueReturnsNoRows(t *testing.T)

func (*Database) TestSend

func (d *Database) TestSend(t *testing.T)

func (*Database) TestSendAMarshalledStruct

func (d *Database) TestSendAMarshalledStruct(t *testing.T)

func (*Database) TestSendBatch

func (d *Database) TestSendBatch(t *testing.T)

func (*Database) TestSendBatchWithDelayTimestamp

func (d *Database) TestSendBatchWithDelayTimestamp(t *testing.T)

func (*Database) TestSendInvalidJSONFails

func (d *Database) TestSendInvalidJSONFails(t *testing.T)

func (*Database) TestSendWithDelayTimestamp

func (d *Database) TestSendWithDelayTimestamp(t *testing.T)

type Message

type Message struct {
	MsgID      int64
	ReadCount  int64
	EnqueuedAt time.Time
	// VT is "visibility time". The UTC timestamp at which the message will
	// be available for reading again.
	VT      time.Time
	Message json.RawMessage
	Headers json.RawMessage // Only supported in pgmq-pg17 and above
}

func Pop

func Pop(ctx context.Context, db DB, queue string) (*Message, error)

Pop reads single message from the queue and deletes it at the same time. Similar to Read and ReadBatch if no messages are available an ErrNoRows is returned. Unlike these methods, the visibility timeout does not apply. This is because the message is immediately deleted.

func Read

func Read(ctx context.Context, db DB, queue string, vt int64) (*Message, error)

Read a single message from the queue. If the queue is empty or all messages are invisible, an ErrNoRows errors is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds.

func ReadBatch

func ReadBatch(ctx context.Context, db DB, queue string, vt int64, numMsgs int64) ([]*Message, error)

ReadBatch reads a specified number of messages from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. If vt is 0 it will be set to the default value, vtDefault.

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL