Documentation
¶
Overview ¶
Package postgres is fabriq's Postgres adapter, built on grove's pg driver. It implements the relational, timeseries, vector and command store ports against the source of truth.
Tenancy is enforced in layers:
- Structurally: every operation runs inside a transaction stamped with SET LOCAL app.tenant_id (set_config(..., true)), and every generated query carries an explicit tenant predicate where applicable.
- In the database: RLS policies (FORCE) key on that setting, so even raw SQL through the escape hatch cannot cross tenants.
- Backstop: a grove pre-query/pre-mutation hook observes every query on both paths (grove >= a01144a fires hooks inside transactions too). It allows the stamped transaction path — RLS is the guard there — and DENIES any pool-path access to a tenant table, which in this architecture is always a bug, returning ErrTenantHookTripped and counting the trip. See docs/decisions/0002-tenancy-layers.md.
Dynamic-entity reads (IsDynamic() == true) scan rows into *[]map[string]any rather than typed structs. Grove's schema-aware Scan only handles struct/slice-of-struct destinations, so dynamic reads use the PgTx.QueryRows path: a driver.Rows cursor iterated manually with Columns()+Scan into individual any destinations, then assembled into maps. Static reads are byte-unchanged.
Index ¶
- type Adapter
- func (a *Adapter) AggregateVersions(ctx context.Context, tenantID, entity string) (map[string]int64, error)
- func (a *Adapter) BackstopTrips() int64
- func (a *Adapter) BulkWrite(ctx context.Context, series string, points []query.Point) error
- func (a *Adapter) Close() error
- func (a *Adapter) Delete(ctx context.Context, entity, id string) error
- func (a *Adapter) Documents() *DocStore
- func (a *Adapter) Driver() *pgdriver.PgDB
- func (a *Adapter) EnsureDynamic(ctx context.Context, ent *registry.Entity) error
- func (a *Adapter) Get(ctx context.Context, entity, id string, into any) error
- func (a *Adapter) GetMany(ctx context.Context, entity string, ids []string, into any) error
- func (a *Adapter) Grove() *grove.DB
- func (a *Adapter) InTenantTx(ctx context.Context, fn func(ctx context.Context, tx command.Tx) error) error
- func (a *Adapter) List(ctx context.Context, entity string, q query.ListQuery, into any) error
- func (a *Adapter) NewLiveStore() *LiveStore
- func (a *Adapter) ProjectionState() *StateRepo
- func (a *Adapter) Query(ctx context.Context, into any, sql string, args ...any) error
- func (a *Adapter) Range(ctx context.Context, q query.RangeQuery, into any) error
- func (a *Adapter) Repair(ctx context.Context, tenantID string, d projection.Drift) error
- func (a *Adapter) Similar(ctx context.Context, q query.VectorQuery, into any) error
- func (a *Adapter) SnapshotEntities(ctx context.Context, tenantID string, fn func(env event.Envelope) error) error
- func (a *Adapter) TenantTxRaw(ctx context.Context, fn func(tx *pgdriver.PgTx) error) error
- func (a *Adapter) Upsert(ctx context.Context, entity, id string, embedding []float32, ...) error
- type DocStore
- func (d *DocStore) ApplyUpdate(ctx context.Context, docID string, update []byte) error
- func (d *DocStore) ApplyUpdateWithSeq(ctx context.Context, docID string, update []byte) (int64, error)
- func (d *DocStore) Compact(ctx context.Context, docID string) error
- func (d *DocStore) MaterializeQuiet(ctx context.Context, validate ValidateFunc) (int, error)
- func (d *DocStore) Snapshot(ctx context.Context, docID string) (document.Materialized, error)
- func (d *DocStore) Sync(ctx context.Context, docID string, stateVector []byte) ([]byte, error)
- type Elector
- type ElectorOption
- type LiveStore
- func (s *LiveStore) After(ctx context.Context, q livequery.LiveQuery, after livequery.Cursor, limit int) ([]livequery.Row, error)
- func (s *LiveStore) Members(ctx context.Context, q livequery.LiveQuery) ([]string, error)
- func (s *LiveStore) Snapshot(ctx context.Context, q livequery.LiveQuery, limit int) ([]livequery.Row, error)
- type LiveSubscriptionRegistry
- func (r *LiveSubscriptionRegistry) ByGateway(ctx context.Context, gatewayID string) ([]livequery.Registration, error)
- func (r *LiveSubscriptionRegistry) ByPartition(ctx context.Context, tenantID, entity string) ([]livequery.Registration, error)
- func (r *LiveSubscriptionRegistry) ByPartitionNum(ctx context.Context, p int) ([]livequery.Registration, error)
- func (r *LiveSubscriptionRegistry) Delete(ctx context.Context, subID string) error
- func (r *LiveSubscriptionRegistry) Put(ctx context.Context, reg livequery.Registration) error
- type Option
- type Relay
- type RelayOption
- type SpatialAdapter
- type StateRepo
- func (r *StateRepo) AppliedVersion(ctx context.Context, tenantID, proj, aggregate, aggID string) (int64, error)
- func (r *StateRepo) Get(ctx context.Context, tenantID, proj string) (projection.State, error)
- func (r *StateRepo) SetApplied(ctx context.Context, tenantID, proj, aggregate, aggID string, version int64) error
- func (r *StateRepo) Tenants(ctx context.Context) ([]string, error)
- func (r *StateRepo) Upsert(ctx context.Context, s projection.State) error
- type ValidateFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Adapter ¶
type Adapter struct {
// contains filtered or unexported fields
}
Adapter implements command.Store, query.RelationalQuerier, query.TSQuerier and query.VectorQuerier on grove/pgdriver.
func Open ¶
func Open(ctx context.Context, dsn string, reg *registry.Registry, opts ...Option) (*Adapter, error)
Open connects to Postgres and wires the tenant backstop.
func (*Adapter) AggregateVersions ¶
func (a *Adapter) AggregateVersions(ctx context.Context, tenantID, entity string) (map[string]int64, error)
AggregateVersions reads id -> version for one entity of a tenant — the reconciler's truth side (projection.TruthVersions).
func (*Adapter) BackstopTrips ¶
BackstopTrips reports how many times the tenant backstop fired.
func (*Adapter) BulkWrite ¶
BulkWrite implements query.TSQuerier — the event-bypass telemetry path. One multi-row INSERT per call; no per-row outbox events (the worker publishes conflated deltas instead).
func (*Adapter) Driver ¶
Driver exposes the typed pg driver for worker-plane components living in this module (relay, leader, migrations CLI). Never hand it to application code.
func (*Adapter) EnsureDynamic ¶
EnsureDynamic creates or ADDITIVELY EVOLVES the Postgres table for a dynamic entity from its descriptor: structural columns (id, tenant_id, version), declared domain columns, a tenant index, any descriptor-declared secondary indexes, and tenant-isolation RLS — mirroring the patterns in migrations/0003_site_asset_tag.go and migrations/0004_rls_policies.go.
Additive-evolution policy ¶
When called on an already-existing table (e.g. because the consumer changed the descriptor by adding columns or indexes), EnsureDynamic reconciles the schema ADDITIVELY:
- Each domain column is emitted as "ALTER TABLE … ADD COLUMN IF NOT EXISTS …", so new columns appear and existing ones are left untouched.
- Each index is emitted as "CREATE INDEX IF NOT EXISTS …", so new indexes are created and existing ones are left untouched.
DROPS, RENAMES, and TYPE CHANGES are NOT auto-applied. If a column or index is removed from the descriptor, the physical column/index remains — evolution is strictly additive. Consumers that need destructive changes must write an explicit migration.
Attempting to add a NOT-NULL column (without a DEFAULT) to a table that already has rows will fail at the Postgres level. This is intentional: the consumer must either supply a Default expression in the descriptor or add the column as nullable. fabriq does not work around this safety boundary.
This is the FENCED managed-DDL lane: fabriq manages DDL ONLY for dynamic entities. Static entities keep migrations as the authority; calling EnsureDynamic for a non-dynamic entity (Spec.Schema == nil) is an error.
Run as the schema owner (superuser DSN), not the RLS-constrained app role.
The entity must be registered in the registry BEFORE the adapter is constructed (fabriq.Open), so the pool-path tenant backstop includes the dynamic table in its guarded set; EnsureDynamic only creates the physical table, it does not register the entity.
func (*Adapter) GetMany ¶
GetMany implements the batched hydration contract: ONE query, results in ids order, missing rows skipped.
func (*Adapter) Grove ¶
Grove exposes the grove handle (hook-guarded pool path) for advanced embedding. Tenant tables are NOT reachable through it — the backstop denies them; use the fabric ports.
func (*Adapter) InTenantTx ¶
func (a *Adapter) InTenantTx(ctx context.Context, fn func(ctx context.Context, tx command.Tx) error) error
InTenantTx implements command.Store: one Postgres transaction, tenant stamped via SET LOCAL, row writes and outbox appends inside it.
func (*Adapter) List ¶
List implements equality-filtered, ordered paging. Filter and order columns are validated against the binding — unknown columns are rejected, which is also the SQL-injection guard.
func (*Adapter) NewLiveStore ¶
NewLiveStore returns the live query snapshot/refill store for this adapter.
func (*Adapter) ProjectionState ¶
ProjectionState returns the projection bookkeeping repo.
func (*Adapter) Query ¶
Query is the raw SQL escape hatch for reads. It still runs inside a tenant-stamped transaction, so RLS contains it; tables outside RLS (guarded tables) additionally require a literal tenant_id reference.
func (*Adapter) Range ¶
Range implements query.TSQuerier for raw points over [From, To). Bucketed aggregates land with the projection phase (time_bucket).
func (*Adapter) Repair ¶
Repair heals one drift through the ordinary pipeline (projection.RepairFunc):
- missing/stale: upsert the aggregate's CURRENT state as its version's event and mark it unpublished — the relay republishes, version-gated consumers converge.
- zombie (row gone): emit a synthetic <entity>.deleted one version past what the projection holds, so the delete applies everywhere.
Reconciliation never writes a projection engine directly.
func (*Adapter) Similar ¶
Similar implements query.VectorQuerier: cosine nearest neighbours through the HNSW index.
func (*Adapter) SnapshotEntities ¶
func (a *Adapter) SnapshotEntities(ctx context.Context, tenantID string, fn func(env event.Envelope) error) error
SnapshotEntities streams every aggregate row of a tenant as synthetic <entity>.updated envelopes at the row's CURRENT version and shape — the rebuild source (projections are always rebuilt from Postgres, never from another projection). Rows are paged in id order inside stamped transactions; because appliers are pure and sinks version-gate, a row that changes mid-snapshot is healed by the live catch-up applies.
func (*Adapter) TenantTxRaw ¶
TenantTxRaw opens a tenant-stamped transaction for NON-command components (e.g. the CAS index) that need raw SQL under app.tenant_id + app.scope_id. It is the only sanctioned way to run blob_cas SQL outside the command plane; FORCE RLS isolates the work to the context tenant.
type DocStore ¶
type DocStore struct {
// contains filtered or unexported fields
}
DocStore is the Postgres document plane: an append-only update log (fabriq_crdt_updates) folded through grove's CRDT merge engine, with compacted snapshots and quiet-window materialization into ordinary entity rows + outbox events.
Update blobs are JSON-encoded []crdt.ChangeRecord (the "grove-crdt" engine named by CRDTSpec). Document ids carry their entity: "<entity>/<ulid>" — the registry's KindDocument entry binds the relational shape materialization writes.
func (*DocStore) ApplyUpdate ¶
ApplyUpdate implements document.Store: append one update to the log and touch the doc's activity timestamp (the quiet-window clock).
func (*DocStore) ApplyUpdateWithSeq ¶
func (d *DocStore) ApplyUpdateWithSeq(ctx context.Context, docID string, update []byte) (int64, error)
ApplyUpdateWithSeq is ApplyUpdate returning the assigned log seq — the live fan-out decorator stamps it on the published sync frame so clients can detect gaps and fall back to Sync.
func (*DocStore) Compact ¶
Compact implements document.Store: fold the log into the snapshot row and trim it, one transaction. Merge results never change — only their storage shape.
func (*DocStore) MaterializeQuiet ¶
MaterializeQuiet materializes every unflagged document whose last activity is older than its entity's QuietWindow and which has updates beyond the last materialization: merged state -> validation -> entity row write + ONE <entity>.updated event (version++) through the outbox. Returns the number of documents materialized.
func (*DocStore) Snapshot ¶
Snapshot implements document.Store: merged current state (compacted snapshot + log tail) and the materialized aggregate version.
type Elector ¶
type Elector struct {
// contains filtered or unexported fields
}
Elector provides advisory-lock leadership for singleton runners (the outbox relay, the reconciler). It holds pg_try_advisory_lock on a DEDICATED pooled connection (grove's ConnAcquirer), so the session-level lock cannot leak to other pool users; a connection-liveness watchdog abdicates if the session dies (the lock died with it).
fabriq-worker can therefore run any number of replicas: exactly one holds each role.
func NewElector ¶
func NewElector(a *Adapter, key int64, opts ...ElectorOption) *Elector
NewElector builds an elector for an advisory lock key. Pick one key per role (e.g. relay, reconciler) and keep them stable across versions.
type ElectorOption ¶
type ElectorOption func(*Elector)
ElectorOption tunes the elector.
func WithElectorHeartbeat ¶
func WithElectorHeartbeat(d time.Duration) ElectorOption
WithElectorHeartbeat sets the leader's session-liveness check cadence (default 5s).
func WithElectorRetry ¶
func WithElectorRetry(d time.Duration) ElectorOption
WithElectorRetry sets how often a non-leader retries acquisition (default 5s).
type LiveStore ¶
type LiveStore struct {
// contains filtered or unexported fields
}
LiveStore implements livequery.Snapshotter and livequery.Refiller over Postgres via the existing tenant-stamped read path. Ordering and the keyset boundary are Postgres-authoritative — this is the exact-top-N oracle. Reads run inside an RLS-scoped transaction (set_config app.tenant_id), so the app role only ever sees its own tenant's rows.
func (*LiveStore) After ¶
func (s *LiveStore) After(ctx context.Context, q livequery.LiveQuery, after livequery.Cursor, limit int) ([]livequery.Row, error)
After returns up to `limit` rows strictly after `after` in total order — the bounded keyset boundary refill.
type LiveSubscriptionRegistry ¶
type LiveSubscriptionRegistry struct {
// contains filtered or unexported fields
}
LiveSubscriptionRegistry is the Postgres-backed durable live subscription registry (fabriq_live_subscriptions) — the backbone the sharded matcher tier uses to recover subscriptions after failover. It is worker-plane (the table has no RLS); construct it on the worker/owner connection.
func NewLiveSubscriptionRegistry ¶
func NewLiveSubscriptionRegistry(db *pgdriver.PgDB) *LiveSubscriptionRegistry
NewLiveSubscriptionRegistry returns the durable registry over a Postgres handle (e.g. adapter.Driver()).
func (*LiveSubscriptionRegistry) ByGateway ¶
func (r *LiveSubscriptionRegistry) ByGateway(ctx context.Context, gatewayID string) ([]livequery.Registration, error)
func (*LiveSubscriptionRegistry) ByPartition ¶
func (r *LiveSubscriptionRegistry) ByPartition(ctx context.Context, tenantID, entity string) ([]livequery.Registration, error)
func (*LiveSubscriptionRegistry) ByPartitionNum ¶
func (r *LiveSubscriptionRegistry) ByPartitionNum(ctx context.Context, p int) ([]livequery.Registration, error)
func (*LiveSubscriptionRegistry) Delete ¶
func (r *LiveSubscriptionRegistry) Delete(ctx context.Context, subID string) error
func (*LiveSubscriptionRegistry) Put ¶
func (r *LiveSubscriptionRegistry) Put(ctx context.Context, reg livequery.Registration) error
type Option ¶
type Option func(*openConfig)
Option configures Open.
func WithGuardedTables ¶
WithGuardedTables adds tables to the tenant guard beyond the registry's (e.g. the telemetry hypertable, which has no RLS because Timescale columnstore forbids it).
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
Relay is the outbox relay: it drains unpublished outbox rows (FOR UPDATE SKIP LOCKED, ULID order) and publishes them through an event.Publisher, woken by LISTEN/NOTIFY with interval polling as the safety net.
Delivery is at-least-once: rows are published before being marked, so a crash between the two replays the event; consumers are version-gated idempotent by contract. Run exactly one active relay (wrap Run in an Elector) — multiple relays are safe (SKIP LOCKED) but waste publishes.
type RelayOption ¶
type RelayOption func(*Relay)
RelayOption tunes the relay.
func WithRelayBatch ¶
func WithRelayBatch(n int) RelayOption
WithRelayBatch sets the per-transaction drain batch (default 128).
func WithRelayOnPublish ¶
func WithRelayOnPublish(fn func(n int)) RelayOption
WithRelayOnPublish installs a per-batch callback (metrics).
func WithRelayPollInterval ¶
func WithRelayPollInterval(d time.Duration) RelayOption
WithRelayPollInterval sets the fallback poll cadence (default 1s; NOTIFY normally wakes the relay first).
type SpatialAdapter ¶
type SpatialAdapter struct {
// contains filtered or unexported fields
}
SpatialAdapter wraps Adapter to implement query.SpatialQuerier. A separate type is required because *Adapter already carries Upsert for query.VectorQuerier ([]float32 embedding) — Go does not allow two methods with the same name on one type, so the spatial variant lives here.
func NewSpatialAdapter ¶
func NewSpatialAdapter(a *Adapter) *SpatialAdapter
NewSpatialAdapter wraps an existing Postgres adapter for geometry operations.
func (*SpatialAdapter) Delete ¶
func (s *SpatialAdapter) Delete(ctx context.Context, entity, id string) error
Delete implements query.SpatialQuerier.
func (*SpatialAdapter) Upsert ¶
func (s *SpatialAdapter) Upsert(ctx context.Context, entity, id string, geom query.Geometry, meta map[string]any) error
Upsert implements query.SpatialQuerier: store/replace a geometry from WKT+SRID.
func (*SpatialAdapter) Within ¶
func (s *SpatialAdapter) Within(ctx context.Context, q query.SpatialQuery, into any) error
Within implements query.SpatialQuerier: GiST-accelerated radius search, nearest-first. For SRID 4326 distance/predicate use the geography cast (true metres); otherwise planar metres in the geometry's own units.
type StateRepo ¶
type StateRepo struct {
// contains filtered or unexported fields
}
StateRepo is the Postgres-backed projection bookkeeping (worker-plane tables, no RLS — consumers and the reconciler are cross-tenant by design). It implements projection.StateRepo.
func (*StateRepo) AppliedVersion ¶
func (r *StateRepo) AppliedVersion(ctx context.Context, tenantID, proj, aggregate, aggID string) (int64, error)
AppliedVersion implements projection.StateReader.
func (*StateRepo) SetApplied ¶
func (r *StateRepo) SetApplied(ctx context.Context, tenantID, proj, aggregate, aggID string, version int64) error
SetApplied records a projection apply; the watermark never regresses.