pgxkit

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2025 License: MIT Imports: 17 Imported by: 0

README

pgxkit

Go Version CI Status Go Report Card Release

A production-ready PostgreSQL toolkit for Go applications — tool-agnostic utilities for connection pooling, observability, testing, and type helpers.

Overview

pgxkit is a tool-agnostic PostgreSQL toolkit that works with any approach to PostgreSQL development:

  • Raw pgx usage - Use pgxkit directly with pgx for maximum control
  • Any code generation tool - Works with sqlc, Skimatik, or any other tool
  • Clean architecture - Separate your database layer from your business logic
  • Production-ready - Built-in observability, retry logic, and graceful shutdown

Key Features

  • 🔄 Read/Write Pool Abstraction - Safe by default, optimized when needed
  • 🎣 Extensible Hook System - Add logging, tracing, metrics, circuit breakers
  • 🔁 Smart Retry Logic - PostgreSQL-aware error detection and exponential backoff
  • 🧪 Testing Infrastructure - Golden test support for performance regression detection
  • 🔧 Type Helpers - Seamless pgx type conversions
  • 📊 Health Checks - Built-in database connectivity monitoring
  • 🛡️ Graceful Shutdown - Production-ready lifecycle management

Installation

go get github.com/nhalm/pgxkit

Quick Start

Basic Usage
package main

import (
    "context"
    "log"
    
    "github.com/nhalm/pgxkit"
)

func main() {
    ctx := context.Background()
    
    // Create and connect to database
    db := pgxkit.NewDB()
    
    // Connect using environment variables or explicit DSN
    err := db.Connect(ctx, "") // Uses POSTGRES_* env vars
    if err != nil {
        log.Fatal(err)
    }
    defer db.Shutdown(ctx)
    
    // Execute queries (uses write pool by default - safe)
    _, err = db.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "John")
    if err != nil {
        log.Fatal(err)
    }
    
    // Optimize reads with explicit read pool usage
    rows, err := db.ReadQuery(ctx, "SELECT id, name FROM users")
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()
    
    // Process results...
}
With Read/Write Splitting
db := pgxkit.NewDB()

// Connect with separate read and write pools
err := db.ConnectReadWrite(ctx, readDSN, writeDSN)
if err != nil {
    log.Fatal(err)
}

// Writes go to write pool (safe by default)
db.Exec(ctx, "INSERT INTO users ...")

// Reads can use read pool for optimization
db.ReadQuery(ctx, "SELECT * FROM users")

Configuration

Environment Variables

pgxkit uses these environment variables when no DSN is provided:

  • POSTGRES_HOST (default: "localhost")
  • POSTGRES_PORT (default: 5432)
  • POSTGRES_USER (default: "postgres")
  • POSTGRES_PASSWORD (default: "")
  • POSTGRES_DB (default: "postgres")
  • POSTGRES_SSLMODE (default: "disable")
DSN Utilities
// Get DSN from environment variables
dsn := pgxkit.GetDSN()

// Use with external tools like golang-migrate
migrate -database "$(pgxkit.GetDSN())" -path ./migrations up

Hooks System

Add observability and custom functionality through hooks:

db := pgxkit.NewDB()

// Add logging hook
db.AddHook(pgxkit.BeforeOperation, func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
    log.Printf("Executing: %s", sql)
    return nil
})

// Add metrics hook
db.AddHook(pgxkit.AfterOperation, func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
    if operationErr != nil {
        metrics.IncrementCounter("db.errors")
    }
    return nil
})

// Add connection-level hooks
db.AddConnectionHook("OnConnect", func(conn *pgx.Conn) error {
    log.Println("New connection established")
    return nil
})
Available Hook Types

Operation-level hooks:

  • BeforeOperation - Before any query/exec
  • AfterOperation - After any query/exec
  • BeforeTransaction - Before transaction starts
  • AfterTransaction - After transaction completes
  • OnShutdown - During graceful shutdown

Connection-level hooks:

  • OnConnect - When new connection is established
  • OnDisconnect - When connection is closed
  • OnAcquire - When connection is acquired from pool
  • OnRelease - When connection is returned to pool

Retry Logic

Built-in Retry Methods
config := pgxkit.DefaultRetryConfig() // 3 retries, exponential backoff

// Retry database operations
result, err := db.ExecWithRetry(ctx, config, "INSERT INTO users ...")
rows, err := db.QueryWithRetry(ctx, config, "SELECT * FROM users")
rows, err := db.ReadQueryWithRetry(ctx, config, "SELECT * FROM users") // Uses read pool
tx, err := db.BeginTxWithRetry(ctx, config, pgx.TxOptions{})
Generic Retry Functions
// Retry any operation
err := pgxkit.RetryOperation(ctx, config, func(ctx context.Context) error {
    return someComplexDatabaseOperation(ctx)
})

// Retry with timeout
result, err := pgxkit.WithTimeoutAndRetry(ctx, 5*time.Second, config, func(ctx context.Context) (*User, error) {
    return getUserFromDatabase(ctx)
})
Smart Error Detection

pgxkit automatically detects which PostgreSQL errors are worth retrying:

if pgxkit.IsRetryableError(err) {
    // Connection errors, deadlocks, serialization failures, etc.
}

Health Checks

// Check database connectivity
if db.IsReady(ctx) {
    log.Println("Database is ready")
}

// Detailed health check
if err := db.HealthCheck(ctx); err != nil {
    log.Printf("Database health check failed: %v", err)
}

Testing

Basic Testing
func TestUserOperations(t *testing.T) {
    testDB := pgxkit.NewTestDB()
    err := testDB.Setup()
    if err != nil {
        t.Skip("Test database not available")
    }
    defer testDB.Clean()
    
    // Use testDB.DB for your tests
    _, err = testDB.Exec(ctx, "INSERT INTO users ...")
    // ... test assertions
}
Golden Testing (Performance Regression Detection)
func TestUserQueries(t *testing.T) {
    testDB := pgxkit.NewTestDB()
    testDB.Setup()
    defer testDB.Clean()
    
    // Enable golden test hooks - captures EXPLAIN plans automatically
    db := testDB.EnableGolden(t, "TestUserQueries")
    
    // These queries will have their EXPLAIN plans captured
    rows, err := db.Query(ctx, "SELECT * FROM users WHERE active = true")
    // ... more queries
    
    // Plans are saved to testdata/golden/TestUserQueries_query_1.json, etc.
    // Future runs compare plans to detect performance regressions
}

Type Helpers

Seamless conversions between Go types and pgx types:

// String conversions
pgxText := pgxkit.ToPgxText(&myString)
stringPtr := pgxkit.FromPgxText(pgxText)

// Numeric conversions
pgxInt := pgxkit.ToPgxInt8(&myInt64)
intPtr := pgxkit.FromPgxInt8(pgxInt)

// UUID conversions
pgxUUID := pgxkit.ToPgxUUID(myUUID)
uuid := pgxkit.FromPgxUUID(pgxUUID)

// Time conversions
pgxTime := pgxkit.ToPgxTimestamptz(&myTime)
timePtr := pgxkit.FromPgxTimestamptz(pgxTime)

// Array conversions
pgxArray := pgxkit.ToPgxTextArray(myStringSlice)
stringSlice := pgxkit.FromPgxTextArray(pgxArray)

Production Features

Graceful Shutdown
// Graceful shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err := db.Shutdown(ctx)
if err != nil {
    log.Printf("Shutdown completed with warnings: %v", err)
}
Connection Statistics
// Get pool statistics
stats := db.Stats()          // Write pool stats
readStats := db.ReadStats()  // Read pool stats (if using read/write split)

log.Printf("Active connections: %d", stats.AcquiredConns())
log.Printf("Idle connections: %d", stats.IdleConns())
Error Handling
// Structured error types
err := pgxkit.NewNotFoundError("User", userID)
err := pgxkit.NewValidationError("Email", "create", "address", "invalid format", nil)
err := pgxkit.NewDatabaseError("Order", "query", originalErr)

// Type checking
var notFoundErr *pgxkit.NotFoundError
if errors.As(err, &notFoundErr) {
    // Handle not found error
}

Architecture

pgxkit follows these design principles:

  1. Safety First - All default methods use write pool for consistency
  2. Explicit Optimization - Use ReadQuery() methods for read optimization
  3. Tool Agnostic - Works with any PostgreSQL development approach
  4. Extensible - Hook system for custom functionality
  5. Production Ready - Built-in observability and lifecycle management

Examples

With Raw pgx
db := pgxkit.NewDB()
db.Connect(ctx, "postgres://...")

// Use pgx directly with pgxkit utilities
rows, err := db.Query(ctx, "SELECT * FROM users")
for rows.Next() {
    var user User
    err := rows.Scan(&user.ID, &user.Name)
    // ...
}
With Any Code Generation Tool
// Works with sqlc, Skimatik, or any other tool
db := pgxkit.NewDB()
db.Connect(ctx, "postgres://...")

// Use your generated code with pgxkit's connection
queries := sqlc.New(db.GetPool()) // or your tool's constructor
users, err := queries.GetAllUsers(ctx)
With Hooks for Observability
db := pgxkit.NewDB()

// Add tracing
db.AddHook(pgxkit.BeforeOperation, func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
    span := trace.SpanFromContext(ctx)
    span.SetAttributes(attribute.String("db.statement", sql))
    return nil
})

// Add circuit breaker
db.AddHook(pgxkit.BeforeOperation, func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
    return circuitBreaker.Execute(func() error {
        // Operation will be executed by pgxkit
        return nil
    })
})

Contributing

We welcome contributions! Please see our Contributing Guide for details.

License

MIT License - see LICENSE file for details.

Documentation

Overview

Package pgxkit provides a production-ready PostgreSQL toolkit for Go applications.

pgxkit is a tool-agnostic PostgreSQL toolkit that works with any approach to PostgreSQL development - raw pgx usage, code generation tools like sqlc or Skimatik, or any other PostgreSQL development workflow.

Key Features:

  • Read/Write Pool Abstraction: Safe by default with write pool, explicit read pool methods for optimization
  • Extensible Hook System: Add logging, tracing, metrics, circuit breakers through hooks
  • Smart Retry Logic: PostgreSQL-aware error detection with exponential backoff
  • Testing Infrastructure: Golden test support for performance regression detection
  • Type Helpers: Seamless pgx type conversions for clean architecture
  • Health Checks: Built-in database connectivity monitoring
  • Graceful Shutdown: Production-ready lifecycle management

Basic Usage:

db := pgxkit.NewDB()
err := db.Connect(ctx, "") // Uses POSTGRES_* env vars
if err != nil {
    log.Fatal(err)
}
defer db.Shutdown(ctx)

// Execute queries (uses write pool by default - safe)
_, err = db.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "John")

// Optimize reads with explicit read pool usage
rows, err := db.ReadQuery(ctx, "SELECT id, name FROM users")

Hook System:

db.AddHook(pgxkit.BeforeOperation, func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
    log.Printf("Executing: %s", sql)
    return nil
})

The package follows a "safety first" design - all default methods use the write pool for consistency, with explicit ReadQuery() methods available for read optimization.

Index

Constants

This section is empty.

Variables

View Source
var (
	ToPgxNumericFromFloat64Ptr = ToPgxNumeric
	FromPgxNumericPtr          = FromPgxNumeric
)

Legacy aliases for backward compatibility TODO: Consider deprecating these in favor of the new names

Functions

func CleanupGolden added in v1.1.0

func CleanupGolden(testName string) error

CleanupGolden removes all golden files for a test

func CleanupTestData

func CleanupTestData(sqlStatements ...string)

CleanupTestData executes cleanup SQL statements on the shared test database

func FromPgxBool

func FromPgxBool(b pgtype.Bool) *bool

FromPgxBool converts a pgtype.Bool to a bool pointer. If the pgtype.Bool is invalid (NULL), returns nil.

func FromPgxBoolToBool added in v1.1.0

func FromPgxBoolToBool(b pgtype.Bool) bool

FromPgxBoolToBool converts a pgtype.Bool to a bool value. If the pgtype.Bool is invalid (NULL), returns false.

func FromPgxDate added in v1.1.0

func FromPgxDate(d pgtype.Date) *time.Time

FromPgxDate converts a pgtype.Date to a time.Time pointer. If the pgtype.Date is invalid (NULL), returns nil.

func FromPgxFloat4 added in v1.1.0

func FromPgxFloat4(f pgtype.Float4) *float32

FromPgxFloat4 converts a pgtype.Float4 to a float32 pointer. If the pgtype.Float4 is invalid (NULL), returns nil.

func FromPgxFloat8 added in v1.1.0

func FromPgxFloat8(f pgtype.Float8) *float64

FromPgxFloat8 converts a pgtype.Float8 to a float64 pointer. If the pgtype.Float8 is invalid (NULL), returns nil.

func FromPgxInt2 added in v1.1.0

func FromPgxInt2(i pgtype.Int2) *int16

FromPgxInt2 converts a pgtype.Int2 to an int16 pointer. If the pgtype.Int2 is invalid (NULL), returns nil.

func FromPgxInt4

func FromPgxInt4(i pgtype.Int4) *int32

FromPgxInt4 converts a pgtype.Int4 to an int32 pointer. If the pgtype.Int4 is invalid (NULL), returns nil.

func FromPgxInt4ToInt added in v1.1.0

func FromPgxInt4ToInt(i pgtype.Int4) *int

FromPgxInt4ToInt converts a pgtype.Int4 to an int pointer. If the pgtype.Int4 is invalid (NULL), returns nil.

func FromPgxInt8

func FromPgxInt8(i pgtype.Int8) *int64

FromPgxInt8 converts a pgtype.Int8 to an int64 pointer. If the pgtype.Int8 is invalid (NULL), returns nil.

func FromPgxInt8Array added in v1.1.0

func FromPgxInt8Array(a pgtype.Array[pgtype.Int8]) []int64

FromPgxInt8Array converts a pgtype.Array[pgtype.Int8] to an int64 slice. If the array is invalid (NULL), returns nil.

func FromPgxNumeric added in v1.1.0

func FromPgxNumeric(n pgtype.Numeric) *float64

FromPgxNumeric converts a pgtype.Numeric to a float64 pointer. If the pgtype.Numeric is invalid (NULL), returns nil.

func FromPgxText

func FromPgxText(t pgtype.Text) *string

FromPgxText converts a pgtype.Text to a string pointer. If the pgtype.Text is invalid (NULL), returns nil.

func FromPgxTextArray added in v1.1.0

func FromPgxTextArray(a pgtype.Array[pgtype.Text]) []string

FromPgxTextArray converts a pgtype.Array[pgtype.Text] to a string slice. If the array is invalid (NULL), returns nil.

func FromPgxTextToString

func FromPgxTextToString(t pgtype.Text) string

FromPgxTextToString converts a pgtype.Text to a string value. If the pgtype.Text is invalid (NULL), returns empty string.

func FromPgxTime added in v1.1.0

func FromPgxTime(t pgtype.Time) *time.Time

FromPgxTime converts a pgtype.Time to a time.Time pointer. If the pgtype.Time is invalid (NULL), returns nil. The returned time will be on the current date with the time component.

func FromPgxTimestamp added in v1.1.0

func FromPgxTimestamp(t pgtype.Timestamp) *time.Time

FromPgxTimestamp converts a pgtype.Timestamp to a time.Time pointer. If the pgtype.Timestamp is invalid (NULL), returns nil.

func FromPgxTimestamptz

func FromPgxTimestamptz(t pgtype.Timestamptz) time.Time

FromPgxTimestamptz converts a pgtype.Timestamptz to a time.Time value. If the pgtype.Timestamptz is invalid (NULL), returns zero time.

func FromPgxTimestamptzPtr

func FromPgxTimestamptzPtr(t pgtype.Timestamptz) *time.Time

FromPgxTimestamptzPtr converts a pgtype.Timestamptz to a time.Time pointer. If the pgtype.Timestamptz is invalid (NULL), returns nil.

func FromPgxUUID

func FromPgxUUID(pgxID pgtype.UUID) uuid.UUID

FromPgxUUID converts a pgtype.UUID to uuid.UUID. Returns uuid.Nil if the pgtype.UUID is invalid or cannot be parsed.

func FromPgxUUIDToPtr added in v1.1.0

func FromPgxUUIDToPtr(pgxID pgtype.UUID) *uuid.UUID

FromPgxUUIDToPtr converts a pgtype.UUID to a uuid.UUID pointer. If the pgtype.UUID is invalid (NULL), returns nil.

func GetDSN

func GetDSN() string

GetDSN returns a PostgreSQL connection string built from environment variables. This is useful for integrating with external tools like golang-migrate that need a connection string rather than a pgxpool.Pool.

Environment variables used:

  • POSTGRES_HOST (default: "localhost")
  • POSTGRES_PORT (default: 5432)
  • POSTGRES_USER (default: "postgres")
  • POSTGRES_PASSWORD (default: "")
  • POSTGRES_DB (default: "postgres")
  • POSTGRES_SSLMODE (default: "disable")

Example:

dsn := pgxkit.GetDSN()
// Use with golang-migrate: migrate -database "$(pgxkit.GetDSN())" -path ./migrations up

func IsRetryableError added in v1.1.0

func IsRetryableError(err error) bool

IsRetryableError determines if an error is worth retrying

func RetryOperation added in v1.1.0

func RetryOperation(ctx context.Context, config *RetryConfig, operation func(context.Context) error) error

RetryOperation is the generic retry function that can be used with any operation

func ToPgxBool

func ToPgxBool(b *bool) pgtype.Bool

ToPgxBool converts a bool pointer to pgtype.Bool. If the input is nil, returns an invalid pgtype.Bool (NULL in database).

func ToPgxBoolFromBool added in v1.1.0

func ToPgxBoolFromBool(b bool) pgtype.Bool

ToPgxBoolFromBool converts a bool value to pgtype.Bool. Use this when you have a bool value instead of a pointer.

func ToPgxDate added in v1.1.0

func ToPgxDate(t *time.Time) pgtype.Date

ToPgxDate converts a time.Time pointer to pgtype.Date. If the input is nil, returns an invalid pgtype.Date (NULL in database).

func ToPgxFloat4 added in v1.1.0

func ToPgxFloat4(f *float32) pgtype.Float4

ToPgxFloat4 converts a float32 pointer to pgtype.Float4. If the input is nil, returns an invalid pgtype.Float4 (NULL in database).

func ToPgxFloat8 added in v1.1.0

func ToPgxFloat8(f *float64) pgtype.Float8

ToPgxFloat8 converts a float64 pointer to pgtype.Float8. If the input is nil, returns an invalid pgtype.Float8 (NULL in database).

func ToPgxInt2 added in v1.1.0

func ToPgxInt2(i *int16) pgtype.Int2

ToPgxInt2 converts an int16 pointer to pgtype.Int2. If the input is nil, returns an invalid pgtype.Int2 (NULL in database).

func ToPgxInt4 added in v1.1.0

func ToPgxInt4(i *int32) pgtype.Int4

ToPgxInt4 converts an int32 pointer to pgtype.Int4. If the input is nil, returns an invalid pgtype.Int4 (NULL in database).

func ToPgxInt4FromInt

func ToPgxInt4FromInt(i *int) pgtype.Int4

ToPgxInt4FromInt converts an int pointer to pgtype.Int4. If the input is nil, returns an invalid pgtype.Int4 (NULL in database).

func ToPgxInt8

func ToPgxInt8(i *int64) pgtype.Int8

ToPgxInt8 converts an int64 pointer to pgtype.Int8. If the input is nil, returns an invalid pgtype.Int8 (NULL in database).

func ToPgxInt8Array added in v1.1.0

func ToPgxInt8Array(s []int64) pgtype.Array[pgtype.Int8]

ToPgxInt8Array converts an int64 slice to pgtype.Array[pgtype.Int8]. If the input is nil, returns an invalid array (NULL in database).

func ToPgxNumeric added in v1.1.0

func ToPgxNumeric(f *float64) pgtype.Numeric

ToPgxNumeric converts a float64 pointer to pgtype.Numeric. If the input is nil, returns an invalid pgtype.Numeric (NULL in database). Uses 6 decimal places as standard precision.

func ToPgxText

func ToPgxText(s *string) pgtype.Text

ToPgxText converts a string pointer to pgtype.Text. If the input is nil, returns an invalid pgtype.Text (NULL in database).

func ToPgxTextArray added in v1.1.0

func ToPgxTextArray(s []string) pgtype.Array[pgtype.Text]

ToPgxTextArray converts a string slice to pgtype.Array[pgtype.Text]. If the input is nil, returns an invalid array (NULL in database).

func ToPgxTextFromString added in v1.1.0

func ToPgxTextFromString(s string) pgtype.Text

ToPgxTextFromString converts a string value to pgtype.Text. Use this when you have a string value instead of a pointer.

func ToPgxTime added in v1.1.0

func ToPgxTime(t *time.Time) pgtype.Time

ToPgxTime converts a time.Time pointer to pgtype.Time. If the input is nil, returns an invalid pgtype.Time (NULL in database).

func ToPgxTimestamp added in v1.1.0

func ToPgxTimestamp(t *time.Time) pgtype.Timestamp

ToPgxTimestamp converts a time.Time pointer to pgtype.Timestamp. If the input is nil, returns an invalid pgtype.Timestamp (NULL in database).

func ToPgxTimestamptz

func ToPgxTimestamptz(t *time.Time) pgtype.Timestamptz

ToPgxTimestamptz converts a time.Time pointer to pgtype.Timestamptz. If the input is nil, returns an invalid pgtype.Timestamptz (NULL in database).

func ToPgxUUID

func ToPgxUUID(id uuid.UUID) pgtype.UUID

ToPgxUUID converts a uuid.UUID to pgtype.UUID.

func ToPgxUUIDFromPtr added in v1.1.0

func ToPgxUUIDFromPtr(id *uuid.UUID) pgtype.UUID

ToPgxUUIDFromPtr converts a uuid.UUID pointer to pgtype.UUID. If the input is nil, returns an invalid pgtype.UUID (NULL in database).

func WithTimeout

func WithTimeout[T any](ctx context.Context, timeout time.Duration, fn func(context.Context) (T, error)) (T, error)

WithTimeout executes a function with a timeout. This is a generic utility function that can be used with any operation.

Example:

result, err := pgxkit.WithTimeout(ctx, 5*time.Second, func(ctx context.Context) (*User, error) {
    return getUserFromDatabase(ctx)
})

func WithTimeoutAndRetry

func WithTimeoutAndRetry[T any](ctx context.Context, timeout time.Duration, retryConfig *RetryConfig, fn func(context.Context) (T, error)) (T, error)

WithTimeoutAndRetry executes a function with timeout and retry logic. This combines timeout handling with intelligent retry logic for transient failures.

Example:

config := pgxkit.DefaultRetryConfig()
result, err := pgxkit.WithTimeoutAndRetry(ctx, 5*time.Second, config, func(ctx context.Context) (*User, error) {
    return getUserFromDatabase(ctx)
})

Types

type ConnectionHooks

type ConnectionHooks struct {
	// contains filtered or unexported fields
}

ConnectionHooks manages connection lifecycle hooks. These hooks are integrated with pgx's connection lifecycle and are useful for connection setup, validation, and cleanup. They use pgx's native function signatures.

func CombineHooks

func CombineHooks(hooksList ...*ConnectionHooks) *ConnectionHooks

CombineHooks combines multiple hook managers into one

func NewConnectionHooks

func NewConnectionHooks() *ConnectionHooks

NewConnectionHooks creates a new connection hooks manager. This is used internally by the DB type but can also be used directly for advanced connection pool configuration.

Example:

hooks := pgxkit.NewConnectionHooks()
hooks.AddOnConnect(func(conn *pgx.Conn) error {
    log.Println("New connection established")
    return nil
})

func SetupHook

func SetupHook(setupSQL string) *ConnectionHooks

SetupHook creates a hook that sets up connection-specific settings

func ValidationHook

func ValidationHook() *ConnectionHooks

ValidationHook creates a hook that validates connections

func (*ConnectionHooks) AddOnAcquire

func (h *ConnectionHooks) AddOnAcquire(fn func(context.Context, *pgx.Conn) error)

AddOnAcquire adds a callback that will be called when a connection is acquired from the pool

func (*ConnectionHooks) AddOnConnect

func (h *ConnectionHooks) AddOnConnect(fn func(*pgx.Conn) error)

AddOnConnect adds a callback that will be called when a new connection is established. This is useful for connection initialization, setting session variables, or validation. If the callback returns an error, the connection will be closed.

Example:

hooks.AddOnConnect(func(conn *pgx.Conn) error {
    _, err := conn.Exec(context.Background(), "SET application_name = 'myapp'")
    return err
})

func (*ConnectionHooks) AddOnDisconnect

func (h *ConnectionHooks) AddOnDisconnect(fn func(*pgx.Conn))

AddOnDisconnect adds a callback that will be called when a connection is closed

func (*ConnectionHooks) AddOnRelease

func (h *ConnectionHooks) AddOnRelease(fn func(*pgx.Conn))

AddOnRelease adds a callback that will be called when a connection is released back to the pool

func (*ConnectionHooks) ConfigurePool added in v1.1.0

func (ch *ConnectionHooks) ConfigurePool(config *pgxpool.Config)

ConfigurePool configures a pgxpool.Config with the connection hooks This integrates the hooks with the actual pool lifecycle events

func (*ConnectionHooks) ExecuteOnAcquire

func (h *ConnectionHooks) ExecuteOnAcquire(ctx context.Context, conn *pgx.Conn) error

ExecuteOnAcquire executes all OnAcquire callbacks

func (*ConnectionHooks) ExecuteOnConnect

func (h *ConnectionHooks) ExecuteOnConnect(conn *pgx.Conn) error

ExecuteOnConnect executes all OnConnect callbacks

func (*ConnectionHooks) ExecuteOnDisconnect

func (h *ConnectionHooks) ExecuteOnDisconnect(conn *pgx.Conn)

ExecuteOnDisconnect executes all OnDisconnect callbacks

func (*ConnectionHooks) ExecuteOnRelease

func (h *ConnectionHooks) ExecuteOnRelease(conn *pgx.Conn)

ExecuteOnRelease executes all OnRelease callbacks

type DB added in v1.1.0

type DB struct {
	// contains filtered or unexported fields
}

DB represents a database connection with read/write pool abstraction. It provides a safe-by-default approach where all operations use the write pool unless explicitly using Read* methods for optimization.

The DB supports:

  • Single pool mode (same pool for read/write)
  • Read/write split mode (separate pools for optimization)
  • Extensible hook system for logging, tracing, metrics
  • Graceful shutdown with active operation tracking
  • Built-in retry logic for transient failures
  • Health checks and connection statistics

func NewDB added in v1.1.0

func NewDB() *DB

NewDB creates a new unconnected DB instance. Add hooks to this instance, then call Connect() to establish the database connection.

Example:

db := pgxkit.NewDB()
db.AddHook(pgxkit.BeforeOperation, myLoggingHook)
err := db.Connect(ctx, "postgres://user:pass@localhost/db")

func NewDBWithPool added in v1.1.0

func NewDBWithPool(pool *pgxpool.Pool) *DB

NewDBWithPool creates a new DB instance with a single pool (same pool for read/write). Deprecated: Use NewDB() + Connect() instead for proper hook integration.

func NewReadWriteDB added in v1.1.0

func NewReadWriteDB(readPool, writePool *pgxpool.Pool) *DB

NewReadWriteDB creates a new DB instance with separate read and write pools. Deprecated: Use NewDB() + ConnectReadWrite() instead for proper hook integration.

func (*DB) AddConnectionHook added in v1.1.0

func (db *DB) AddConnectionHook(hookType string, hookFunc interface{}) error

AddConnectionHook adds a connection-level hook to the database. These hooks are integrated with pgx's connection lifecycle and are useful for connection setup, validation, and cleanup.

Available hook types:

  • "OnConnect": Called when a new connection is established
  • "OnDisconnect": Called when a connection is closed
  • "OnAcquire": Called when a connection is acquired from the pool
  • "OnRelease": Called when a connection is returned to the pool

Example:

db.AddConnectionHook("OnConnect", func(conn *pgx.Conn) error {
    log.Println("New connection established")
    return nil
})

TODO: hookType needs to be an Enum. TODO: this doc needs to be updated to reflect the new hookType Enum and that this is for pgx hooks. TODO: this should be split up into the different pgx hooks. and then we don't need the enum.

func (*DB) AddHook added in v1.1.0

func (db *DB) AddHook(hookType HookType, hookFunc HookFunc) *DB

AddHook adds an operation-level hook to the database. Hooks are executed in the order they are added and provide extensibility for logging, tracing, metrics, circuit breakers, and other cross-cutting concerns.

Available hook types:

  • BeforeOperation: Called before any query/exec operation
  • AfterOperation: Called after any query/exec operation
  • BeforeTransaction: Called before starting a transaction
  • AfterTransaction: Called after a transaction completes
  • OnShutdown: Called during graceful shutdown

Example:

db.AddHook(pgxkit.BeforeOperation, func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
    log.Printf("Executing: %s", sql)
    return nil
})

func (*DB) AssertGolden added in v1.1.0

func (db *DB) AssertGolden(t *testing.T, testName string)

AssertGolden compares captured query plans with existing golden file

func (*DB) BeginTx added in v1.1.0

func (db *DB) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)

BeginTx starts a transaction using the write pool. Transactions always use the write pool to ensure consistency. The transaction will execute BeforeTransaction and AfterTransaction hooks.

Example:

tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
    return err
}
defer tx.Rollback(ctx) // Safe to call even after commit

_, err = tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", name)
if err != nil {
    return err
}
return tx.Commit(ctx)

func (*DB) BeginTxWithRetry added in v1.1.0

func (db *DB) BeginTxWithRetry(ctx context.Context, config *RetryConfig, txOptions pgx.TxOptions) (pgx.Tx, error)

BeginTxWithRetry starts a transaction using the write pool with retry logic

func (*DB) Connect added in v1.1.0

func (db *DB) Connect(ctx context.Context, dsn string) error

Connect establishes a database connection with a single pool (same pool for read/write). If dsn is empty, it uses environment variables to construct the connection string. The hooks are configured at pool creation time for proper integration.

This is the recommended approach for most applications as it provides safety by default while still allowing read optimization through ReadQuery methods.

Example:

db := pgxkit.NewDB()
err := db.Connect(ctx, "postgres://user:pass@localhost/db")
// Or use environment variables:
err := db.Connect(ctx, "")

func (*DB) ConnectReadWrite added in v1.1.0

func (db *DB) ConnectReadWrite(ctx context.Context, readDSN, writeDSN string) error

ConnectReadWrite establishes database connections with separate read and write pools. If readDSN or writeDSN is empty, it uses environment variables to construct the connection string. The hooks are configured at pool creation time for proper integration.

This is useful for applications that want to optimize read performance by routing read queries to read replicas while ensuring writes go to the primary database.

Example:

db := pgxkit.NewDB()
err := db.ConnectReadWrite(ctx, "postgres://user:pass@read-replica/db", "postgres://user:pass@primary/db")
// Now ReadQuery methods will use the read pool, while Query/Exec use the write pool

func (*DB) Exec added in v1.1.0

func (db *DB) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)

Exec executes a statement using the write pool. This method is used for INSERT, UPDATE, DELETE, and other write operations.

Example:

tag, err := db.Exec(ctx, "INSERT INTO users (name, email) VALUES ($1, $2)", name, email)
if err != nil {
    return err
}
fmt.Printf("Inserted %d rows\n", tag.RowsAffected())

func (*DB) ExecWithRetry added in v1.1.0

func (db *DB) ExecWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) (pgconn.CommandTag, error)

ExecWithRetry executes a statement using the write pool with retry logic. This automatically retries transient failures like connection errors, deadlocks, and serialization failures using exponential backoff.

Example:

config := pgxkit.DefaultRetryConfig()
tag, err := db.ExecWithRetry(ctx, config, "INSERT INTO users (name) VALUES ($1)", name)
if err != nil {
    return fmt.Errorf("failed to insert user after retries: %w", err)
}

func (*DB) HealthCheck added in v1.1.0

func (db *DB) HealthCheck(ctx context.Context) error

HealthCheck performs a simple health check by pinging the database. This is useful for health check endpoints and monitoring systems. It returns an error if the database is not connected, shutting down, or unreachable.

Example:

if err := db.HealthCheck(ctx); err != nil {
    log.Printf("Database health check failed: %v", err)
    http.Error(w, "Database unavailable", http.StatusServiceUnavailable)
    return
}

func (*DB) IsReady added in v1.1.0

func (db *DB) IsReady(ctx context.Context) bool

IsReady checks if the database connection is ready to accept queries. This is a convenience method that returns true if HealthCheck() succeeds. It's useful for readiness probes and quick status checks.

Example:

if db.IsReady(ctx) {
    log.Println("Database is ready to accept queries")
}

func (*DB) Query added in v1.1.0

func (db *DB) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

Query executes a query using the write pool (safe by default). This ensures consistency by always using the primary database connection. Use ReadQuery for read-only queries that can benefit from read replicas.

Example:

rows, err := db.Query(ctx, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
    return err
}
defer rows.Close()

func (*DB) QueryRow added in v1.1.0

func (db *DB) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

QueryRow executes a query that returns a single row using the write pool. This ensures consistency by always using the primary database connection. Use ReadQueryRow for read-only queries that can benefit from read replicas.

Example:

var userID int
err := db.QueryRow(ctx, "SELECT id FROM users WHERE email = $1", email).Scan(&userID)

func (*DB) QueryRowWithRetry added in v1.1.0

func (db *DB) QueryRowWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) pgx.Row

QueryRowWithRetry executes a query that returns a single row using the write pool with retry logic. This automatically retries transient failures like connection errors, deadlocks, and serialization failures using exponential backoff.

Example:

config := pgxkit.DefaultRetryConfig()
row := db.QueryRowWithRetry(ctx, config, "SELECT id FROM users WHERE email = $1", email)
var userID int
err := row.Scan(&userID)

func (*DB) QueryWithRetry added in v1.1.0

func (db *DB) QueryWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) (pgx.Rows, error)

QueryWithRetry executes a query using the write pool with retry logic. This automatically retries transient failures like connection errors, deadlocks, and serialization failures using exponential backoff.

Example:

config := pgxkit.DefaultRetryConfig()
rows, err := db.QueryWithRetry(ctx, config, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
    return fmt.Errorf("failed to query users after retries: %w", err)
}
defer rows.Close()

func (*DB) ReadQuery added in v1.1.0

func (db *DB) ReadQuery(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

ReadQuery executes a query using the read pool (explicit optimization). This method routes the query to read replicas when available, improving performance for read-heavy workloads. Only use this for queries that can tolerate read replica lag.

Example:

rows, err := db.ReadQuery(ctx, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
    return err
}
defer rows.Close()

func (*DB) ReadQueryRow added in v1.1.0

func (db *DB) ReadQueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

ReadQueryRow executes a query that returns a single row using the read pool. This method routes the query to read replicas when available, improving performance for read-heavy workloads. Only use this for queries that can tolerate read replica lag.

Example:

var count int
err := db.ReadQueryRow(ctx, "SELECT COUNT(*) FROM users").Scan(&count)

func (*DB) ReadQueryRowWithRetry added in v1.1.0

func (db *DB) ReadQueryRowWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) pgx.Row

ReadQueryRowWithRetry executes a query that returns a single row using the read pool with retry logic

func (*DB) ReadQueryWithRetry added in v1.1.0

func (db *DB) ReadQueryWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) (pgx.Rows, error)

ReadQueryWithRetry executes a query using the read pool with retry logic

func (*DB) ReadStats added in v1.1.0

func (db *DB) ReadStats() *pgxpool.Stat

ReadStats returns statistics for the read pool. This provides information about read connection usage, which is useful for monitoring read replica performance and connection pool health.

Example:

stats := db.ReadStats()
if stats != nil {
    log.Printf("Read pool active connections: %d", stats.AcquiredConns())
}

func (*DB) Shutdown added in v1.1.0

func (db *DB) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the database connections. It waits for active operations to complete, respecting the context timeout. If the context times out, shutdown proceeds anyway to prevent hanging.

The shutdown process: 1. Marks the database as shutting down (new operations will fail) 2. Waits for active operations to complete (respects context timeout) 3. Executes OnShutdown hooks 4. Closes connection pools

Example:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := db.Shutdown(ctx)

func (*DB) Stats added in v1.1.0

func (db *DB) Stats() *pgxpool.Stat

Stats returns statistics for the write pool. This provides information about connection usage, which is useful for monitoring and debugging connection pool performance.

Example:

stats := db.Stats()
if stats != nil {
    log.Printf("Active connections: %d", stats.AcquiredConns())
    log.Printf("Idle connections: %d", stats.IdleConns())
}

func (*DB) WriteStats added in v1.1.0

func (db *DB) WriteStats() *pgxpool.Stat

WriteStats returns statistics for the write pool. This is an alias for Stats() provided for consistency with ReadStats().

type DBConfig added in v1.1.0

type DBConfig struct {
	MaxConns        int32         // Maximum number of connections in the pool
	MinConns        int32         // Minimum number of connections in the pool
	MaxConnLifetime time.Duration // Maximum lifetime of a connection
	MaxConnIdleTime time.Duration // Maximum idle time for a connection
}

DBConfig holds configuration options for database connections. These options are applied when creating connection pools.

type DatabaseError

type DatabaseError struct {
	Entity    string
	Operation string // "create", "update", "delete", "query"
	Err       error
}

DatabaseError represents database operation failures such as connection errors, constraint violations, or other database-specific errors.

func NewDatabaseError

func NewDatabaseError(entity, operation string, err error) *DatabaseError

NewDatabaseError creates a new DatabaseError with the given entity, operation, and underlying error.

func (*DatabaseError) Error

func (e *DatabaseError) Error() string

func (*DatabaseError) Unwrap

func (e *DatabaseError) Unwrap() error

type HookFunc added in v1.1.0

type HookFunc func(ctx context.Context, sql string, args []interface{}, operationErr error) error

HookFunc is the universal hook function signature for operation-level hooks. All operation-level hooks use this signature for consistency and simplicity.

Parameters:

  • ctx: The context for the operation
  • sql: The SQL statement being executed (empty for shutdown hooks)
  • args: The arguments for the SQL statement (nil for shutdown hooks)
  • operationErr: The error from the operation (nil for before hooks)

The hook should return an error if it wants to abort the operation. For after hooks, returning an error will not affect the original operation result.

type HookType added in v1.1.0

type HookType int

HookType represents the type of hook for operation-level hooks. These hooks are executed during database operations and provide extensibility for logging, tracing, metrics, circuit breakers, and other cross-cutting concerns.

const (
	// BeforeOperation is called before any query/exec operation.
	// The operationErr parameter will always be nil.
	BeforeOperation HookType = iota

	// AfterOperation is called after any query/exec operation.
	// The operationErr parameter contains the result of the operation.
	AfterOperation

	// BeforeTransaction is called before starting a transaction.
	// The operationErr parameter will always be nil.
	BeforeTransaction

	// AfterTransaction is called after a transaction completes.
	// The operationErr parameter contains the result of the transaction.
	AfterTransaction

	// OnShutdown is called during graceful shutdown.
	// The sql and args parameters will be empty, operationErr will be nil.
	OnShutdown
)

type NotFoundError

type NotFoundError struct {
	Entity     string
	Identifier interface{}
}

NotFoundError represents when a requested entity is not found in the database. This error should be used instead of returning pgx.ErrNoRows directly.

Example:

if err == pgx.ErrNoRows {
    return nil, pgxkit.NewNotFoundError("User", userID)
}

func NewNotFoundError

func NewNotFoundError(entity string, identifier interface{}) *NotFoundError

NewNotFoundError creates a new NotFoundError with the given entity and identifier.

func (*NotFoundError) Error

func (e *NotFoundError) Error() string

type QueryPlan added in v1.1.0

type QueryPlan struct {
	Query       int                      `json:"query"`
	SQL         string                   `json:"sql"`
	Plan        []map[string]interface{} `json:"plan"`
	ExecutionMS float64                  `json:"execution_ms,omitempty"`
	PlanningMS  float64                  `json:"planning_ms,omitempty"`
}

QueryPlan represents a captured query and its EXPLAIN plan

type RetryConfig

type RetryConfig struct {
	MaxRetries int           // Maximum number of retry attempts
	BaseDelay  time.Duration // Initial delay between retries
	MaxDelay   time.Duration // Maximum delay between retries
	Multiplier float64       // Multiplier for exponential backoff
}

RetryConfig holds configuration for retry logic. It uses exponential backoff with jitter to avoid thundering herd problems.

func DefaultRetryConfig

func DefaultRetryConfig() *RetryConfig

DefaultRetryConfig returns a sensible default retry configuration. It provides 3 retries with exponential backoff starting at 100ms, capped at 1 second with a 2x multiplier.

Example:

config := pgxkit.DefaultRetryConfig()
// Customize if needed:
config.MaxRetries = 5
config.MaxDelay = 5 * time.Second

type TestDB added in v1.1.0

type TestDB struct {
	*DB
}

TestDB is a testing utility that wraps DB with testing-specific functionality. It provides simple methods for test setup, cleanup, and golden test support. TestDB automatically manages test database connections and provides utilities for performance regression testing through golden tests.

func NewTestDB added in v1.1.0

func NewTestDB() *TestDB

NewTestDB creates a new TestDB instance using the shared test pool. The test pool is automatically configured from the TEST_DATABASE_URL environment variable. If no test database is available, the TestDB will have nil pools and tests should skip.

Example:

func TestUserOperations(t *testing.T) {
    testDB := pgxkit.NewTestDB()
    err := testDB.Setup()
    if err != nil {
        t.Skip("Test database not available")
    }
    defer testDB.Clean()
    // ... test code
}

func RequireDB added in v1.1.0

func RequireDB(t *testing.T) *TestDB

RequireDB ensures a test database is available or skips the test

func (*TestDB) Clean added in v1.1.0

func (tdb *TestDB) Clean() error

Clean cleans the database after the test

func (*TestDB) EnableGolden added in v1.1.0

func (tdb *TestDB) EnableGolden(testName string) *DB

EnableGolden returns a new DB with golden test hooks added

func (*TestDB) Setup added in v1.1.0

func (tdb *TestDB) Setup() error

Setup prepares the database for testing. This method verifies the database connection and can be extended to run migrations, seed data, or perform other test setup tasks. Returns an error if the database is not available or not ready for testing.

Example:

err := testDB.Setup()
if err != nil {
    t.Skip("Test database not available")
}

type ValidationError

type ValidationError struct {
	Entity    string
	Operation string
	Field     string
	Reason    string
	Err       error
}

ValidationError represents validation failures that occur before database operations. Use this for input validation, constraint violations, or business rule failures.

func NewValidationError

func NewValidationError(entity, operation, field, reason string, err error) *ValidationError

NewValidationError creates a new ValidationError with the given parameters.

func (*ValidationError) Error

func (e *ValidationError) Error() string

func (*ValidationError) Unwrap

func (e *ValidationError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL