postgres

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package postgres provides a PostgreSQL-based transport for protoflow.

Index

Constants

View Source
const (
	// DefaultPollInterval is the default interval for polling new messages.
	DefaultPollInterval = 100 * time.Millisecond
	// DefaultMaxRetries is the default number of retries before moving to DLQ.
	DefaultMaxRetries = 3
	// DefaultLockTimeout is the default duration a message is locked during processing.
	DefaultLockTimeout = 30 * time.Second
)
View Source
const TransportName = "postgres"

TransportName is the name used to register this transport.

Variables

This section is empty.

Functions

func Build

Build creates a new PostgreSQL transport.

func Capabilities

func Capabilities() transport.Capabilities

Capabilities returns the capabilities of this transport.

Types

type Config

type Config struct {
	// ConnectionString is the PostgreSQL connection string.
	ConnectionString string
	// PollInterval is the interval for polling new messages.
	PollInterval time.Duration
	// MaxRetries is the number of times to retry a message before moving to DLQ.
	MaxRetries int
	// LockTimeout is how long a message stays locked during processing.
	LockTimeout time.Duration
	// SchemaName is the schema to use for tables. Defaults to "protoflow".
	SchemaName string
	// MaxOpenConns sets the maximum number of open connections to the database.
	MaxOpenConns int
	// MaxIdleConns sets the maximum number of idle connections.
	MaxIdleConns int
}

Config holds PostgreSQL-specific configuration.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport implements both Publisher and Subscriber interfaces for PostgreSQL.

func New

func New(cfg Config, logger watermill.LoggerAdapter) (*Transport, error)

New creates a new PostgreSQL-based transport.

func (*Transport) CleanupExpiredLocks

func (t *Transport) CleanupExpiredLocks() (int64, error)

CleanupExpiredLocks unlocks messages that have been locked longer than the lock timeout.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the transport and releases resources.

func (*Transport) GetCapabilities

func (t *Transport) GetCapabilities() transport.Capabilities

GetCapabilities returns the capabilities of this transport instance.

func (*Transport) GetDB

func (t *Transport) GetDB() *sql.DB

GetDB returns the underlying database connection for advanced use cases.

func (*Transport) GetDLQCount

func (t *Transport) GetDLQCount(topic string) (int64, error)

GetDLQCount returns the number of messages in the dead letter queue for a topic.

func (*Transport) GetPendingCount

func (t *Transport) GetPendingCount(topic string) (int64, error)

GetPendingCount returns the number of pending messages for a topic.

func (*Transport) ListDLQMessages

func (t *Transport) ListDLQMessages(topic string, limit, offset int) ([]transport.DLQMessage, error)

ListDLQMessages returns messages from the dead letter queue with pagination.

func (*Transport) Publish

func (t *Transport) Publish(topic string, messages ...*message.Message) error

Publish publishes messages to the specified topic.

func (*Transport) PurgeDLQ

func (t *Transport) PurgeDLQ(topic string) (int64, error)

PurgeDLQ removes all messages from the dead letter queue for a topic.

func (*Transport) ReplayAllDLQ

func (t *Transport) ReplayAllDLQ(topic string) (int64, error)

ReplayAllDLQ moves all messages from DLQ back to the main queue for a topic.

func (*Transport) ReplayDLQMessage

func (t *Transport) ReplayDLQMessage(dlqID int64) error

ReplayDLQMessage moves a message from DLQ back to the main queue.

func (*Transport) Subscribe

func (t *Transport) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe subscribes to messages from the specified topic.

func (*Transport) VacuumTables

func (t *Transport) VacuumTables() error

VacuumTables runs VACUUM on the message tables to reclaim space.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL