pgxutils

package module
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2025 License: MIT Imports: 18 Imported by: 0

README

go-pgx-utils

Enterprise-grade PostgreSQL connection management for Go applications using pgx/v5.

Purpose

go-pgx-utils provides robust PostgreSQL connection management with automatic retry logic, health checking, and transaction utilities. Built on top of pgx/v5, this package integrates seamlessly with go-config for configuration and go-errors for structured error handling.

Features

  • Automatic Connection Retry: Exponential backoff retry logic with configurable timeout
  • Health Checking: Built-in health check with configurable timeout
  • Connection Pooling: Full support for pgxpool configuration
  • Transaction Management: Utilities for safe transaction handling and rollback
  • Error Handling: Integration with go-errors for structured error types
  • Testcontainers Support: Integration tests using testcontainers-go
  • Thread-Safe: Safe for concurrent use after initialization

Installation

go get github.com/JohnPlummer/go-pgx-utils@v1.0.0

Dependencies

This package requires:

Quick Start

package main

import (
    "context"
    "log"
    "time"

    "github.com/JohnPlummer/go-config"
    pgxutils "github.com/JohnPlummer/go-pgx-utils"
)

func main() {
    // Create database configuration
    cfg := &config.DatabaseConfig{
        Host:            "localhost",
        Port:            5432,
        Database:        "mydb",
        User:            "myuser",
        Password:        "mypassword",
        SSLMode:         "disable",
        MaxConns:        25,
        MinConns:        5,
        ConnMaxLifetime: time.Hour,
        ConnMaxIdleTime: 30 * time.Minute,
    }

    // Create connection
    conn, err := pgxutils.NewConnection(cfg)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // Connect with automatic retry
    ctx := context.Background()
    if err := conn.Connect(ctx); err != nil {
        log.Fatal(err)
    }

    // Verify connection health
    if err := conn.Health(ctx); err != nil {
        log.Fatal(err)
    }

    // Execute queries
    _, err = conn.Exec(ctx, "SELECT 1")
    if err != nil {
        log.Fatal(err)
    }
}

Configuration

The package uses config.DatabaseConfig from go-config:

type DatabaseConfig struct {
    Host         string
    Port         int
    Name         string
    User         string
    Password     string
    SSLMode      string
    MaxConns     int
    MinConns     int
    MaxConnLife  time.Duration
    MaxConnIdle  time.Duration
    RetryTimeout time.Duration
}
Environment Variables

Configure via environment variables (using go-config):

DB_HOST=localhost
DB_PORT=5432
DB_NAME=mydb
DB_USER=myuser
DB_PASSWORD=mypassword
DB_SSLMODE=disable
DB_MAX_CONNS=25
DB_MIN_CONNS=5
DB_MAX_CONN_LIFE=1h
DB_MAX_CONN_IDLE=30m
DB_RETRY_TIMEOUT=30s

Or use a .env file or other configuration sources supported by go-config.

Connection Options

Customize connection behavior with functional options:

conn, err := pgxutils.NewConnection(cfg.Database,
    pgxutils.WithLogger(logger),              // Custom logger
    pgxutils.WithHealthTimeout(10*time.Second), // Health check timeout
    pgxutils.WithRetryTimeout(60*time.Second),  // Connection retry timeout
)
Available Options
  • WithLogger: Set a custom *slog.Logger for connection logs
  • WithHealthTimeout: Override default health check timeout (default: 5s)
  • WithRetryTimeout: Override default connection retry timeout (default: 30s)

Retry Logic

Connection retry uses exponential backoff:

  • Starts with 1-second delay
  • Increases exponentially up to 10-second maximum
  • Retries until RetryTimeout is reached (default: 30s)
  • Configurable via config.DatabaseConfig.RetryTimeout or WithRetryTimeout option

Example:

cfg.Database.RetryTimeout = 60 * time.Second // Wait up to 60s
conn, err := pgxutils.NewConnection(cfg.Database)
// OR
conn, err := pgxutils.NewConnection(cfg.Database,
    pgxutils.WithRetryTimeout(60*time.Second),
)

Health Checking

Health checks verify database connectivity:

if err := conn.Health(ctx); err != nil {
    // Connection is unhealthy
    log.Printf("Database health check failed: %v", err)
}

The health check:

  • Executes SELECT 1 query
  • Uses configurable timeout (default: 5s)
  • Returns ErrUnavailable from go-errors on failure
  • Can be customized with WithHealthTimeout option

Transaction Management

Manual Transaction Handling
tx, err := conn.Begin(ctx)
if err != nil {
    return err
}

var txErr error
defer func() {
    if txErr != nil {
        pgxutils.HandleTransactionRollback(ctx, tx, logger)
    }
}()

_, txErr = tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "Alice")
if txErr != nil {
    return txErr
}

if err := tx.Commit(ctx); err != nil {
    txErr = err
    return err
}
Using WithTransaction Helper

The WithTransaction helper automatically handles Begin/Commit/Rollback:

err := pgxutils.WithTransaction(ctx, conn, logger, func(tx pgx.Tx) error {
    _, err := tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "Alice")
    if err != nil {
        return err
    }

    _, err = tx.Exec(ctx, "INSERT INTO audit_log (action) VALUES ($1)", "user_created")
    return err
})

if err != nil {
    // Transaction was automatically rolled back
    log.Printf("Transaction failed: %v", err)
}
Transaction Utilities

HandleTransactionRollback: Safe rollback in defer blocks

defer func() {
    if err != nil {
        pgxutils.HandleTransactionRollback(ctx, tx, logger)
    }
}()

RollbackTransaction: Rollback with error wrapping

defer func() {
    if err != nil {
        err = pgxutils.RollbackTransaction(ctx, tx, logger, err)
    }
}()

WithTransaction: Complete transaction management

err := pgxutils.WithTransaction(ctx, conn, logger, func(tx pgx.Tx) error {
    // Your transaction logic here
    return nil
})

Error Handling

All errors use go-errors for structured error handling:

import "github.com/JohnPlummer/go-errors"

err := conn.Connect(ctx)
if err != nil {
    switch {
    case errors.Is(err, errors.ErrTimeout):
        // Connection timed out after retry attempts
    case errors.Is(err, errors.ErrInvalidInput):
        // Invalid configuration
    case errors.Is(err, errors.ErrPreconditionFailed):
        // Pool not initialized
    case errors.Is(err, errors.ErrUnavailable):
        // Database unavailable
    case errors.Is(err, errors.ErrInternal):
        // Internal database error
    }
}
Error Types
  • ErrInvalidInput: Invalid configuration or parameters
  • ErrTimeout: Connection or operation timeout
  • ErrPreconditionFailed: Operation called before initialization
  • ErrUnavailable: Database unavailable or unhealthy
  • ErrInternal: Database query or transaction error
  • ErrCanceled: Context canceled during transaction
  • ErrDeadlineExceeded: Context deadline exceeded during transaction

Query Methods

The Connection type provides the standard pgx query methods:

// Execute a query without returning rows
tag, err := conn.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "Alice")

// Query multiple rows
rows, err := conn.Query(ctx, "SELECT id, name FROM users")
defer rows.Close()

// Query a single row
var name string
err := conn.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", 1).Scan(&name)

// Begin a transaction
tx, err := conn.Begin(ctx)

// Begin a transaction with options
tx, err := conn.BeginTx(ctx, pgx.TxOptions{
    IsoLevel: pgx.ReadCommitted,
})

Connection Pool Statistics

Monitor pool health with built-in statistics:

stats := conn.Stats()
if stats != nil {
    log.Printf("Pool stats: acquired=%d idle=%d max=%d total=%d",
        stats.AcquiredConns(),
        stats.IdleConns(),
        stats.MaxConns(),
        stats.TotalConns(),
    )
}

Migration Guide

From Monorepo Pattern

If migrating from the monorepo pattern, update your imports:

Before:

import "github.com/YourOrg/your-app/pkg/database"

db, err := database.NewDB(cfg)

After:

import (
    "github.com/JohnPlummer/go-config"
    pgxutils "github.com/JohnPlummer/go-pgx-utils"
)

conn, err := pgxutils.NewConnection(cfg.Database)
Key Changes
  1. Package name: databasepgxutils
  2. Type name: DBConnection
  3. Constructor: NewDB(cfg)NewConnection(cfg.Database)
  4. Config: Custom config → config.DatabaseConfig from go-config
  5. Errors: Standard errors → go-errors integration

Testing

Unit Tests
go test -v -race -cover ./...
Integration Tests

Integration tests use testcontainers and require Docker:

go test -v -race -tags=integration ./...
Coverage
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out

This package maintains >80% test coverage.

Best Practices

  1. Use go-config for configuration: Load DatabaseConfig via go-config
  2. Check errors with go-errors: Use errors.Is() for error type checking
  3. Use WithTransaction for complex transactions: Automatic rollback on error
  4. Set appropriate connection pool sizes: Match your workload characteristics
  5. Monitor pool statistics: Use Stats() for observability
  6. Use health checks: Integrate health checks in readiness probes
  7. Configure retry timeout: Balance startup time vs. reliability
  8. Log transaction rollbacks: Use provided utilities for consistent logging

Thread Safety

The Connection type is thread-safe after Connect() succeeds. Multiple goroutines can safely:

  • Execute queries
  • Begin transactions
  • Check health
  • Access pool statistics

The underlying pgxpool handles concurrency automatically.

Performance Considerations

  • Connection pooling reduces overhead of connection establishment
  • Health check period runs every 30s to detect stale connections
  • Retry logic adds startup latency in failure scenarios
  • Pool size should match expected concurrent query load
  • Use transactions for multi-statement operations

License

MIT License - See LICENSE for details.

Contributing

Contributions welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass
  5. Submit a pull request

Support

For issues and questions:

  • go-config - Configuration management
  • go-errors - Structured error handling
  • pgx - PostgreSQL driver and toolkit

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HandleTransactionRollback

func HandleTransactionRollback(ctx context.Context, tx TransactionRollback, logger *slog.Logger)

HandleTransactionRollback safely rolls back a transaction in deferred cleanup.

Designed for use in defer blocks where err != nil. Logs but doesn't propagate rollback errors to prevent masking the original failure. The pgx driver automatically rolls back uncommitted transactions on connection close, making this a best-effort operation for explicit cleanup.

Expected rollback failures (tx already closed, context canceled, or connection busy) are logged at WARN level to reduce noise. Unexpected failures are logged at ERROR level.

Example usage:

tx, err := conn.Begin(ctx)
if err != nil {
    return err
}
defer func() {
    if err != nil {
        pgxutils.HandleTransactionRollback(ctx, tx, logger)
    }
}()

func RollbackMigrations added in v1.0.2

func RollbackMigrations(databaseURL, migrationsPath string) error

RollbackMigrations rolls back the last migration

func RollbackTransaction

func RollbackTransaction(ctx context.Context, tx TransactionRollback, logger *slog.Logger, originalErr error) error

RollbackTransaction is a convenience wrapper around HandleTransactionRollback that also wraps the original error with transaction context.

This function is useful when you want to both roll back a transaction and preserve the original error with additional context.

Example usage:

tx, err := conn.Begin(ctx)
if err != nil {
    return err
}
defer func() {
    if err != nil {
        err = pgxutils.RollbackTransaction(ctx, tx, logger, err)
    }
}()

func RunMigrations added in v1.0.2

func RunMigrations(databaseURL, migrationsPath string) error

RunMigrations runs database migrations using golang-migrate

func WithTransaction

func WithTransaction(ctx context.Context, conn *Connection, logger *slog.Logger, fn func(pgx.Tx) error) error

WithTransaction executes a function within a database transaction. If the function returns an error, the transaction is rolled back. Otherwise, the transaction is committed.

This helper simplifies transaction management by handling the Begin/Commit/Rollback boilerplate.

Example usage:

err := pgxutils.WithTransaction(ctx, conn, logger, func(tx pgx.Tx) error {
    _, err := tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "Alice")
    if err != nil {
        return err
    }
    _, err = tx.Exec(ctx, "INSERT INTO logs (message) VALUES ($1)", "User created")
    return err
})

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection manages a PostgreSQL connection pool with automatic retry and health checking. Thread-safe after Connect() succeeds.

func NewConnection

func NewConnection(cfg *config.DatabaseConfig, opts ...Option) (*Connection, error)

NewConnection creates a new Connection instance without establishing connections.

Actual connection occurs in Connect() to allow retry configuration. Use functional options to customize behavior.

func (*Connection) Acquire added in v1.0.2

func (db *Connection) Acquire(ctx context.Context) (*ConnectionWrapper, error)

Acquire gets a connection from the pool with context

func (*Connection) AverageAcquireTime added in v1.0.2

func (db *Connection) AverageAcquireTime() time.Duration

AverageAcquireTime returns the average time to acquire a connection

func (*Connection) Begin

func (c *Connection) Begin(ctx context.Context) (pgx.Tx, error)

Begin starts a transaction with default isolation level.

func (*Connection) BeginTx

func (c *Connection) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)

BeginTx starts a transaction with custom isolation and access mode.

func (*Connection) CheckConnections added in v1.0.2

func (db *Connection) CheckConnections() error

CheckConnections verifies that the connection pool is within healthy thresholds

func (*Connection) Close

func (c *Connection) Close()

Close closes all pool connections immediately.

func (*Connection) Connect

func (c *Connection) Connect(ctx context.Context) error

Connect establishes the connection pool with exponential backoff retry.

Retries for up to RetryTimeout (default 30s) with max 10s between attempts. Returns error if connection cannot be established within timeout.

func (*Connection) CopyFrom added in v1.0.2

func (db *Connection) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)

CopyFrom performs a bulk insert using PostgreSQL COPY protocol

func (*Connection) DetailedHealth added in v1.0.2

func (db *Connection) DetailedHealth(ctx context.Context) *HealthStatus

DetailedHealth performs a comprehensive health check and returns detailed status

func (*Connection) Exec

func (c *Connection) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)

Exec executes queries without result rows (INSERT, UPDATE, DELETE).

func (*Connection) GetMetrics added in v1.0.2

func (db *Connection) GetMetrics() *PoolMetrics

GetMetrics returns current pool metrics

func (*Connection) Health

func (c *Connection) Health(ctx context.Context) error

Health verifies database connectivity with a configurable timeout.

Returns error if pool uninitialized or SELECT 1 fails.

func (*Connection) IsHealthy added in v1.0.2

func (db *Connection) IsHealthy(ctx context.Context) bool

IsHealthy is a simple boolean check for database health

func (*Connection) Pool

func (c *Connection) Pool() *pgxpool.Pool

Pool exposes the underlying pgxpool for advanced operations.

Prefer Connection methods; direct pool access bypasses initialization checks.

func (*Connection) Query

func (c *Connection) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

Query executes queries returning multiple rows.

func (*Connection) QueryRow

func (c *Connection) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

QueryRow executes queries expecting single row.

Returns emptyRow with error if pool uninitialized.

func (*Connection) ResetPool added in v1.0.2

func (db *Connection) ResetPool(ctx context.Context) error

ResetPool closes and recreates the connection pool This can be useful for handling certain types of connection errors

func (*Connection) SendBatch added in v1.0.2

func (db *Connection) SendBatch(ctx context.Context, batch *pgx.Batch) pgx.BatchResults

SendBatch sends a batch of queries to be executed

func (*Connection) Stats

func (c *Connection) Stats() *pgxpool.Stat

Stats returns pool metrics or nil if uninitialized.

func (*Connection) WaitForReady added in v1.0.2

func (db *Connection) WaitForReady(ctx context.Context, timeout time.Duration) error

WaitForReady waits for the database to become ready with a timeout

func (*Connection) WithConnection added in v1.0.2

func (db *Connection) WithConnection(ctx context.Context, fn func(*pgxpool.Conn) error) error

WithConnection executes a function with a database connection

func (*Connection) WithTransaction added in v1.0.2

func (db *Connection) WithTransaction(ctx context.Context, fn func(pgx.Tx) error) error

WithTransaction executes a function within a database transaction

type ConnectionWrapper added in v1.0.2

type ConnectionWrapper struct {
	// contains filtered or unexported fields
}

ConnectionWrapper wraps a connection with additional functionality

func (*ConnectionWrapper) Conn added in v1.0.2

func (cw *ConnectionWrapper) Conn() *pgxpool.Conn

Conn returns the underlying connection

func (*ConnectionWrapper) Release added in v1.0.2

func (cw *ConnectionWrapper) Release()

Release returns the connection to the pool

type HealthStatus added in v1.0.2

type HealthStatus struct {
	Healthy     bool          `json:"healthy"`
	Message     string        `json:"message"`
	Latency     time.Duration `json:"latency_ms"`
	Connections int32         `json:"connections"`
	IdleConns   int32         `json:"idle_connections"`
	MaxConns    int32         `json:"max_connections"`
	LastChecked time.Time     `json:"last_checked"`
}

HealthStatus represents the health status of the database

type Option

type Option func(*connectionOptions)

Option is a functional option for configuring Connection.

func WithHealthTimeout

func WithHealthTimeout(timeout time.Duration) Option

WithHealthTimeout sets a custom timeout for health checks. Default is 5 seconds.

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger sets a custom logger for the connection.

func WithRetryTimeout

func WithRetryTimeout(timeout time.Duration) Option

WithRetryTimeout sets a custom timeout for connection retry logic. Default is 30 seconds.

type PoolMetrics added in v1.0.2

type PoolMetrics struct {
	TotalConns           int32         `json:"total_connections"`
	AcquiredConns        int32         `json:"acquired_connections"`
	IdleConns            int32         `json:"idle_connections"`
	MaxConns             int32         `json:"max_connections"`
	TotalAcquireCount    int64         `json:"total_acquire_count"`
	TotalAcquireTime     time.Duration `json:"total_acquire_time"`
	EmptyAcquireCount    int64         `json:"empty_acquire_count"`
	CanceledAcquireCount int64         `json:"canceled_acquire_count"`
}

PoolMetrics provides detailed metrics about the connection pool

type TransactionRollback

type TransactionRollback interface {
	Rollback(ctx context.Context) error
}

TransactionRollback defines the minimal interface needed for transaction rollback. This allows for easier testing while maintaining compatibility with pgx.Tx.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL