Documentation
¶
Overview ¶
Package postgres provides a PostgreSQL adapter for the GoCell framework.
It wraps pgx/v5 to offer:
- Pool: connection pool with DSN/env-based configuration and Health() probe.
- TxManager: RunInTx with context-embedded pgx.Tx, savepoint nesting, and automatic panic rollback.
- Migrator: embed.FS-driven SQL migrations with up/down/status and a schema_migrations tracking table.
- RowScanner helper for reducing boilerplate in repository implementations.
For parameterized PostgreSQL query construction, see pkg/pgquery.Builder. For generic PG SQLSTATE wire-error classification, see pkg/pgquery.IsUniqueViolation and IsForeignKeyViolation. The last-admin trigger sentinel classification (effective_admin_invariant_fn, migration 024) is accesscore-internal: corecells/accesscore/internal/adapters/postgres.isLastAdminProtected.
Error codes use the ERR_ADAPTER_PG_* prefix (see errcode.go in this package).
ref: Watermill watermill-sql schema_adapter_postgresql.go — adopted advisory locking for migration init; diverged by using embed.FS instead of inline SQL.
Index ¶
- Constants
- Variables
- func CtxWithTx(ctx context.Context, tx pgx.Tx) context.Context
- func ExpectedVersion(fsys fs.FS) (int64, error)
- func InvalidIndexCheck(ctx context.Context, pool *Pool) error
- func MigrationsFS() (fs.FS, error)
- func NewJournalingOutboxWriter(inner *OutboxWriter, projectionTopics []string) outbox.Writer
- func VerifyExpectedShape(ctx context.Context, pool *Pool) error
- func VerifyExpectedVersion(ctx context.Context, pool *Pool, fsys fs.FS, ns migration.Namespace) error
- func VerifyNoInvalidIndexes(ctx context.Context, pool *Pool) error
- type AuditCrossTenantStore
- type Config
- type DestructiveDownPermit
- type ForwardRebuildPermit
- type InvalidIndex
- type LedgerStore
- func (s *LedgerStore) Append(ctx context.Context, e *ledger.Entry) error
- func (s *LedgerStore) GetBySeq(ctx context.Context, vis tenant.RowVisibility, seq int64) (*ledger.Entry, error)
- func (s *LedgerStore) Protocol() *ledger.Protocol
- func (s *LedgerStore) Query(ctx context.Context, t tenant.TenantID, vis tenant.RowVisibility, ...) ([]*ledger.Entry, error)
- func (s *LedgerStore) RepoReady(ctx context.Context) error
- func (s *LedgerStore) Tail(ctx context.Context) (ledger.TailSnapshot, error)
- func (s *LedgerStore) Verify(ctx context.Context, fromSeq, toSeq int64) (valid bool, firstInvalidSeq int64, err error)
- type MigrationDirection
- type MigrationSet
- type MigrationStatus
- type Migrator
- func (m *Migrator) Close() error
- func (m *Migrator) Down(ctx context.Context, permit DestructiveDownPermit) error
- func (m *Migrator) ForwardRebuild(ctx context.Context, permits ...ForwardRebuildPermit) error
- func (m *Migrator) Status(ctx context.Context) ([]MigrationStatus, error)
- func (m *Migrator) Up(ctx context.Context) error
- type OutboxWriter
- type PGCommandQueue
- func (q *PGCommandQueue) Ack(ctx context.Context, commandID string, reason command.AckReason, now time.Time) error
- func (q *PGCommandQueue) Cancel(ctx context.Context, commandID string, now time.Time) error
- func (q *PGCommandQueue) Dequeue(ctx context.Context, deviceID string, n int, leaseDuration time.Duration) ([]command.Entry, error)
- func (q *PGCommandQueue) Enqueue(ctx context.Context, entry command.Entry, opts command.EnqueueOptions) error
- func (q *PGCommandQueue) ExtendLease(ctx context.Context, commandID string, extension time.Duration, now time.Time) error
- func (q *PGCommandQueue) GetCommand(ctx context.Context, id string) (*command.Entry, error)
- func (q *PGCommandQueue) RepoReady(ctx context.Context) error
- func (q *PGCommandQueue) Report(ctx context.Context, commandID string, now time.Time) error
- func (q *PGCommandQueue) ScanActive(ctx context.Context, filter command.ScanFilter) ([]command.Entry, error)
- type PGOutboxStore
- func (s *PGOutboxStore) ClaimPending(ctx context.Context, batchSize int) ([]outbox.ClaimedEntry, error)
- func (s *PGOutboxStore) CleanupDead(ctx context.Context, cutoff time.Time, batchSize int) (int, error)
- func (s *PGOutboxStore) CleanupPublished(ctx context.Context, cutoff time.Time, batchSize int) (int, error)
- func (s *PGOutboxStore) CountPending(ctx context.Context) (int64, error)
- func (s *PGOutboxStore) MarkDead(ctx context.Context, id, leaseID string, attempts int, lastError string) (bool, error)
- func (s *PGOutboxStore) MarkPublished(ctx context.Context, id, leaseID string) (bool, error)
- func (s *PGOutboxStore) MarkRetry(ctx context.Context, id, leaseID string, attempts int, nextRetryAt time.Time, ...) (bool, error)
- func (s *PGOutboxStore) OldestEligibleAt(ctx context.Context, status kout.State) (time.Time, bool, error)
- func (s *PGOutboxStore) ReclaimStale(ctx context.Context, claimTTL time.Duration, maxAttempts int, ...) (int, error)
- type PGProjectionEventSource
- func (s *PGProjectionEventSource) Head(ctx context.Context) (int64, error)
- func (s *PGProjectionEventSource) Position(entry projection.ProjectionEvent) (int64, error)
- func (s *PGProjectionEventSource) Replay(ctx context.Context, fromOffset int64, ...) error
- func (s *PGProjectionEventSource) RepoReady(ctx context.Context) error
- func (s *PGProjectionEventSource) ResolveCarrier(ctx context.Context, entry projection.ProjectionEvent) (projection.ProjectionEvent, error)
- type PGRefreshStore
- func (s *PGRefreshStore) GC(ctx context.Context, olderThan time.Time) (int, error)
- func (s *PGRefreshStore) Issue(ctx context.Context, sessionID, subjectID string, authzEpochAtIssue int64) (string, *refresh.Token, error)
- func (s *PGRefreshStore) Peek(ctx context.Context, presented string) (*refresh.Token, error)
- func (s *PGRefreshStore) RevokeSession(ctx context.Context, sessionID string) error
- func (s *PGRefreshStore) RevokeSessionDetached(ctx context.Context, sessionID string) error
- func (s *PGRefreshStore) RevokeUser(ctx context.Context, subjectID string, tok credentialfence.FenceToken) error
- func (s *PGRefreshStore) Rotate(ctx context.Context, presented string) (string, *refresh.Token, error)
- type PGSessionStore
- func (s *PGSessionStore) Close(_ context.Context) error
- func (s *PGSessionStore) Create(ctx context.Context, t tenant.TenantID, sess *session.Session) error
- func (s *PGSessionStore) Get(ctx context.Context, id string) (*session.ValidateView, error)
- func (s *PGSessionStore) Probes() []healthz.Probe
- func (s *PGSessionStore) RepoReady(ctx context.Context) error
- func (s *PGSessionStore) Revoke(ctx context.Context, id string) error
- func (s *PGSessionStore) RevokeForSubject(ctx context.Context, subjectID string, event session.CredentialEvent, ...) error
- func (s *PGSessionStore) Worker() worker.Worker
- type Pool
- func (p *Pool) AppRoleRestrictedCheck(ctx context.Context) error
- func (p *Pool) AuditAdminReadyCheck(ctx context.Context) error
- func (p *Pool) Close(ctx context.Context) error
- func (p *Pool) DB() *pgxpool.Pool
- func (p *Pool) Health(ctx context.Context) error
- func (p *Pool) PoolStats() PoolStats
- func (p *Pool) Probes() []healthz.Probe
- func (p *Pool) Stats() string
- func (p *Pool) Statter(name string) poolstats.Statter
- func (p *Pool) Worker() kworker.Worker
- type PoolStats
- type ProjectionCheckpointStore
- func (s *ProjectionCheckpointStore) AdvanceIfOwner(ctx context.Context, cellID, projectionID, ownerToken string, offset int64) error
- func (s *ProjectionCheckpointStore) LoadOffset(ctx context.Context, cellID, projectionID string) (int64, error)
- func (s *ProjectionCheckpointStore) SaveOffset(ctx context.Context, cellID, projectionID string, offset int64) error
- type ReconcileElector
- type RowScanner
- type TxManager
- type WebhookSourceRepository
Constants ¶
const ( // ErrAdapterPGConnect indicates a connection or pool initialization failure. ErrAdapterPGConnect errcode.Code = "ERR_ADAPTER_PG_CONNECT" // ErrAdapterPGConnectTimeout indicates a connection attempt exceeded the // adapter-level ConnectTimeout budget (default 5s). Distinct from // ErrAdapterPGConnect so operators can route timeout-vs-refusal differently; // always routed via errcode.WrapInfra → IsTransient(err) == true. ErrAdapterPGConnectTimeout errcode.Code = "ERR_ADAPTER_PG_CONNECT_TIMEOUT" // ErrAdapterPGQuery indicates a query execution failure. ErrAdapterPGQuery errcode.Code = "ERR_ADAPTER_PG_QUERY" // ErrAdapterPGMigrate indicates a migration execution or tracking failure. ErrAdapterPGMigrate errcode.Code = "ERR_ADAPTER_PG_MIGRATE" // ErrAdapterPGNoTx indicates outbox.Writer.Write was called outside a transaction. ErrAdapterPGNoTx errcode.Code = "ERR_ADAPTER_PG_NO_TX" // ErrAdapterPGMarshal indicates a JSON marshal failure for outbox entry. ErrAdapterPGMarshal errcode.Code = "ERR_ADAPTER_PG_MARSHAL" // ErrAdapterPGPublish indicates the outbox relay failed to publish an entry. ErrAdapterPGPublish errcode.Code = "ERR_ADAPTER_PG_PUBLISH" // ErrAdapterPGSchemaMismatch indicates the DB schema version does not match // the expected version derived from the embedded migration files. ErrAdapterPGSchemaMismatch errcode.Code = "ERR_ADAPTER_PG_SCHEMA_MISMATCH" // ErrAdapterPGSchemaShape indicates the DB schema's column / table shape // does not match the expected shape after migration. Distinct from // ErrAdapterPGSchemaMismatch (version-level) so operators can route // "binary expected sessions.jti but DB still has sessions.access_token" // (partial migration) separately from "binary is at version N+1 vs DB at N". ErrAdapterPGSchemaShape errcode.Code = "ERR_ADAPTER_PG_SCHEMA_SHAPE" // ErrAdapterPGInvalidIndex signals one or more `pg_index.indisvalid = false` // indexes detected at startup. Replaces the prior warn-continue behavior // (B2-X-03) — invalid indexes typically indicate an aborted CREATE INDEX // CONCURRENTLY and must be DROPped manually before the binary may proceed. ErrAdapterPGInvalidIndex errcode.Code = "ERR_ADAPTER_PG_INVALID_INDEX" // ErrAdapterPGRoleBypassRLS signals that the serving connection's current_user // is a superuser or carries BYPASSRLS — either bypasses ROW LEVEL SECURITY, // making the FORCE RLS tenant_isolation policies (migrations 052/053) a runtime // no-op. Surfaced by the postgres_app_role_restricted_ready precondition probe // (#1676 [F-B11]); a serving pool that trips this must NOT be considered ready. //nolint:gosec // G101 false positive: errcode sentinel naming the PG BYPASSRLS role attribute, not a credential ErrAdapterPGRoleBypassRLS errcode.Code = "ERR_ADAPTER_PG_ROLE_BYPASS_RLS" // ErrAdapterPGAuditAdminSelectCheck signals that the audit admin pool's // current_user lacks SELECT privilege on audit_entries, or that the privilege // check itself failed. Surfaced by Pool.AuditAdminReadyCheck (#1810 F4): the // gocell_audit_admin role must be able to SELECT on audit_entries (via its // role-scoped permissive RLS policy, migration 065) — a missing GRANT would // cause the admin pool to pass the role-attribute probe yet fail every // cross-tenant read request at runtime. ErrAdapterPGAuditAdminSelectCheck errcode.Code = "ERR_ADAPTER_PG_AUDIT_ADMIN_SELECT_CHECK" )
PostgreSQL adapter error codes.
const ( // ProbeReady probes basic pool liveness (Ping). ProbeReady healthz.ProbeName = "postgres_ready" // ProbeIndexesValidReady probes that all expected indexes are valid. ProbeIndexesValidReady healthz.ProbeName = "postgres_indexes_valid_ready" // ProbeAppRoleRestrictedReady probes that the serving connection's current_user // is neither a superuser nor BYPASSRLS, so the FORCE ROW LEVEL SECURITY // tenant_isolation policies (migrations 052/053) are actually enforced at // runtime (#1676 [F-B11]). Registered only when Config.RequireRestrictedRole is // set — a serving pool; admin/utility pools (e.g. tools/pg-migrate) leave it off. ProbeAppRoleRestrictedReady healthz.ProbeName = "postgres_app_role_restricted_ready" // ProbeAuditAdminRestrictedReady probes that the OPTIONAL cross-tenant audit // admin pool's current_user is neither a superuser nor BYPASSRLS (#1810). The // gocell_audit_admin role reads every tenant via a role-scoped permissive RLS // SELECT policy (migration 065), NOT via BYPASSRLS — so it must stay NOBYPASSRLS // and minimally privileged. This distinctly-named probe (vs the serving pool's // postgres_app_role_restricted_ready) asserts that at runtime AND makes the // optional admin pool's liveness visible on /readyz; it reuses // Pool.AppRoleRestrictedCheck. Registered by the auditcore module's admin-pool // ManagedResource only when GOCELL_AUDIT_ADMIN_DSN is provisioned. ProbeAuditAdminRestrictedReady healthz.ProbeName = "postgres_audit_admin_restricted_ready" )
Pool readiness-probe names. healthz.ProbeName-typed consts funneled by archtest PROBENAME-SEALED-FUNNEL-01 (snake_case + _ready).
const ForwardRebuildAnnotationPattern = `(?m)^\s*--\s*\+gocell\s+forward-rebuild\s+target=([a-zA-Z_][a-zA-Z0-9_]*)\s*$`
ForwardRebuildAnnotationPattern is the single-source regex shared by the runtime permit gate (forwardRebuildTargets) and the archtest MIGRATION-FORWARD-REBUILD-ANNOTATION-01 (tools/archtest/pg_schema_guard_invariants_test.go). Exporting it as one const keeps the Go gate and the CI archtest from drifting.
Each annotation line names exactly ONE table. A migration that rebuilds multiple tables uses multiple annotation lines — one per table. The Go phase0 gate scans all matching lines with FindAllSubmatch and gates each target independently.
WARNING: changing this pattern is a wire-format change to the migration annotation. Every existing `-- +gocell forward-rebuild target=…` line in adapters/postgres/migrations/*.sql must be updated to match, or pending rebuilds silently stop being gated.
const MaxMetadataBytes = 64 << 10
MaxMetadataBytes caps the JSON-encoded size of an outbox entry's metadata column. A bug or malicious producer could otherwise stuff multi-MB JSON into the column, amplifying relay memory pressure and PG replication delay. 64 KiB is comfortably above any legitimate envelope use (request id, correlation id, trace context, a handful of business labels) while preventing a single entry from dominating a relay batch.
const ProbeProjectionJournalReady healthz.ProbeName = "projection_journal_ready"
ProbeProjectionJournalReady is the typed readyz probe name for the durable projection_events journal (PROBENAME-SEALED-FUNNEL-01: adapter probe names are declared typed consts ending in _ready; bare strings never reach RegisterReadiness). The composition root registers it against PGProjectionEventSource.RepoReady when the durable source is wired (EPIC #1504 PR-03); PR-01 ships the capability + conformance.
Variables ¶
var PlatformTrackingTable = trackingTableFor(migration.PlatformNamespace)
PlatformTrackingTable is the tracking table for the platform's own embedded migrations: schema_migrations_platform. Platform is not special — it is the reserved migration.PlatformNamespace flowing through the same derivation as every external namespace (full symmetry, #1089 decision #3).
Functions ¶
func CtxWithTx ¶
CtxWithTx returns a new context carrying the given pgx.Tx. Downstream code (e.g. OutboxWriter) retrieves it via persistence.TxFromContext[pgx.Tx]. Uses persistence.TxCtxKey so cell-local adapters can participate in ambient transactions without importing adapters/postgres.
func ExpectedVersion ¶
ExpectedVersion scans the given fs.FS for .sql migration files and returns the maximum numeric prefix found. This represents the expected schema version that must be present in the database for the binary to start safely.
ref: pressly/goose v3.27 Provider — migrations embedded in FS.
func InvalidIndexCheck ¶
InvalidIndexCheck wraps DetectInvalidIndexes for use as a readyz probe (func(context.Context) error signature). Returns:
- nil when no invalid indexes exist
- the underlying query error (KindInternal) when DetectInvalidIndexes fails — this is a real fault (connection, SQL error) and maps to "unhealthy"
- an errcode error when indisvalid=false rows are present. Invalid indexes are a schema fault, so runtime/http/health.runOneProbe classifies this as "unhealthy" and /readyz returns HTTP 503. Operators see the invalid-index list in /readyz?verbose diagnostics and DROP the index manually.
ref: kubernetes/kubernetes pkg/util/healthz — named health checkers return error.
func MigrationsFS ¶
MigrationsFS returns an fs.FS rooted at the migrations/ directory so that file entries are directly accessible (e.g. "001_create_outbox_entries.up.sql" rather than "migrations/001_create_outbox_entries.up.sql"). Callers must handle the returned error before passing the filesystem to NewMigrator.
func NewJournalingOutboxWriter ¶
func NewJournalingOutboxWriter(inner *OutboxWriter, projectionTopics []string) outbox.Writer
NewJournalingOutboxWriter wraps a base OutboxWriter so projection-source events are journaled in the producer's transaction. projectionTopics is the set of routing topics (== projection contract ids) to journal; a nil or empty set means the decorator forwards writes unchanged (journals nothing). Exported because the composition root (a different package) wires it; construction is locked to the capability provider funnel (CAPABILITY-PROVIDER-FUNNEL-01).
inner is a strong dependency: a nil inner is a composition-root programmer error (NewOutboxWriter never returns nil), so it fail-fasts at construction rather than deferring to a nil-deref on the first Write (Option 范式 fail-fast; mirrors clock.MustHaveClock).
func VerifyExpectedShape ¶
VerifyExpectedShape checks that the post-migration column / table shape matches the binary's expectation across 9 structural dimensions: columns (type + nullability), primary keys, unique indexes, foreign keys (with ON DELETE action), non-unique indexes, triggers (enabled state + function), trigger functions, and CHECK constraints.
Run **after** VerifyExpectedVersion (which gates migration version) — VerifyExpectedShape catches "version table says N but my migration's DDL never reached the column" drift, e.g. partial migration that did not abort, or a 3rd-party tool applying SQL out-of-band.
ADR-credential §5.1.3 deployment playbook mandates these checks for the S3+S5 schema. Each fault returns ErrAdapterPGSchemaShape so operators see the precise dimension at fault.
func VerifyExpectedVersion ¶
func VerifyExpectedVersion(ctx context.Context, pool *Pool, fsys fs.FS, ns migration.Namespace) error
VerifyExpectedVersion compares the database's current goose schema version against the expected version derived from the embedded migration FS for the given namespace.
ns identifies the migration lineage; the goose tracking table is derived as schema_migrations_<namespace> (see trackingTableFor) and must match the table NewMigrator(_, _, ns) writes. Taking a typed migration.Namespace rather than a free table string keeps the read and write sides on the same derivation.
Returns ErrAdapterPGSchemaMismatch if:
- actual < expected: DB schema is behind the binary (migrations not run).
- actual > expected: binary is behind the DB (binary rollback without migration rollback).
Returns nil when actual == expected.
ref: pressly/goose v3.27 Provider.GetDBVersion — GetDBVersion reads max version from the goose version table.
func VerifyNoInvalidIndexes ¶
VerifyNoInvalidIndexes is the fail-fast counterpart of DetectInvalidIndexes for the cmd/corebundle startup path. It returns ErrAdapterPGInvalidIndex when any pg_index row has indisvalid=false, replacing the prior warn-continue defense (B2-X-03). Operators must DROP the invalid index manually before the binary will start.
Use DetectInvalidIndexes when you need the index list for diagnostics (e.g. /readyz?verbose response). Use VerifyNoInvalidIndexes when you want startup to abort.
Types ¶
type AuditCrossTenantStore ¶
type AuditCrossTenantStore struct {
// contains filtered or unexported fields
}
AuditCrossTenantStore is the PostgreSQL implementation of ledger.CrossTenantQueryStore for super-admin cross-tenant audit reads (#1810). It is backed by a DEDICATED admin connection pool (the gocell_audit_admin role), which carries a permissive RLS SELECT policy USING(true) so it can read every tenant's rows. FORCE RLS stays ON for the serving role — the cross-tenant visibility comes from an explicit pg_policy row, not from bypassing RLS.
Unlike LedgerStore, AuditCrossTenantStore:
- issues NO namespace predicate (returns rows from BOTH the relay and bootstrap namespace chains in one scan)
- issues NO tenant predicate (all tenants are visible via the admin-role policy)
- is READ-ONLY: no Append, no RunInTx, no clock
- does NOT honor the ordinary serving Store.Query path
The dedicated admin pool must be configured with the gocell_audit_admin role. Composition-root wiring is responsible for provisioning that pool; if the pool is not configured, callers remain fail-closed at HTTP 501 (RowScopeAllUnsupportedError) exactly as before #1810.
func NewAuditCrossTenantStore ¶
func NewAuditCrossTenantStore(pool *pgxpool.Pool) (*AuditCrossTenantStore, error)
NewAuditCrossTenantStore constructs an AuditCrossTenantStore backed by the supplied admin pool. pool must not be nil. The pool must be configured with the gocell_audit_admin role (or equivalent permissive RLS SELECT policy) so QueryCrossTenant can read across all tenants without bypassing FORCE RLS.
func (*AuditCrossTenantStore) QueryCrossTenant ¶
func (s *AuditCrossTenantStore) QueryCrossTenant( ctx context.Context, ctv tenant.CrossTenantVisibility, filters ledger.AuditFilters, params query.ListParams, ) ([]*ledger.Entry, error)
QueryCrossTenant lists audit entries across ALL tenants and BOTH namespace chains matching AuditFilters, using keyset cursor pagination. params.Sort must be non-empty (callers pass ledger.QuerySort). Returns up to params.FetchLimit() rows for N+1 hasMore detection. Returns an empty (non-nil) slice when no entries match.
ctv carries the sealed RowScopeAll obligation. This method re-validates it fail-closed (ctv.Validate) before reading — the data-layer PEP (F2): the typed param makes "forge/forget the grant" a compile error, and this runtime check rejects Go's constructable zero value, so a zero/invalid obligation can never produce a cross-tenant read (mirrors LedgerStore.Query's defensive Validate). The owner dimension is unrestricted (RowScopeAll), so AuditFilters is the only narrowing applied.
NO SET LOCAL, NO RunInTx, NO Protocol — this is a pure read-only path.
type Config ¶
type Config struct {
// DSN is the PostgreSQL connection string (e.g.
// "postgres://user:pass@localhost:5432/dbname?sslmode=disable").
DSN string
// MaxConns is the maximum number of connections in the pool.
// Default (applied by applyDefaults): 10.
MaxConns int32
// IdleTimeout is how long an idle connection may remain in the pool.
// Default (applied by applyDefaults): 5m.
IdleTimeout time.Duration
// MaxLifetime is the maximum lifetime of a connection.
// Default (applied by applyDefaults): 1h.
MaxLifetime time.Duration
// ConnectTimeout bounds a single connection-establishment attempt
// (TCP+TLS+startup) at the pgconn layer. It applies to the pool's initial
// connect AND every subsequent on-demand connection top-up at runtime —
// without it, pgxpool falls back to its 2 min internal default and a TCP
// SYN-blackhole route can hang any acquire for that long.
//
// The total budget for NewPool itself (parse + initial dial + Ping) is
// the caller's ctx; this field only governs per-connection handshakes.
//
// Precedence: when set > 0, the value is written to
// pgconn.Config.ConnectTimeout unconditionally, overriding any
// connect_timeout=N in the DSN. Default (applied by applyDefaults): 5s.
ConnectTimeout time.Duration
// RequireRestrictedRole declares this pool as a SERVING pool for
// RLS-protected tables: when true, Probes() additionally exposes
// ProbeAppRoleRestrictedReady, which fails /readyz if the connection's
// current_user is a superuser or carries BYPASSRLS (either bypasses the
// FORCE ROW LEVEL SECURITY tenant_isolation policies, making them a runtime
// no-op — #1676 [F-B11]). Leave false for admin/migration pools (e.g.
// tools/pg-migrate) and utility pools, which legitimately connect as an
// owner/superuser. This is a present capability axis (serving vs admin), not
// a backward-compat toggle: corebundle's serving pool sets it true because
// its schema (migrations 052/053) always carries RLS.
RequireRestrictedRole bool
}
Config holds PostgreSQL connection pool settings. All fields must be set explicitly by the caller; there is no global env-reading constructor.
type DestructiveDownPermit ¶
type DestructiveDownPermit interface {
Reason() string
// contains filtered or unexported methods
}
DestructiveDownPermit is an explicit break-glass token required for any schema rollback. The unexported marker makes the permit sealed: callers outside this package cannot fabricate one and must go through AllowDestructiveDown.
func AllowDestructiveDown ¶
func AllowDestructiveDown(reason string) (DestructiveDownPermit, error)
AllowDestructiveDown constructs the explicit permit required by Migrator.Down. The reason is kept for audit/log plumbing and must be non-empty.
type ForwardRebuildPermit ¶
type ForwardRebuildPermit interface {
MigrationNumber() int64
Reason() string
// contains filtered or unexported methods
}
ForwardRebuildPermit is an explicit break-glass token required to run a forward-rebuild migration (DROP+CREATE / TRUNCATE) when the target table already holds rows. Like DestructiveDownPermit, the unexported marker seals the permit: callers outside this package cannot fabricate one.
func AllowForwardRebuild ¶
func AllowForwardRebuild(migrationNumber int64, reason string) (ForwardRebuildPermit, error)
AllowForwardRebuild constructs the explicit permit required by Migrator.ForwardRebuild for a populated-table rebuild. migrationNumber must be positive (goose Source.Version starts at 1) and reason must be non-empty.
type InvalidIndex ¶
type InvalidIndex struct {
// Index is the qualified name of the invalid index (e.g. "public.idx_foo").
Index string
// Table is the qualified name of the table the index belongs to.
Table string
}
InvalidIndex describes an index that is marked as invalid in pg_index. Invalid indexes can occur when CREATE INDEX CONCURRENTLY is interrupted.
func DetectInvalidIndexes ¶
func DetectInvalidIndexes(ctx context.Context, pool *Pool) ([]InvalidIndex, error)
DetectInvalidIndexes queries pg_index for any indexes marked as invalid (indisvalid = false) within the current schema. These can occur when CREATE INDEX CONCURRENTLY is interrupted (orphan invalid index). The caller should log a warning and consider manual cleanup (DROP INDEX).
Rollout safety: a peer pod or a manual operator running CREATE INDEX CONCURRENTLY on a live table will momentarily produce a row in pg_stat_progress_create_index while indisvalid=false. We LEFT JOIN that view and exclude any index that currently has an active build session so that a normal rolling deploy does not block startup. Only fully orphaned invalid indexes (no active builder) are reported.
pg_stat_progress_create_index is available in PostgreSQL 12+. GoCell requires PostgreSQL 14+, so this join is always safe.
The check is scoped to current_schema() so that in-progress CONCURRENTLY builds in other schemas (e.g. parallel test schemas) do not block migrations in unrelated schemas. The returned Index/Table fields are schema-qualified ("public.idx_foo") so multi-schema deployments do not observe spurious matches across schemas with reused names.
Returns an empty slice when no invalid indexes are found.
type LedgerStore ¶
type LedgerStore struct {
// contains filtered or unexported fields
}
LedgerStore is a PostgreSQL implementation of ledger.Store. It persists audit entries in a tamper-evident hash chain using the following design:
- pg_advisory_xact_lock(hashtext(namespace), hashtext(tenant_id)) serializes concurrent Append calls within the same (namespace, tenant) sub-chain. Different (namespace, tenant) pairs use different int32 hash key pairs, so their advisory locks never contend (B2-C-10).
- SELECT ... FOR UPDATE on the tail row fences the read-modify-write cycle.
- All DML runs inside the caller's ambient transaction via txRunner.RunInTx.
- Idempotency uses a stable EventID fingerprint check before inserting. EventID (UUID from the outbox entry) is the same across at-least-once redeliveries; Timestamp changes per retry so it is excluded (F-CR-2). A DB-level UNIQUE INDEX uq_audit_ns_tenant_event_id on (namespace, tenant_id, event_id) (migration 055) is the second-line guard against concurrent bypass of the application check.
Consistency level: L1 LocalTx — Append is a single-transaction write that participates in the caller's ambient transaction. L2 callers compose this store with an outbox.Writer inside the same RunInTx block.
ref: google/trillian storage/log_storage.go ReadWriteTransaction pattern. ref: adapters/postgres/refresh_store.go — advisory lock + ambient tx model.
func NewLedgerStore ¶
func NewLedgerStore( pool *pgxpool.Pool, txRunner persistence.TxRunner, protocol *ledger.Protocol, clk clock.Clock, ) (*LedgerStore, error)
NewLedgerStore constructs a LedgerStore. Returns a non-nil error when any required dependency is absent (fail-fast at construction, not at runtime):
- pool nil → ErrValidationFailed
- txRunner nil or typed-nil → ErrValidationFailed
- protocol nil → ErrValidationFailed
- clk nil or typed-nil → ErrValidationFailed
pool is wrapped in a pgExecutor so all SQL paths (Append, Tail, GetBySeq, Query, Verify, RepoReady) route through the ambient transaction when ctx carries one, and fall back to pool otherwise.
func (*LedgerStore) Append ¶
Append persists a new audit entry in the namespace's hash chain.
Algorithm (all within txRunner.RunInTx):
- Validate payload is valid JSON.
- Acquire pg_advisory_xact_lock(hashtext(namespace), hashtext(tenant_id)) to serialize within the (namespace, tenant) sub-chain.
- Check idempotency fingerprint (event_id only) inside the lock to eliminate TOCTOU between concurrent Appends. Timestamp is excluded from the check: at-least-once redelivery produces the same EventID but a new clk.Now(), so fingerprinting on Timestamp would defeat idempotency (F-CR-2).
- SELECT tail row FOR UPDATE (prevents concurrent tail reads in the same namespace).
- Compute next seq_no = tail.seq_no + 1 (or 1 for empty).
- Compute hash = protocol.ComputeHash(tail.hash, entry).
- INSERT.
F2: advisory lock (step 2) must precede fingerprint check (step 3) so that concurrent Appends with identical content cannot both pass the fingerprint check and both insert. MemStore acquires its mutex before the fingerprint check — PG Append now mirrors that ordering.
func (*LedgerStore) GetBySeq ¶
func (s *LedgerStore) GetBySeq(ctx context.Context, vis tenant.RowVisibility, seq int64) (*ledger.Entry, error)
GetBySeq fetches a single entry by sequence number. Returns ErrAuditLedgerNotFound when the seq does not exist, or when the entry exists but vis.Allows(entry.ActorID) is false (IDOR-safe collapse).
func (*LedgerStore) Protocol ¶
func (s *LedgerStore) Protocol() *ledger.Protocol
Protocol returns the immutable protocol decisions backing this store.
func (*LedgerStore) Query ¶
func (s *LedgerStore) Query( ctx context.Context, t tenant.TenantID, vis tenant.RowVisibility, filters ledger.AuditFilters, params query.ListParams, ) ([]*ledger.Entry, error)
Query lists entries matching the supplied AuditFilters within tenant t using keyset cursor pagination pushed into SQL via pgquery.AppendKeyset: params.Sort drives the ORDER BY and the keyset WHERE predicate, and params.FetchLimit() (Limit+1) is the LIMIT for N+1 hasMore detection. The idx_audit_namespace_ts_id composite index covers the (timestamp DESC, id ASC) keyset, so this is an index scan with tenant_id applied as an in-scan filter (the `OR tenant_id=”` disjunction precludes a single ordered tenant-leading scan). params.Sort must be non-empty (callers pass ledger.QuerySort); AppendKeyset returns ErrValidationFailed on empty Sort. Returns an empty (non-nil) slice when no entries match.
t (#1618) is the mandatory tenant axis (param[1], TENANT-REPO-PARAM-FUNNEL-01): the app-layer half of the dual-layer tenant isolation. FORCE RLS on the app.tenant_id GUC (set from the post-auth ctxkeys.TenantID when this runs inside the auditquery RunInTx) is the DB-Hard primary; this explicit predicate is the defense-in-depth backstop and the sole isolation on the superuser/mem paths.
func (*LedgerStore) RepoReady ¶
func (s *LedgerStore) RepoReady(ctx context.Context) error
RepoReady implements healthz.RepoProber. It issues a cheap non-transactional representative query against the audit_entries table so that schema/migration drift and table-level permission loss are surfaced as a differentiated failure domain distinct from the pool-level postgres_ready probe registered by *Pool.
The ambient-tx fallback in pgExecutor is a no-op for this probe: health handler contexts never carry a pgx.Tx, so pgExecutor routes directly to the pool, keeping the health check independent of any caller transaction state.
func (*LedgerStore) Tail ¶
func (s *LedgerStore) Tail(ctx context.Context) (ledger.TailSnapshot, error)
Tail returns the current tail snapshot of the ctx-scoped tenant chain (the tenant.WithScope value, or the "" system chain when unscoped — #1618). Returns zero TailSnapshot for an empty/absent chain (not an error). Routes through pgExecutor (ambient-tx aware; falls back to pool when no tx in ctx).
Per-(namespace, tenant) chains have no single "namespace tail"; startup tail-verify runs unscoped and thus reads the "" system/bootstrap chain.
F13: uses a single SQL query to retrieve seq_no, hash, and total count.
Caveat: when called within a caller's ambient transaction, the result reflects that transaction's uncommitted chain state. Callers requiring post-commit integrity verification must call Tail after the transaction commits.
func (*LedgerStore) Verify ¶
func (s *LedgerStore) Verify(ctx context.Context, fromSeq, toSeq int64) (valid bool, firstInvalidSeq int64, err error)
Verify re-computes HMAC-SHA256 hash for each entry in [fromSeq, toSeq] and checks chain linkage (PrevHash). Returns valid=true and firstInvalidSeq=-1 when all entries are intact. Routes through pgExecutor (ambient-tx aware).
Sub-range correctness: when fromSeq > 1 the first entry in the range has a non-empty PrevHash pointing at entries[fromSeq-1]. Verify fetches that predecessor's hash as the baseline so the first PrevHash linkage check is evaluated against the correct expected value rather than the empty string used for the chain's genesis entry.
Caveat: when called within a caller's ambient transaction, the result reflects that transaction's uncommitted chain state. Callers requiring post-commit integrity verification must call Verify after the transaction commits.
type MigrationDirection ¶
type MigrationDirection string
MigrationDirection indicates whether a migration is applied or rolled back.
const ( // MigrationUp applies a migration. MigrationUp MigrationDirection = "up" // MigrationDown rolls back a migration. MigrationDown MigrationDirection = "down" )
type MigrationSet ¶
type MigrationSet struct {
// contains filtered or unexported fields
}
MigrationSet is an ordered registry of per-namespace migration sources. Each namespace owns an independent 001..N version lineage tracked in its own schema_migrations_<namespace> table (Django-style (namespace, version) composite key, #1089). There is no global version sequence: namespaces are independent, applied in registration order.
ref: pressly/goose v3 Provider.WithTableName — per-table version counter.
The zero value (MigrationSet{}) is usable: Add appends to a nil slice and dedups by scanning entries, so there is no nil-map hazard for a caller who constructs MigrationSet{} directly instead of via NewMigrationSet.
func NewMigrationSet ¶
func NewMigrationSet() *MigrationSet
NewMigrationSet returns an empty MigrationSet. Use NewMigrationSetWithPlatform for the common case (platform + external namespaces). The zero value is also usable (see MigrationSet docs), so this is a convenience, not a requirement.
func NewMigrationSetWithPlatform ¶
func NewMigrationSetWithPlatform() (*MigrationSet, error)
NewMigrationSetWithPlatform returns a MigrationSet pre-seeded with the platform namespace (adapters/postgres embedded migrations) FIRST. Platform applies before any external namespace registered via Add, so an external Cell module's migrations may reference platform tables (e.g. foreign keys) — platform-first ordering is the concrete cross-namespace guarantee.
func (*MigrationSet) Add ¶
Add registers an external Cell module's migration set under ns. The reserved platform namespace is rejected (only the internal seed in NewMigrationSetWithPlatform may use it); ns must be valid and fsys non-nil; duplicate namespaces are rejected.
func (*MigrationSet) ApplyAll ¶
func (s *MigrationSet) ApplyAll(ctx context.Context, pool *Pool) error
ApplyAll applies every namespace's pending migrations in registration order, each against its own schema_migrations_<namespace> tracking table. It uses plain Up per namespace; a populated-table forward-rebuild stays a platform-only break-glass operation via Migrator.ForwardRebuild, and Up's fail-closed gate still refuses an unpermitted populated rebuild. Idempotent: already-applied namespaces are no-ops.
Fails fast on the first namespace error; subsequent namespaces are NOT attempted. This is intentional with platform-first ordering: if platform migrations fail, external namespaces that FK platform tables must not proceed. An external namespace whose migration is a forward-rebuild of a populated table must be applied directly via NewMigrator(pool, fs, ns).ForwardRebuild (with a permit), not through ApplyAll's plain Up.
func (*MigrationSet) Namespaces ¶
func (s *MigrationSet) Namespaces() []migration.Namespace
Namespaces returns the registered namespaces in registration (application) order.
func (*MigrationSet) VerifyAll ¶
func (s *MigrationSet) VerifyAll(ctx context.Context, pool *Pool) error
VerifyAll checks each namespace's database version against the max version in its embedded migration FS (see VerifyExpectedVersion). Used by the schema guard on startup paths that verify (not apply) migrations.
type MigrationStatus ¶
type MigrationStatus struct {
// Version is the migration prefix (e.g. "001").
Version string
// Name is the descriptive part (e.g. "create_outbox_entries").
Name string
// Applied indicates whether this migration has been executed.
Applied bool
// AppliedAt is when the migration was applied (zero if not applied).
AppliedAt time.Time
}
MigrationStatus describes the state of a single migration file.
type Migrator ¶
type Migrator struct {
// contains filtered or unexported fields
}
Migrator manages SQL database migrations using goose v3 and an embed.FS source. It tracks applied migrations in a configurable table using goose's built-in advisory locking.
func NewMigrator ¶
NewMigrator creates a Migrator that reads SQL files from the given fs.FS. Migration files must follow the goose annotated format with -- +goose Up and -- +goose Down sections.
ns identifies the migration lineage; the goose tracking table is derived as schema_migrations_<namespace> (see trackingTableFor). Passing a typed migration.Namespace — not a free table-name string — makes "track migrations in the bare global schema_migrations table" (the #1089 / R3 collision footgun) unexpressible by construction. ns must be valid (fail-fast).
func (*Migrator) Down ¶
func (m *Migrator) Down(ctx context.Context, permit DestructiveDownPermit) error
Down rolls back the last applied migration. If no migrations have been applied (version 0), Down is a no-op and returns nil. Callers must pass an explicit DestructiveDownPermit because rollback files may drop production data even when they only move the schema back by one version.
func (*Migrator) ForwardRebuild ¶
func (m *Migrator) ForwardRebuild(ctx context.Context, permits ...ForwardRebuildPermit) error
ForwardRebuild applies all unapplied migrations, authorizing the supplied forward-rebuild permits for populated target tables.
A permit is only needed when a forward-rebuild migration's target table already holds rows — an empty or missing target is rebuilt automatically by Up alone, so fresh provisioning needs no permit. Supplying more permits than required is fine; a permit that is nil, duplicated, or references a migration which is not a pending forward-rebuild is a misconfiguration error.
type OutboxWriter ¶
type OutboxWriter struct {
// contains filtered or unexported fields
}
OutboxWriter writes outbox entries within a PostgreSQL transaction. It relies on persistence.TxFromContext[pgx.Tx] to obtain the current transaction, ensuring atomicity with the business state write (same DB transaction).
ref: ThreeDotsLabs/watermill-sql offset_adapter_postgresql.go — transactional outbox insert Adopted: INSERT within caller-provided transaction, JSON metadata serialization. Deviated: explicit fail-fast on missing tx instead of auto-begin.
func NewOutboxWriter ¶
func NewOutboxWriter(clk clock.Clock) *OutboxWriter
NewOutboxWriter creates an OutboxWriter.
func (*OutboxWriter) Write ¶
Write inserts an outbox entry into the outbox_entries table using the transaction from the context. Returns ErrAdapterPGNoTx if no transaction is present.
func (*OutboxWriter) WriteBatch ¶
WriteBatch inserts multiple outbox entries within the caller's transaction. All entries are validated upfront (ID format + Entry.Validate); if any entry is invalid, no entries are written.
For batches exceeding writeBatchChunkSize, entries are split into chunks and each chunk is inserted with a separate multi-row INSERT within the same transaction, preserving all-or-nothing semantics.
An empty entries slice is a no-op and returns nil.
type PGCommandQueue ¶
type PGCommandQueue struct {
// contains filtered or unexported fields
}
PGCommandQueue implements kernel/command.Queue + command.ActiveScanner backed by PostgreSQL. All mutating methods (Enqueue/Dequeue/Report/Ack/ExtendLease/ Cancel) self-wrap in a short RunInTx so callers require no ambient transaction. When the caller already holds an ambient tx, the inner RunInTx walks savepoint semantics via PGTxManager.RunInTx (adapters/postgres/tx_manager.go). Read-only methods (ScanActive, GetCommand) run directly against the pool.
Consistency: L4 DeviceLatent — commands traverse the state machine Pending→Sent→Delivered→{Succeeded,Failed,Expired,Canceled} through distinct Queue method calls. Lease management and timeout detection are the sweeper's responsibility; Dequeue does NOT reclaim expired leases inline.
ref: adapters/postgres/outbox_store.go (FOR UPDATE SKIP LOCKED pattern) ref: adapters/postgres/tx_manager.go (savepoint / nested RunInTx)
func NewCommandQueue ¶
func NewCommandQueue(pool *pgxpool.Pool, txRunner persistence.TxRunner, clk clock.Clock) (*PGCommandQueue, error)
NewCommandQueue constructs a PGCommandQueue backed by pool. Returns ErrValidationFailed when any required parameter is nil.
func (*PGCommandQueue) Ack ¶
func (q *PGCommandQueue) Ack(ctx context.Context, commandID string, reason command.AckReason, now time.Time) error
Ack finalizes a command in a single transition to a terminal status. Same-target idempotent; different-target on terminal returns ErrValidationFailed. The method self-manages a short transaction; nested calls walk savepoint semantics.
func (*PGCommandQueue) Cancel ¶
Cancel transitions a non-terminal command to StatusCanceled. Returns ErrValidationFailed when the command is already terminal. The method self-manages a short transaction; nested calls walk savepoint semantics.
func (*PGCommandQueue) Dequeue ¶
func (q *PGCommandQueue) Dequeue(ctx context.Context, deviceID string, n int, leaseDuration time.Duration) ([]command.Entry, error)
Dequeue atomically claims up to n Pending entries for deviceID (oldest FIFO), advancing them to StatusSent and setting a lease. The method self-manages a short transaction; nested calls walk savepoint semantics.
func (*PGCommandQueue) Enqueue ¶
func (q *PGCommandQueue) Enqueue(ctx context.Context, entry command.Entry, opts command.EnqueueOptions) error
Enqueue stores entry in the commands table with status=Pending. If opts.Authz is non-nil, it is invoked before the transaction opens. If opts.IdempotencyKey is set, an existing entry with the same key is a no-op. Duplicate PK returns ErrConflict; unknown device_id returns ErrDeviceNotFound. The method self-manages a short transaction; nested calls walk savepoint semantics.
func (*PGCommandQueue) ExtendLease ¶
func (q *PGCommandQueue) ExtendLease(ctx context.Context, commandID string, extension time.Duration, now time.Time) error
ExtendLease renews the lease for a Sent or Delivered command. Returns ErrCommandNotFound when not found; ErrValidationFailed when no lease. The method self-manages a short transaction; nested calls walk savepoint semantics.
func (*PGCommandQueue) GetCommand ¶
GetCommand returns a single command entry by ID. Returns ErrCommandNotFound when not found.
func (*PGCommandQueue) RepoReady ¶
func (q *PGCommandQueue) RepoReady(ctx context.Context) error
RepoReady verifies that the commands table is reachable by executing a lightweight probe query. Registered as "command_queue_ready" via the cellgen-emitted typed helper in the devicecell Init path.
func (*PGCommandQueue) Report ¶
Report advances a command from Sent to Delivered. Idempotent when the command is already Delivered (preserves the original delivered_at timestamp). Returns ErrCommandNotFound when commandID does not exist. The method self-manages a short transaction; nested calls walk savepoint semantics.
func (*PGCommandQueue) ScanActive ¶
func (q *PGCommandQueue) ScanActive(ctx context.Context, filter command.ScanFilter) ([]command.Entry, error)
ScanActive returns non-terminal entries matching filter, ordered by CreatedAt ascending. Does not require an ambient transaction.
type PGOutboxStore ¶
type PGOutboxStore struct {
// contains filtered or unexported fields
}
PGOutboxStore implements runtime/outbox.Store over PostgreSQL using pgx.
Each method opens its own short transaction; methods do not compose into a larger transaction. The backing DB handle is typically a *pgxpool.Pool.
Consistency level: L2 (OutboxFact) — adapts the outbox state machine from the relay layer into discrete, testable DB operations.
func NewOutboxStore ¶
func NewOutboxStore(db relayDB, clk clock.Clock) *PGOutboxStore
NewOutboxStore constructs a Store backed by the supplied database handle. The handle is typically a *pgxpool.Pool; it must support short-lived transactions (Begin).
func (*PGOutboxStore) ClaimPending ¶
func (s *PGOutboxStore) ClaimPending(ctx context.Context, batchSize int) ([]outbox.ClaimedEntry, error)
ClaimPending atomically transitions up to batchSize rows from pending to claiming status, stamping each with a fresh lease_id (UUID). The lease is the fencing token that callers MUST echo through subsequent Mark* calls; without it, an in-flight worker whose claim was already reclaimed cannot overwrite a new owner's outcome. Returns empty slice + nil when nothing is claimable.
func (*PGOutboxStore) CleanupDead ¶
func (s *PGOutboxStore) CleanupDead(ctx context.Context, cutoff time.Time, batchSize int) (int, error)
CleanupDead deletes a batch of dead rows older than cutoff. Caller is responsible for looping until deleted < batchSize.
func (*PGOutboxStore) CleanupPublished ¶
func (s *PGOutboxStore) CleanupPublished(ctx context.Context, cutoff time.Time, batchSize int) (int, error)
CleanupPublished deletes a batch of published rows older than cutoff. Caller is responsible for looping until deleted < batchSize.
func (*PGOutboxStore) CountPending ¶
func (s *PGOutboxStore) CountPending(ctx context.Context) (int64, error)
CountPending returns the number of rows in pending status. The count may be approximate under high concurrency; callers should treat errors as transient and skip the metric update rather than panicking.
func (*PGOutboxStore) MarkDead ¶
func (s *PGOutboxStore) MarkDead(ctx context.Context, id, leaseID string, attempts int, lastError string) (bool, error)
MarkDead transitions a failing entry to dead status, fencing on leaseID. updated=false when the lease was lost.
func (*PGOutboxStore) MarkPublished ¶
MarkPublished transitions an entry from claiming to published, fencing on leaseID. updated=false means the lease was lost (ReclaimStale + re-Claim, or row already in a terminal state) — silent at-least-once OK; callers must not treat it as error.
func (*PGOutboxStore) MarkRetry ¶
func (s *PGOutboxStore) MarkRetry( ctx context.Context, id, leaseID string, attempts int, nextRetryAt time.Time, lastError string, ) (bool, error)
MarkRetry transitions a failing entry back to pending with the supplied nextRetryAt and attempts count, fencing on leaseID. updated=false same as MarkPublished.
func (*PGOutboxStore) OldestEligibleAt ¶
func (s *PGOutboxStore) OldestEligibleAt(ctx context.Context, status kout.State) (time.Time, bool, error)
OldestEligibleAt returns the oldest published_at (status=kout.StatePublished) or dead_at (status=kout.StateDead) in the table. Used by the relay's data-driven cleanup loop to schedule the next wake-up at oldest+retention instead of a fixed timer.
status MUST be kout.StatePublished or kout.StateDead. Other values return an error immediately.
func (*PGOutboxStore) ReclaimStale ¶
func (s *PGOutboxStore) ReclaimStale( ctx context.Context, claimTTL time.Duration, maxAttempts int, baseDelay, maxDelay time.Duration, batchSize int, ) (int, error)
ReclaimStale transitions up to batchSize claiming rows whose claimed_at is older than claimTTL back to pending (with attempts+1 and next_retry_at = backoff) or to dead (when attempts+1 >= maxAttempts). Returns count of rows recovered. The caller's reclaim loop re-runs to drain residual when the row count exceeds batchSize (`count < batchSize` signals "no more").
type PGProjectionEventSource ¶
type PGProjectionEventSource struct {
// contains filtered or unexported fields
}
PGProjectionEventSource is the durable production projection.ReplaySource + projection.LiveCursor backed by the append-only projection_events journal (EPIC #1504). Unlike the outbox-backed replay source it replaced (deleted in PR-03), it reads its monotonic position from projection_events.global_seq — a never-deleted journal — so a position can never depend on a row the relay may have deleted (the #1504 root bug: a cleaned outbox row → SELECT seq WHERE id=$1 → ErrNoRows → permanent error → dead-letter).
Position reads global_seq off the JournalEvent carrier that Replay produced, with NO SQL lookup. That structurally closes the REBUILD-from-0 gap, because Replay itself constructs the position-bearing carrier from each row. The LIVE-path gap is closed by ResolveCarrier: the live broker delivers a raw outbox.Entry that carries no global_seq, which PositionFromCarrier would reject as a permanent error, so the Coordinator resolves it at the delivery boundary via ResolveCarrier into a position-bearing JournalEvent before Position is consulted (this source is wired as the Coordinator's projection.LiveCursor). The durable journal makes that resolution safe: the D4 same-transaction double-write commits the projection_events row before the event is delivered, and the journal is never cleaned, so the lookup cannot spuriously ErrNoRows (the exact failure mode that broke the transient-outbox path). The live-carrier protocol is recorded in ADR 202606071600-1504 §4.2.
It holds a pgexec.PGExecutor (the sealed pool funnel, PG-REPO-AMBIENT-TX-01) rather than a raw pool. Replay is NOT transactional: the Coordinator invokes it outside RunInTx and wraps each fn callback in its own transaction. The SAME instance is wired as both the Coordinator's ReplaySource and its LiveCursor so the global_seq encoding is consistent across the interfaces (mirrors SagaJournalSource).
ref: AxonFramework JdbcEventStore + TrackingToken — retained event store as projection source.
func NewProjectionEventSource ¶
func NewProjectionEventSource(pool *pgxpool.Pool) (*PGProjectionEventSource, error)
NewProjectionEventSource wraps the pool in the sealed pgexec funnel. A nil pool is rejected with ErrValidationFailed (same shape as NewProjectionCheckpointStore).
func (*PGProjectionEventSource) Head ¶
func (s *PGProjectionEventSource) Head(ctx context.Context) (int64, error)
Head returns the highest available global_seq, or 0 if the journal is empty. Used to compute the rebuild cutoff and the pending_events metric.
func (*PGProjectionEventSource) Position ¶
func (s *PGProjectionEventSource) Position(entry projection.ProjectionEvent) (int64, error)
Position resolves the event's monotonic stream position by reading the global_seq off the JournalEvent carrier that Replay produced — issuing NO SQL. This is the #1504 structural fix: the position can never depend on a projection_events row (and a fortiori never on a deleted transient outbox row). An unresolvable carrier is permanent (Cursor invariant #4), delegated to the shared projection.PositionFromCarrier.
func (*PGProjectionEventSource) Replay ¶
func (s *PGProjectionEventSource) Replay(ctx context.Context, fromOffset int64, fn func(projection.ProjectionEvent) error) error
Replay iterates events with global_seq strictly greater than fromOffset in ascending order, calling fn for each. fromOffset==0 replays all history. If fn returns an error, Replay stops immediately and returns it; events already passed to fn are NOT retried. ctx cancellation is honored between rows (pgx aborts the streaming query when ctx is done). Each row is rebuilt through EntryScan.ToEntry and wrapped in a projection.JournalEvent carrying its global_seq.
func (*PGProjectionEventSource) RepoReady ¶
func (s *PGProjectionEventSource) RepoReady(ctx context.Context) error
RepoReady implements healthz.RepoProber. It issues a cheap non-transactional representative query against projection_events so schema/migration drift and table-level permission loss surface as a differentiated failure domain distinct from the pool-level postgres_ready probe. Health handler contexts carry no pgx.Tx, so pgexec routes directly to the pool.
func (*PGProjectionEventSource) ResolveCarrier ¶
func (s *PGProjectionEventSource) ResolveCarrier( ctx context.Context, entry projection.ProjectionEvent, ) (projection.ProjectionEvent, error)
ResolveCarrier normalizes a live-delivered entry into a position-bearing carrier (projection.LiveCarrierResolver). An already-positioned *JournalEvent is returned unchanged (idempotent — a rebuild carrier needs no lookup). A bare live outbox.Entry is resolved by querying its global_seq from the durable journal and wrapping it in a JournalEvent, so the Coordinator's carrier-intrinsic Position then succeeds.
Error semantics mirror the position lookup the transient-outbox source used, but against the never-cleaned journal: pgx.ErrNoRows is a PERMANENT error (an entry genuinely absent from the journal cannot be assigned a position and retry cannot fix it — the D4 emit-time double-write guarantees a real projection-source event was committed before delivery, so a miss is a true invariant violation, not the spurious cleaned-row case); any other query failure is transient (wrapped, requeued).
Transaction visibility: the lookup runs on the caller's ambient tx ctx (pgexec routes Query to the in-flight tx when one is present). The projection_events row was committed by the producer's D4 same-transaction double-write in a PRIOR transaction, before this event was delivered, so it is visible regardless of the txRunner's isolation level (it predates the consumer tx snapshot under REPEATABLE READ/SERIALIZABLE just as it does under READ COMMITTED). The lookup is read-only; it neither needs nor takes a separate connection. It is bounded by projectionEventLookupTimeout (deriving a child ctx that preserves the ambient tx value, so a stall surfaces as a transient timeout the Coordinator requeues rather than hanging the worker; a shorter caller deadline wins).
Bootstrap-gap caveat (ADR 202606071600-1504 §5/§8): events produced BEFORE the PR-02 journaling decorator was deployed have no projection_events row, so a redelivery of such a historical event resolves to ErrNoRows → PERMANENT → dead-letter. That is the documented v1 limitation, not a system fault — the journal intentionally retains only events from the decorator's deployment forward, so such a DLX entry is expected and benign, not a recoverable transient.
type PGRefreshStore ¶
type PGRefreshStore struct {
// contains filtered or unexported fields
}
PGRefreshStore implements refresh.Store over PostgreSQL using pgx.
All time values come from the injected clock (never PG's now()), so the FakeClock in storetest drives deterministic behavior.
Consistency: L1 LocalTx — Rotate is atomic within a single transaction; Issue and revoke paths are single-statement writes.
Transaction contract (B2-A-08): PGRefreshStore uses the ambient transaction for business operations. Multi-statement operations (Peek, Rotate) delegate to the injected TxRunner. When the caller already holds an ambient transaction, TxManager creates a savepoint instead of a new top-level transaction, so refresh operations are fully nesting-safe. The only explicit bypass is RevokeSessionDetached and reuse-detection cascade revoke, which commit independently as security/compensation responses.
ref: ory/fosite token/hmac/hmacsha.go (base64url nopad + constant-time compare) ref: ory/hydra persistence/sql/persister_oauth2.go (CAS chain + reuse cascade)
func NewRefreshStore ¶
func NewRefreshStore( pool *pgxpool.Pool, txRunner persistence.TxRunner, policy refresh.Policy, clk clock.Clock, randReader io.Reader, ) (*PGRefreshStore, error)
NewRefreshStore constructs a PGRefreshStore.
Returns a non-nil error if pool, txRunner, or clock are nil, or if policy values are out of range.
SQL operations go through the typed executor, which joins the ambient transaction from context unless a method explicitly asks for a direct bypass.
txRunner is required: Peek and Rotate need a transaction boundary. Pass NewTxManager(pool) for standalone callers; ambient-tx callers (e.g. session login) provide a shared TxManager whose RunInTx will create a savepoint when a top-level transaction is already in context.
func (*PGRefreshStore) GC ¶
GC removes rows whose expires_at < olderThan in batches of gcBatchSize. Uses the ambient transaction from ctx when present (F1).
func (*PGRefreshStore) Issue ¶
func (s *PGRefreshStore) Issue(ctx context.Context, sessionID, subjectID string, authzEpochAtIssue int64) (string, *refresh.Token, error)
Issue creates a new refresh chain root. L1 LocalTx.
authzEpochAtIssue must be > 0 (S4d): zero indicates the caller did not snapshot users.authz_epoch; storing zero would let stale grants validate forever once user.epoch is bumped (PR #490 review P1-#2).
func (*PGRefreshStore) Peek ¶
Peek validates the presented wire token without advancing the lineage.
Callers MUST call Peek (and Rotate) within an ambient transaction created by the injected TxRunner. PGRefreshStore no longer acquires its own transactions (B2-A-08 ambient-only model).
ErrReused contract: when the reuse branches inside validatePresentedInTx fire, row is already loaded (selector_miss / verifier_miss reject paths return earlier with ErrRejected, NOT ErrReused). The reuse return carries the row metadata so the service layer can drive a user-wide credential invalidation cascade — see refresh.Store godoc.
func (*PGRefreshStore) RevokeSession ¶
func (s *PGRefreshStore) RevokeSession(ctx context.Context, sessionID string) error
RevokeSession marks every row in the session_id lineage as revoked. Uses the ambient transaction from ctx when present (F1).
func (*PGRefreshStore) RevokeSessionDetached ¶
func (s *PGRefreshStore) RevokeSessionDetached(ctx context.Context, sessionID string) error
RevokeSessionDetached marks every row in the session_id lineage as revoked, bypassing any ambient transaction and caller cancellation. It is used only for security cascade/compensation paths that must commit independently of the surrounding business transaction.
func (*PGRefreshStore) RevokeUser ¶
func (s *PGRefreshStore) RevokeUser(ctx context.Context, subjectID string, tok credentialfence.FenceToken) error
RevokeUser marks every row owned by subjectID as revoked. Uses the ambient transaction from ctx when present (F1).
func (*PGRefreshStore) Rotate ¶
func (s *PGRefreshStore) Rotate(ctx context.Context, presented string) (string, *refresh.Token, error)
Rotate advances the chain. See Store.Rotate contract for branch behavior.
Non-happy paths funnel through rejectWithReason. ErrRejected paths return (nil, nil) alongside the error; the reuse branches (ErrReused) return the row metadata so the service layer can drive the user-wide cascade — see refresh.Store godoc. The transaction is committed uniformly on ErrRejected/ErrReused so that commit-vs-rollback latency is not an oracle on whether a cascade-revoke happened.
Callers MUST call Rotate within an ambient transaction or with a standalone context. PGRefreshStore delegates transaction management to the injected TxRunner (B2-A-08 ambient-only model).
type PGSessionStore ¶
type PGSessionStore struct {
// contains filtered or unexported fields
}
PGSessionStore implements session.Store over PostgreSQL using pgx.
All time values come from the injected clock (never PG's now()), so the FakeClock in storetest drives deterministic behavior.
Consistency: L1 LocalTx — Create/Revoke/RevokeForSubject are single-statement writes that participate in the ambient transaction (ADR-credential D5 same-tx revoke).
Transaction contract: PGSessionStore uses the typed executor for all SQL. The executor joins the ambient tx when present, enabling write atomicity and consistent reads within a business transaction.
ref: dexidp/dex storage/sql/sql.go — session row model ref: ory/hydra persistence/sql/persister_oauth2.go — ambient-tx pattern
func NewSessionStore ¶
func NewSessionStore( pool *pgxpool.Pool, txRunner persistence.TxRunner, protocol *session.Protocol, clk clock.Clock, ) (*PGSessionStore, error)
NewSessionStore constructs a PGSessionStore.
Returns a non-nil error if pool, txRunner, protocol, or clock are nil.
pool is retained for direct exec when no ambient transaction is in context; txRunner is required for callers that need transaction scoping (e.g. session login service). protocol drives fingerprint-shape validation on Create. clock is used exclusively for revoked_at timestamps — never PG's NOW().
func (*PGSessionStore) Close ¶
func (s *PGSessionStore) Close(_ context.Context) error
Close is a no-op: PGSessionStore does not own its pool. Pool teardown is handled by the *Pool ManagedResource registered at the composition root; closing the store a second time would double-close.
func (*PGSessionStore) Create ¶
func (s *PGSessionStore) Create(ctx context.Context, t tenant.TenantID, sess *session.Session) error
Create persists a new session row with the given tenant t. t must be a non-empty canonical UUID; empty TenantID returns ErrValidationFailed. Nil session, empty Session.ID, or empty Session.SubjectID return ErrValidationFailed. FingerprintMode shape violations (e.g. empty JTI under FingerprintJTIRef) return ErrValidationFailed. Duplicate Session.ID returns ErrSessionConflict (SQLSTATE 23505).
func (*PGSessionStore) Get ¶
func (s *PGSessionStore) Get(ctx context.Context, id string) (*session.ValidateView, error)
Get fetches the validate projection by ID. Returns ErrSessionNotFound when the ID is not present. Revoked sessions are still returned (caller checks RevokedAt). GC eligibility (expires_at) is intentionally not exposed — validate paths must not gate on it.
func (*PGSessionStore) Probes ¶
func (s *PGSessionStore) Probes() []healthz.Probe
Probes returns nil: pool-level connection liveness is *Pool's concern. Cell-level differentiated readiness is exposed through RepoReady, which is registered as "accesscore_repo_ready" via the cellgen-emitted typed helper in the accesscore cell init path.
func (*PGSessionStore) RepoReady ¶
func (s *PGSessionStore) RepoReady(ctx context.Context) error
RepoReady implements healthz.RepoProber. It issues a cheap non-transactional representative query against the sessions table so that schema/migration drift and table-level permission loss are surfaced as a differentiated failure domain distinct from the pool-level postgres_ready probe registered by *Pool. Errors are wrapped via pkg/errcode.
func (*PGSessionStore) Revoke ¶
func (s *PGSessionStore) Revoke(ctx context.Context, id string) error
Revoke marks the session dead. Idempotent: already-revoked or missing IDs both return nil (防枚举 — must not leak existence). RevokedAt is set exactly once via the WHERE revoked_at IS NULL guard; subsequent Revoke calls are no-ops at the SQL level.
func (*PGSessionStore) RevokeForSubject ¶
func (s *PGSessionStore) RevokeForSubject( ctx context.Context, subjectID string, event session.CredentialEvent, tok credentialfence.FenceToken, ) error
RevokeForSubject marks every active session for subjectID dead. Empty subjectID returns ErrValidationFailed. Unknown CredentialEvent values return ErrValidationFailed. Returns nil even when the subject has no sessions. Pre-revoked sessions preserve their original RevokedAt timestamp (append-only revoke semantics — ADR-Session D3). The event argument is informational under the current protocol; the UPDATE covers all active sessions regardless of event.
func (*PGSessionStore) Worker ¶
func (s *PGSessionStore) Worker() worker.Worker
Worker returns nil: PGSessionStore has no background goroutine.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool wraps a pgxpool.Pool with health checking and lifecycle management. It implements lifecycle.ManagedResource directly, so callers can pass a *Pool to bootstrap.WithManagedResource without a wrapper.
func NewPool ¶
NewPool creates a new connection pool from the supplied Config. It validates the DSN, applies defaults, and pings the database to confirm connectivity.
func (*Pool) AppRoleRestrictedCheck ¶
AppRoleRestrictedCheck verifies that the pool's serving connection runs as a role that ROW LEVEL SECURITY actually constrains: neither a superuser nor a BYPASSRLS role. Either bypasses RLS regardless of FORCE, so the tenant_isolation policies (migrations 052/053) would silently leak across tenants at runtime.
It backs the ProbeAppRoleRestrictedReady readyz probe (#1676 [F-B11]) and runs only on pools that opt in via Config.RequireRestrictedRole. The catalog read is SELECT-only on pg_roles (readable by any role) keyed on current_user, so it is safe for the restricted serving role itself to execute.
func (*Pool) AuditAdminReadyCheck ¶
AuditAdminReadyCheck is a combined readiness check for the optional cross-tenant audit admin pool (#1810). It proves the pool's connection is the EXACT admin identity the cross-tenant read relies on, asserting:
Role identity + attributes (checkAuditAdminRole): current_user is exactly gocell_audit_admin, neither superuser nor BYPASSRLS, with SELECT on audit_entries. The role must be gocell_audit_admin specifically — that is the role audit_admin_read_all is scoped TO; any other role passes the attribute probe yet reads nothing cross-tenant (#1810 F3).
Policy shape (checkAuditAdminPolicy): the audit_admin_read_all RLS policy exists on audit_entries with the full expected shape (PERMISSIVE, FOR SELECT, TO gocell_audit_admin, USING(true), no WITH CHECK) — reusing the same checkAuditAdminPolicyShape the migration-time schema guard applies. A missing or mis-shaped policy means the pool would read nothing (or the wrong scope) at runtime; readyz must catch it (#1810 F3).
This single method is reused at both composition-time fail-fast (buildCrossTenantStore calls it once on startup) and in the ProbeAuditAdminRestrictedReady /readyz probe (auditAdminPoolResource.Probes).
func (*Pool) Close ¶
Close gracefully shuts down the connection pool, bounded by ctx.
pgxpool.Pool.Close() performs a synchronous drain with no context parameter. Close wraps it in a goroutine so the caller's shutdown budget is honored; if ctx expires, the pool's connection resources are abandoned (process-exit cleanup semantics, acceptable under orchestrator-restart SLO).
Close is idempotent: calling it on an already-closed pool is safe.
ref: uber-go/fx app.go StopTimeout — ctx as shared shutdown budget. ref: uber-go/fx lifecycle OnStop(ctx) — ContextCloser pattern.
func (*Pool) PoolStats ¶
PoolStats returns structured pool statistics suitable for metrics collection and operational dashboards. Returns zero-value PoolStats if the pool is not initialized (defensive guard, consistent with Redis adapter pattern).
func (*Pool) Probes ¶
Probes returns the pool's typed readiness probes contributing to /readyz:
- ProbeReady (= "postgres_ready") — pings the PG pool connection via Pool.Health.
- ProbeIndexesValidReady (= "postgres_indexes_valid_ready") — calls InvalidIndexCheck to surface any indexes left invalid by an interrupted CREATE INDEX CONCURRENTLY.
- ProbeAppRoleRestrictedReady (= "postgres_app_role_restricted_ready") — present ONLY when Config.RequireRestrictedRole is set (a serving pool): verifies current_user is neither superuser nor BYPASSRLS so FORCE RLS is effective at runtime (#1676 [F-B11]). Admin/migration pools leave the flag off and so do not register it.
Every probe caps its inner wait at adapterutil.DefaultProbeTimeout (5 s) so a slow PG does not hold the /readyz response indefinitely.
ref: kubernetes/kubernetes pkg/util/healthz — named health checkers. ref: uber-go/fx app.go StopTimeout — shared shutdown budget via ctx.
func (*Pool) Statter ¶
Statter returns a poolstats.Statter bound to this pool with the supplied human-readable name (e.g. "postgres-main"). The OTel pool collector uses this adapter-neutral interface to emit gocell.pool.* metrics without knowing each adapter's internal stats type.
ref: jackc/pgx pgxpool/stat.go — TotalConns/IdleConns/AcquiredConns/ MaxConns/EmptyAcquireCount maps cleanly onto poolstats.Snapshot.
type PoolStats ¶
type PoolStats struct {
AcquireCount int64 `json:"acquireCount"`
AcquireDuration time.Duration `json:"acquireDuration"`
AcquiredConns int32 `json:"acquiredConns"`
CanceledAcquireCount int64 `json:"canceledAcquireCount"`
ConstructingConns int32 `json:"constructingConns"`
EmptyAcquireCount int64 `json:"emptyAcquireCount"`
IdleConns int32 `json:"idleConns"`
MaxConns int32 `json:"maxConns"`
TotalConns int32 `json:"totalConns"`
NewConnsCount int64 `json:"newConnsCount"`
MaxLifetimeDestroyCount int64 `json:"maxLifetimeDestroyCount"`
MaxIdleDestroyCount int64 `json:"maxIdleDestroyCount"`
}
PoolStats holds structured connection pool statistics.
ref: pgxpool Stat() — adopted same field set for operational dashboards and Prometheus/OTel metric collectors.
type ProjectionCheckpointStore ¶
type ProjectionCheckpointStore struct {
// contains filtered or unexported fields
}
ProjectionCheckpointStore is the PostgreSQL projection.CheckpointStore and projection.OwnerCheckpointStore backend for the CQRS projection lifecycle harness (epic #1100 / #1609). It holds no raw pool: the sole field is a pgexec.PGExecutor that routes every statement through the ambient transaction carried by ctx (persistence.TxFromContext) when present, or the pool directly when not.
CheckpointStore (SaveOffset) path: the harness Coordinator wraps LoadOffset → apply → SaveOffset in one TxRunner.RunInTx, so all three run in the caller's transaction and the offset advance commits atomically with the Apply mutation (exactly-once).
OwnerCheckpointStore (AdvanceIfOwner) path: the saga Tailer wraps its apply + AdvanceIfOwner in one TxRunner.RunInTx so the fenced checkpoint advance commits atomically with the Apply mutation (D5(a) / D5(b)). The CAS uses a read-FOR UPDATE then conditional write within the ambient tx to prevent concurrent handoff races at the SQL layer — see AdvanceIfOwner.
Holding pgexec.PGExecutor (not *pgxpool.Pool) is enforced by archtest PROJECTION-CHECKPOINT-TX-BOUND-01; the pool stays sealed in internal/pgexec (PG-REPO-AMBIENT-TX-01 R1/R2).
ref: AxonFramework JdbcTokenStore — same-transaction checkpoint commit. ref: ThreeDotsLabs/watermill-sql offset_adapter_postgresql.go — transactional offset upsert. ref: JasperFx/marten EventStore Async Daemon pg_advisory_lock + token claim. ref: docs/architecture/202605261620-adr-cqrs-projection-lifecycle-harness.md §3/§Q5. ref: docs/architecture/202606051200-1609-adr-saga-journal-projection-source.md §D5(b).
func NewProjectionCheckpointStore ¶
func NewProjectionCheckpointStore(pool *pgxpool.Pool) (*ProjectionCheckpointStore, error)
NewProjectionCheckpointStore wraps the pool in the sealed pgexec funnel (PG-REPO-AMBIENT-TX-01 R2). A nil pool is rejected with ErrValidationFailed.
func (*ProjectionCheckpointStore) AdvanceIfOwner ¶
func (s *ProjectionCheckpointStore) AdvanceIfOwner(ctx context.Context, cellID, projectionID, ownerToken string, offset int64) error
AdvanceIfOwner advances the committed offset fenced by ownerToken (semantics B, per the projection.OwnerCheckpointStore interface godoc — the authoritative spec is projectiontest.RunOwnerCheckpointConformance).
Fencing logic (mirrors MemOwnerCheckpointStore, verified by the shared conformance harness):
- empty ownerToken → KindInvalid (fail-closed; a real Tailer always mints a non-empty UUID per lock acquisition)
- no existing row AND offset > 0 → INSERT (cold claim)
- no existing row AND offset == 0 → ErrStaleOwner (0 is not strictly ahead of the virtual cold committed offset 0)
- existing row AND (ownerToken == recorded owner OR offset > recorded offset) → UPDATE (accept: same leader re-advance or new leader claiming ahead)
- existing row otherwise → ErrStaleOwner (different token, not ahead)
The read uses SELECT … FOR UPDATE so that concurrent AdvanceIfOwner calls within the same leader-handoff window are serialized at the SQL layer: the second caller blocks on the lock, then re-reads the row written by the first (which may have changed the owner and/or offset), and the fencing predicate is re-evaluated against the fresh state. This makes the CAS race-free without requiring an optimistic retry loop in the application layer.
Cold-start INSERT race: if two concurrent leaders both reach the cold-INSERT path (both saw ErrNoRows before the lock was contended), the INSERT uses ON CONFLICT (cell_id, projection_id) DO NOTHING. The loser gets RowsAffected==0 and is mapped to ErrStaleOwner — equivalent to a deposed claimant losing the race.
Warm UPDATE defense-in-depth: updateCheckpointWithOwnerSQL carries a SQL WHERE CAS predicate (owner=$token OR offset>committed) that mirrors the Go predicate. RowsAffected==0 from the UPDATE is also mapped to ErrStaleOwner. Both the Go predicate and the SQL CAS must agree (they're the same predicate expressed in two layers); the SQL CAS is the belt-and-suspenders layer that remains correct even if a future refactor removes the Go short-circuit.
SaveOffset interaction: SaveOffset (base CheckpointStore) never writes the owner column. A cold row created by SaveOffset has an empty owner and offset=0, so a later AdvanceIfOwner with a real token needs offset>0 to claim (the strictly-ahead predicate treats an empty owner as "no owner yet"). This is correct fencing, not a bug: offset>0 is the first real advance past the base row.
AdvanceIfOwner participates in the ambient transaction via s.db (pgexec.PGExecutor, PROJECTION-CHECKPOINT-TX-BOUND-01): both the SELECT FOR UPDATE and the INSERT/UPDATE see and join the caller's transaction, so the offset advance commits atomically with the Apply mutation (D5(a) + D5(b)).
func (*ProjectionCheckpointStore) LoadOffset ¶
func (s *ProjectionCheckpointStore) LoadOffset(ctx context.Context, cellID, projectionID string) (int64, error)
LoadOffset returns the committed offset for (cellID, projectionID), or 0 if no row exists yet (cold start). It runs within the ambient transaction when ctx carries one.
cellID and projectionID are opaque caller-owned UTF-8 keys stored verbatim as the TEXT primary key; the no-dash projectionID convention is the caller's (cellgen's) responsibility, not validated here. On query failure the error carries cell_id/projection_id as server-only internal detail so ops can locate the failing projection from slog without exposing the keys on the wire.
func (*ProjectionCheckpointStore) SaveOffset ¶
func (s *ProjectionCheckpointStore) SaveOffset(ctx context.Context, cellID, projectionID string, offset int64) error
SaveOffset advances the projection's committed offset, upserting on the (cell_id, projection_id) primary key. It runs within the ambient transaction when ctx carries one (the exactly-once path); the owner column is never written (base CheckpointStore contract — AdvanceIfOwner is the fenced path). On failure the error carries cell_id/projection_id as server-only internal detail for ops correlation.
type ReconcileElector ¶
type ReconcileElector struct {
// contains filtered or unexported fields
}
ReconcileElector implements reconcile.LeaderElector using the reconcile_leases ROW's expires_at TTL as the lease authority (row-TTL CAS), NOT a session advisory lock. It is STATELESS — every call is one pool query, so it is safe for concurrent use across reconcilerIDs and holds no connection between calls. Failover triggers on TTL expiry (a hung-but-alive leader's lease lapses and a follower takes over), matching the Redis and fake electors.
func NewReconcileElector ¶
func NewReconcileElector(pool *Pool, lease reconcile.LeaseTTL) (*ReconcileElector, error)
NewReconcileElector builds a PG leader elector. The holderID (this replica's identity) is minted INTERNALLY as a fresh UUID — making accidental cross-process holderID reuse (which would be treated as the same holder, defeating mutual exclusion — PR-A6 review C4) impossible by construction rather than relying on a caller contract. lease is the validated lease TTL window (a sealed reconcile.LeaseTTL, so a sub-millisecond window — which would truncate to a 0ms interval and break mutual exclusion — is unrepresentable). Unlike the Redis elector, no clock.Clock is needed — lease timestamps are computed by the DB (now()), the single wall-clock authority across replicas.
func (*ReconcileElector) AcquireLease ¶
func (e *ReconcileElector) AcquireLease(ctx context.Context, reconcilerID string) (reconcile.LeaseToken, error)
AcquireLease implements reconcile.LeaderElector via the row-TTL UPSERT CAS.
func (*ReconcileElector) ReleaseLease ¶
func (e *ReconcileElector) ReleaseLease(ctx context.Context, token reconcile.LeaseToken) error
ReleaseLease implements reconcile.LeaderElector (marks our lease expired for a fast handoff; idempotent — 0 rows if no longer ours). The epoch row persists so the next holder observes a strictly higher epoch on takeover.
func (*ReconcileElector) RenewLease ¶
func (e *ReconcileElector) RenewLease(ctx context.Context, token reconcile.LeaseToken) error
RenewLease implements reconcile.LeaderElector (holder + still-live guarded). 0 rows ⟹ lease lapsed or taken over ⟹ ErrReconcileLeaseLost.
type RowScanner ¶
type RowScanner interface {
// Scan reads column values into dest, analogous to pgx.Row.Scan.
Scan(dest ...any) error
}
RowScanner abstracts a single-row scan target, satisfied by both pgx.Row and *pgx.Rows. Repository implementations use this to write scan logic that works for both QueryRow and Query result sets.
type TxManager ¶
type TxManager struct {
// contains filtered or unexported fields
}
TxManager provides transactional execution with context-embedded pgx.Tx, savepoint-based nesting, and automatic panic rollback.
func NewTxManager ¶
NewTxManager creates a TxManager backed by the given Pool.
func (*TxManager) ApplyTenantScope ¶
ApplyTenantScope sets the RLS app.tenant_id GUC on the AMBIENT transaction mid-flight. It exists for paths that cannot know the tenant at tx-start because they must read a non-RLS row inside the tx to learn it — specifically sessionrefresh, which Peeks the refresh token and reads sessions.tenant_id (neither table is under FORCE RLS) inside the cross-store tx (REFRESH-CROSS- STORE-TX-01), then derives the tenant and scopes the subsequent users/roles reads with this call.
PRECONDITIONS (caller responsibility): (1) called inside a RunInTx (an ambient pgx.Tx must be in ctx, else this returns an error — it never silently runs unscoped); (2) every statement executed in the tx BEFORE this call touches only non-RLS tables. Calling it after an RLS-table statement would mean that earlier statement ran fail-closed (0 rows) — the late scope cannot retroactively fix it.
is_local=true keeps the GUC transaction-scoped (auto-reset on COMMIT/ROLLBACK), identical to setLocalTenant.
func (*TxManager) RunInTx ¶
func (tm *TxManager) RunInTx(ctx context.Context, fn func(ctx context.Context) error) (retErr error)
RunInTx executes fn inside a database transaction. The pgx.Tx is stored in the context so that downstream code can retrieve it via persistence.TxFromContext[pgx.Tx].
Nesting: if the context already carries a transaction, RunInTx creates a savepoint instead of a new top-level transaction. Savepoints are released on success and rolled back on error or panic.
Panic safety: panics inside fn trigger a rollback (or savepoint rollback) before being re-raised.
type WebhookSourceRepository ¶
type WebhookSourceRepository struct {
// contains filtered or unexported fields
}
WebhookSourceRepository is the PostgreSQL runtimewebhook.SourceRepo backing for persistent webhook source secrets (#1540). It holds the sealed pgexec funnel (never a raw *pgxpool.Pool — PG-REPO-AMBIENT-TX-01) and the kcrypto.ValueTransformer that seals/unseals each secret.
The repository never handles a plaintext secret: Upsert seals through kwh.Source.Encrypt and writes only the resulting ciphertext columns; LoadAll reads the ciphertext columns and reconstructs sealed kwh.Source values via kwh.NewSourceFromCiphertext. The AAD binding each ciphertext to its source id is computed inside the kernel funnel, so this adapter cannot transplant a ciphertext across sources or weaken the binding.
func NewWebhookSourceRepository ¶
func NewWebhookSourceRepository(pool *pgxpool.Pool, transformer kcrypto.ValueTransformer) (*WebhookSourceRepository, error)
NewWebhookSourceRepository wraps pool in the sealed pgexec funnel and binds the value transformer used to seal/unseal secrets.
transformer is validated first: nil is rejected (no plaintext fallback), and runtimecrypto.NoopTransformer is rejected by a concrete-type guard — passing the passthrough transformer would persist webhook source secrets in plaintext, which the table schema structurally prevents (no plaintext column). Both the value form (runtimecrypto.NoopTransformer) and the pointer form (*runtimecrypto.NoopTransformer) are rejected: NoopTransformer's methods have value receivers, so *NoopTransformer also satisfies kcrypto.ValueTransformer and a value-only guard would let the pointer slip through. This rejection is ENFORCED by a runtime guard, not merely narrated (contrast configcore, which permits Noop for non-sensitive entries). pool must also be non-nil.
func (*WebhookSourceRepository) Delete ¶
Delete implements runtimewebhook.SourceRepo.
func (*WebhookSourceRepository) LoadAll ¶
LoadAll implements runtimewebhook.SourceRepo.
func (*WebhookSourceRepository) Upsert ¶
Upsert implements runtimewebhook.SourceRepo.
Source Files
¶
- audit_cross_tenant_store.go
- audit_ledger_store.go
- classify.go
- command_queue_store.go
- doc.go
- embed.go
- errors.go
- helpers.go
- journaling_outbox_writer.go
- migration_set.go
- migrator.go
- outbox_db.go
- outbox_store.go
- outbox_writer.go
- pool.go
- pool_statter.go
- projection_checkpoint_store.go
- projection_event_source.go
- reconcile_leader.go
- refresh_store.go
- schema_guard.go
- session_store.go
- tx_manager.go
- webhook_source_repo.go
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
pgexec
Package pgexec is the sealed PostgreSQL executor funnel for adapters/postgres.
|
Package pgexec is the sealed PostgreSQL executor funnel for adapters/postgres. |
|
Package saga implements kernel/saga/journal.Journal over PostgreSQL.
|
Package saga implements kernel/saga/journal.Journal over PostgreSQL. |
|
internal/pgexec
Package pgexec is the sealed PostgreSQL executor funnel for the saga adapter.
|
Package pgexec is the sealed PostgreSQL executor funnel for the saga adapter. |