Documentation
¶
Overview ¶
Package pgxkit provides a production-ready PostgreSQL toolkit for Go applications.
pgxkit is a tool-agnostic PostgreSQL toolkit that works with any approach to PostgreSQL development - raw pgx usage, code generation tools like sqlc or Skimatik, or any other PostgreSQL development workflow.
Key Features:
- Read/Write Pool Abstraction: Safe by default with write pool, explicit read pool methods for optimization
- Extensible Hook System: Add logging, tracing, metrics, circuit breakers through hooks
- Smart Retry Logic: PostgreSQL-aware error detection with exponential backoff
- Testing Infrastructure: Golden test support for performance regression detection
- Type Helpers: Seamless pgx type conversions for clean architecture
- Health Checks: Built-in database connectivity monitoring
- Graceful Shutdown: Production-ready lifecycle management
Basic Usage:
db := pgxkit.NewDB()
err := db.Connect(ctx, "") // Uses POSTGRES_* env vars
if err != nil {
log.Fatal(err)
}
defer db.Shutdown(ctx)
// Execute queries (uses write pool by default - safe)
_, err = db.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "John")
// Optimize reads with explicit read pool usage
rows, err := db.ReadQuery(ctx, "SELECT id, name FROM users")
Hook System:
db.AddHook(pgxkit.BeforeOperation, func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
log.Printf("Executing: %s", sql)
return nil
})
The package follows a "safety first" design - all default methods use the write pool for consistency, with explicit ReadQuery() methods available for read optimization.
Index ¶
- Variables
- func CleanupGolden(testName string) error
- func CleanupTestData(sqlStatements ...string)
- func FromPgxBool(b pgtype.Bool) *bool
- func FromPgxBoolToBool(b pgtype.Bool) bool
- func FromPgxDate(d pgtype.Date) *time.Time
- func FromPgxFloat4(f pgtype.Float4) *float32
- func FromPgxFloat8(f pgtype.Float8) *float64
- func FromPgxInt2(i pgtype.Int2) *int16
- func FromPgxInt4(i pgtype.Int4) *int32
- func FromPgxInt4ToInt(i pgtype.Int4) *int
- func FromPgxInt8(i pgtype.Int8) *int64
- func FromPgxInt8Array(a pgtype.Array[pgtype.Int8]) []int64
- func FromPgxNumeric(n pgtype.Numeric) *float64
- func FromPgxText(t pgtype.Text) *string
- func FromPgxTextArray(a pgtype.Array[pgtype.Text]) []string
- func FromPgxTextToString(t pgtype.Text) string
- func FromPgxTime(t pgtype.Time) *time.Time
- func FromPgxTimestamp(t pgtype.Timestamp) *time.Time
- func FromPgxTimestamptz(t pgtype.Timestamptz) time.Time
- func FromPgxTimestamptzPtr(t pgtype.Timestamptz) *time.Time
- func FromPgxUUID(pgxID pgtype.UUID) uuid.UUID
- func FromPgxUUIDToPtr(pgxID pgtype.UUID) *uuid.UUID
- func GetDSN() string
- func IsRetryableError(err error) bool
- func RetryOperation(ctx context.Context, config *RetryConfig, ...) error
- func ToPgxBool(b *bool) pgtype.Bool
- func ToPgxBoolFromBool(b bool) pgtype.Bool
- func ToPgxDate(t *time.Time) pgtype.Date
- func ToPgxFloat4(f *float32) pgtype.Float4
- func ToPgxFloat8(f *float64) pgtype.Float8
- func ToPgxInt2(i *int16) pgtype.Int2
- func ToPgxInt4(i *int32) pgtype.Int4
- func ToPgxInt4FromInt(i *int) pgtype.Int4
- func ToPgxInt8(i *int64) pgtype.Int8
- func ToPgxInt8Array(s []int64) pgtype.Array[pgtype.Int8]
- func ToPgxNumeric(f *float64) pgtype.Numeric
- func ToPgxText(s *string) pgtype.Text
- func ToPgxTextArray(s []string) pgtype.Array[pgtype.Text]
- func ToPgxTextFromString(s string) pgtype.Text
- func ToPgxTime(t *time.Time) pgtype.Time
- func ToPgxTimestamp(t *time.Time) pgtype.Timestamp
- func ToPgxTimestamptz(t *time.Time) pgtype.Timestamptz
- func ToPgxUUID(id uuid.UUID) pgtype.UUID
- func ToPgxUUIDFromPtr(id *uuid.UUID) pgtype.UUID
- func WithTimeout[T any](ctx context.Context, timeout time.Duration, ...) (T, error)
- func WithTimeoutAndRetry[T any](ctx context.Context, timeout time.Duration, retryConfig *RetryConfig, ...) (T, error)
- type ConnectionHooks
- func (h *ConnectionHooks) AddOnAcquire(fn func(context.Context, *pgx.Conn) error)
- func (h *ConnectionHooks) AddOnConnect(fn func(*pgx.Conn) error)
- func (h *ConnectionHooks) AddOnDisconnect(fn func(*pgx.Conn))
- func (h *ConnectionHooks) AddOnRelease(fn func(*pgx.Conn))
- func (ch *ConnectionHooks) ConfigurePool(config *pgxpool.Config)
- func (h *ConnectionHooks) ExecuteOnAcquire(ctx context.Context, conn *pgx.Conn) error
- func (h *ConnectionHooks) ExecuteOnConnect(conn *pgx.Conn) error
- func (h *ConnectionHooks) ExecuteOnDisconnect(conn *pgx.Conn)
- func (h *ConnectionHooks) ExecuteOnRelease(conn *pgx.Conn)
- type DB
- func (db *DB) AddConnectionHook(hookType string, hookFunc interface{}) error
- func (db *DB) AddHook(hookType HookType, hookFunc HookFunc) *DB
- func (db *DB) AssertGolden(t *testing.T, testName string)
- func (db *DB) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
- func (db *DB) BeginTxWithRetry(ctx context.Context, config *RetryConfig, txOptions pgx.TxOptions) (pgx.Tx, error)
- func (db *DB) Connect(ctx context.Context, dsn string) error
- func (db *DB) ConnectReadWrite(ctx context.Context, readDSN, writeDSN string) error
- func (db *DB) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
- func (db *DB) ExecWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) (pgconn.CommandTag, error)
- func (db *DB) HealthCheck(ctx context.Context) error
- func (db *DB) IsReady(ctx context.Context) bool
- func (db *DB) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
- func (db *DB) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
- func (db *DB) QueryRowWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) pgx.Row
- func (db *DB) QueryWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) (pgx.Rows, error)
- func (db *DB) ReadQuery(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
- func (db *DB) ReadQueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
- func (db *DB) ReadQueryRowWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) pgx.Row
- func (db *DB) ReadQueryWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) (pgx.Rows, error)
- func (db *DB) ReadStats() *pgxpool.Stat
- func (db *DB) Shutdown(ctx context.Context) error
- func (db *DB) Stats() *pgxpool.Stat
- func (db *DB) WriteStats() *pgxpool.Stat
- type DBConfig
- type DatabaseError
- type HookFunc
- type HookType
- type NotFoundError
- type QueryPlan
- type RetryConfig
- type TestDB
- type ValidationError
Constants ¶
This section is empty.
Variables ¶
var ( ToPgxNumericFromFloat64Ptr = ToPgxNumeric FromPgxNumericPtr = FromPgxNumeric )
Legacy aliases for backward compatibility TODO: Consider deprecating these in favor of the new names
Functions ¶
func CleanupGolden ¶ added in v1.1.0
CleanupGolden removes all golden files for a test
func CleanupTestData ¶
func CleanupTestData(sqlStatements ...string)
CleanupTestData executes cleanup SQL statements on the shared test database
func FromPgxBool ¶
FromPgxBool converts a pgtype.Bool to a bool pointer. If the pgtype.Bool is invalid (NULL), returns nil.
func FromPgxBoolToBool ¶ added in v1.1.0
FromPgxBoolToBool converts a pgtype.Bool to a bool value. If the pgtype.Bool is invalid (NULL), returns false.
func FromPgxDate ¶ added in v1.1.0
FromPgxDate converts a pgtype.Date to a time.Time pointer. If the pgtype.Date is invalid (NULL), returns nil.
func FromPgxFloat4 ¶ added in v1.1.0
FromPgxFloat4 converts a pgtype.Float4 to a float32 pointer. If the pgtype.Float4 is invalid (NULL), returns nil.
func FromPgxFloat8 ¶ added in v1.1.0
FromPgxFloat8 converts a pgtype.Float8 to a float64 pointer. If the pgtype.Float8 is invalid (NULL), returns nil.
func FromPgxInt2 ¶ added in v1.1.0
FromPgxInt2 converts a pgtype.Int2 to an int16 pointer. If the pgtype.Int2 is invalid (NULL), returns nil.
func FromPgxInt4 ¶
FromPgxInt4 converts a pgtype.Int4 to an int32 pointer. If the pgtype.Int4 is invalid (NULL), returns nil.
func FromPgxInt4ToInt ¶ added in v1.1.0
FromPgxInt4ToInt converts a pgtype.Int4 to an int pointer. If the pgtype.Int4 is invalid (NULL), returns nil.
func FromPgxInt8 ¶
FromPgxInt8 converts a pgtype.Int8 to an int64 pointer. If the pgtype.Int8 is invalid (NULL), returns nil.
func FromPgxInt8Array ¶ added in v1.1.0
FromPgxInt8Array converts a pgtype.Array[pgtype.Int8] to an int64 slice. If the array is invalid (NULL), returns nil.
func FromPgxNumeric ¶ added in v1.1.0
FromPgxNumeric converts a pgtype.Numeric to a float64 pointer. If the pgtype.Numeric is invalid (NULL), returns nil.
func FromPgxText ¶
FromPgxText converts a pgtype.Text to a string pointer. If the pgtype.Text is invalid (NULL), returns nil.
func FromPgxTextArray ¶ added in v1.1.0
FromPgxTextArray converts a pgtype.Array[pgtype.Text] to a string slice. If the array is invalid (NULL), returns nil.
func FromPgxTextToString ¶
FromPgxTextToString converts a pgtype.Text to a string value. If the pgtype.Text is invalid (NULL), returns empty string.
func FromPgxTime ¶ added in v1.1.0
FromPgxTime converts a pgtype.Time to a time.Time pointer. If the pgtype.Time is invalid (NULL), returns nil. The returned time will be on the current date with the time component.
func FromPgxTimestamp ¶ added in v1.1.0
FromPgxTimestamp converts a pgtype.Timestamp to a time.Time pointer. If the pgtype.Timestamp is invalid (NULL), returns nil.
func FromPgxTimestamptz ¶
func FromPgxTimestamptz(t pgtype.Timestamptz) time.Time
FromPgxTimestamptz converts a pgtype.Timestamptz to a time.Time value. If the pgtype.Timestamptz is invalid (NULL), returns zero time.
func FromPgxTimestamptzPtr ¶
func FromPgxTimestamptzPtr(t pgtype.Timestamptz) *time.Time
FromPgxTimestamptzPtr converts a pgtype.Timestamptz to a time.Time pointer. If the pgtype.Timestamptz is invalid (NULL), returns nil.
func FromPgxUUID ¶
FromPgxUUID converts a pgtype.UUID to uuid.UUID. Returns uuid.Nil if the pgtype.UUID is invalid or cannot be parsed.
func FromPgxUUIDToPtr ¶ added in v1.1.0
FromPgxUUIDToPtr converts a pgtype.UUID to a uuid.UUID pointer. If the pgtype.UUID is invalid (NULL), returns nil.
func GetDSN ¶
func GetDSN() string
GetDSN returns a PostgreSQL connection string built from environment variables. This is useful for integrating with external tools like golang-migrate that need a connection string rather than a pgxpool.Pool.
Environment variables used:
- POSTGRES_HOST (default: "localhost")
- POSTGRES_PORT (default: 5432)
- POSTGRES_USER (default: "postgres")
- POSTGRES_PASSWORD (default: "")
- POSTGRES_DB (default: "postgres")
- POSTGRES_SSLMODE (default: "disable")
Example:
dsn := pgxkit.GetDSN() // Use with golang-migrate: migrate -database "$(pgxkit.GetDSN())" -path ./migrations up
func IsRetryableError ¶ added in v1.1.0
IsRetryableError determines if an error is worth retrying
func RetryOperation ¶ added in v1.1.0
func RetryOperation(ctx context.Context, config *RetryConfig, operation func(context.Context) error) error
RetryOperation is the generic retry function that can be used with any operation
func ToPgxBool ¶
ToPgxBool converts a bool pointer to pgtype.Bool. If the input is nil, returns an invalid pgtype.Bool (NULL in database).
func ToPgxBoolFromBool ¶ added in v1.1.0
ToPgxBoolFromBool converts a bool value to pgtype.Bool. Use this when you have a bool value instead of a pointer.
func ToPgxDate ¶ added in v1.1.0
ToPgxDate converts a time.Time pointer to pgtype.Date. If the input is nil, returns an invalid pgtype.Date (NULL in database).
func ToPgxFloat4 ¶ added in v1.1.0
ToPgxFloat4 converts a float32 pointer to pgtype.Float4. If the input is nil, returns an invalid pgtype.Float4 (NULL in database).
func ToPgxFloat8 ¶ added in v1.1.0
ToPgxFloat8 converts a float64 pointer to pgtype.Float8. If the input is nil, returns an invalid pgtype.Float8 (NULL in database).
func ToPgxInt2 ¶ added in v1.1.0
ToPgxInt2 converts an int16 pointer to pgtype.Int2. If the input is nil, returns an invalid pgtype.Int2 (NULL in database).
func ToPgxInt4 ¶ added in v1.1.0
ToPgxInt4 converts an int32 pointer to pgtype.Int4. If the input is nil, returns an invalid pgtype.Int4 (NULL in database).
func ToPgxInt4FromInt ¶
ToPgxInt4FromInt converts an int pointer to pgtype.Int4. If the input is nil, returns an invalid pgtype.Int4 (NULL in database).
func ToPgxInt8 ¶
ToPgxInt8 converts an int64 pointer to pgtype.Int8. If the input is nil, returns an invalid pgtype.Int8 (NULL in database).
func ToPgxInt8Array ¶ added in v1.1.0
ToPgxInt8Array converts an int64 slice to pgtype.Array[pgtype.Int8]. If the input is nil, returns an invalid array (NULL in database).
func ToPgxNumeric ¶ added in v1.1.0
ToPgxNumeric converts a float64 pointer to pgtype.Numeric. If the input is nil, returns an invalid pgtype.Numeric (NULL in database). Uses 6 decimal places as standard precision.
func ToPgxText ¶
ToPgxText converts a string pointer to pgtype.Text. If the input is nil, returns an invalid pgtype.Text (NULL in database).
func ToPgxTextArray ¶ added in v1.1.0
ToPgxTextArray converts a string slice to pgtype.Array[pgtype.Text]. If the input is nil, returns an invalid array (NULL in database).
func ToPgxTextFromString ¶ added in v1.1.0
ToPgxTextFromString converts a string value to pgtype.Text. Use this when you have a string value instead of a pointer.
func ToPgxTime ¶ added in v1.1.0
ToPgxTime converts a time.Time pointer to pgtype.Time. If the input is nil, returns an invalid pgtype.Time (NULL in database).
func ToPgxTimestamp ¶ added in v1.1.0
ToPgxTimestamp converts a time.Time pointer to pgtype.Timestamp. If the input is nil, returns an invalid pgtype.Timestamp (NULL in database).
func ToPgxTimestamptz ¶
func ToPgxTimestamptz(t *time.Time) pgtype.Timestamptz
ToPgxTimestamptz converts a time.Time pointer to pgtype.Timestamptz. If the input is nil, returns an invalid pgtype.Timestamptz (NULL in database).
func ToPgxUUIDFromPtr ¶ added in v1.1.0
ToPgxUUIDFromPtr converts a uuid.UUID pointer to pgtype.UUID. If the input is nil, returns an invalid pgtype.UUID (NULL in database).
func WithTimeout ¶
func WithTimeout[T any](ctx context.Context, timeout time.Duration, fn func(context.Context) (T, error)) (T, error)
WithTimeout executes a function with a timeout. This is a generic utility function that can be used with any operation.
Example:
result, err := pgxkit.WithTimeout(ctx, 5*time.Second, func(ctx context.Context) (*User, error) {
return getUserFromDatabase(ctx)
})
func WithTimeoutAndRetry ¶
func WithTimeoutAndRetry[T any](ctx context.Context, timeout time.Duration, retryConfig *RetryConfig, fn func(context.Context) (T, error)) (T, error)
WithTimeoutAndRetry executes a function with timeout and retry logic. This combines timeout handling with intelligent retry logic for transient failures.
Example:
config := pgxkit.DefaultRetryConfig()
result, err := pgxkit.WithTimeoutAndRetry(ctx, 5*time.Second, config, func(ctx context.Context) (*User, error) {
return getUserFromDatabase(ctx)
})
Types ¶
type ConnectionHooks ¶
type ConnectionHooks struct {
// contains filtered or unexported fields
}
ConnectionHooks manages connection lifecycle hooks. These hooks are integrated with pgx's connection lifecycle and are useful for connection setup, validation, and cleanup. They use pgx's native function signatures.
func CombineHooks ¶
func CombineHooks(hooksList ...*ConnectionHooks) *ConnectionHooks
CombineHooks combines multiple hook managers into one
func NewConnectionHooks ¶
func NewConnectionHooks() *ConnectionHooks
NewConnectionHooks creates a new connection hooks manager. This is used internally by the DB type but can also be used directly for advanced connection pool configuration.
Example:
hooks := pgxkit.NewConnectionHooks()
hooks.AddOnConnect(func(conn *pgx.Conn) error {
log.Println("New connection established")
return nil
})
func SetupHook ¶
func SetupHook(setupSQL string) *ConnectionHooks
SetupHook creates a hook that sets up connection-specific settings
func ValidationHook ¶
func ValidationHook() *ConnectionHooks
ValidationHook creates a hook that validates connections
func (*ConnectionHooks) AddOnAcquire ¶
AddOnAcquire adds a callback that will be called when a connection is acquired from the pool
func (*ConnectionHooks) AddOnConnect ¶
func (h *ConnectionHooks) AddOnConnect(fn func(*pgx.Conn) error)
AddOnConnect adds a callback that will be called when a new connection is established. This is useful for connection initialization, setting session variables, or validation. If the callback returns an error, the connection will be closed.
Example:
hooks.AddOnConnect(func(conn *pgx.Conn) error {
_, err := conn.Exec(context.Background(), "SET application_name = 'myapp'")
return err
})
func (*ConnectionHooks) AddOnDisconnect ¶
func (h *ConnectionHooks) AddOnDisconnect(fn func(*pgx.Conn))
AddOnDisconnect adds a callback that will be called when a connection is closed
func (*ConnectionHooks) AddOnRelease ¶
func (h *ConnectionHooks) AddOnRelease(fn func(*pgx.Conn))
AddOnRelease adds a callback that will be called when a connection is released back to the pool
func (*ConnectionHooks) ConfigurePool ¶ added in v1.1.0
func (ch *ConnectionHooks) ConfigurePool(config *pgxpool.Config)
ConfigurePool configures a pgxpool.Config with the connection hooks This integrates the hooks with the actual pool lifecycle events
func (*ConnectionHooks) ExecuteOnAcquire ¶
ExecuteOnAcquire executes all OnAcquire callbacks
func (*ConnectionHooks) ExecuteOnConnect ¶
func (h *ConnectionHooks) ExecuteOnConnect(conn *pgx.Conn) error
ExecuteOnConnect executes all OnConnect callbacks
func (*ConnectionHooks) ExecuteOnDisconnect ¶
func (h *ConnectionHooks) ExecuteOnDisconnect(conn *pgx.Conn)
ExecuteOnDisconnect executes all OnDisconnect callbacks
func (*ConnectionHooks) ExecuteOnRelease ¶
func (h *ConnectionHooks) ExecuteOnRelease(conn *pgx.Conn)
ExecuteOnRelease executes all OnRelease callbacks
type DB ¶ added in v1.1.0
type DB struct {
// contains filtered or unexported fields
}
DB represents a database connection with read/write pool abstraction. It provides a safe-by-default approach where all operations use the write pool unless explicitly using Read* methods for optimization.
The DB supports:
- Single pool mode (same pool for read/write)
- Read/write split mode (separate pools for optimization)
- Extensible hook system for logging, tracing, metrics
- Graceful shutdown with active operation tracking
- Built-in retry logic for transient failures
- Health checks and connection statistics
func NewDB ¶ added in v1.1.0
func NewDB() *DB
NewDB creates a new unconnected DB instance. Add hooks to this instance, then call Connect() to establish the database connection.
Example:
db := pgxkit.NewDB() db.AddHook(pgxkit.BeforeOperation, myLoggingHook) err := db.Connect(ctx, "postgres://user:pass@localhost/db")
func NewDBWithPool ¶ added in v1.1.0
NewDBWithPool creates a new DB instance with a single pool (same pool for read/write). Deprecated: Use NewDB() + Connect() instead for proper hook integration.
func NewReadWriteDB ¶ added in v1.1.0
NewReadWriteDB creates a new DB instance with separate read and write pools. Deprecated: Use NewDB() + ConnectReadWrite() instead for proper hook integration.
func (*DB) AddConnectionHook ¶ added in v1.1.0
AddConnectionHook adds a connection-level hook to the database. These hooks are integrated with pgx's connection lifecycle and are useful for connection setup, validation, and cleanup.
Available hook types:
- "OnConnect": Called when a new connection is established
- "OnDisconnect": Called when a connection is closed
- "OnAcquire": Called when a connection is acquired from the pool
- "OnRelease": Called when a connection is returned to the pool
Example:
db.AddConnectionHook("OnConnect", func(conn *pgx.Conn) error {
log.Println("New connection established")
return nil
})
TODO: hookType needs to be an Enum. TODO: this doc needs to be updated to reflect the new hookType Enum and that this is for pgx hooks. TODO: this should be split up into the different pgx hooks. and then we don't need the enum.
func (*DB) AddHook ¶ added in v1.1.0
AddHook adds an operation-level hook to the database. Hooks are executed in the order they are added and provide extensibility for logging, tracing, metrics, circuit breakers, and other cross-cutting concerns.
Available hook types:
- BeforeOperation: Called before any query/exec operation
- AfterOperation: Called after any query/exec operation
- BeforeTransaction: Called before starting a transaction
- AfterTransaction: Called after a transaction completes
- OnShutdown: Called during graceful shutdown
Example:
db.AddHook(pgxkit.BeforeOperation, func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
log.Printf("Executing: %s", sql)
return nil
})
func (*DB) AssertGolden ¶ added in v1.1.0
AssertGolden compares captured query plans with existing golden file
func (*DB) BeginTx ¶ added in v1.1.0
BeginTx starts a transaction using the write pool. Transactions always use the write pool to ensure consistency. The transaction will execute BeforeTransaction and AfterTransaction hooks.
Example:
tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return err
}
defer tx.Rollback(ctx) // Safe to call even after commit
_, err = tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", name)
if err != nil {
return err
}
return tx.Commit(ctx)
func (*DB) BeginTxWithRetry ¶ added in v1.1.0
func (db *DB) BeginTxWithRetry(ctx context.Context, config *RetryConfig, txOptions pgx.TxOptions) (pgx.Tx, error)
BeginTxWithRetry starts a transaction using the write pool with retry logic
func (*DB) Connect ¶ added in v1.1.0
Connect establishes a database connection with a single pool (same pool for read/write). If dsn is empty, it uses environment variables to construct the connection string. The hooks are configured at pool creation time for proper integration.
This is the recommended approach for most applications as it provides safety by default while still allowing read optimization through ReadQuery methods.
Example:
db := pgxkit.NewDB() err := db.Connect(ctx, "postgres://user:pass@localhost/db") // Or use environment variables: err := db.Connect(ctx, "")
func (*DB) ConnectReadWrite ¶ added in v1.1.0
ConnectReadWrite establishes database connections with separate read and write pools. If readDSN or writeDSN is empty, it uses environment variables to construct the connection string. The hooks are configured at pool creation time for proper integration.
This is useful for applications that want to optimize read performance by routing read queries to read replicas while ensuring writes go to the primary database.
Example:
db := pgxkit.NewDB() err := db.ConnectReadWrite(ctx, "postgres://user:pass@read-replica/db", "postgres://user:pass@primary/db") // Now ReadQuery methods will use the read pool, while Query/Exec use the write pool
func (*DB) Exec ¶ added in v1.1.0
Exec executes a statement using the write pool. This method is used for INSERT, UPDATE, DELETE, and other write operations.
Example:
tag, err := db.Exec(ctx, "INSERT INTO users (name, email) VALUES ($1, $2)", name, email)
if err != nil {
return err
}
fmt.Printf("Inserted %d rows\n", tag.RowsAffected())
func (*DB) ExecWithRetry ¶ added in v1.1.0
func (db *DB) ExecWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) (pgconn.CommandTag, error)
ExecWithRetry executes a statement using the write pool with retry logic. This automatically retries transient failures like connection errors, deadlocks, and serialization failures using exponential backoff.
Example:
config := pgxkit.DefaultRetryConfig()
tag, err := db.ExecWithRetry(ctx, config, "INSERT INTO users (name) VALUES ($1)", name)
if err != nil {
return fmt.Errorf("failed to insert user after retries: %w", err)
}
func (*DB) HealthCheck ¶ added in v1.1.0
HealthCheck performs a simple health check by pinging the database. This is useful for health check endpoints and monitoring systems. It returns an error if the database is not connected, shutting down, or unreachable.
Example:
if err := db.HealthCheck(ctx); err != nil {
log.Printf("Database health check failed: %v", err)
http.Error(w, "Database unavailable", http.StatusServiceUnavailable)
return
}
func (*DB) IsReady ¶ added in v1.1.0
IsReady checks if the database connection is ready to accept queries. This is a convenience method that returns true if HealthCheck() succeeds. It's useful for readiness probes and quick status checks.
Example:
if db.IsReady(ctx) {
log.Println("Database is ready to accept queries")
}
func (*DB) Query ¶ added in v1.1.0
Query executes a query using the write pool (safe by default). This ensures consistency by always using the primary database connection. Use ReadQuery for read-only queries that can benefit from read replicas.
Example:
rows, err := db.Query(ctx, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
return err
}
defer rows.Close()
func (*DB) QueryRow ¶ added in v1.1.0
QueryRow executes a query that returns a single row using the write pool. This ensures consistency by always using the primary database connection. Use ReadQueryRow for read-only queries that can benefit from read replicas.
Example:
var userID int err := db.QueryRow(ctx, "SELECT id FROM users WHERE email = $1", email).Scan(&userID)
func (*DB) QueryRowWithRetry ¶ added in v1.1.0
func (db *DB) QueryRowWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) pgx.Row
QueryRowWithRetry executes a query that returns a single row using the write pool with retry logic. This automatically retries transient failures like connection errors, deadlocks, and serialization failures using exponential backoff.
Example:
config := pgxkit.DefaultRetryConfig() row := db.QueryRowWithRetry(ctx, config, "SELECT id FROM users WHERE email = $1", email) var userID int err := row.Scan(&userID)
func (*DB) QueryWithRetry ¶ added in v1.1.0
func (db *DB) QueryWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) (pgx.Rows, error)
QueryWithRetry executes a query using the write pool with retry logic. This automatically retries transient failures like connection errors, deadlocks, and serialization failures using exponential backoff.
Example:
config := pgxkit.DefaultRetryConfig()
rows, err := db.QueryWithRetry(ctx, config, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
return fmt.Errorf("failed to query users after retries: %w", err)
}
defer rows.Close()
func (*DB) ReadQuery ¶ added in v1.1.0
ReadQuery executes a query using the read pool (explicit optimization). This method routes the query to read replicas when available, improving performance for read-heavy workloads. Only use this for queries that can tolerate read replica lag.
Example:
rows, err := db.ReadQuery(ctx, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
return err
}
defer rows.Close()
func (*DB) ReadQueryRow ¶ added in v1.1.0
ReadQueryRow executes a query that returns a single row using the read pool. This method routes the query to read replicas when available, improving performance for read-heavy workloads. Only use this for queries that can tolerate read replica lag.
Example:
var count int err := db.ReadQueryRow(ctx, "SELECT COUNT(*) FROM users").Scan(&count)
func (*DB) ReadQueryRowWithRetry ¶ added in v1.1.0
func (db *DB) ReadQueryRowWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) pgx.Row
ReadQueryRowWithRetry executes a query that returns a single row using the read pool with retry logic
func (*DB) ReadQueryWithRetry ¶ added in v1.1.0
func (db *DB) ReadQueryWithRetry(ctx context.Context, config *RetryConfig, sql string, args ...interface{}) (pgx.Rows, error)
ReadQueryWithRetry executes a query using the read pool with retry logic
func (*DB) ReadStats ¶ added in v1.1.0
ReadStats returns statistics for the read pool. This provides information about read connection usage, which is useful for monitoring read replica performance and connection pool health.
Example:
stats := db.ReadStats()
if stats != nil {
log.Printf("Read pool active connections: %d", stats.AcquiredConns())
}
func (*DB) Shutdown ¶ added in v1.1.0
Shutdown gracefully shuts down the database connections. It waits for active operations to complete, respecting the context timeout. If the context times out, shutdown proceeds anyway to prevent hanging.
The shutdown process: 1. Marks the database as shutting down (new operations will fail) 2. Waits for active operations to complete (respects context timeout) 3. Executes OnShutdown hooks 4. Closes connection pools
Example:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() err := db.Shutdown(ctx)
func (*DB) Stats ¶ added in v1.1.0
Stats returns statistics for the write pool. This provides information about connection usage, which is useful for monitoring and debugging connection pool performance.
Example:
stats := db.Stats()
if stats != nil {
log.Printf("Active connections: %d", stats.AcquiredConns())
log.Printf("Idle connections: %d", stats.IdleConns())
}
func (*DB) WriteStats ¶ added in v1.1.0
WriteStats returns statistics for the write pool. This is an alias for Stats() provided for consistency with ReadStats().
type DBConfig ¶ added in v1.1.0
type DBConfig struct {
MaxConns int32 // Maximum number of connections in the pool
MinConns int32 // Minimum number of connections in the pool
MaxConnLifetime time.Duration // Maximum lifetime of a connection
MaxConnIdleTime time.Duration // Maximum idle time for a connection
}
DBConfig holds configuration options for database connections. These options are applied when creating connection pools.
type DatabaseError ¶
type DatabaseError struct {
Entity string
Operation string // "create", "update", "delete", "query"
Err error
}
DatabaseError represents database operation failures such as connection errors, constraint violations, or other database-specific errors.
func NewDatabaseError ¶
func NewDatabaseError(entity, operation string, err error) *DatabaseError
NewDatabaseError creates a new DatabaseError with the given entity, operation, and underlying error.
func (*DatabaseError) Error ¶
func (e *DatabaseError) Error() string
func (*DatabaseError) Unwrap ¶
func (e *DatabaseError) Unwrap() error
type HookFunc ¶ added in v1.1.0
HookFunc is the universal hook function signature for operation-level hooks. All operation-level hooks use this signature for consistency and simplicity.
Parameters:
- ctx: The context for the operation
- sql: The SQL statement being executed (empty for shutdown hooks)
- args: The arguments for the SQL statement (nil for shutdown hooks)
- operationErr: The error from the operation (nil for before hooks)
The hook should return an error if it wants to abort the operation. For after hooks, returning an error will not affect the original operation result.
type HookType ¶ added in v1.1.0
type HookType int
HookType represents the type of hook for operation-level hooks. These hooks are executed during database operations and provide extensibility for logging, tracing, metrics, circuit breakers, and other cross-cutting concerns.
const ( // BeforeOperation is called before any query/exec operation. // The operationErr parameter will always be nil. BeforeOperation HookType = iota // AfterOperation is called after any query/exec operation. // The operationErr parameter contains the result of the operation. AfterOperation // BeforeTransaction is called before starting a transaction. // The operationErr parameter will always be nil. BeforeTransaction // AfterTransaction is called after a transaction completes. // The operationErr parameter contains the result of the transaction. AfterTransaction // OnShutdown is called during graceful shutdown. // The sql and args parameters will be empty, operationErr will be nil. OnShutdown )
type NotFoundError ¶
type NotFoundError struct {
Entity string
Identifier interface{}
}
NotFoundError represents when a requested entity is not found in the database. This error should be used instead of returning pgx.ErrNoRows directly.
Example:
if err == pgx.ErrNoRows {
return nil, pgxkit.NewNotFoundError("User", userID)
}
func NewNotFoundError ¶
func NewNotFoundError(entity string, identifier interface{}) *NotFoundError
NewNotFoundError creates a new NotFoundError with the given entity and identifier.
func (*NotFoundError) Error ¶
func (e *NotFoundError) Error() string
type QueryPlan ¶ added in v1.1.0
type QueryPlan struct {
Query int `json:"query"`
SQL string `json:"sql"`
Plan []map[string]interface{} `json:"plan"`
ExecutionMS float64 `json:"execution_ms,omitempty"`
PlanningMS float64 `json:"planning_ms,omitempty"`
}
QueryPlan represents a captured query and its EXPLAIN plan
type RetryConfig ¶
type RetryConfig struct {
MaxRetries int // Maximum number of retry attempts
BaseDelay time.Duration // Initial delay between retries
MaxDelay time.Duration // Maximum delay between retries
Multiplier float64 // Multiplier for exponential backoff
}
RetryConfig holds configuration for retry logic. It uses exponential backoff with jitter to avoid thundering herd problems.
func DefaultRetryConfig ¶
func DefaultRetryConfig() *RetryConfig
DefaultRetryConfig returns a sensible default retry configuration. It provides 3 retries with exponential backoff starting at 100ms, capped at 1 second with a 2x multiplier.
Example:
config := pgxkit.DefaultRetryConfig() // Customize if needed: config.MaxRetries = 5 config.MaxDelay = 5 * time.Second
type TestDB ¶ added in v1.1.0
type TestDB struct {
*DB
}
TestDB is a testing utility that wraps DB with testing-specific functionality. It provides simple methods for test setup, cleanup, and golden test support. TestDB automatically manages test database connections and provides utilities for performance regression testing through golden tests.
func NewTestDB ¶ added in v1.1.0
func NewTestDB() *TestDB
NewTestDB creates a new TestDB instance using the shared test pool. The test pool is automatically configured from the TEST_DATABASE_URL environment variable. If no test database is available, the TestDB will have nil pools and tests should skip.
Example:
func TestUserOperations(t *testing.T) {
testDB := pgxkit.NewTestDB()
err := testDB.Setup()
if err != nil {
t.Skip("Test database not available")
}
defer testDB.Clean()
// ... test code
}
func (*TestDB) EnableGolden ¶ added in v1.1.0
EnableGolden returns a new DB with golden test hooks added
func (*TestDB) Setup ¶ added in v1.1.0
Setup prepares the database for testing. This method verifies the database connection and can be extended to run migrations, seed data, or perform other test setup tasks. Returns an error if the database is not available or not ready for testing.
Example:
err := testDB.Setup()
if err != nil {
t.Skip("Test database not available")
}
type ValidationError ¶
ValidationError represents validation failures that occur before database operations. Use this for input validation, constraint violations, or business rule failures.
func NewValidationError ¶
func NewValidationError(entity, operation, field, reason string, err error) *ValidationError
NewValidationError creates a new ValidationError with the given parameters.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
func (*ValidationError) Unwrap ¶
func (e *ValidationError) Unwrap() error