postgres

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2026 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Package postgres is fabriq's Postgres adapter, built on grove's pg driver. It implements the relational, timeseries, vector and command store ports against the source of truth.

Tenancy is enforced in layers:

  1. Structurally: every operation runs inside a transaction stamped with SET LOCAL app.tenant_id (set_config(..., true)), and every generated query carries an explicit tenant predicate where applicable.
  2. In the database: RLS policies (FORCE) key on that setting, so even raw SQL through the escape hatch cannot cross tenants.
  3. Backstop: a grove pre-query/pre-mutation hook observes every query on both paths (grove >= a01144a fires hooks inside transactions too). It allows the stamped transaction path — RLS is the guard there — and DENIES any pool-path access to a tenant table, which in this architecture is always a bug, returning ErrTenantHookTripped and counting the trip. See docs/decisions/0002-tenancy-layers.md.

Dynamic-entity reads (IsDynamic() == true) scan rows into *[]map[string]any rather than typed structs. Grove's schema-aware Scan only handles struct/slice-of-struct destinations, so dynamic reads use the PgTx.QueryRows path: a driver.Rows cursor iterated manually with Columns()+Scan into individual any destinations, then assembled into maps. Static reads are byte-unchanged.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Adapter

type Adapter struct {
	// contains filtered or unexported fields
}

Adapter implements command.Store, query.RelationalQuerier, query.TSQuerier and query.VectorQuerier on grove/pgdriver.

func Open

func Open(ctx context.Context, dsn string, reg *registry.Registry, opts ...Option) (*Adapter, error)

Open connects to Postgres and wires the tenant backstop.

func (*Adapter) AggregateVersions

func (a *Adapter) AggregateVersions(ctx context.Context, tenantID, entity string) (map[string]int64, error)

AggregateVersions reads id -> version for one entity of a tenant — the reconciler's truth side (projection.TruthVersions).

func (*Adapter) BackstopTrips

func (a *Adapter) BackstopTrips() int64

BackstopTrips reports how many times the tenant backstop fired.

func (*Adapter) BulkWrite

func (a *Adapter) BulkWrite(ctx context.Context, series string, points []query.Point) error

BulkWrite implements query.TSQuerier — the event-bypass telemetry path. One multi-row INSERT per call; no per-row outbox events (the worker publishes conflated deltas instead).

func (*Adapter) Close

func (a *Adapter) Close() error

Close releases the connection pool.

func (*Adapter) Delete

func (a *Adapter) Delete(ctx context.Context, entity, id string) error

Delete implements query.VectorQuerier.

func (*Adapter) Documents

func (a *Adapter) Documents() *DocStore

Documents returns the document-plane store.

func (*Adapter) Driver

func (a *Adapter) Driver() *pgdriver.PgDB

Driver exposes the typed pg driver for worker-plane components living in this module (relay, leader, migrations CLI). Never hand it to application code.

func (*Adapter) EnsureDynamic

func (a *Adapter) EnsureDynamic(ctx context.Context, ent *registry.Entity) error

EnsureDynamic creates or ADDITIVELY EVOLVES the Postgres table for a dynamic entity from its descriptor: structural columns (id, tenant_id, version), declared domain columns, a tenant index, any descriptor-declared secondary indexes, and tenant-isolation RLS — mirroring the patterns in migrations/0003_site_asset_tag.go and migrations/0004_rls_policies.go.

Additive-evolution policy

When called on an already-existing table (e.g. because the consumer changed the descriptor by adding columns or indexes), EnsureDynamic reconciles the schema ADDITIVELY:

  • Each domain column is emitted as "ALTER TABLE … ADD COLUMN IF NOT EXISTS …", so new columns appear and existing ones are left untouched.
  • Each index is emitted as "CREATE INDEX IF NOT EXISTS …", so new indexes are created and existing ones are left untouched.

DROPS, RENAMES, and TYPE CHANGES are NOT auto-applied. If a column or index is removed from the descriptor, the physical column/index remains — evolution is strictly additive. Consumers that need destructive changes must write an explicit migration.

Attempting to add a NOT-NULL column (without a DEFAULT) to a table that already has rows will fail at the Postgres level. This is intentional: the consumer must either supply a Default expression in the descriptor or add the column as nullable. fabriq does not work around this safety boundary.

This is the FENCED managed-DDL lane: fabriq manages DDL ONLY for dynamic entities. Static entities keep migrations as the authority; calling EnsureDynamic for a non-dynamic entity (Spec.Schema == nil) is an error.

Run as the schema owner (superuser DSN), not the RLS-constrained app role.

The entity must be registered in the registry BEFORE the adapter is constructed (fabriq.Open), so the pool-path tenant backstop includes the dynamic table in its guarded set; EnsureDynamic only creates the physical table, it does not register the entity.

func (*Adapter) Get

func (a *Adapter) Get(ctx context.Context, entity, id string, into any) error

Get implements query.RelationalQuerier.

func (*Adapter) GetMany

func (a *Adapter) GetMany(ctx context.Context, entity string, ids []string, into any) error

GetMany implements the batched hydration contract: ONE query, results in ids order, missing rows skipped.

func (*Adapter) Grove

func (a *Adapter) Grove() *grove.DB

Grove exposes the grove handle (hook-guarded pool path) for advanced embedding. Tenant tables are NOT reachable through it — the backstop denies them; use the fabric ports.

func (*Adapter) InTenantTx

func (a *Adapter) InTenantTx(ctx context.Context, fn func(ctx context.Context, tx command.Tx) error) error

InTenantTx implements command.Store: one Postgres transaction, tenant stamped via SET LOCAL, row writes and outbox appends inside it.

func (*Adapter) List

func (a *Adapter) List(ctx context.Context, entity string, q query.ListQuery, into any) error

List implements equality-filtered, ordered paging. Filter and order columns are validated against the binding — unknown columns are rejected, which is also the SQL-injection guard.

func (*Adapter) NewLiveStore

func (a *Adapter) NewLiveStore() *LiveStore

NewLiveStore returns the live query snapshot/refill store for this adapter.

func (*Adapter) ProjectionState

func (a *Adapter) ProjectionState() *StateRepo

ProjectionState returns the projection bookkeeping repo.

func (*Adapter) Query

func (a *Adapter) Query(ctx context.Context, into any, sql string, args ...any) error

Query is the raw SQL escape hatch for reads. It still runs inside a tenant-stamped transaction, so RLS contains it; tables outside RLS (guarded tables) additionally require a literal tenant_id reference.

func (*Adapter) Range

func (a *Adapter) Range(ctx context.Context, q query.RangeQuery, into any) error

Range implements query.TSQuerier for raw points over [From, To). Bucketed aggregates land with the projection phase (time_bucket).

func (*Adapter) Repair

func (a *Adapter) Repair(ctx context.Context, tenantID string, d projection.Drift) error

Repair heals one drift through the ordinary pipeline (projection.RepairFunc):

  • missing/stale: upsert the aggregate's CURRENT state as its version's event and mark it unpublished — the relay republishes, version-gated consumers converge.
  • zombie (row gone): emit a synthetic <entity>.deleted one version past what the projection holds, so the delete applies everywhere.

Reconciliation never writes a projection engine directly.

func (*Adapter) Similar

func (a *Adapter) Similar(ctx context.Context, q query.VectorQuery, into any) error

Similar implements query.VectorQuerier: cosine nearest neighbours through the HNSW index.

func (*Adapter) SnapshotEntities

func (a *Adapter) SnapshotEntities(ctx context.Context, tenantID string, fn func(env event.Envelope) error) error

SnapshotEntities streams every aggregate row of a tenant as synthetic <entity>.updated envelopes at the row's CURRENT version and shape — the rebuild source (projections are always rebuilt from Postgres, never from another projection). Rows are paged in id order inside stamped transactions; because appliers are pure and sinks version-gate, a row that changes mid-snapshot is healed by the live catch-up applies.

func (*Adapter) TenantTxRaw

func (a *Adapter) TenantTxRaw(ctx context.Context, fn func(tx *pgdriver.PgTx) error) error

TenantTxRaw opens a tenant-stamped transaction for NON-command components (e.g. the CAS index) that need raw SQL under app.tenant_id + app.scope_id. It is the only sanctioned way to run blob_cas SQL outside the command plane; FORCE RLS isolates the work to the context tenant.

func (*Adapter) Upsert

func (a *Adapter) Upsert(ctx context.Context, entity, id string, embedding []float32, meta map[string]any) error

Upsert implements query.VectorQuerier.

type DocStore

type DocStore struct {
	// contains filtered or unexported fields
}

DocStore is the Postgres document plane: an append-only update log (fabriq_crdt_updates) folded through grove's CRDT merge engine, with compacted snapshots and quiet-window materialization into ordinary entity rows + outbox events.

Update blobs are JSON-encoded []crdt.ChangeRecord (the "grove-crdt" engine named by CRDTSpec). Document ids carry their entity: "<entity>/<ulid>" — the registry's KindDocument entry binds the relational shape materialization writes.

func (*DocStore) ApplyUpdate

func (d *DocStore) ApplyUpdate(ctx context.Context, docID string, update []byte) error

ApplyUpdate implements document.Store: append one update to the log and touch the doc's activity timestamp (the quiet-window clock).

func (*DocStore) ApplyUpdateWithSeq

func (d *DocStore) ApplyUpdateWithSeq(ctx context.Context, docID string, update []byte) (int64, error)

ApplyUpdateWithSeq is ApplyUpdate returning the assigned log seq — the live fan-out decorator stamps it on the published sync frame so clients can detect gaps and fall back to Sync.

func (*DocStore) Compact

func (d *DocStore) Compact(ctx context.Context, docID string) error

Compact implements document.Store: fold the log into the snapshot row and trim it, one transaction. Merge results never change — only their storage shape.

func (*DocStore) MaterializeQuiet

func (d *DocStore) MaterializeQuiet(ctx context.Context, validate ValidateFunc) (int, error)

MaterializeQuiet materializes every unflagged document whose last activity is older than its entity's QuietWindow and which has updates beyond the last materialization: merged state -> validation -> entity row write + ONE <entity>.updated event (version++) through the outbox. Returns the number of documents materialized.

func (*DocStore) Snapshot

func (d *DocStore) Snapshot(ctx context.Context, docID string) (document.Materialized, error)

Snapshot implements document.Store: merged current state (compacted snapshot + log tail) and the materialized aggregate version.

func (*DocStore) Sync

func (d *DocStore) Sync(ctx context.Context, docID string, stateVector []byte) ([]byte, error)

Sync implements document.Store: the state vector is an 8-byte big-endian last-seen seq (empty = from the beginning); the reply holds the compacted snapshot (when the client is behind it) plus every later update, and the new vector seq.

type Elector

type Elector struct {
	// contains filtered or unexported fields
}

Elector provides advisory-lock leadership for singleton runners (the outbox relay, the reconciler). It holds pg_try_advisory_lock on a DEDICATED pooled connection (grove's ConnAcquirer), so the session-level lock cannot leak to other pool users; a connection-liveness watchdog abdicates if the session dies (the lock died with it).

fabriq-worker can therefore run any number of replicas: exactly one holds each role.

func NewElector

func NewElector(a *Adapter, key int64, opts ...ElectorOption) *Elector

NewElector builds an elector for an advisory lock key. Pick one key per role (e.g. relay, reconciler) and keep them stable across versions.

func (*Elector) Run

func (e *Elector) Run(ctx context.Context, lead func(ctx context.Context) error) error

Run keeps trying to lead until ctx ends. While leading it runs lead with a context that is cancelled on abdication (session loss or ctx end). lead returning (even nil) abdicates and re-campaigns.

type ElectorOption

type ElectorOption func(*Elector)

ElectorOption tunes the elector.

func WithElectorHeartbeat

func WithElectorHeartbeat(d time.Duration) ElectorOption

WithElectorHeartbeat sets the leader's session-liveness check cadence (default 5s).

func WithElectorRetry

func WithElectorRetry(d time.Duration) ElectorOption

WithElectorRetry sets how often a non-leader retries acquisition (default 5s).

type LiveStore

type LiveStore struct {
	// contains filtered or unexported fields
}

LiveStore implements livequery.Snapshotter and livequery.Refiller over Postgres via the existing tenant-stamped read path. Ordering and the keyset boundary are Postgres-authoritative — this is the exact-top-N oracle. Reads run inside an RLS-scoped transaction (set_config app.tenant_id), so the app role only ever sees its own tenant's rows.

func (*LiveStore) After

func (s *LiveStore) After(ctx context.Context, q livequery.LiveQuery, after livequery.Cursor, limit int) ([]livequery.Row, error)

After returns up to `limit` rows strictly after `after` in total order — the bounded keyset boundary refill.

func (*LiveStore) Members

func (s *LiveStore) Members(ctx context.Context, q livequery.LiveQuery) ([]string, error)

Members returns every aggregate id currently matching the query's filter (tenant-scoped, RLS-enforced) — the membership seed for a Streamed subscription. No ordering or payloads; just ids.

func (*LiveStore) Snapshot

func (s *LiveStore) Snapshot(ctx context.Context, q livequery.LiveQuery, limit int) ([]livequery.Row, error)

Snapshot returns the first `limit` rows from the anchor in total order.

type LiveSubscriptionRegistry

type LiveSubscriptionRegistry struct {
	// contains filtered or unexported fields
}

LiveSubscriptionRegistry is the Postgres-backed durable live subscription registry (fabriq_live_subscriptions) — the backbone the sharded matcher tier uses to recover subscriptions after failover. It is worker-plane (the table has no RLS); construct it on the worker/owner connection.

func NewLiveSubscriptionRegistry

func NewLiveSubscriptionRegistry(db *pgdriver.PgDB) *LiveSubscriptionRegistry

NewLiveSubscriptionRegistry returns the durable registry over a Postgres handle (e.g. adapter.Driver()).

func (*LiveSubscriptionRegistry) ByGateway

func (r *LiveSubscriptionRegistry) ByGateway(ctx context.Context, gatewayID string) ([]livequery.Registration, error)

func (*LiveSubscriptionRegistry) ByPartition

func (r *LiveSubscriptionRegistry) ByPartition(ctx context.Context, tenantID, entity string) ([]livequery.Registration, error)

func (*LiveSubscriptionRegistry) ByPartitionNum

func (r *LiveSubscriptionRegistry) ByPartitionNum(ctx context.Context, p int) ([]livequery.Registration, error)

func (*LiveSubscriptionRegistry) Delete

func (r *LiveSubscriptionRegistry) Delete(ctx context.Context, subID string) error

func (*LiveSubscriptionRegistry) Put

type Option

type Option func(*openConfig)

Option configures Open.

func WithGuardedTables

func WithGuardedTables(tables ...string) Option

WithGuardedTables adds tables to the tenant guard beyond the registry's (e.g. the telemetry hypertable, which has no RLS because Timescale columnstore forbids it).

func WithPoolSize

func WithPoolSize(n int) Option

WithPoolSize sets the pgx pool size.

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

Relay is the outbox relay: it drains unpublished outbox rows (FOR UPDATE SKIP LOCKED, ULID order) and publishes them through an event.Publisher, woken by LISTEN/NOTIFY with interval polling as the safety net.

Delivery is at-least-once: rows are published before being marked, so a crash between the two replays the event; consumers are version-gated idempotent by contract. Run exactly one active relay (wrap Run in an Elector) — multiple relays are safe (SKIP LOCKED) but waste publishes.

func NewRelay

func NewRelay(a *Adapter, reg *registry.Registry, pub event.Publisher, opts ...RelayOption) *Relay

NewRelay builds a relay on the adapter's pool.

func (*Relay) Backlog

func (r *Relay) Backlog(ctx context.Context) (int64, error)

Backlog reports the unpublished outbox depth (metrics).

func (*Relay) Run

func (r *Relay) Run(ctx context.Context) error

Run drains until ctx ends.

type RelayOption

type RelayOption func(*Relay)

RelayOption tunes the relay.

func WithRelayBatch

func WithRelayBatch(n int) RelayOption

WithRelayBatch sets the per-transaction drain batch (default 128).

func WithRelayOnPublish

func WithRelayOnPublish(fn func(n int)) RelayOption

WithRelayOnPublish installs a per-batch callback (metrics).

func WithRelayPollInterval

func WithRelayPollInterval(d time.Duration) RelayOption

WithRelayPollInterval sets the fallback poll cadence (default 1s; NOTIFY normally wakes the relay first).

type SpatialAdapter

type SpatialAdapter struct {
	// contains filtered or unexported fields
}

SpatialAdapter wraps Adapter to implement query.SpatialQuerier. A separate type is required because *Adapter already carries Upsert for query.VectorQuerier ([]float32 embedding) — Go does not allow two methods with the same name on one type, so the spatial variant lives here.

func NewSpatialAdapter

func NewSpatialAdapter(a *Adapter) *SpatialAdapter

NewSpatialAdapter wraps an existing Postgres adapter for geometry operations.

func (*SpatialAdapter) Delete

func (s *SpatialAdapter) Delete(ctx context.Context, entity, id string) error

Delete implements query.SpatialQuerier.

func (*SpatialAdapter) Upsert

func (s *SpatialAdapter) Upsert(ctx context.Context, entity, id string, geom query.Geometry, meta map[string]any) error

Upsert implements query.SpatialQuerier: store/replace a geometry from WKT+SRID.

func (*SpatialAdapter) Within

func (s *SpatialAdapter) Within(ctx context.Context, q query.SpatialQuery, into any) error

Within implements query.SpatialQuerier: GiST-accelerated radius search, nearest-first. For SRID 4326 distance/predicate use the geography cast (true metres); otherwise planar metres in the geometry's own units.

type StateRepo

type StateRepo struct {
	// contains filtered or unexported fields
}

StateRepo is the Postgres-backed projection bookkeeping (worker-plane tables, no RLS — consumers and the reconciler are cross-tenant by design). It implements projection.StateRepo.

func (*StateRepo) AppliedVersion

func (r *StateRepo) AppliedVersion(ctx context.Context, tenantID, proj, aggregate, aggID string) (int64, error)

AppliedVersion implements projection.StateReader.

func (*StateRepo) Get

func (r *StateRepo) Get(ctx context.Context, tenantID, proj string) (projection.State, error)

Get implements projection.StateRepo.

func (*StateRepo) SetApplied

func (r *StateRepo) SetApplied(ctx context.Context, tenantID, proj, aggregate, aggID string, version int64) error

SetApplied records a projection apply; the watermark never regresses.

func (*StateRepo) Tenants

func (r *StateRepo) Tenants(ctx context.Context) ([]string, error)

Tenants lists every tenant that has ever emitted an event (worker-plane discovery for rebuild --all-tenants and the reconciler; the outbox has no RLS, so this sees across tenants by design).

func (*StateRepo) Upsert

func (r *StateRepo) Upsert(ctx context.Context, s projection.State) error

Upsert implements projection.StateRepo.

type ValidateFunc

type ValidateFunc func(entity string, vals map[string]any) error

ValidateFunc is the post-merge validation hook: CRDTs converge but do not guarantee business validity. A non-nil error flags the document for resolution instead of materializing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL