pgmq

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	QueueName      string             `required:"true"`
	MessageHandler MessageHandlerFunc `required:"true"`
	DbPool         *sql.DB            `required:"true"`

	//-- configurable fields with defaults
	// PollingInterval is the number of seconds to wait between polling for new messages when none are found, default is 1 second
	PollingInterval int

	// VisibilityTimeout is the number of seconds a message is hidden from other consumers while being processed, default is 10 seconds
	VisibilityTimeout int

	// ConcurrentMsgs is the number of messages to process concurrently, default is 10
	ConcurrentMsgs int

	// ArchiveAfterHandle indicates whether to archive messages after they have been handled, default is false (messages are deleted)
	ArchiveAfterHandle bool

	// ExponentialBackoff is the number of seconds to increase the sleep time by when no messages are found, default is 0 seconds
	ExponentialBackoff int

	// ExponentialPollingLimit is the maximum number of seconds to sleep when no messages are found, default is 10 seconds
	ExponentialPollingLimit int
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(pool *sql.DB, queueName string, handlerFunc MessageHandlerFunc, mods ...ConsumerModifier) *Consumer

func (*Consumer) ArchiveMessage

func (s *Consumer) ArchiveMessage(msg *PgmqMessage)

func (*Consumer) DeleteMessage

func (s *Consumer) DeleteMessage(msg *PgmqMessage)

func (*Consumer) PurgeQueue

func (s *Consumer) PurgeQueue(msg *PgmqMessage)

func (*Consumer) ShutdownWithWait

func (s *Consumer) ShutdownWithWait()

func (*Consumer) Start

func (s *Consumer) Start()

type ConsumerModifier

type ConsumerModifier func(*Consumer)

func WithConcurrentMsgs

func WithConcurrentMsgs(count int) ConsumerModifier

func WithExponentialBackoff

func WithExponentialBackoff(secs int) ConsumerModifier

func WithPollingInterval

func WithPollingInterval(secs int) ConsumerModifier

func WithVisibilityTimeout

func WithVisibilityTimeout(secs int) ConsumerModifier

type MessageHandlerFunc

type MessageHandlerFunc func(ctx context.Context, msg *PgmqMessage)

type PgmqMessage

type PgmqMessage struct {
	MsgID      int64
	ReadCount  int
	EnqueuedAt *time.Time
	VT         *time.Time
	Message    *string
	Headers    *string
}

type Producer

type Producer struct {
	DbPool *sql.DB `required:"true"`
}

func NewProducer

func NewProducer(pool *sql.DB) *Producer

func (*Producer) Produce

func (s *Producer) Produce(queueName, message, headers string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL