postgres

package
v0.0.0-...-6c1af87 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: MIT Imports: 29 Imported by: 0

Documentation

Overview

Package postgres provides a PostgreSQL adapter for the GoCell framework.

It wraps pgx/v5 to offer:

  • Pool: connection pool with DSN/env-based configuration and Health() probe.
  • TxManager: RunInTx with context-embedded pgx.Tx, savepoint nesting, and automatic panic rollback.
  • Migrator: embed.FS-driven SQL migrations with up/down/status and a schema_migrations tracking table.
  • RowScanner helper for reducing boilerplate in repository implementations.

For parameterized SQL query construction, see pkg/query.Builder.

Error codes use the ERR_ADAPTER_PG_* prefix (see errcode.go in this package).

ref: Watermill watermill-sql schema_adapter_postgresql.go — adopted advisory locking for migration init; diverged by using embed.FS instead of inline SQL.

Index

Constants

View Source
const (
	// ErrAdapterPGConnect indicates a connection or pool initialization failure.
	ErrAdapterPGConnect errcode.Code = "ERR_ADAPTER_PG_CONNECT"

	// ErrAdapterPGQuery indicates a query execution failure.
	ErrAdapterPGQuery errcode.Code = "ERR_ADAPTER_PG_QUERY"

	// ErrAdapterPGMigrate indicates a migration execution or tracking failure.
	ErrAdapterPGMigrate errcode.Code = "ERR_ADAPTER_PG_MIGRATE"

	// ErrAdapterPGNoTx indicates outbox.Writer.Write was called outside a transaction.
	ErrAdapterPGNoTx errcode.Code = "ERR_ADAPTER_PG_NO_TX"

	// ErrAdapterPGMarshal indicates a JSON marshal failure for outbox entry.
	ErrAdapterPGMarshal errcode.Code = "ERR_ADAPTER_PG_MARSHAL"

	// ErrAdapterPGPublish indicates the outbox relay failed to publish an entry.
	ErrAdapterPGPublish errcode.Code = "ERR_ADAPTER_PG_PUBLISH"

	// ErrAdapterPGSchemaMismatch indicates the DB schema version does not match
	// the expected version derived from the embedded migration files.
	ErrAdapterPGSchemaMismatch errcode.Code = "ERR_ADAPTER_PG_SCHEMA_MISMATCH"
)

PostgreSQL adapter error codes.

Variables

This section is empty.

Functions

func CtxWithTx

func CtxWithTx(ctx context.Context, tx pgx.Tx) context.Context

CtxWithTx returns a new context carrying the given pgx.Tx. Downstream code (e.g. OutboxWriter) retrieves it via TxFromContext. Uses persistence.TxCtxKey so cell-local adapters can participate in ambient transactions without importing adapters/postgres.

func ExpectedVersion

func ExpectedVersion(fsys fs.FS) (int64, error)

ExpectedVersion scans the given fs.FS for .sql migration files and returns the maximum numeric prefix found. This represents the expected schema version that must be present in the database for the binary to start safely.

ref: pressly/goose v3.27 Provider — migrations embedded in FS.

func MigrationsFS

func MigrationsFS() fs.FS

MigrationsFS returns an fs.FS rooted at the migrations/ directory so that file entries are directly accessible (e.g. "001_create_outbox_entries.up.sql" rather than "migrations/001_create_outbox_entries.up.sql"). Callers can pass the result directly to NewMigrator.

func TxFromContext

func TxFromContext(ctx context.Context) (pgx.Tx, bool)

TxFromContext extracts a pgx.Tx from the context. The boolean return indicates whether a transaction was present.

func VerifyExpectedVersion

func VerifyExpectedVersion(ctx context.Context, pool *Pool, fsys fs.FS, tableName ...string) error

VerifyExpectedVersion compares the database's current goose schema version against the expected version derived from the embedded migration FS.

tableName is the goose tracking table (pass "" to use the default "schema_migrations"). It must match the table used by NewMigrator.

Returns ErrAdapterPGSchemaMismatch if:

  • actual < expected: DB schema is behind the binary (migrations not run).
  • actual > expected: binary is behind the DB (binary rollback without migration rollback).

Returns nil when actual == expected.

ref: pressly/goose v3.27 Provider.GetDBVersion — GetDBVersion reads max version from the goose version table (schema_migrations by default).

Types

type Config

type Config struct {
	// DSN is the PostgreSQL connection string (e.g.
	// "postgres://user:pass@localhost:5432/dbname?sslmode=disable").
	DSN string

	// MaxConns is the maximum number of connections in the pool.
	// Default (applied by applyDefaults): 10.
	MaxConns int32

	// IdleTimeout is how long an idle connection may remain in the pool.
	// Default (applied by applyDefaults): 5m.
	IdleTimeout time.Duration

	// MaxLifetime is the maximum lifetime of a connection.
	// Default (applied by applyDefaults): 1h.
	MaxLifetime time.Duration
}

Config holds PostgreSQL connection pool settings. All fields must be set explicitly by the caller; there is no global env-reading constructor.

type InvalidIndex

type InvalidIndex struct {
	// Index is the qualified name of the invalid index (e.g. "public.idx_foo").
	Index string
	// Table is the qualified name of the table the index belongs to.
	Table string
}

InvalidIndex describes an index that is marked as invalid in pg_index. Invalid indexes can occur when CREATE INDEX CONCURRENTLY is interrupted.

func DetectInvalidIndexes

func DetectInvalidIndexes(ctx context.Context, pool *Pool) ([]InvalidIndex, error)

DetectInvalidIndexes queries pg_index for any indexes marked as invalid (indisvalid = false) within the current schema. These can occur when CREATE INDEX CONCURRENTLY is interrupted. The caller should log a warning and consider manual cleanup.

The check is scoped to current_schema() so that in-progress CONCURRENTLY builds in other schemas (e.g. parallel test schemas) do not block migrations in unrelated schemas.

Returns an empty slice when no invalid indexes are found.

type MigrationDirection

type MigrationDirection string

MigrationDirection indicates whether a migration is applied or rolled back.

const (
	// MigrationUp applies a migration.
	MigrationUp MigrationDirection = "up"
	// MigrationDown rolls back a migration.
	MigrationDown MigrationDirection = "down"
)

type MigrationStatus

type MigrationStatus struct {
	// Version is the migration prefix (e.g. "001").
	Version string
	// Name is the descriptive part (e.g. "create_outbox_entries").
	Name string
	// Applied indicates whether this migration has been executed.
	Applied bool
	// AppliedAt is when the migration was applied (zero if not applied).
	AppliedAt time.Time
}

MigrationStatus describes the state of a single migration file.

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

Migrator manages SQL database migrations using goose v3 and an embed.FS source. It tracks applied migrations in a configurable table using goose's built-in advisory locking.

func NewMigrator

func NewMigrator(p *Pool, migrations fs.FS, tableName string) (*Migrator, error)

NewMigrator creates a Migrator that reads SQL files from the given fs.FS. Migration files must follow the goose annotated format with -- +goose Up and -- +goose Down sections.

The tableName parameter controls the tracking table name (default: "schema_migrations"). It must be a valid SQL identifier ([a-zA-Z_][a-zA-Z0-9_]*) to prevent SQL injection.

func (*Migrator) Close

func (m *Migrator) Close() error

Close releases the underlying *sql.DB created for goose.

func (*Migrator) Down

func (m *Migrator) Down(ctx context.Context) error

Down rolls back the last applied migration. If no migrations have been applied (version 0), Down is a no-op and returns nil.

func (*Migrator) Status

func (m *Migrator) Status(ctx context.Context) ([]MigrationStatus, error)

Status returns the status of all discovered migrations.

func (*Migrator) Up

func (m *Migrator) Up(ctx context.Context) error

Up applies all unapplied migrations in order. It performs a pre-check for INVALID indexes before advancing the schema version: if any index is found with indisvalid=false, Up returns an error and does not execute any migrations. Manual cleanup is required before re-running.

ref: pressly/goose migration workflow boundary — fail before advancing version, not after; same principle as Atlas lint gate. ref: golang-migrate Source.Read — validate preconditions before applying.

type OutboxWriter

type OutboxWriter struct{}

OutboxWriter writes outbox entries within a PostgreSQL transaction. It relies on TxFromContext to obtain the current transaction, ensuring atomicity with the business state write (same DB transaction).

ref: ThreeDotsLabs/watermill-sql offset_adapter_postgresql.go — transactional outbox insert Adopted: INSERT within caller-provided transaction, JSON metadata serialization. Deviated: explicit fail-fast on missing tx instead of auto-begin.

func NewOutboxWriter

func NewOutboxWriter() *OutboxWriter

NewOutboxWriter creates an OutboxWriter.

func (*OutboxWriter) Write

func (w *OutboxWriter) Write(ctx context.Context, entry outbox.Entry) error

Write inserts an outbox entry into the outbox_entries table using the transaction from the context. Returns ErrAdapterPGNoTx if no transaction is present.

func (*OutboxWriter) WriteBatch

func (w *OutboxWriter) WriteBatch(ctx context.Context, entries []outbox.Entry) error

WriteBatch inserts multiple outbox entries within the caller's transaction. All entries are validated upfront (ID format + Entry.Validate); if any entry is invalid, no entries are written.

For batches exceeding writeBatchChunkSize, entries are split into chunks and each chunk is inserted with a separate multi-row INSERT within the same transaction, preserving all-or-nothing semantics.

An empty entries slice is a no-op and returns nil.

type PGOutboxStore

type PGOutboxStore struct {
	// contains filtered or unexported fields
}

PGOutboxStore implements runtime/outbox.Store over PostgreSQL using pgx.

Each method opens its own short transaction; methods do not compose into a larger transaction. The backing DB handle is typically a *pgxpool.Pool.

Consistency level: L2 (OutboxFact) — adapts the outbox state machine from the relay layer into discrete, testable DB operations.

func NewOutboxStore

func NewOutboxStore(db relayDB) *PGOutboxStore

NewOutboxStore constructs a Store backed by the supplied database handle. The handle is typically a *pgxpool.Pool; it must support short-lived transactions (Begin).

func (*PGOutboxStore) ClaimPending

func (s *PGOutboxStore) ClaimPending(ctx context.Context, batchSize int) ([]outbox.ClaimedEntry, error)

ClaimPending atomically transitions up to batchSize rows from pending to claiming status. Returns empty slice + nil when nothing is claimable.

func (*PGOutboxStore) CleanupDead

func (s *PGOutboxStore) CleanupDead(ctx context.Context, cutoff time.Time, batchSize int) (int, error)

CleanupDead deletes a batch of dead rows older than cutoff. Caller is responsible for looping until deleted < batchSize.

func (*PGOutboxStore) CleanupPublished

func (s *PGOutboxStore) CleanupPublished(ctx context.Context, cutoff time.Time, batchSize int) (int, error)

CleanupPublished deletes a batch of published rows older than cutoff. Caller is responsible for looping until deleted < batchSize.

func (*PGOutboxStore) MarkDead

func (s *PGOutboxStore) MarkDead(ctx context.Context, id string, attempts int, lastError string) (bool, error)

MarkDead transitions a failing entry to dead status. updated=false when entry no longer in claiming.

func (*PGOutboxStore) MarkPublished

func (s *PGOutboxStore) MarkPublished(ctx context.Context, id string) (bool, error)

MarkPublished transitions an entry from claiming to published. updated=false means the entry was reclaimed by ReclaimStale (not an error).

func (*PGOutboxStore) MarkRetry

func (s *PGOutboxStore) MarkRetry(ctx context.Context, id string, attempts int, nextRetryAt time.Time, lastError string) (bool, error)

MarkRetry transitions a failing entry back to pending with the supplied nextRetryAt and attempts count. updated=false when entry no longer in claiming.

func (*PGOutboxStore) OldestEligibleAt

func (s *PGOutboxStore) OldestEligibleAt(ctx context.Context, status string) (time.Time, bool, error)

OldestEligibleAt returns the oldest published_at (status="published") or dead_at (status="dead") in the table. Used by the relay's data-driven cleanup loop to schedule the next wake-up at oldest+retention instead of a fixed timer.

func (*PGOutboxStore) ReclaimStale

func (s *PGOutboxStore) ReclaimStale(ctx context.Context, claimTTL time.Duration, maxAttempts int, baseDelay, maxDelay time.Duration) (int, error)

ReclaimStale transitions claiming rows whose claimed_at is older than claimTTL back to pending (with attempts+1 and next_retry_at = backoff) or to dead (when attempts+1 >= maxAttempts). Returns count of rows recovered.

type PGRefreshStore

type PGRefreshStore struct {
	// contains filtered or unexported fields
}

PGRefreshStore implements refresh.Store over PostgreSQL using pgx.

All time values are sourced from the injected clock (never PG's now()), so that the FakeClock in storetest can drive deterministic behaviour.

Consistency: L1 LocalTx — Rotate is atomic within a single transaction; Issue and Revoke are single-statement writes.

Token values are stored in plaintext. See backlog X11 (REFRESH-HMAC-SPLIT-01) for the planned HMAC-split upgrade that stores only hash(verifier).

ref: dexidp/dex storage/sql/sql.go (pgx-based refresh token CAS pattern) ref: F2 contract C1-C7 from docs/plans/202604191515-auth-federated-whistle.md

func NewRefreshStore

func NewRefreshStore(pool *pgxpool.Pool, policy refresh.Policy, clock refresh.Clock, randReader io.Reader) *PGRefreshStore

NewRefreshStore constructs a PGRefreshStore.

clock must not be nil. policy.MaxAge must be positive. If randReader is nil, crypto/rand.Reader is used.

func (*PGRefreshStore) GC

func (s *PGRefreshStore) GC(ctx context.Context, olderThan time.Time) (int, error)

GC removes tokens whose expires_at < olderThan in batches of gcBatchSize. Returns the total count of rows deleted.

Consistency: L0 LocalOnly — best-effort cleanup, no transactional guarantee.

func (*PGRefreshStore) Issue

func (s *PGRefreshStore) Issue(ctx context.Context, sessionID, subjectID string) (*refresh.Token, error)

Issue creates a new refresh chain for (sessionID, subjectID).

Consistency: L1 LocalTx — single INSERT, no outbox event.

func (*PGRefreshStore) Revoke

func (s *PGRefreshStore) Revoke(ctx context.Context, sessionID string) error

Revoke marks all tokens in the session as revoked.

Idempotent: 0 rows affected is not an error. Consistency: L1 LocalTx — single UPDATE within one statement.

func (*PGRefreshStore) Rotate

func (s *PGRefreshStore) Rotate(ctx context.Context, presentedToken string) (*refresh.Token, error)

Rotate advances the chain one generation using a single transaction.

State machine branches (see refresh.Store interface for full contract):

  1. CAS active — UPDATE the current active token; returns new token.
  2. Active exists but revoked/expired — surface ErrTokenRevoked/ErrTokenExpired.
  3. Obsolete grace retry — presented token is a previous-generation obsolete; within ReuseInterval → return current token idempotently.
  4. Obsolete reuse detection — grace window elapsed → cascade Revoke + ErrTokenReused.
  5. Not found in either index → ErrTokenNotFound.

Consistency: L1 LocalTx — all branches execute within one BEGIN/COMMIT block.

type PGResource

type PGResource struct {
	// contains filtered or unexported fields
}

PGResource wraps a Pool (and an optional relay worker) as a lifecycle.ManagedResource. Bootstrap uses it to:

  • Register the pool health probe in /readyz under the "postgres" name.
  • Start/stop the relay worker through the bootstrap WorkerGroup.
  • Close the pool during LIFO shutdown.

Construct via NewPGResource; do not create the zero value directly.

ref: uber-go/fx lifecycle.go@master:L124-L310 — resource lifecycle managed by Hook registration; GoCell converges this into a single ManagedResource.

func NewPGResource

func NewPGResource(pool *Pool, relay kworker.Worker) (*PGResource, error)

NewPGResource creates a PGResource. pool must be non-nil; passing nil returns ErrValidationFailed because Checkers() and Close() dereference pool at runtime — a silent nil would produce a panic during /readyz probe or shutdown, both of which are the worst times to discover it.

relay may be nil when no relay worker is needed (e.g. in-memory outbox mode). name is always "postgres".

ref: uber-go/fx internal/lifecycle/lifecycle.go Append — resource registration does no nil-substitution; bad inputs surface immediately.

func (*PGResource) Checkers

func (r *PGResource) Checkers() map[string]func(context.Context) error

Checkers returns a single health probe named after r.name that pings the PG pool. The probe accepts a ctx from the /readyz handler so that a deliberate deadline (e.g. WithReadyzDeadline) propagates into the probe; the probe further caps its own wait at 5 s via an inner context.WithTimeout so that a slow PG does not hold the /readyz response indefinitely.

ctx is derived from context.Background() by the readyz handler to avoid kubelet/LB client-ctx cancellation; this probe further bounds at 5s.

ref: cmd/corebundle/main.go:230-241 (pgHealthCheckerOpts) — same rationale, same 5s timeout, now centralised here. ref: Kubernetes readyz — external dependencies contribute named checks.

func (*PGResource) Close

func (r *PGResource) Close(ctx context.Context) error

Close shuts down the pool, bounded by ctx. Delegates to Pool.Close(ctx) so the caller's shutdown budget propagates into pool drain. Uses the poolCloser interface so tests can inject a stub without a real DB.

ref: uber-go/fx app.go StopTimeout — shared shutdown budget via ctx.

func (*PGResource) Worker

func (r *PGResource) Worker() kworker.Worker

Worker returns the relay worker (may be nil).

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool wraps a pgxpool.Pool with health checking and lifecycle management.

func NewPool

func NewPool(ctx context.Context, cfg Config) (*Pool, error)

NewPool creates a new connection pool from the supplied Config. It validates the DSN, applies defaults, and pings the database to confirm connectivity.

func (*Pool) Close

func (p *Pool) Close(ctx context.Context) error

Close gracefully shuts down the connection pool, bounded by ctx.

pgxpool.Pool.Close() performs a synchronous drain with no context parameter. Close wraps it in a goroutine so the caller's shutdown budget is honoured; if ctx expires, the pool's connection resources are abandoned (process-exit cleanup semantics, acceptable under orchestrator-restart SLO).

Close is idempotent: calling it on an already-closed pool is safe.

ref: uber-go/fx app.go StopTimeout — ctx as shared shutdown budget. ref: uber-go/fx lifecycle OnStop(ctx) — ContextCloser pattern.

func (*Pool) DB

func (p *Pool) DB() *pgxpool.Pool

DB returns the underlying pgxpool.Pool for direct access.

func (*Pool) Health

func (p *Pool) Health(ctx context.Context) error

Health performs a ping against the database and returns nil if healthy.

func (*Pool) PoolStats

func (p *Pool) PoolStats() PoolStats

PoolStats returns structured pool statistics suitable for metrics collection and operational dashboards. Returns zero-value PoolStats if the pool is not initialized (defensive guard, consistent with Redis adapter pattern).

func (*Pool) Stats

func (p *Pool) Stats() string

Stats returns pool statistics as a formatted string for diagnostics.

func (*Pool) Statter

func (p *Pool) Statter(name string) poolstats.Statter

Statter returns a poolstats.Statter bound to this pool with the supplied human-readable name (e.g. "postgres-main"). The OTel pool collector uses this adapter-neutral interface to emit gocell.pool.* metrics without knowing each adapter's internal stats type.

ref: jackc/pgx pgxpool/stat.go — TotalConns/IdleConns/AcquiredConns/ MaxConns/EmptyAcquireCount maps cleanly onto poolstats.Snapshot.

type PoolStats

type PoolStats struct {
	AcquireCount            int64         `json:"acquireCount"`
	AcquireDuration         time.Duration `json:"acquireDuration"`
	AcquiredConns           int32         `json:"acquiredConns"`
	CanceledAcquireCount    int64         `json:"canceledAcquireCount"`
	ConstructingConns       int32         `json:"constructingConns"`
	EmptyAcquireCount       int64         `json:"emptyAcquireCount"`
	IdleConns               int32         `json:"idleConns"`
	MaxConns                int32         `json:"maxConns"`
	TotalConns              int32         `json:"totalConns"`
	NewConnsCount           int64         `json:"newConnsCount"`
	MaxLifetimeDestroyCount int64         `json:"maxLifetimeDestroyCount"`
	MaxIdleDestroyCount     int64         `json:"maxIdleDestroyCount"`
}

PoolStats holds structured connection pool statistics.

ref: pgxpool Stat() — adopted same field set for operational dashboards and Prometheus/OTel metric collectors.

type RowScanner

type RowScanner interface {
	// Scan reads column values into dest, analogous to pgx.Row.Scan.
	Scan(dest ...any) error
}

RowScanner abstracts a single-row scan target, satisfied by both pgx.Row and *pgx.Rows. Repository implementations use this to write scan logic that works for both QueryRow and Query result sets.

type TxManager

type TxManager struct {
	// contains filtered or unexported fields
}

TxManager provides transactional execution with context-embedded pgx.Tx, savepoint-based nesting, and automatic panic rollback.

func NewTxManager

func NewTxManager(p *Pool) *TxManager

NewTxManager creates a TxManager backed by the given Pool.

func (*TxManager) RunInTx

func (tm *TxManager) RunInTx(ctx context.Context, fn func(ctx context.Context) error) (retErr error)

RunInTx executes fn inside a database transaction. The pgx.Tx is stored in the context so that downstream code can retrieve it via TxFromContext.

Nesting: if the context already carries a transaction, RunInTx creates a savepoint instead of a new top-level transaction. Savepoints are released on success and rolled back on error or panic.

Panic safety: panics inside fn trigger a rollback (or savepoint rollback) before being re-raised.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL