Documentation
¶
Overview ¶
Package postgres provides a PostgreSQL-based transport for protoflow.
Index ¶
- Constants
- func Build(ctx context.Context, cfg transport.Config, logger watermill.LoggerAdapter) (transport.Transport, error)
- func Capabilities() transport.Capabilities
- func Register()
- type Config
- type Transport
- func (t *Transport) CleanupExpiredLocks() (int64, error)
- func (t *Transport) Close() error
- func (t *Transport) GetCapabilities() transport.Capabilities
- func (t *Transport) GetDB() *sql.DB
- func (t *Transport) GetDLQCount(topic string) (int64, error)
- func (t *Transport) GetPendingCount(topic string) (int64, error)
- func (t *Transport) ListDLQMessages(topic string, limit, offset int) ([]transport.DLQMessage, error)
- func (t *Transport) Publish(topic string, messages ...*message.Message) error
- func (t *Transport) PurgeDLQ(topic string) (int64, error)
- func (t *Transport) ReplayAllDLQ(topic string) (int64, error)
- func (t *Transport) ReplayDLQMessage(dlqID int64) error
- func (t *Transport) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)
- func (t *Transport) VacuumTables() error
Constants ¶
const ( // DefaultPollInterval is the default interval for polling new messages. DefaultPollInterval = 100 * time.Millisecond // DefaultMaxRetries is the default number of retries before moving to DLQ. DefaultMaxRetries = 3 // DefaultLockTimeout is the default duration a message is locked during processing. DefaultLockTimeout = 30 * time.Second )
const TransportName = "postgres"
TransportName is the name used to register this transport.
Variables ¶
This section is empty.
Functions ¶
func Build ¶
func Build(ctx context.Context, cfg transport.Config, logger watermill.LoggerAdapter) (transport.Transport, error)
Build creates a new PostgreSQL transport.
func Capabilities ¶
func Capabilities() transport.Capabilities
Capabilities returns the capabilities of this transport.
Types ¶
type Config ¶
type Config struct {
// ConnectionString is the PostgreSQL connection string.
ConnectionString string
// PollInterval is the interval for polling new messages.
PollInterval time.Duration
// MaxRetries is the number of times to retry a message before moving to DLQ.
MaxRetries int
// LockTimeout is how long a message stays locked during processing.
LockTimeout time.Duration
// SchemaName is the schema to use for tables. Defaults to "protoflow".
SchemaName string
// MaxOpenConns sets the maximum number of open connections to the database.
MaxOpenConns int
// MaxIdleConns sets the maximum number of idle connections.
MaxIdleConns int
}
Config holds PostgreSQL-specific configuration.
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport implements both Publisher and Subscriber interfaces for PostgreSQL.
func New ¶
func New(cfg Config, logger watermill.LoggerAdapter) (*Transport, error)
New creates a new PostgreSQL-based transport.
func (*Transport) CleanupExpiredLocks ¶
CleanupExpiredLocks unlocks messages that have been locked longer than the lock timeout.
func (*Transport) GetCapabilities ¶
func (t *Transport) GetCapabilities() transport.Capabilities
GetCapabilities returns the capabilities of this transport instance.
func (*Transport) GetDLQCount ¶
GetDLQCount returns the number of messages in the dead letter queue for a topic.
func (*Transport) GetPendingCount ¶
GetPendingCount returns the number of pending messages for a topic.
func (*Transport) ListDLQMessages ¶
func (t *Transport) ListDLQMessages(topic string, limit, offset int) ([]transport.DLQMessage, error)
ListDLQMessages returns messages from the dead letter queue with pagination.
func (*Transport) ReplayAllDLQ ¶
ReplayAllDLQ moves all messages from DLQ back to the main queue for a topic.
func (*Transport) ReplayDLQMessage ¶
ReplayDLQMessage moves a message from DLQ back to the main queue.
func (*Transport) VacuumTables ¶
VacuumTables runs VACUUM on the message tables to reclaim space.