Documentation
¶
Index ¶
- Variables
- func Archive(ctx context.Context, db DB, queue string, msgID int64) (bool, error)
- func ArchiveBatch(ctx context.Context, db DB, queue string, msgIDs []int64) ([]int64, error)
- func CreatePGMQExtension(ctx context.Context, db DB) error
- func CreateQueue(ctx context.Context, db DB, queue string) error
- func CreateUnloggedQueue(ctx context.Context, db DB, queue string) error
- func Delete(ctx context.Context, db DB, queue string, msgID int64) (bool, error)
- func DeleteBatch(ctx context.Context, db DB, queue string, msgIDs []int64) ([]int64, error)
- func DropQueue(ctx context.Context, db DB, queue string) error
- func NewPgxPool(ctx context.Context, connString string) (*pgxpool.Pool, error)
- func Send(ctx context.Context, db DB, queue string, msg json.RawMessage) (int64, error)
- func SendBatch(ctx context.Context, db DB, queue string, msgs []json.RawMessage) ([]int64, error)
- func SendBatchWithDelay(ctx context.Context, db DB, queue string, msgs []json.RawMessage, delay int) ([]int64, error)
- func SendBatchWithDelayTimestamp(ctx context.Context, db DB, queue string, msgs []json.RawMessage, ...) ([]int64, error)
- func SendWithDelay(ctx context.Context, db DB, queue string, msg json.RawMessage, delay int) (int64, error)
- func SendWithDelayTimestamp(ctx context.Context, db DB, queue string, msg json.RawMessage, delay time.Time) (int64, error)
- type DB
- type Database
- func (d *Database) Init()
- func (d *Database) TestArchive(t *testing.T)
- func (d *Database) TestArchiveBatch(t *testing.T)
- func (d *Database) TestArchiveNotExist(t *testing.T)
- func (d *Database) TestCreateAndDropQueue(t *testing.T)
- func (d *Database) TestCreateUnloggedAndDropQueue(t *testing.T)
- func (d *Database) TestDelete(t *testing.T)
- func (d *Database) TestDeleteBatch(t *testing.T)
- func (d *Database) TestDeleteNotExist(t *testing.T)
- func (d *Database) TestDropQueueWhichDoesNotExist(t *testing.T)
- func (d *Database) TestPing(t *testing.T)
- func (d *Database) TestPop(t *testing.T)
- func (d *Database) TestPopEmptyQueueReturnsNoRows(t *testing.T)
- func (d *Database) TestRead(t *testing.T)
- func (d *Database) TestReadBatch(t *testing.T)
- func (d *Database) TestReadEmptyQueueReturnsNoRows(t *testing.T)
- func (d *Database) TestSend(t *testing.T)
- func (d *Database) TestSendAMarshalledStruct(t *testing.T)
- func (d *Database) TestSendBatch(t *testing.T)
- func (d *Database) TestSendBatchWithDelayTimestamp(t *testing.T)
- func (d *Database) TestSendInvalidJSONFails(t *testing.T)
- func (d *Database) TestSendWithDelayTimestamp(t *testing.T)
- type Message
Constants ¶
This section is empty.
Variables ¶
var ErrNoRows = errors.New("pgmq: no rows in result set")
Functions ¶
func Archive ¶
Archive moves a message from the queue table to the archive table by its id. View messages on the archive table with sql:
SELECT * FROM pgmq.a_<queue_name>;
func ArchiveBatch ¶
ArchiveBatch moves a batch of messages from the queue table to the archive table by their ids. View messages on the archive table with sql:
SELECT * FROM pgmq.a_<queue_name>;
func CreatePGMQExtension ¶
CreatePGMQExtension will create the PGMQ extension using the provided DB.
func CreateQueue ¶
CreateQueue creates a new queue. This sets up the queue's tables, indexes, and metadata.
func CreateUnloggedQueue ¶
CreateUnloggedQueue creates a new unlogged queue, which uses an unlogged table under the hood. This sets up the queue's tables, indexes, and metadata.
func Delete ¶
Delete deletes a message from the queue by its id. This is a permanent delete and cannot be undone. If you want to retain a log of the message, use the Archive method.
func DeleteBatch ¶
DeleteBatch deletes a batch of messages from the queue by their ids. This is a permanent delete and cannot be undone. If you want to retain a log of the messages, use the ArchiveBatch method.
func DropQueue ¶
DropQueue deletes the given queue. It deletes the queue's tables, indices, and metadata. It will return an error if the queue does not exist.
func NewPgxPool ¶
NewPgxPool is a convenience function for creating a new *pgxpool.Pool.
func Send ¶
Send sends a single message to a queue. The message id, unique to the queue, is returned.
func SendBatch ¶
SendBatch sends a batch of messages to a queue. The message ids, unique to the queue, are returned.
func SendBatchWithDelay ¶
func SendBatchWithDelay(ctx context.Context, db DB, queue string, msgs []json.RawMessage, delay int) ([]int64, error)
SendBatchWithDelay sends a batch of messages to a queue with a delay. The delay is specified in seconds. The message ids, unique to the queue, are returned.
func SendBatchWithDelayTimestamp ¶
func SendBatchWithDelayTimestamp(ctx context.Context, db DB, queue string, msgs []json.RawMessage, delay time.Time) ([]int64, error)
SendBatchWithDelayTimestamp sends a batch of messages to a queue with a delay. The delay is specified as a timestamp. The message ids, unique to the queue, are returned.
func SendWithDelay ¶
func SendWithDelay(ctx context.Context, db DB, queue string, msg json.RawMessage, delay int) (int64, error)
SendWithDelay sends a single message to a queue with a delay. The delay is specified in seconds. The message id, unique to the queue, is returned.
func SendWithDelayTimestamp ¶
func SendWithDelayTimestamp(ctx context.Context, db DB, queue string, msg json.RawMessage, delay time.Time) (int64, error)
SendWithDelayTimestamp sends a single message to a queue with a delay. The delay is specified as a timestamp. The message id, unique to the queue, is returned. Only supported in pgmq-pg17 and above.
Types ¶
type Database ¶
func (*Database) TestArchive ¶
func (*Database) TestArchiveBatch ¶
func (*Database) TestArchiveNotExist ¶
func (*Database) TestCreateAndDropQueue ¶
func (*Database) TestCreateUnloggedAndDropQueue ¶
func (*Database) TestDelete ¶
func (*Database) TestDeleteBatch ¶
func (*Database) TestDeleteNotExist ¶
func (*Database) TestDropQueueWhichDoesNotExist ¶
func (*Database) TestPopEmptyQueueReturnsNoRows ¶
func (*Database) TestReadBatch ¶
func (*Database) TestReadEmptyQueueReturnsNoRows ¶
func (*Database) TestSendAMarshalledStruct ¶
func (*Database) TestSendBatch ¶
func (*Database) TestSendBatchWithDelayTimestamp ¶
func (*Database) TestSendInvalidJSONFails ¶
func (*Database) TestSendWithDelayTimestamp ¶
type Message ¶
type Message struct {
MsgID int64
ReadCount int64
EnqueuedAt time.Time
// VT is "visibility time". The UTC timestamp at which the message will
// be available for reading again.
VT time.Time
Message json.RawMessage
Headers json.RawMessage // Only supported in pgmq-pg17 and above
}
func Pop ¶
Pop reads single message from the queue and deletes it at the same time. Similar to Read and ReadBatch if no messages are available an ErrNoRows is returned. Unlike these methods, the visibility timeout does not apply. This is because the message is immediately deleted.
func Read ¶
Read a single message from the queue. If the queue is empty or all messages are invisible, an ErrNoRows errors is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds.
func ReadBatch ¶
func ReadBatch(ctx context.Context, db DB, queue string, vt int64, numMsgs int64) ([]*Message, error)
ReadBatch reads a specified number of messages from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. If vt is 0 it will be set to the default value, vtDefault.