Documentation
¶
Overview ¶
Package postgres provides a PostgreSQL adapter for the GoCell framework.
It wraps pgx/v5 to offer:
- Pool: connection pool with DSN/env-based configuration and Health() probe.
- TxManager: RunInTx with context-embedded pgx.Tx, savepoint nesting, and automatic panic rollback.
- Migrator: embed.FS-driven SQL migrations with up/down/status and a schema_migrations tracking table.
- RowScanner helper for reducing boilerplate in repository implementations.
For parameterized SQL query construction, see pkg/query.Builder.
Error codes use the ERR_ADAPTER_PG_* prefix (see errcode.go in this package).
ref: Watermill watermill-sql schema_adapter_postgresql.go — adopted advisory locking for migration init; diverged by using embed.FS instead of inline SQL.
Index ¶
- Constants
- func CtxWithTx(ctx context.Context, tx pgx.Tx) context.Context
- func ExpectedVersion(fsys fs.FS) (int64, error)
- func MigrationsFS() fs.FS
- func TxFromContext(ctx context.Context) (pgx.Tx, bool)
- func VerifyExpectedVersion(ctx context.Context, pool *Pool, fsys fs.FS, tableName ...string) error
- type Config
- type InvalidIndex
- type MigrationDirection
- type MigrationStatus
- type Migrator
- type OutboxWriter
- type PGOutboxStore
- func (s *PGOutboxStore) ClaimPending(ctx context.Context, batchSize int) ([]outbox.ClaimedEntry, error)
- func (s *PGOutboxStore) CleanupDead(ctx context.Context, cutoff time.Time, batchSize int) (int, error)
- func (s *PGOutboxStore) CleanupPublished(ctx context.Context, cutoff time.Time, batchSize int) (int, error)
- func (s *PGOutboxStore) MarkDead(ctx context.Context, id string, attempts int, lastError string) (bool, error)
- func (s *PGOutboxStore) MarkPublished(ctx context.Context, id string) (bool, error)
- func (s *PGOutboxStore) MarkRetry(ctx context.Context, id string, attempts int, nextRetryAt time.Time, ...) (bool, error)
- func (s *PGOutboxStore) OldestEligibleAt(ctx context.Context, status string) (time.Time, bool, error)
- func (s *PGOutboxStore) ReclaimStale(ctx context.Context, claimTTL time.Duration, maxAttempts int, ...) (int, error)
- type PGRefreshStore
- func (s *PGRefreshStore) GC(ctx context.Context, olderThan time.Time) (int, error)
- func (s *PGRefreshStore) Issue(ctx context.Context, sessionID, subjectID string) (*refresh.Token, error)
- func (s *PGRefreshStore) Revoke(ctx context.Context, sessionID string) error
- func (s *PGRefreshStore) Rotate(ctx context.Context, presentedToken string) (*refresh.Token, error)
- type PGResource
- type Pool
- type PoolStats
- type RowScanner
- type TxManager
Constants ¶
const ( // ErrAdapterPGConnect indicates a connection or pool initialization failure. ErrAdapterPGConnect errcode.Code = "ERR_ADAPTER_PG_CONNECT" // ErrAdapterPGQuery indicates a query execution failure. ErrAdapterPGQuery errcode.Code = "ERR_ADAPTER_PG_QUERY" // ErrAdapterPGMigrate indicates a migration execution or tracking failure. ErrAdapterPGMigrate errcode.Code = "ERR_ADAPTER_PG_MIGRATE" // ErrAdapterPGNoTx indicates outbox.Writer.Write was called outside a transaction. ErrAdapterPGNoTx errcode.Code = "ERR_ADAPTER_PG_NO_TX" // ErrAdapterPGMarshal indicates a JSON marshal failure for outbox entry. ErrAdapterPGMarshal errcode.Code = "ERR_ADAPTER_PG_MARSHAL" // ErrAdapterPGPublish indicates the outbox relay failed to publish an entry. ErrAdapterPGPublish errcode.Code = "ERR_ADAPTER_PG_PUBLISH" // ErrAdapterPGSchemaMismatch indicates the DB schema version does not match // the expected version derived from the embedded migration files. ErrAdapterPGSchemaMismatch errcode.Code = "ERR_ADAPTER_PG_SCHEMA_MISMATCH" )
PostgreSQL adapter error codes.
Variables ¶
This section is empty.
Functions ¶
func CtxWithTx ¶
CtxWithTx returns a new context carrying the given pgx.Tx. Downstream code (e.g. OutboxWriter) retrieves it via TxFromContext. Uses persistence.TxCtxKey so cell-local adapters can participate in ambient transactions without importing adapters/postgres.
func ExpectedVersion ¶
ExpectedVersion scans the given fs.FS for .sql migration files and returns the maximum numeric prefix found. This represents the expected schema version that must be present in the database for the binary to start safely.
ref: pressly/goose v3.27 Provider — migrations embedded in FS.
func MigrationsFS ¶
MigrationsFS returns an fs.FS rooted at the migrations/ directory so that file entries are directly accessible (e.g. "001_create_outbox_entries.up.sql" rather than "migrations/001_create_outbox_entries.up.sql"). Callers can pass the result directly to NewMigrator.
func TxFromContext ¶
TxFromContext extracts a pgx.Tx from the context. The boolean return indicates whether a transaction was present.
func VerifyExpectedVersion ¶
VerifyExpectedVersion compares the database's current goose schema version against the expected version derived from the embedded migration FS.
tableName is the goose tracking table (pass "" to use the default "schema_migrations"). It must match the table used by NewMigrator.
Returns ErrAdapterPGSchemaMismatch if:
- actual < expected: DB schema is behind the binary (migrations not run).
- actual > expected: binary is behind the DB (binary rollback without migration rollback).
Returns nil when actual == expected.
ref: pressly/goose v3.27 Provider.GetDBVersion — GetDBVersion reads max version from the goose version table (schema_migrations by default).
Types ¶
type Config ¶
type Config struct {
// DSN is the PostgreSQL connection string (e.g.
// "postgres://user:pass@localhost:5432/dbname?sslmode=disable").
DSN string
// MaxConns is the maximum number of connections in the pool.
// Default (applied by applyDefaults): 10.
MaxConns int32
// IdleTimeout is how long an idle connection may remain in the pool.
// Default (applied by applyDefaults): 5m.
IdleTimeout time.Duration
// MaxLifetime is the maximum lifetime of a connection.
// Default (applied by applyDefaults): 1h.
MaxLifetime time.Duration
}
Config holds PostgreSQL connection pool settings. All fields must be set explicitly by the caller; there is no global env-reading constructor.
type InvalidIndex ¶
type InvalidIndex struct {
// Index is the qualified name of the invalid index (e.g. "public.idx_foo").
Index string
// Table is the qualified name of the table the index belongs to.
Table string
}
InvalidIndex describes an index that is marked as invalid in pg_index. Invalid indexes can occur when CREATE INDEX CONCURRENTLY is interrupted.
func DetectInvalidIndexes ¶
func DetectInvalidIndexes(ctx context.Context, pool *Pool) ([]InvalidIndex, error)
DetectInvalidIndexes queries pg_index for any indexes marked as invalid (indisvalid = false) within the current schema. These can occur when CREATE INDEX CONCURRENTLY is interrupted. The caller should log a warning and consider manual cleanup.
The check is scoped to current_schema() so that in-progress CONCURRENTLY builds in other schemas (e.g. parallel test schemas) do not block migrations in unrelated schemas.
Returns an empty slice when no invalid indexes are found.
type MigrationDirection ¶
type MigrationDirection string
MigrationDirection indicates whether a migration is applied or rolled back.
const ( // MigrationUp applies a migration. MigrationUp MigrationDirection = "up" // MigrationDown rolls back a migration. MigrationDown MigrationDirection = "down" )
type MigrationStatus ¶
type MigrationStatus struct {
// Version is the migration prefix (e.g. "001").
Version string
// Name is the descriptive part (e.g. "create_outbox_entries").
Name string
// Applied indicates whether this migration has been executed.
Applied bool
// AppliedAt is when the migration was applied (zero if not applied).
AppliedAt time.Time
}
MigrationStatus describes the state of a single migration file.
type Migrator ¶
type Migrator struct {
// contains filtered or unexported fields
}
Migrator manages SQL database migrations using goose v3 and an embed.FS source. It tracks applied migrations in a configurable table using goose's built-in advisory locking.
func NewMigrator ¶
NewMigrator creates a Migrator that reads SQL files from the given fs.FS. Migration files must follow the goose annotated format with -- +goose Up and -- +goose Down sections.
The tableName parameter controls the tracking table name (default: "schema_migrations"). It must be a valid SQL identifier ([a-zA-Z_][a-zA-Z0-9_]*) to prevent SQL injection.
func (*Migrator) Down ¶
Down rolls back the last applied migration. If no migrations have been applied (version 0), Down is a no-op and returns nil.
func (*Migrator) Status ¶
func (m *Migrator) Status(ctx context.Context) ([]MigrationStatus, error)
Status returns the status of all discovered migrations.
func (*Migrator) Up ¶
Up applies all unapplied migrations in order. It performs a pre-check for INVALID indexes before advancing the schema version: if any index is found with indisvalid=false, Up returns an error and does not execute any migrations. Manual cleanup is required before re-running.
ref: pressly/goose migration workflow boundary — fail before advancing version, not after; same principle as Atlas lint gate. ref: golang-migrate Source.Read — validate preconditions before applying.
type OutboxWriter ¶
type OutboxWriter struct{}
OutboxWriter writes outbox entries within a PostgreSQL transaction. It relies on TxFromContext to obtain the current transaction, ensuring atomicity with the business state write (same DB transaction).
ref: ThreeDotsLabs/watermill-sql offset_adapter_postgresql.go — transactional outbox insert Adopted: INSERT within caller-provided transaction, JSON metadata serialization. Deviated: explicit fail-fast on missing tx instead of auto-begin.
func NewOutboxWriter ¶
func NewOutboxWriter() *OutboxWriter
NewOutboxWriter creates an OutboxWriter.
func (*OutboxWriter) Write ¶
Write inserts an outbox entry into the outbox_entries table using the transaction from the context. Returns ErrAdapterPGNoTx if no transaction is present.
func (*OutboxWriter) WriteBatch ¶
WriteBatch inserts multiple outbox entries within the caller's transaction. All entries are validated upfront (ID format + Entry.Validate); if any entry is invalid, no entries are written.
For batches exceeding writeBatchChunkSize, entries are split into chunks and each chunk is inserted with a separate multi-row INSERT within the same transaction, preserving all-or-nothing semantics.
An empty entries slice is a no-op and returns nil.
type PGOutboxStore ¶
type PGOutboxStore struct {
// contains filtered or unexported fields
}
PGOutboxStore implements runtime/outbox.Store over PostgreSQL using pgx.
Each method opens its own short transaction; methods do not compose into a larger transaction. The backing DB handle is typically a *pgxpool.Pool.
Consistency level: L2 (OutboxFact) — adapts the outbox state machine from the relay layer into discrete, testable DB operations.
func NewOutboxStore ¶
func NewOutboxStore(db relayDB) *PGOutboxStore
NewOutboxStore constructs a Store backed by the supplied database handle. The handle is typically a *pgxpool.Pool; it must support short-lived transactions (Begin).
func (*PGOutboxStore) ClaimPending ¶
func (s *PGOutboxStore) ClaimPending(ctx context.Context, batchSize int) ([]outbox.ClaimedEntry, error)
ClaimPending atomically transitions up to batchSize rows from pending to claiming status. Returns empty slice + nil when nothing is claimable.
func (*PGOutboxStore) CleanupDead ¶
func (s *PGOutboxStore) CleanupDead(ctx context.Context, cutoff time.Time, batchSize int) (int, error)
CleanupDead deletes a batch of dead rows older than cutoff. Caller is responsible for looping until deleted < batchSize.
func (*PGOutboxStore) CleanupPublished ¶
func (s *PGOutboxStore) CleanupPublished(ctx context.Context, cutoff time.Time, batchSize int) (int, error)
CleanupPublished deletes a batch of published rows older than cutoff. Caller is responsible for looping until deleted < batchSize.
func (*PGOutboxStore) MarkDead ¶
func (s *PGOutboxStore) MarkDead(ctx context.Context, id string, attempts int, lastError string) (bool, error)
MarkDead transitions a failing entry to dead status. updated=false when entry no longer in claiming.
func (*PGOutboxStore) MarkPublished ¶
MarkPublished transitions an entry from claiming to published. updated=false means the entry was reclaimed by ReclaimStale (not an error).
func (*PGOutboxStore) MarkRetry ¶
func (s *PGOutboxStore) MarkRetry(ctx context.Context, id string, attempts int, nextRetryAt time.Time, lastError string) (bool, error)
MarkRetry transitions a failing entry back to pending with the supplied nextRetryAt and attempts count. updated=false when entry no longer in claiming.
func (*PGOutboxStore) OldestEligibleAt ¶
func (s *PGOutboxStore) OldestEligibleAt(ctx context.Context, status string) (time.Time, bool, error)
OldestEligibleAt returns the oldest published_at (status="published") or dead_at (status="dead") in the table. Used by the relay's data-driven cleanup loop to schedule the next wake-up at oldest+retention instead of a fixed timer.
func (*PGOutboxStore) ReclaimStale ¶
func (s *PGOutboxStore) ReclaimStale(ctx context.Context, claimTTL time.Duration, maxAttempts int, baseDelay, maxDelay time.Duration) (int, error)
ReclaimStale transitions claiming rows whose claimed_at is older than claimTTL back to pending (with attempts+1 and next_retry_at = backoff) or to dead (when attempts+1 >= maxAttempts). Returns count of rows recovered.
type PGRefreshStore ¶
type PGRefreshStore struct {
// contains filtered or unexported fields
}
PGRefreshStore implements refresh.Store over PostgreSQL using pgx.
All time values are sourced from the injected clock (never PG's now()), so that the FakeClock in storetest can drive deterministic behaviour.
Consistency: L1 LocalTx — Rotate is atomic within a single transaction; Issue and Revoke are single-statement writes.
Token values are stored in plaintext. See backlog X11 (REFRESH-HMAC-SPLIT-01) for the planned HMAC-split upgrade that stores only hash(verifier).
ref: dexidp/dex storage/sql/sql.go (pgx-based refresh token CAS pattern) ref: F2 contract C1-C7 from docs/plans/202604191515-auth-federated-whistle.md
func NewRefreshStore ¶
func NewRefreshStore(pool *pgxpool.Pool, policy refresh.Policy, clock refresh.Clock, randReader io.Reader) *PGRefreshStore
NewRefreshStore constructs a PGRefreshStore.
clock must not be nil. policy.MaxAge must be positive. If randReader is nil, crypto/rand.Reader is used.
func (*PGRefreshStore) GC ¶
GC removes tokens whose expires_at < olderThan in batches of gcBatchSize. Returns the total count of rows deleted.
Consistency: L0 LocalOnly — best-effort cleanup, no transactional guarantee.
func (*PGRefreshStore) Issue ¶
func (s *PGRefreshStore) Issue(ctx context.Context, sessionID, subjectID string) (*refresh.Token, error)
Issue creates a new refresh chain for (sessionID, subjectID).
Consistency: L1 LocalTx — single INSERT, no outbox event.
func (*PGRefreshStore) Revoke ¶
func (s *PGRefreshStore) Revoke(ctx context.Context, sessionID string) error
Revoke marks all tokens in the session as revoked.
Idempotent: 0 rows affected is not an error. Consistency: L1 LocalTx — single UPDATE within one statement.
func (*PGRefreshStore) Rotate ¶
Rotate advances the chain one generation using a single transaction.
State machine branches (see refresh.Store interface for full contract):
- CAS active — UPDATE the current active token; returns new token.
- Active exists but revoked/expired — surface ErrTokenRevoked/ErrTokenExpired.
- Obsolete grace retry — presented token is a previous-generation obsolete; within ReuseInterval → return current token idempotently.
- Obsolete reuse detection — grace window elapsed → cascade Revoke + ErrTokenReused.
- Not found in either index → ErrTokenNotFound.
Consistency: L1 LocalTx — all branches execute within one BEGIN/COMMIT block.
type PGResource ¶
type PGResource struct {
// contains filtered or unexported fields
}
PGResource wraps a Pool (and an optional relay worker) as a lifecycle.ManagedResource. Bootstrap uses it to:
- Register the pool health probe in /readyz under the "postgres" name.
- Start/stop the relay worker through the bootstrap WorkerGroup.
- Close the pool during LIFO shutdown.
Construct via NewPGResource; do not create the zero value directly.
ref: uber-go/fx lifecycle.go@master:L124-L310 — resource lifecycle managed by Hook registration; GoCell converges this into a single ManagedResource.
func NewPGResource ¶
func NewPGResource(pool *Pool, relay kworker.Worker) (*PGResource, error)
NewPGResource creates a PGResource. pool must be non-nil; passing nil returns ErrValidationFailed because Checkers() and Close() dereference pool at runtime — a silent nil would produce a panic during /readyz probe or shutdown, both of which are the worst times to discover it.
relay may be nil when no relay worker is needed (e.g. in-memory outbox mode). name is always "postgres".
ref: uber-go/fx internal/lifecycle/lifecycle.go Append — resource registration does no nil-substitution; bad inputs surface immediately.
func (*PGResource) Checkers ¶
func (r *PGResource) Checkers() map[string]func(context.Context) error
Checkers returns a single health probe named after r.name that pings the PG pool. The probe accepts a ctx from the /readyz handler so that a deliberate deadline (e.g. WithReadyzDeadline) propagates into the probe; the probe further caps its own wait at 5 s via an inner context.WithTimeout so that a slow PG does not hold the /readyz response indefinitely.
ctx is derived from context.Background() by the readyz handler to avoid kubelet/LB client-ctx cancellation; this probe further bounds at 5s.
ref: cmd/corebundle/main.go:230-241 (pgHealthCheckerOpts) — same rationale, same 5s timeout, now centralised here. ref: Kubernetes readyz — external dependencies contribute named checks.
func (*PGResource) Close ¶
func (r *PGResource) Close(ctx context.Context) error
Close shuts down the pool, bounded by ctx. Delegates to Pool.Close(ctx) so the caller's shutdown budget propagates into pool drain. Uses the poolCloser interface so tests can inject a stub without a real DB.
ref: uber-go/fx app.go StopTimeout — shared shutdown budget via ctx.
func (*PGResource) Worker ¶
func (r *PGResource) Worker() kworker.Worker
Worker returns the relay worker (may be nil).
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool wraps a pgxpool.Pool with health checking and lifecycle management.
func NewPool ¶
NewPool creates a new connection pool from the supplied Config. It validates the DSN, applies defaults, and pings the database to confirm connectivity.
func (*Pool) Close ¶
Close gracefully shuts down the connection pool, bounded by ctx.
pgxpool.Pool.Close() performs a synchronous drain with no context parameter. Close wraps it in a goroutine so the caller's shutdown budget is honoured; if ctx expires, the pool's connection resources are abandoned (process-exit cleanup semantics, acceptable under orchestrator-restart SLO).
Close is idempotent: calling it on an already-closed pool is safe.
ref: uber-go/fx app.go StopTimeout — ctx as shared shutdown budget. ref: uber-go/fx lifecycle OnStop(ctx) — ContextCloser pattern.
func (*Pool) PoolStats ¶
PoolStats returns structured pool statistics suitable for metrics collection and operational dashboards. Returns zero-value PoolStats if the pool is not initialized (defensive guard, consistent with Redis adapter pattern).
func (*Pool) Statter ¶
Statter returns a poolstats.Statter bound to this pool with the supplied human-readable name (e.g. "postgres-main"). The OTel pool collector uses this adapter-neutral interface to emit gocell.pool.* metrics without knowing each adapter's internal stats type.
ref: jackc/pgx pgxpool/stat.go — TotalConns/IdleConns/AcquiredConns/ MaxConns/EmptyAcquireCount maps cleanly onto poolstats.Snapshot.
type PoolStats ¶
type PoolStats struct {
AcquireCount int64 `json:"acquireCount"`
AcquireDuration time.Duration `json:"acquireDuration"`
AcquiredConns int32 `json:"acquiredConns"`
CanceledAcquireCount int64 `json:"canceledAcquireCount"`
ConstructingConns int32 `json:"constructingConns"`
EmptyAcquireCount int64 `json:"emptyAcquireCount"`
IdleConns int32 `json:"idleConns"`
MaxConns int32 `json:"maxConns"`
TotalConns int32 `json:"totalConns"`
NewConnsCount int64 `json:"newConnsCount"`
MaxLifetimeDestroyCount int64 `json:"maxLifetimeDestroyCount"`
MaxIdleDestroyCount int64 `json:"maxIdleDestroyCount"`
}
PoolStats holds structured connection pool statistics.
ref: pgxpool Stat() — adopted same field set for operational dashboards and Prometheus/OTel metric collectors.
type RowScanner ¶
type RowScanner interface {
// Scan reads column values into dest, analogous to pgx.Row.Scan.
Scan(dest ...any) error
}
RowScanner abstracts a single-row scan target, satisfied by both pgx.Row and *pgx.Rows. Repository implementations use this to write scan logic that works for both QueryRow and Query result sets.
type TxManager ¶
type TxManager struct {
// contains filtered or unexported fields
}
TxManager provides transactional execution with context-embedded pgx.Tx, savepoint-based nesting, and automatic panic rollback.
func NewTxManager ¶
NewTxManager creates a TxManager backed by the given Pool.
func (*TxManager) RunInTx ¶
func (tm *TxManager) RunInTx(ctx context.Context, fn func(ctx context.Context) error) (retErr error)
RunInTx executes fn inside a database transaction. The pgx.Tx is stored in the context so that downstream code can retrieve it via TxFromContext.
Nesting: if the context already carries a transaction, RunInTx creates a savepoint instead of a new top-level transaction. Savepoints are released on success and rolled back on error or panic.
Panic safety: panics inside fn trigger a rollback (or savepoint rollback) before being re-raised.