Documentation
¶
Overview ¶
Package projection turns domain events into engine-neutral mutations and (in later phases) drives the consumer loops that apply them.
Appliers are pure: apply(Event) -> []Mutation. Mutations carry no dialect — no Cypher, no ES DSL. Adapters translate them (FalkorDB -> MERGE, Elasticsearch -> bulk ops) and gate on Version for idempotency: a mutation is skipped when the stored aggregate version is >= the mutation's version.
Index ¶
- type AppliedRecorder
- type Applier
- type ApplierFunc
- type CustomApplier
- type DocDeindex
- type DocIndex
- type Drift
- type EdgeDelete
- type EdgeUpsert
- type Engine
- type Mutation
- type NodeDelete
- type NodeUpsert
- type ProjectedVersions
- type Rebuilder
- type Reconciler
- type RelDelete
- type RelUpsert
- type RepairFunc
- type Sink
- type Snapshotter
- type Source
- type State
- type StateReader
- type StateRepo
- type TargetSink
- type TruthVersions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AppliedRecorder ¶
type AppliedRecorder interface {
SetApplied(ctx context.Context, tenantID, projection, aggregate, aggID string, version int64) error
}
AppliedRecorder records per-aggregate applied versions (WaitForProjection reads them back). Implemented by the postgres StateRepo and fabriqtest.
type Applier ¶
Applier is a pure event-to-mutations function. Implementations must be deterministic and side-effect free; all idempotency and dialect concerns live in adapters.
func GraphApplier ¶
GraphApplier derives the graph projection from the registry: created/updated -> NodeUpsert + edge maintenance from EdgeSpecs (non-empty FK -> EdgeUpsert, empty FK -> EdgeDelete); deleted -> NodeDelete. Unknown or non-graph entities produce no mutations.
func SearchApplier ¶
SearchApplier derives the search projection: created/updated -> DocIndex restricted to the declared search fields (plus the structural id, tenant_id and version); deleted -> DocDeindex.
type ApplierFunc ¶
ApplierFunc adapts a function to the Applier interface.
type CustomApplier ¶
type CustomApplier struct {
Target string // "" = any target, else must equal the engine's Projection
Entity string // "" = any aggregate, else must equal the event's Aggregate
Apply Applier
}
CustomApplier contributes extra mutations for a target beyond the built-in declarative applier. Apply MUST be deterministic and side-effect-free: given the same Envelope it must always return the same Mutations regardless of wall-clock time, feature flags, or any external state — this is what makes blue-green rebuilds produce identical projections. Specifically forbidden: network/database I/O, model calls, time.Now(), cross-aggregate reads, and randomness. A CustomApplier must be wired into BOTH Engine.Custom and the matching Rebuilder.Custom, or the live and rebuilt projections will diverge.
type DocDeindex ¶
DocDeindex removes a document from the search projection. Version carries the originating event version so engines can gate stale replays (external_gte semantics in Elasticsearch).
type DocIndex ¶
DocIndex indexes a document into the search projection. Index is the logical base name; adapters derive the tenant-routed target. Version feeds external version gating.
type Drift ¶
Drift is one aggregate whose projection disagrees with Postgres. TruthVersion 0 means the row no longer exists (the projection holds a zombie); ProjectedVersion 0 means the projection never saw it.
type EdgeDelete ¶
EdgeDelete removes one outgoing relationship of the given type from a node, regardless of target. Version carries the originating event's version so adapters can gate stale replays (an old EdgeDelete must not remove an edge a newer event created).
type EdgeUpsert ¶
type EdgeUpsert struct {
Rel string
FromLabel string
FromID string
ToLabel string
ToID string
Version int64
}
EdgeUpsert creates or refreshes a relationship between two nodes.
type Engine ¶
type Engine struct {
Projection string // "graph" | "search"
Group string // "proj:graph" | "proj:search"
Source Source
Sink Sink
Applier Applier
Custom []CustomApplier // optional; unioned after the built-in applier
Upcasters *event.UpcasterChain // optional; appliers see the latest shape
State AppliedRecorder
// TargetsFor lists the sink targets for a tenant's events. Default:
// [""] (live only). During a blue-green rebuild it returns the live
// AND building targets, so live events catch the new target up while
// the snapshot replays (version gating makes the overlap safe).
TargetsFor func(ctx context.Context, tenantID string) ([]string, error)
}
Engine consumes one projection's group and applies events: upcast -> pure applier -> sink (per target) -> applied bookkeeping. Handler success acks; failure leaves the entry pending for redelivery — at-least-once end to end, made safe by version-gated sinks.
type Mutation ¶
type Mutation interface {
// contains filtered or unexported methods
}
Mutation is the closed set of engine-neutral projection operations.
func ApplyChain ¶
func ApplyChain(builtin Applier, custom []CustomApplier, projection string, env event.Envelope) ([]Mutation, error)
ApplyChain runs the built-in applier then every matching custom applier, unioning their mutations. Pure.
type NodeDelete ¶
NodeDelete removes a node and, by contract, everything attached to it (graph adapters implement detach-delete semantics).
type NodeUpsert ¶
type NodeUpsert struct {
Label string
ExtraLabels []string // additional labels added via SET (e.g. []string{"Archived", "Featured"})
ID string
Props map[string]any
Version int64
}
NodeUpsert creates or updates a graph node. Props are column-keyed and always include "version"; adapters use Version for idempotency gating.
type ProjectedVersions ¶
type ProjectedVersions func(ctx context.Context, tenantID string, ent *registry.Entity) (map[string]int64, error)
ProjectedVersions reads id->version from a projection engine — implemented by the graph/search adapters.
type Rebuilder ¶
type Rebuilder struct {
Projection string // "graph" | "search"
State StateRepo
Sink TargetSink
Applier Applier
Custom []CustomApplier // MUST mirror the live Engine.Custom so rebuilds stay identical
Snapshot Snapshotter
// TargetName derives the versioned build target (registry naming).
TargetName func(tenantID string, modelVersion int) string
// OnFlip runs right after the state pointer flips — the seam for
// engine-side cutovers that must accompany it (Elasticsearch swaps
// the tenant aliases here, atomically, in one _aliases call).
OnFlip func(ctx context.Context, tenantID string, oldModelVersion, newModelVersion int) error
}
Rebuilder performs blue-green projection rebuilds:
- Mark projection_state status=building. From this moment the live engine dual-applies every event to the live AND building targets (Engine.TargetsFor) — the live catch-up.
- Replay the Postgres snapshot into the building target. Version gating makes the overlap with live applies safe in both orders.
- Flip: model_version++, target_name=building target, status=soaking. Readers resolve targets through projection_state, so the flip is atomic for them.
- Finalize (after soak): drop the old target, status=live.
func (*Rebuilder) Finalize ¶
Finalize ends the soak: drops the old target and marks the projection live. oldTarget may be empty (first rebuild: the unversioned default target was the implicit live one — pass it explicitly to drop it).
func (*Rebuilder) Rebuild ¶
func (r *Rebuilder) Rebuild(ctx context.Context, tenantID string) (oldTarget, newTarget string, err error)
Rebuild builds and flips; it returns the old and new target names (the old one is dropped by Finalize after the soak window, or immediately if the operator passes --drop-old).
type Reconciler ¶
type Reconciler struct {
Projection string
Registry *registry.Registry
Include func(ent *registry.Entity) bool // which entities this projection carries
Truth TruthVersions
Projected ProjectedVersions
Repair RepairFunc
}
Reconciler compares per-aggregate versions between Postgres and one projection and optionally repairs the differences. Run it scheduled (leader-elected in fabriq-worker) or on demand (`fabriq reconcile`).
A projection AHEAD of the truth scan is not drift — events legitimately land between the two reads; the version-gated pipeline converges on its own.
type RelDelete ¶
RelDelete removes the relationship whose r.id matches ID. Keyed on id alone so payload-less delete events (which still carry AggID) can address it. A bare {id}-only relationship left by a stale/out-of-order upsert that missed its SET is also correctly swept.
type RelUpsert ¶
type RelUpsert struct {
ID string
Type string
FromLabel string
FromID string
ToLabel string
ToID string
Props map[string]any
Version int64
}
RelUpsert creates or refreshes ONE specific relationship of a reified-edge entity, identified by its own id (stored as r.id). Unlike EdgeUpsert (foreign-key semantics) it does not prune sibling edges and carries scalar properties. Endpoints are merged by id under their identity labels.
type RepairFunc ¶
RepairFunc heals one drifted aggregate THROUGH THE NORMAL PIPELINE: republish the aggregate's latest event (missing/stale) or emit a synthetic deleted event (zombie) via the outbox — reconciliation never writes engines directly.
type Sink ¶
Sink applies translated mutations to one engine target (implemented by the graph/search adapters). target "" means the tenant's live target, resolved by the sink from the tenant on ctx.
type Snapshotter ¶
type Snapshotter interface {
SnapshotEntities(ctx context.Context, tenantID string, fn func(env event.Envelope) error) error
}
Snapshotter streams a tenant's aggregates as synthetic envelopes at their current version (implemented by adapters/postgres — rebuilds always replay FROM POSTGRES, never from another projection).
type Source ¶
type Source interface {
EnsureGroup(ctx context.Context, group string) error
Consume(ctx context.Context, group, consumer string, handle func(streamID string, env event.Envelope) error) error
}
Source is the consumer-group surface the engine reads from (implemented by adapters/redis).
type State ¶
type State struct {
TenantID string
Projection string // "graph" | "search"
ModelVersion int // bumped by rebuilds (the _v{N} suffix)
EventVersion string // last applied event ULID / stream position
Status string // "live" | "building" | "soaking" | "abandoned"
TargetName string // engine target currently receiving applies
}
State is one row of projection bookkeeping: the live pointer and stream position per (tenant, projection). target_name carries the blue-green pointer (tenant_{id}_v{N} graph or versioned index behind the alias).
type StateReader ¶
type StateReader interface {
// AppliedVersion returns the latest aggregate version the projection
// has durably applied (0 if never seen).
AppliedVersion(ctx context.Context, tenantID, projection, aggregate, aggID string) (int64, error)
}
StateReader reports how far a projection has applied a given aggregate. The Postgres-backed repo (phase 4) implements it from projection bookkeeping; fabriqtest provides an in-memory fake. WaitForProjection polls this port.
type StateRepo ¶
type StateRepo interface {
StateReader
Get(ctx context.Context, tenantID, projection string) (State, error)
Upsert(ctx context.Context, s State) error
}
StateRepo is the full bookkeeping port used by the projection engine, rebuild and reconcile (phase 4 implements it in adapters/postgres).
type TargetSink ¶
TargetSink is a Sink whose targets can also be dropped (rebuild cleanup).