postgres

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package postgres provides a PostgreSQL-based transport for protoflow.

Index

Constants

View Source
const (
	// DefaultPollInterval is the default interval for polling new messages.
	DefaultPollInterval = 100 * time.Millisecond
	// DefaultMaxRetries is the default number of retries before moving to DLQ.
	DefaultMaxRetries = 3
	// DefaultLockTimeout is the default duration a message is locked during processing.
	DefaultLockTimeout = 30 * time.Second
)
View Source
const TransportName = "postgres"

TransportName is the name used to register this transport.

Variables

This section is empty.

Functions

func Build

Build creates a new PostgreSQL transport.

func Capabilities

func Capabilities() transport.Capabilities

Capabilities returns the capabilities of this transport.

func Register added in v0.5.0

func Register()

Register registers the PostgreSQL transport with the default registry. This should be called from an init() function in an importing package, or explicitly before using the transport.

Types

type Config

type Config struct {
	// ConnectionString is the PostgreSQL connection string.
	ConnectionString string
	// PollInterval is the interval for polling new messages.
	PollInterval time.Duration
	// MaxRetries is the number of times to retry a message before moving to DLQ.
	MaxRetries int
	// LockTimeout is how long a message stays locked during processing.
	LockTimeout time.Duration
	// SchemaName is the schema to use for tables. Defaults to "protoflow".
	SchemaName string
	// MaxOpenConns sets the maximum number of open connections to the database.
	MaxOpenConns int
	// MaxIdleConns sets the maximum number of idle connections.
	MaxIdleConns int
}

Config holds PostgreSQL-specific configuration.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport implements both Publisher and Subscriber interfaces for PostgreSQL.

func New

func New(cfg Config, logger watermill.LoggerAdapter) (*Transport, error)

New creates a new PostgreSQL-based transport.

func (*Transport) CleanupExpiredLocks

func (t *Transport) CleanupExpiredLocks() (int64, error)

CleanupExpiredLocks unlocks messages that have been locked longer than the lock timeout.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the transport and releases resources.

func (*Transport) GetCapabilities

func (t *Transport) GetCapabilities() transport.Capabilities

GetCapabilities returns the capabilities of this transport instance.

func (*Transport) GetDB

func (t *Transport) GetDB() *sql.DB

GetDB returns the underlying database connection for advanced use cases.

func (*Transport) GetDLQCount

func (t *Transport) GetDLQCount(topic string) (int64, error)

GetDLQCount returns the number of messages in the dead letter queue for a topic.

func (*Transport) GetPendingCount

func (t *Transport) GetPendingCount(topic string) (int64, error)

GetPendingCount returns the number of pending messages for a topic.

func (*Transport) ListDLQMessages

func (t *Transport) ListDLQMessages(topic string, limit, offset int) ([]transport.DLQMessage, error)

ListDLQMessages returns messages from the dead letter queue with pagination.

func (*Transport) Publish

func (t *Transport) Publish(topic string, messages ...*message.Message) error

Publish publishes messages to the specified topic.

func (*Transport) PurgeDLQ

func (t *Transport) PurgeDLQ(topic string) (int64, error)

PurgeDLQ removes all messages from the dead letter queue for a topic.

func (*Transport) ReplayAllDLQ

func (t *Transport) ReplayAllDLQ(topic string) (int64, error)

ReplayAllDLQ moves all messages from DLQ back to the main queue for a topic.

func (*Transport) ReplayDLQMessage

func (t *Transport) ReplayDLQMessage(dlqID int64) error

ReplayDLQMessage moves a message from DLQ back to the main queue.

func (*Transport) Subscribe

func (t *Transport) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe subscribes to messages from the specified topic.

func (*Transport) VacuumTables

func (t *Transport) VacuumTables() error

VacuumTables runs VACUUM on the message tables to reclaim space.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL