Documentation
¶
Index ¶
- func HandleTransactionRollback(ctx context.Context, tx TransactionRollback, logger *slog.Logger)
- func RollbackMigrations(databaseURL, migrationsPath string) error
- func RollbackTransaction(ctx context.Context, tx TransactionRollback, logger *slog.Logger, ...) error
- func RunMigrations(databaseURL, migrationsPath string) error
- func WithTransaction(ctx context.Context, conn *Connection, logger *slog.Logger, ...) error
- type Connection
- func (db *Connection) Acquire(ctx context.Context) (*ConnectionWrapper, error)
- func (db *Connection) AverageAcquireTime() time.Duration
- func (c *Connection) Begin(ctx context.Context) (pgx.Tx, error)
- func (c *Connection) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
- func (db *Connection) CheckConnections() error
- func (c *Connection) Close()
- func (c *Connection) Connect(ctx context.Context) error
- func (db *Connection) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, ...) (int64, error)
- func (db *Connection) DetailedHealth(ctx context.Context) *HealthStatus
- func (c *Connection) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
- func (db *Connection) GetMetrics() *PoolMetrics
- func (c *Connection) Health(ctx context.Context) error
- func (db *Connection) IsHealthy(ctx context.Context) bool
- func (c *Connection) Pool() *pgxpool.Pool
- func (c *Connection) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
- func (c *Connection) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
- func (db *Connection) ResetPool(ctx context.Context) error
- func (db *Connection) SendBatch(ctx context.Context, batch *pgx.Batch) pgx.BatchResults
- func (c *Connection) Stats() *pgxpool.Stat
- func (db *Connection) WaitForReady(ctx context.Context, timeout time.Duration) error
- func (db *Connection) WithConnection(ctx context.Context, fn func(*pgxpool.Conn) error) error
- func (db *Connection) WithTransaction(ctx context.Context, fn func(pgx.Tx) error) error
- type ConnectionWrapper
- type HealthStatus
- type Option
- type PoolMetrics
- type TransactionRollback
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HandleTransactionRollback ¶
func HandleTransactionRollback(ctx context.Context, tx TransactionRollback, logger *slog.Logger)
HandleTransactionRollback safely rolls back a transaction in deferred cleanup.
Designed for use in defer blocks where err != nil. Logs but doesn't propagate rollback errors to prevent masking the original failure. The pgx driver automatically rolls back uncommitted transactions on connection close, making this a best-effort operation for explicit cleanup.
Expected rollback failures (tx already closed, context canceled, or connection busy) are logged at WARN level to reduce noise. Unexpected failures are logged at ERROR level.
Example usage:
tx, err := conn.Begin(ctx)
if err != nil {
return err
}
defer func() {
if err != nil {
pgxutils.HandleTransactionRollback(ctx, tx, logger)
}
}()
func RollbackMigrations ¶ added in v1.0.2
RollbackMigrations rolls back the last migration
func RollbackTransaction ¶
func RollbackTransaction(ctx context.Context, tx TransactionRollback, logger *slog.Logger, originalErr error) error
RollbackTransaction is a convenience wrapper around HandleTransactionRollback that also wraps the original error with transaction context.
This function is useful when you want to both roll back a transaction and preserve the original error with additional context.
Example usage:
tx, err := conn.Begin(ctx)
if err != nil {
return err
}
defer func() {
if err != nil {
err = pgxutils.RollbackTransaction(ctx, tx, logger, err)
}
}()
func RunMigrations ¶ added in v1.0.2
RunMigrations runs database migrations using golang-migrate
func WithTransaction ¶
func WithTransaction(ctx context.Context, conn *Connection, logger *slog.Logger, fn func(pgx.Tx) error) error
WithTransaction executes a function within a database transaction. If the function returns an error, the transaction is rolled back. Otherwise, the transaction is committed.
This helper simplifies transaction management by handling the Begin/Commit/Rollback boilerplate.
Example usage:
err := pgxutils.WithTransaction(ctx, conn, logger, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "Alice")
if err != nil {
return err
}
_, err = tx.Exec(ctx, "INSERT INTO logs (message) VALUES ($1)", "User created")
return err
})
Types ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection manages a PostgreSQL connection pool with automatic retry and health checking. Thread-safe after Connect() succeeds.
func NewConnection ¶
func NewConnection(cfg *config.DatabaseConfig, opts ...Option) (*Connection, error)
NewConnection creates a new Connection instance without establishing connections.
Actual connection occurs in Connect() to allow retry configuration. Use functional options to customize behavior.
func (*Connection) Acquire ¶ added in v1.0.2
func (db *Connection) Acquire(ctx context.Context) (*ConnectionWrapper, error)
Acquire gets a connection from the pool with context
func (*Connection) AverageAcquireTime ¶ added in v1.0.2
func (db *Connection) AverageAcquireTime() time.Duration
AverageAcquireTime returns the average time to acquire a connection
func (*Connection) CheckConnections ¶ added in v1.0.2
func (db *Connection) CheckConnections() error
CheckConnections verifies that the connection pool is within healthy thresholds
func (*Connection) Close ¶
func (c *Connection) Close()
Close closes all pool connections immediately.
func (*Connection) Connect ¶
func (c *Connection) Connect(ctx context.Context) error
Connect establishes the connection pool with exponential backoff retry.
Retries for up to RetryTimeout (default 30s) with max 10s between attempts. Returns error if connection cannot be established within timeout.
func (*Connection) CopyFrom ¶ added in v1.0.2
func (db *Connection) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
CopyFrom performs a bulk insert using PostgreSQL COPY protocol
func (*Connection) DetailedHealth ¶ added in v1.0.2
func (db *Connection) DetailedHealth(ctx context.Context) *HealthStatus
DetailedHealth performs a comprehensive health check and returns detailed status
func (*Connection) Exec ¶
func (c *Connection) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
Exec executes queries without result rows (INSERT, UPDATE, DELETE).
func (*Connection) GetMetrics ¶ added in v1.0.2
func (db *Connection) GetMetrics() *PoolMetrics
GetMetrics returns current pool metrics
func (*Connection) Health ¶
func (c *Connection) Health(ctx context.Context) error
Health verifies database connectivity with a configurable timeout.
Returns error if pool uninitialized or SELECT 1 fails.
func (*Connection) IsHealthy ¶ added in v1.0.2
func (db *Connection) IsHealthy(ctx context.Context) bool
IsHealthy is a simple boolean check for database health
func (*Connection) Pool ¶
func (c *Connection) Pool() *pgxpool.Pool
Pool exposes the underlying pgxpool for advanced operations.
Prefer Connection methods; direct pool access bypasses initialization checks.
func (*Connection) QueryRow ¶
QueryRow executes queries expecting single row.
Returns emptyRow with error if pool uninitialized.
func (*Connection) ResetPool ¶ added in v1.0.2
func (db *Connection) ResetPool(ctx context.Context) error
ResetPool closes and recreates the connection pool This can be useful for handling certain types of connection errors
func (*Connection) SendBatch ¶ added in v1.0.2
func (db *Connection) SendBatch(ctx context.Context, batch *pgx.Batch) pgx.BatchResults
SendBatch sends a batch of queries to be executed
func (*Connection) Stats ¶
func (c *Connection) Stats() *pgxpool.Stat
Stats returns pool metrics or nil if uninitialized.
func (*Connection) WaitForReady ¶ added in v1.0.2
WaitForReady waits for the database to become ready with a timeout
func (*Connection) WithConnection ¶ added in v1.0.2
WithConnection executes a function with a database connection
func (*Connection) WithTransaction ¶ added in v1.0.2
WithTransaction executes a function within a database transaction
type ConnectionWrapper ¶ added in v1.0.2
type ConnectionWrapper struct {
// contains filtered or unexported fields
}
ConnectionWrapper wraps a connection with additional functionality
func (*ConnectionWrapper) Conn ¶ added in v1.0.2
func (cw *ConnectionWrapper) Conn() *pgxpool.Conn
Conn returns the underlying connection
func (*ConnectionWrapper) Release ¶ added in v1.0.2
func (cw *ConnectionWrapper) Release()
Release returns the connection to the pool
type HealthStatus ¶ added in v1.0.2
type HealthStatus struct {
Healthy bool `json:"healthy"`
Message string `json:"message"`
Latency time.Duration `json:"latency_ms"`
Connections int32 `json:"connections"`
IdleConns int32 `json:"idle_connections"`
MaxConns int32 `json:"max_connections"`
LastChecked time.Time `json:"last_checked"`
}
HealthStatus represents the health status of the database
type Option ¶
type Option func(*connectionOptions)
Option is a functional option for configuring Connection.
func WithHealthTimeout ¶
WithHealthTimeout sets a custom timeout for health checks. Default is 5 seconds.
func WithLogger ¶
WithLogger sets a custom logger for the connection.
func WithRetryTimeout ¶
WithRetryTimeout sets a custom timeout for connection retry logic. Default is 30 seconds.
type PoolMetrics ¶ added in v1.0.2
type PoolMetrics struct {
TotalConns int32 `json:"total_connections"`
AcquiredConns int32 `json:"acquired_connections"`
IdleConns int32 `json:"idle_connections"`
MaxConns int32 `json:"max_connections"`
TotalAcquireCount int64 `json:"total_acquire_count"`
TotalAcquireTime time.Duration `json:"total_acquire_time"`
EmptyAcquireCount int64 `json:"empty_acquire_count"`
CanceledAcquireCount int64 `json:"canceled_acquire_count"`
}
PoolMetrics provides detailed metrics about the connection pool
type TransactionRollback ¶
TransactionRollback defines the minimal interface needed for transaction rollback. This allows for easier testing while maintaining compatibility with pgx.Tx.