pgxkit

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2026 License: MIT Imports: 19 Imported by: 0

README

pgxkit

Go Version CI Status Go Report Card Release

A production-ready PostgreSQL toolkit for Go applications — tool-agnostic utilities for connection pooling, observability, testing, and type helpers.

Overview

pgxkit is a tool-agnostic PostgreSQL toolkit that works with any approach to PostgreSQL development:

  • Raw pgx usage - Use pgxkit directly with pgx for maximum control
  • Any code generation tool - Works with sqlc, Skimatik, or any other tool
  • Clean architecture - Separate your database layer from your business logic
  • Production-ready - Built-in observability, retry logic, and graceful shutdown

Key Features

  • 🔄 Read/Write Pool Abstraction - Safe by default, optimized when needed
  • 🎣 Extensible Hook System - Add logging, tracing, metrics, circuit breakers
  • 🔁 Smart Retry Logic - PostgreSQL-aware error detection and exponential backoff
  • 🧪 Testing Infrastructure - Golden test support for performance regression detection
  • 🔧 Type Helpers - Seamless pgx type conversions
  • 📊 Health Checks - Built-in database connectivity monitoring
  • 🛡️ Graceful Shutdown - Production-ready lifecycle management
  • 🔀 Executor Interface - Write functions that work with both *DB and *Tx

Installation

go get github.com/nhalm/pgxkit

Quick Start

Basic Usage
package main

import (
    "context"
    "log"

    "github.com/nhalm/pgxkit"
)

func main() {
    ctx := context.Background()

    // Create and connect to database
    db := pgxkit.NewDB()

    // Connect using environment variables or explicit DSN
    err := db.Connect(ctx, "") // Uses POSTGRES_* env vars
    if err != nil {
        log.Fatal(err)
    }
    defer db.Shutdown(ctx)

    // Execute queries (uses write pool by default - safe)
    _, err = db.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "John")
    if err != nil {
        log.Fatal(err)
    }

    // Optimize reads with explicit read pool usage
    rows, err := db.ReadQuery(ctx, "SELECT id, name FROM users")
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()

    // Process results...
}
With Read/Write Splitting
db := pgxkit.NewDB()

// Connect with separate read and write pools
err := db.ConnectReadWrite(ctx, readDSN, writeDSN)
if err != nil {
    log.Fatal(err)
}

// Writes go to write pool (safe by default)
db.Exec(ctx, "INSERT INTO users ...")

// Reads can use read pool for optimization
db.ReadQuery(ctx, "SELECT * FROM users")

Configuration

Environment Variables

pgxkit uses these environment variables when no DSN is provided:

  • POSTGRES_HOST (default: "localhost")
  • POSTGRES_PORT (default: 5432)
  • POSTGRES_USER (default: "postgres")
  • POSTGRES_PASSWORD (default: "")
  • POSTGRES_DB (default: "postgres")
  • POSTGRES_SSLMODE (default: "disable")
DSN Utilities
// Get DSN from environment variables
dsn := pgxkit.GetDSN()

// Use with external tools
dsn := pgxkit.GetDSN()

Hooks System

Add observability and custom functionality through connect options:

db := pgxkit.NewDB()
err := db.Connect(ctx, dsn,
    // Logging hook
    pgxkit.WithBeforeOperation(func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
        log.Printf("Executing: %s", sql)
        return nil
    }),
    // Metrics hook
    pgxkit.WithAfterOperation(func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
        if operationErr != nil {
            metrics.IncrementCounter("db.errors")
        }
        return nil
    }),
    // Connection setup
    pgxkit.WithOnConnect(func(conn *pgx.Conn) error {
        log.Println("New connection established")
        return nil
    }),
)
Available Options

Pool configuration:

  • WithMaxConns(n int32) - Maximum connections in pool
  • WithMinConns(n int32) - Minimum connections in pool
  • WithMaxConnLifetime(d time.Duration) - Maximum connection lifetime
  • WithMaxConnIdleTime(d time.Duration) - Maximum idle time

Operation hooks:

  • WithBeforeOperation(fn) - Before any query/exec
  • WithAfterOperation(fn) - After any query/exec
  • WithBeforeTransaction(fn) - Before transaction starts
  • WithAfterTransaction(fn) - After transaction completes
  • WithOnShutdown(fn) - During graceful shutdown

Connection hooks:

  • WithOnConnect(fn) - When new connection is established
  • WithOnDisconnect(fn) - When connection is closed
  • WithOnAcquire(fn) - When connection is acquired from pool
  • WithOnRelease(fn) - When connection is returned to pool

Retry Logic

RetryOperation Function
// Retry with default settings:
// - 3 retry attempts
// - 100ms initial delay
// - 1s maximum delay
// - 2x exponential backoff
err := pgxkit.RetryOperation(ctx, func(ctx context.Context) error {
    _, err := db.Exec(ctx, "INSERT INTO users (name, email) VALUES ($1, $2)",
        "John Doe", "john@example.com")
    return err
})

// Retry with custom configuration
err = pgxkit.RetryOperation(ctx, func(ctx context.Context) error {
    rows, err := db.Query(ctx, "SELECT * FROM users")
    if err != nil {
        return err
    }
    defer rows.Close()
    // Process rows...
    return nil
}, pgxkit.WithMaxRetries(5), pgxkit.WithMaxDelay(5*time.Second))

// Retry with timeout using context.WithTimeout
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
user, err := pgxkit.Retry(ctx, func(ctx context.Context) (*User, error) {
    return getUserFromDatabase(ctx)
}, pgxkit.WithMaxRetries(3))
Timeout Behavior

The timeout (set via context.WithTimeout) applies to all retry attempts combined, not per-attempt. If your timeout is 5 seconds and the first attempt takes 3 seconds, subsequent retries share the remaining 2 seconds.

Smart Error Detection

pgxkit automatically detects which PostgreSQL errors are worth retrying:

Error Type Retries? Examples
Network timeouts Yes context deadline exceeded during dial
Connection failures Yes connection refused, connection reset
PostgreSQL connection errors Yes 08000, 08003, 08006
Server shutdown Yes 57P01, 57P02, 57P03
Serialization/deadlock Yes 40001, 40P01
Context cancellation No context canceled
No rows found No pgx.ErrNoRows
Constraint violations No unique_violation, foreign_key_violation
Syntax errors No syntax_error
// Check if an error would be retried
if pgxkit.IsRetryableError(err) {
    log.Println("Transient error - would retry")
} else {
    log.Println("Permanent error - would not retry")
}

Health Checks

// Check database connectivity
if db.IsReady(ctx) {
    log.Println("Database is ready")
}

// Detailed health check
if err := db.HealthCheck(ctx); err != nil {
    log.Printf("Database health check failed: %v", err)
}

Testing

Basic Testing
func TestUserOperations(t *testing.T) {
    testDB := pgxkit.NewTestDB()
    ctx := context.Background()
    err := testDB.Connect(ctx, "") // Uses TEST_DATABASE_URL env var
    if err != nil {
        t.Skip("Test database not available")
    }
    defer testDB.Shutdown(ctx)

    err = testDB.Setup()
    if err != nil {
        t.Skip("Test database setup failed")
    }
    defer testDB.Clean()

    // Use testDB.DB for your tests
    _, err = testDB.Exec(ctx, "INSERT INTO users ...")
    // ... test assertions
}
Golden Testing (Performance Regression Detection)
func TestUserQueries(t *testing.T) {
    testDB := pgxkit.NewTestDB()
    ctx := context.Background()
    err := testDB.Connect(ctx, "")
    if err != nil {
        t.Skip("Test database not available")
    }
    defer testDB.Shutdown(ctx)

    testDB.Setup()
    defer testDB.Clean()

    // Enable golden test hooks - captures EXPLAIN plans automatically
    db := testDB.EnableGolden("TestUserQueries")

    // These queries will have their EXPLAIN plans captured
    rows, err := db.Query(ctx, "SELECT * FROM users WHERE active = true")
    // ... more queries

    // Plans are saved to testdata/golden/TestUserQueries_query_1.json, etc.
    // Future runs compare plans to detect performance regressions
}

Type Helpers

Seamless conversions between Go types and pgx types:

// String conversions
pgxText := pgxkit.ToPgxText(&myString)
stringPtr := pgxkit.FromPgxText(pgxText)

// Numeric conversions
pgxInt := pgxkit.ToPgxInt8(&myInt64)
intPtr := pgxkit.FromPgxInt8(pgxInt)

// UUID conversions
pgxUUID := pgxkit.ToPgxUUID(myUUID)
uuid := pgxkit.FromPgxUUID(pgxUUID)

// Time conversions
pgxTime := pgxkit.ToPgxTimestamptz(&myTime)
timePtr := pgxkit.FromPgxTimestamptz(pgxTime)

// Array conversions
pgxArray := pgxkit.ToPgxTextArray(myStringSlice)
stringSlice := pgxkit.FromPgxTextArray(pgxArray)

Production Features

Graceful Shutdown
// Graceful shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err := db.Shutdown(ctx)
if err != nil {
    log.Printf("Shutdown completed with warnings: %v", err)
}
Connection Statistics
// Get pool statistics
stats := db.Stats()          // Write pool stats
readStats := db.ReadStats()  // Read pool stats (if using read/write split)

log.Printf("Active connections: %d", stats.AcquiredConns())
log.Printf("Idle connections: %d", stats.IdleConns())
Error Handling
// Structured error types
err := pgxkit.NewNotFoundError("User", userID)
err := pgxkit.NewValidationError("Email", "create", "address", "invalid format", nil)
err := pgxkit.NewDatabaseError("Order", "query", originalErr)

// Type checking
var notFoundErr *pgxkit.NotFoundError
if errors.As(err, &notFoundErr) {
    // Handle not found error
}

Architecture

pgxkit follows these design principles:

  1. Safety First - All default methods use write pool for consistency
  2. Explicit Optimization - Use ReadQuery() methods for read optimization
  3. Tool Agnostic - Works with any PostgreSQL development approach
  4. Extensible - Hook system for custom functionality
  5. Production Ready - Built-in observability and lifecycle management

Examples

With Raw pgx
db := pgxkit.NewDB()
db.Connect(ctx, "postgres://...")

// Use pgx directly with pgxkit utilities
rows, err := db.Query(ctx, "SELECT * FROM users")
for rows.Next() {
    var user User
    err := rows.Scan(&user.ID, &user.Name)
    // ...
}
With Any Code Generation Tool
// Works with sqlc, Skimatik, or any other tool
db := pgxkit.NewDB()
db.Connect(ctx, "postgres://...")

// Use your generated code with pgxkit's connection
queries := sqlc.New(db.GetPool()) // or your tool's constructor
users, err := queries.GetAllUsers(ctx)
With Hooks for Observability
db := pgxkit.NewDB()
err := db.Connect(ctx, dsn,
    // Add tracing
    pgxkit.WithBeforeOperation(func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
        span := trace.SpanFromContext(ctx)
        span.SetAttributes(attribute.String("db.statement", sql))
        return nil
    }),
    // Add metrics
    pgxkit.WithAfterOperation(func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
        status := "success"
        if operationErr != nil {
            status = "error"
        }
        queryCounter.WithLabelValues(status).Inc()
        return nil
    }),
)

Documentation

Visit the pgxkit Wiki - Complete documentation with examples and guides

Contributing

We welcome contributions! Please see our Contributing Guide for details.

License

MIT License - see LICENSE file for details.

Documentation

Overview

Package pgxkit provides a production-ready PostgreSQL toolkit for Go applications.

pgxkit is a tool-agnostic PostgreSQL toolkit that works with any approach to PostgreSQL development - raw pgx usage, code generation tools like sqlc or Skimatik, or any other PostgreSQL development workflow.

Key Features:

  • Read/Write Pool Abstraction: Safe by default with write pool, explicit read pool methods for optimization
  • Extensible Hook System: Add logging, tracing, metrics, circuit breakers through hooks
  • Smart Retry Logic: PostgreSQL-aware error detection with exponential backoff
  • Testing Infrastructure: Golden test support for performance regression detection
  • Type Helpers: Seamless pgx type conversions for clean architecture
  • Health Checks: Built-in database connectivity monitoring
  • Graceful Shutdown: Production-ready lifecycle management

Basic Usage:

db := pgxkit.NewDB()
err := db.Connect(ctx, "", pgxkit.WithMaxConns(25))
if err != nil {
    log.Fatal(err)
}
defer db.Shutdown(ctx)

// Execute queries (uses write pool by default - safe)
_, err = db.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "John")

// Optimize reads with explicit read pool usage
rows, err := db.ReadQuery(ctx, "SELECT id, name FROM users")

Hook System:

db := pgxkit.NewDB()
err := db.Connect(ctx, "",
    pgxkit.WithBeforeOperation(func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
        log.Printf("Executing: %s", sql)
        return nil
    }),
)

The package follows a "safety first" design - all default methods use the write pool for consistency, with explicit ReadQuery() methods available for read optimization.

Transaction Usage:

pgxkit provides a Tx type that wraps pgx.Tx and implements the Executor interface, allowing you to write functions that work with both *DB and *Tx interchangeably.

func CreateUser(ctx context.Context, exec pgxkit.Executor, name string) (int, error) {
    var id int
    err := exec.QueryRow(ctx, "INSERT INTO users (name) VALUES ($1) RETURNING id", name).Scan(&id)
    return id, err
}

// Works with *DB
id, err := CreateUser(ctx, db, "Alice")

// Works with *Tx
tx, _ := db.BeginTx(ctx, pgx.TxOptions{})
id, err := CreateUser(ctx, tx, "Bob")

The recommended transaction pattern uses defer for safety:

tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
    return err
}
defer tx.Rollback(ctx) // Safe no-op if already committed

_, err = tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "Alice")
if err != nil {
    return err
}

return tx.Commit(ctx)

Transactions are tracked by activeOps for graceful shutdown - Shutdown will wait for active transactions to complete. The *Tx type is NOT goroutine-safe.

Index

Constants

View Source
const (
	TxCommit   = "TX:COMMIT"
	TxRollback = "TX:ROLLBACK"
)

Variables

View Source
var ErrTxFinalized = errors.New("transaction already finalized")

Functions

func CleanupGolden

func CleanupGolden(testName string) error

CleanupGolden removes all golden test files for the specified test name. This includes both the captured query plan file and its baseline file from the testdata/golden directory.

func CleanupTestData

func CleanupTestData(sqlStatements ...string)

CleanupTestData executes cleanup SQL statements on the shared test database

func FromPgxBool

func FromPgxBool(b pgtype.Bool) *bool

FromPgxBool converts a pgtype.Bool to a bool pointer. If the pgtype.Bool is invalid (NULL), returns nil.

func FromPgxBoolToBool

func FromPgxBoolToBool(b pgtype.Bool) bool

FromPgxBoolToBool converts a pgtype.Bool to a bool value. If the pgtype.Bool is invalid (NULL), returns false.

func FromPgxDate

func FromPgxDate(d pgtype.Date) *time.Time

FromPgxDate converts a pgtype.Date to a time.Time pointer. If the pgtype.Date is invalid (NULL), returns nil.

func FromPgxFloat4

func FromPgxFloat4(f pgtype.Float4) *float32

FromPgxFloat4 converts a pgtype.Float4 to a float32 pointer. If the pgtype.Float4 is invalid (NULL), returns nil.

func FromPgxFloat8

func FromPgxFloat8(f pgtype.Float8) *float64

FromPgxFloat8 converts a pgtype.Float8 to a float64 pointer. If the pgtype.Float8 is invalid (NULL), returns nil.

func FromPgxInt2

func FromPgxInt2(i pgtype.Int2) *int16

FromPgxInt2 converts a pgtype.Int2 to an int16 pointer. If the pgtype.Int2 is invalid (NULL), returns nil.

func FromPgxInt4

func FromPgxInt4(i pgtype.Int4) *int32

FromPgxInt4 converts a pgtype.Int4 to an int32 pointer. If the pgtype.Int4 is invalid (NULL), returns nil.

func FromPgxInt4ToInt

func FromPgxInt4ToInt(i pgtype.Int4) *int

FromPgxInt4ToInt converts a pgtype.Int4 to an int pointer. If the pgtype.Int4 is invalid (NULL), returns nil.

func FromPgxInt8

func FromPgxInt8(i pgtype.Int8) *int64

FromPgxInt8 converts a pgtype.Int8 to an int64 pointer. If the pgtype.Int8 is invalid (NULL), returns nil.

func FromPgxInt8Array

func FromPgxInt8Array(a pgtype.Array[pgtype.Int8]) []int64

FromPgxInt8Array converts a pgtype.Array[pgtype.Int8] to an int64 slice. If the array is invalid (NULL), returns nil.

func FromPgxNumeric

func FromPgxNumeric(n pgtype.Numeric) *float64

FromPgxNumeric converts a pgtype.Numeric to a float64 pointer. If the pgtype.Numeric is invalid (NULL), returns nil.

func FromPgxText

func FromPgxText(t pgtype.Text) *string

FromPgxText converts a pgtype.Text to a string pointer. If the pgtype.Text is invalid (NULL), returns nil.

func FromPgxTextArray

func FromPgxTextArray(a pgtype.Array[pgtype.Text]) []string

FromPgxTextArray converts a pgtype.Array[pgtype.Text] to a string slice. If the array is invalid (NULL), returns nil.

func FromPgxTextToString

func FromPgxTextToString(t pgtype.Text) string

FromPgxTextToString converts a pgtype.Text to a string value. If the pgtype.Text is invalid (NULL), returns empty string.

func FromPgxTime

func FromPgxTime(t pgtype.Time) *time.Time

FromPgxTime converts a pgtype.Time to a time.Time pointer. If the pgtype.Time is invalid (NULL), returns nil. The returned time will be on the current date with the time component.

func FromPgxTimestamp

func FromPgxTimestamp(t pgtype.Timestamp) *time.Time

FromPgxTimestamp converts a pgtype.Timestamp to a time.Time pointer. If the pgtype.Timestamp is invalid (NULL), returns nil.

func FromPgxTimestamptz

func FromPgxTimestamptz(t pgtype.Timestamptz) time.Time

FromPgxTimestamptz converts a pgtype.Timestamptz to a time.Time value. If the pgtype.Timestamptz is invalid (NULL), returns zero time.

func FromPgxTimestamptzPtr

func FromPgxTimestamptzPtr(t pgtype.Timestamptz) *time.Time

FromPgxTimestamptzPtr converts a pgtype.Timestamptz to a time.Time pointer. If the pgtype.Timestamptz is invalid (NULL), returns nil.

func FromPgxUUID

func FromPgxUUID(pgxID pgtype.UUID) uuid.UUID

FromPgxUUID converts a pgtype.UUID to uuid.UUID. Returns uuid.Nil if the pgtype.UUID is invalid or cannot be parsed.

func FromPgxUUIDToPtr

func FromPgxUUIDToPtr(pgxID pgtype.UUID) *uuid.UUID

FromPgxUUIDToPtr converts a pgtype.UUID to a uuid.UUID pointer. If the pgtype.UUID is invalid (NULL), returns nil.

func GetDSN

func GetDSN() string

GetDSN returns a PostgreSQL connection string built from environment variables. This is useful for scripts and tools that need a connection string rather than a pgxpool.Pool.

Environment variables used:

  • POSTGRES_HOST (default: "localhost")
  • POSTGRES_PORT (default: 5432)
  • POSTGRES_USER (default: "postgres")
  • POSTGRES_PASSWORD (default: "")
  • POSTGRES_DB (default: "postgres")
  • POSTGRES_SSLMODE (default: "disable")

Example:

dsn := pgxkit.GetDSN()

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError determines if an error is worth retrying

func Retry

func Retry[T any](ctx context.Context, fn func(context.Context) (T, error), opts ...RetryOption) (T, error)

Retry executes a generic operation with configurable retry logic. It uses exponential backoff to avoid thundering herd problems.

func RetryOperation

func RetryOperation(ctx context.Context, operation func(context.Context) error, opts ...RetryOption) error

RetryOperation executes an operation with configurable retry logic. It uses exponential backoff to avoid thundering herd problems.

Example:

err := pgxkit.RetryOperation(ctx, func(ctx context.Context) error {
    return doSomething(ctx)
}, pgxkit.WithMaxRetries(5), pgxkit.WithMaxDelay(5*time.Second))

func ToPgxBool

func ToPgxBool(b *bool) pgtype.Bool

ToPgxBool converts a bool pointer to pgtype.Bool. If the input is nil, returns an invalid pgtype.Bool (NULL in database).

func ToPgxBoolFromBool

func ToPgxBoolFromBool(b bool) pgtype.Bool

ToPgxBoolFromBool converts a bool value to pgtype.Bool. Use this when you have a bool value instead of a pointer.

func ToPgxDate

func ToPgxDate(t *time.Time) pgtype.Date

ToPgxDate converts a time.Time pointer to pgtype.Date. If the input is nil, returns an invalid pgtype.Date (NULL in database).

func ToPgxFloat4

func ToPgxFloat4(f *float32) pgtype.Float4

ToPgxFloat4 converts a float32 pointer to pgtype.Float4. If the input is nil, returns an invalid pgtype.Float4 (NULL in database).

func ToPgxFloat8

func ToPgxFloat8(f *float64) pgtype.Float8

ToPgxFloat8 converts a float64 pointer to pgtype.Float8. If the input is nil, returns an invalid pgtype.Float8 (NULL in database).

func ToPgxInt2

func ToPgxInt2(i *int16) pgtype.Int2

ToPgxInt2 converts an int16 pointer to pgtype.Int2. If the input is nil, returns an invalid pgtype.Int2 (NULL in database).

func ToPgxInt4

func ToPgxInt4(i *int32) pgtype.Int4

ToPgxInt4 converts an int32 pointer to pgtype.Int4. If the input is nil, returns an invalid pgtype.Int4 (NULL in database).

func ToPgxInt4FromInt

func ToPgxInt4FromInt(i *int) pgtype.Int4

ToPgxInt4FromInt converts an int pointer to pgtype.Int4. If the input is nil, returns an invalid pgtype.Int4 (NULL in database).

func ToPgxInt8

func ToPgxInt8(i *int64) pgtype.Int8

ToPgxInt8 converts an int64 pointer to pgtype.Int8. If the input is nil, returns an invalid pgtype.Int8 (NULL in database).

func ToPgxInt8Array

func ToPgxInt8Array(s []int64) pgtype.Array[pgtype.Int8]

ToPgxInt8Array converts an int64 slice to pgtype.Array[pgtype.Int8]. If the input is nil, returns an invalid array (NULL in database).

func ToPgxNumeric

func ToPgxNumeric(f *float64) pgtype.Numeric

ToPgxNumeric converts a float64 pointer to pgtype.Numeric. If the input is nil, returns an invalid pgtype.Numeric (NULL in database). Uses 6 decimal places as standard precision.

func ToPgxText

func ToPgxText(s *string) pgtype.Text

ToPgxText converts a string pointer to pgtype.Text. If the input is nil, returns an invalid pgtype.Text (NULL in database).

func ToPgxTextArray

func ToPgxTextArray(s []string) pgtype.Array[pgtype.Text]

ToPgxTextArray converts a string slice to pgtype.Array[pgtype.Text]. If the input is nil, returns an invalid array (NULL in database).

func ToPgxTextFromString

func ToPgxTextFromString(s string) pgtype.Text

ToPgxTextFromString converts a string value to pgtype.Text. Use this when you have a string value instead of a pointer.

func ToPgxTime

func ToPgxTime(t *time.Time) pgtype.Time

ToPgxTime converts a time.Time pointer to pgtype.Time. If the input is nil, returns an invalid pgtype.Time (NULL in database).

func ToPgxTimestamp

func ToPgxTimestamp(t *time.Time) pgtype.Timestamp

ToPgxTimestamp converts a time.Time pointer to pgtype.Timestamp. If the input is nil, returns an invalid pgtype.Timestamp (NULL in database).

func ToPgxTimestamptz

func ToPgxTimestamptz(t *time.Time) pgtype.Timestamptz

ToPgxTimestamptz converts a time.Time pointer to pgtype.Timestamptz. If the input is nil, returns an invalid pgtype.Timestamptz (NULL in database).

func ToPgxUUID

func ToPgxUUID(id uuid.UUID) pgtype.UUID

ToPgxUUID converts a uuid.UUID to pgtype.UUID.

func ToPgxUUIDFromPtr

func ToPgxUUIDFromPtr(id *uuid.UUID) pgtype.UUID

ToPgxUUIDFromPtr converts a uuid.UUID pointer to pgtype.UUID. If the input is nil, returns an invalid pgtype.UUID (NULL in database).

Types

type ConnectOption

type ConnectOption func(*connectConfig)

ConnectOption configures a database connection.

func WithAfterOperation

func WithAfterOperation(fn HookFunc) ConnectOption

func WithAfterTransaction

func WithAfterTransaction(fn HookFunc) ConnectOption

func WithBeforeOperation

func WithBeforeOperation(fn HookFunc) ConnectOption

func WithBeforeTransaction

func WithBeforeTransaction(fn HookFunc) ConnectOption

func WithMaxConnIdleTime

func WithMaxConnIdleTime(d time.Duration) ConnectOption

func WithMaxConnLifetime

func WithMaxConnLifetime(d time.Duration) ConnectOption

func WithMaxConns

func WithMaxConns(n int32) ConnectOption

func WithMinConns

func WithMinConns(n int32) ConnectOption

func WithOnAcquire

func WithOnAcquire(fn func(context.Context, *pgx.Conn) error) ConnectOption

func WithOnConnect

func WithOnConnect(fn func(*pgx.Conn) error) ConnectOption

func WithOnDisconnect

func WithOnDisconnect(fn func(*pgx.Conn)) ConnectOption

func WithOnRelease

func WithOnRelease(fn func(*pgx.Conn)) ConnectOption

func WithOnShutdown

func WithOnShutdown(fn HookFunc) ConnectOption

func WithReadMaxConns

func WithReadMaxConns(n int32) ConnectOption

func WithReadMinConns

func WithReadMinConns(n int32) ConnectOption

func WithWriteMaxConns

func WithWriteMaxConns(n int32) ConnectOption

func WithWriteMinConns

func WithWriteMinConns(n int32) ConnectOption

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB represents a database connection with read/write pool abstraction. It provides a safe-by-default approach where all operations use the write pool unless explicitly using Read* methods for optimization.

The DB supports:

  • Single pool mode (same pool for read/write)
  • Read/write split mode (separate pools for optimization)
  • Extensible hook system for logging, tracing, metrics
  • Graceful shutdown with active operation tracking
  • Built-in retry logic for transient failures
  • Health checks and connection statistics

func NewDB

func NewDB() *DB

NewDB creates a new unconnected DB instance. Call Connect() with options to establish the database connection.

Example:

db := pgxkit.NewDB()
err := db.Connect(ctx, "postgres://user:pass@localhost/db",
    pgxkit.WithMaxConns(25),
    pgxkit.WithBeforeOperation(myLoggingHook),
)

func (*DB) AssertGolden

func (db *DB) AssertGolden(t *testing.T, testName string)

AssertGolden compares captured query plans against a baseline file. On first run, it creates a baseline file from the current golden output. On subsequent runs, it compares the current plans against the baseline and reports test failures for any query count, SQL, or plan changes.

func (*DB) BeginTx

func (db *DB) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (*Tx, error)

BeginTx starts a transaction using the write pool. Transactions always use the write pool to ensure consistency. The transaction will execute BeforeTransaction hook on start and AfterTransaction hook on Commit/Rollback.

Example:

tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
    return err
}
defer tx.Rollback(ctx) // Safe to call even after commit

_, err = tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", name)
if err != nil {
    return err
}
return tx.Commit(ctx)

func (*DB) Connect

func (db *DB) Connect(ctx context.Context, dsn string, opts ...ConnectOption) error

Connect establishes a database connection with a single pool (same pool for read/write). If dsn is empty, it uses environment variables to construct the connection string. Options are applied to configure pool settings and hooks.

This is the recommended approach for most applications as it provides safety by default while still allowing read optimization through ReadQuery methods.

Example:

db := pgxkit.NewDB()
err := db.Connect(ctx, "postgres://user:pass@localhost/db",
    pgxkit.WithMaxConns(25),
    pgxkit.WithOnConnect(func(conn *pgx.Conn) error {
        _, err := conn.Exec(context.Background(), "SET application_name = 'myapp'")
        return err
    }),
)
// Or use environment variables:
err := db.Connect(ctx, "")

func (*DB) ConnectReadWrite

func (db *DB) ConnectReadWrite(ctx context.Context, readDSN, writeDSN string, opts ...ConnectOption) error

ConnectReadWrite establishes database connections with separate read and write pools. If readDSN or writeDSN is empty, it uses environment variables to construct the connection string. Options are applied to both pools.

This is useful for applications that want to optimize read performance by routing read queries to read replicas while ensuring writes go to the primary database.

Example:

db := pgxkit.NewDB()
err := db.ConnectReadWrite(ctx, "postgres://user:pass@read-replica/db", "postgres://user:pass@primary/db",
    pgxkit.WithMaxConns(25),
)
// Now ReadQuery methods will use the read pool, while Query/Exec use the write pool

func (*DB) Exec

func (db *DB) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)

Exec executes a statement using the write pool. This method is used for INSERT, UPDATE, DELETE, and other write operations.

Example:

tag, err := db.Exec(ctx, "INSERT INTO users (name, email) VALUES ($1, $2)", name, email)
if err != nil {
    return err
}
fmt.Printf("Inserted %d rows\n", tag.RowsAffected())

func (*DB) HealthCheck

func (db *DB) HealthCheck(ctx context.Context) error

HealthCheck performs a simple health check by pinging the database. This is useful for health check endpoints and monitoring systems. It returns an error if the database is not connected, shutting down, or unreachable.

Example:

if err := db.HealthCheck(ctx); err != nil {
    log.Printf("Database health check failed: %v", err)
    http.Error(w, "Database unavailable", http.StatusServiceUnavailable)
    return
}

func (*DB) IsReady

func (db *DB) IsReady(ctx context.Context) bool

IsReady checks if the database connection is ready to accept queries. This is a convenience method that returns true if HealthCheck() succeeds. It's useful for readiness probes and quick status checks.

Example:

if db.IsReady(ctx) {
    log.Println("Database is ready to accept queries")
}

func (*DB) Query

func (db *DB) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

Query executes a query using the write pool (safe by default). This ensures consistency by always using the primary database connection. Use ReadQuery for read-only queries that can benefit from read replicas.

Example:

rows, err := db.Query(ctx, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
    return err
}
defer rows.Close()

func (*DB) QueryRow

func (db *DB) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

QueryRow executes a query that returns a single row using the write pool. This ensures consistency by always using the primary database connection. Use ReadQueryRow for read-only queries that can benefit from read replicas.

Example:

var userID int
err := db.QueryRow(ctx, "SELECT id FROM users WHERE email = $1", email).Scan(&userID)

func (*DB) ReadPool

func (db *DB) ReadPool() *pgxpool.Pool

ReadPool returns the underlying read connection pool. Returns nil if no separate read pool is configured.

func (*DB) ReadQuery

func (db *DB) ReadQuery(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

ReadQuery executes a query using the read pool (explicit optimization). This method routes the query to read replicas when available, improving performance for read-heavy workloads. Only use this for queries that can tolerate read replica lag.

Example:

rows, err := db.ReadQuery(ctx, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
    return err
}
defer rows.Close()

func (*DB) ReadQueryRow

func (db *DB) ReadQueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

ReadQueryRow executes a query that returns a single row using the read pool. This method routes the query to read replicas when available, improving performance for read-heavy workloads. Only use this for queries that can tolerate read replica lag.

Example:

var count int
err := db.ReadQueryRow(ctx, "SELECT COUNT(*) FROM users").Scan(&count)

func (*DB) ReadStats

func (db *DB) ReadStats() *pgxpool.Stat

ReadStats returns statistics for the read pool. This provides information about read connection usage, which is useful for monitoring read replica performance and connection pool health.

Example:

stats := db.ReadStats()
if stats != nil {
    log.Printf("Read pool active connections: %d", stats.AcquiredConns())
}

func (*DB) Shutdown

func (db *DB) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the database connections. It waits for active operations to complete, respecting the context timeout. If the context times out, shutdown proceeds anyway to prevent hanging.

The shutdown process: 1. Marks the database as shutting down (new operations will fail) 2. Waits for active operations to complete (respects context timeout) 3. Executes OnShutdown hooks 4. Closes connection pools

Example:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := db.Shutdown(ctx)

func (*DB) Stats

func (db *DB) Stats() *pgxpool.Stat

Stats returns statistics for the write pool. This provides information about connection usage, which is useful for monitoring and debugging connection pool performance.

Example:

stats := db.Stats()
if stats != nil {
    log.Printf("Active connections: %d", stats.AcquiredConns())
    log.Printf("Idle connections: %d", stats.IdleConns())
}

func (*DB) WritePool

func (db *DB) WritePool() *pgxpool.Pool

WritePool returns the underlying write connection pool. Useful for integrating with code generation tools like sqlc.

type DatabaseError

type DatabaseError struct {
	Entity    string
	Operation string // "create", "update", "delete", "query"
	Err       error
}

DatabaseError represents database operation failures such as connection errors, constraint violations, or other database-specific errors.

func NewDatabaseError

func NewDatabaseError(entity, operation string, err error) *DatabaseError

NewDatabaseError creates a new DatabaseError with the given entity, operation, and underlying error.

func (*DatabaseError) Error

func (e *DatabaseError) Error() string

func (*DatabaseError) Unwrap

func (e *DatabaseError) Unwrap() error

type Executor

type Executor interface {
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
	Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
}

Executor is a unified interface for database operations that both *DB and *Tx implement. This allows passing a single interface for database operations whether in a transaction or not.

type HookFunc

type HookFunc func(ctx context.Context, sql string, args []interface{}, operationErr error) error

HookFunc is the universal hook function signature for operation-level hooks. All operation-level hooks use this signature for consistency and simplicity.

Parameters:

  • ctx: The context for the operation
  • sql: The SQL statement being executed (empty for shutdown hooks)
  • args: The arguments for the SQL statement (nil for shutdown hooks)
  • operationErr: The error from the operation (nil for before hooks)

The hook should return an error if it wants to abort the operation. For after hooks, returning an error will not affect the original operation result.

type HookType

type HookType int

HookType represents the type of hook for operation-level hooks. These hooks are executed during database operations and provide extensibility for logging, tracing, metrics, circuit breakers, and other cross-cutting concerns.

const (
	// BeforeOperation is called before any query/exec operation.
	// The operationErr parameter will always be nil.
	BeforeOperation HookType = iota

	// AfterOperation is called after any query/exec operation.
	// The operationErr parameter contains the result of the operation.
	AfterOperation

	// BeforeTransaction is called before starting a transaction.
	// The operationErr parameter will always be nil.
	BeforeTransaction

	// AfterTransaction is called after a transaction completes.
	// The operationErr parameter contains the result of the transaction.
	AfterTransaction

	// OnShutdown is called during graceful shutdown.
	// The sql and args parameters will be empty, operationErr will be nil.
	OnShutdown
)

type NotFoundError

type NotFoundError struct {
	Entity     string
	Identifier interface{}
}

NotFoundError represents when a requested entity is not found in the database. This error should be used instead of returning pgx.ErrNoRows directly.

Example:

if err == pgx.ErrNoRows {
    return nil, pgxkit.NewNotFoundError("User", userID)
}

func NewNotFoundError

func NewNotFoundError(entity string, identifier interface{}) *NotFoundError

NewNotFoundError creates a new NotFoundError with the given entity and identifier.

func (*NotFoundError) Error

func (e *NotFoundError) Error() string

type QueryPlan

type QueryPlan struct {
	Query       int                      `json:"query"`
	SQL         string                   `json:"sql"`
	Plan        []map[string]interface{} `json:"plan"`
	ExecutionMS float64                  `json:"execution_ms,omitempty"`
	PlanningMS  float64                  `json:"planning_ms,omitempty"`
}

QueryPlan represents a captured query execution plan from EXPLAIN ANALYZE. It stores the SQL statement, the full JSON plan output, and timing metrics for use in golden test comparisons.

type RetryOption

type RetryOption func(*retryConfig)

RetryOption configures retry behavior for operations.

func WithBackoffMultiplier

func WithBackoffMultiplier(m float64) RetryOption

WithBackoffMultiplier sets the multiplier for exponential backoff.

func WithBaseDelay

func WithBaseDelay(d time.Duration) RetryOption

WithBaseDelay sets the initial delay between retries.

func WithMaxDelay

func WithMaxDelay(d time.Duration) RetryOption

WithMaxDelay sets the maximum delay between retries.

func WithMaxRetries

func WithMaxRetries(n int) RetryOption

WithMaxRetries sets the maximum number of retry attempts.

type TestDB

type TestDB struct {
	*DB
}

TestDB is a testing utility that wraps DB with testing-specific functionality. It provides simple methods for test setup, cleanup, and golden test support. TestDB automatically manages test database connections and provides utilities for performance regression testing through golden tests.

func NewTestDB

func NewTestDB() *TestDB

NewTestDB creates a new unconnected TestDB instance. Call Connect() to establish the database connection.

Example:

func TestUserOperations(t *testing.T) {
    testDB := pgxkit.NewTestDB()
    err := testDB.Connect(context.Background(), "") // uses TEST_DATABASE_URL env var
    if err != nil {
        t.Skip("Test database not available")
    }
    defer testDB.Shutdown(context.Background())
    // ... test code
}

func RequireDB

func RequireDB(t *testing.T) *TestDB

RequireDB ensures a test database is available or skips the test. It creates a TestDB and connects using TEST_DATABASE_URL environment variable.

func (*TestDB) Clean

func (tdb *TestDB) Clean() error

Clean performs cleanup operations after a test completes. It verifies the database connection is still active and can be extended to truncate tables or reset test data. Returns nil if no pool is configured.

func (*TestDB) EnableGolden

func (tdb *TestDB) EnableGolden(testName string) *DB

EnableGolden returns a new DB instance configured with golden test hooks. Golden tests capture EXPLAIN ANALYZE output for each query, enabling detection of query plan regressions. The testName is used to name the golden file. Use AssertGolden after test execution to compare against baseline plans.

Example:

goldenDB := testDB.EnableGolden("user_queries")
// run queries using goldenDB
goldenDB.AssertGolden(t, "user_queries")

func (*TestDB) Setup

func (tdb *TestDB) Setup() error

Setup prepares the database for testing. This method verifies the database connection and can be extended to seed data or perform other test setup tasks. Returns an error if the database is not available or not ready for testing.

Example:

err := testDB.Setup()
if err != nil {
    t.Skip("Test database not available")
}

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

Tx wraps a pgx.Tx to implement the Executor interface and provide transaction lifecycle management integrated with pgxkit's activeOps tracking.

func (*Tx) Commit

func (t *Tx) Commit(ctx context.Context) error

Commit commits the transaction. It fires the AfterTransaction hook and uses atomic finalization to ensure activeOps.Done() is called exactly once, making it safe for the "defer Rollback() + explicit Commit()" pattern.

func (*Tx) Exec

func (t *Tx) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)

Exec executes a statement within the transaction. Unlike DB.Exec, this does not fire BeforeOperation/AfterOperation hooks.

func (*Tx) IsFinalized

func (t *Tx) IsFinalized() bool

IsFinalized returns true if the transaction has been committed or rolled back.

func (*Tx) Query

func (t *Tx) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

Query executes a query within the transaction. Unlike DB.Query, this does not fire BeforeOperation/AfterOperation hooks.

func (*Tx) QueryRow

func (t *Tx) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

QueryRow executes a query that returns a single row within the transaction. Unlike DB.QueryRow, this does not fire BeforeOperation/AfterOperation hooks.

func (*Tx) Rollback

func (t *Tx) Rollback(ctx context.Context) error

Rollback rolls back the transaction. It fires the AfterTransaction hook and uses atomic finalization to ensure activeOps.Done() is called exactly once, making it safe for the "defer Rollback() + explicit Commit()" pattern.

func (*Tx) Tx

func (t *Tx) Tx() pgx.Tx

Tx returns the underlying pgx.Tx for advanced use cases that require direct access to pgx transaction functionality.

type ValidationError

type ValidationError struct {
	Entity    string
	Operation string
	Field     string
	Reason    string
	Err       error
}

ValidationError represents validation failures that occur before database operations. Use this for input validation, constraint violations, or business rule failures.

func NewValidationError

func NewValidationError(entity, operation, field, reason string, err error) *ValidationError

NewValidationError creates a new ValidationError with the given parameters.

func (*ValidationError) Error

func (e *ValidationError) Error() string

func (*ValidationError) Unwrap

func (e *ValidationError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL