projection

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package projection turns domain events into engine-neutral mutations and (in later phases) drives the consumer loops that apply them.

Appliers are pure: apply(Event) -> []Mutation. Mutations carry no dialect — no Cypher, no ES DSL. Adapters translate them (FalkorDB -> MERGE, Elasticsearch -> bulk ops) and gate on Version for idempotency: a mutation is skipped when the stored aggregate version is >= the mutation's version.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AppliedRecorder

type AppliedRecorder interface {
	SetApplied(ctx context.Context, tenantID, projection, aggregate, aggID string, version int64) error
}

AppliedRecorder records per-aggregate applied versions (WaitForProjection reads them back). Implemented by the postgres StateRepo and fabriqtest.

type Applier

type Applier interface {
	Apply(env event.Envelope) ([]Mutation, error)
}

Applier is a pure event-to-mutations function. Implementations must be deterministic and side-effect free; all idempotency and dialect concerns live in adapters.

func GraphApplier

func GraphApplier(reg *registry.Registry) Applier

GraphApplier derives the graph projection from the registry: created/updated -> NodeUpsert + edge maintenance from EdgeSpecs (non-empty FK -> EdgeUpsert, empty FK -> EdgeDelete); deleted -> NodeDelete. Unknown or non-graph entities produce no mutations.

func SearchApplier

func SearchApplier(reg *registry.Registry) Applier

SearchApplier derives the search projection: created/updated -> DocIndex restricted to the declared search fields (plus the structural id, tenant_id and version); deleted -> DocDeindex.

type ApplierFunc

type ApplierFunc func(env event.Envelope) ([]Mutation, error)

ApplierFunc adapts a function to the Applier interface.

func (ApplierFunc) Apply

func (f ApplierFunc) Apply(env event.Envelope) ([]Mutation, error)

Apply implements Applier.

type CustomApplier

type CustomApplier struct {
	Target string // "" = any target, else must equal the engine's Projection
	Entity string // "" = any aggregate, else must equal the event's Aggregate
	Apply  Applier
}

CustomApplier contributes extra mutations for a target beyond the built-in declarative applier. Apply MUST be deterministic and side-effect-free: given the same Envelope it must always return the same Mutations regardless of wall-clock time, feature flags, or any external state — this is what makes blue-green rebuilds produce identical projections. Specifically forbidden: network/database I/O, model calls, time.Now(), cross-aggregate reads, and randomness. A CustomApplier must be wired into BOTH Engine.Custom and the matching Rebuilder.Custom, or the live and rebuilt projections will diverge.

type DocDeindex

type DocDeindex struct {
	Index   string
	ID      string
	Version int64
}

DocDeindex removes a document from the search projection. Version carries the originating event version so engines can gate stale replays (external_gte semantics in Elasticsearch).

type DocIndex

type DocIndex struct {
	Index   string
	ID      string
	Doc     map[string]any
	Version int64
}

DocIndex indexes a document into the search projection. Index is the logical base name; adapters derive the tenant-routed target. Version feeds external version gating.

type Drift

type Drift struct {
	Entity           string
	AggID            string
	TruthVersion     int64
	ProjectedVersion int64
}

Drift is one aggregate whose projection disagrees with Postgres. TruthVersion 0 means the row no longer exists (the projection holds a zombie); ProjectedVersion 0 means the projection never saw it.

type EdgeDelete

type EdgeDelete struct {
	Rel       string
	FromLabel string
	FromID    string
	Version   int64
}

EdgeDelete removes one outgoing relationship of the given type from a node, regardless of target. Version carries the originating event's version so adapters can gate stale replays (an old EdgeDelete must not remove an edge a newer event created).

type EdgeUpsert

type EdgeUpsert struct {
	Rel       string
	FromLabel string
	FromID    string
	ToLabel   string
	ToID      string
	Version   int64
}

EdgeUpsert creates or refreshes a relationship between two nodes.

type Engine

type Engine struct {
	Projection string // "graph" | "search"
	Group      string // "proj:graph" | "proj:search"
	Source     Source
	Sink       Sink
	Applier    Applier
	Custom     []CustomApplier      // optional; unioned after the built-in applier
	Upcasters  *event.UpcasterChain // optional; appliers see the latest shape
	State      AppliedRecorder

	// TargetsFor lists the sink targets for a tenant's events. Default:
	// [""] (live only). During a blue-green rebuild it returns the live
	// AND building targets, so live events catch the new target up while
	// the snapshot replays (version gating makes the overlap safe).
	TargetsFor func(ctx context.Context, tenantID string) ([]string, error)
}

Engine consumes one projection's group and applies events: upcast -> pure applier -> sink (per target) -> applied bookkeeping. Handler success acks; failure leaves the entry pending for redelivery — at-least-once end to end, made safe by version-gated sinks.

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, consumer string) error

Run consumes until ctx ends. Scale by running replicas with distinct consumer names — consumer groups need no leader election.

type Mutation

type Mutation interface {
	// contains filtered or unexported methods
}

Mutation is the closed set of engine-neutral projection operations.

func ApplyChain

func ApplyChain(builtin Applier, custom []CustomApplier, projection string, env event.Envelope) ([]Mutation, error)

ApplyChain runs the built-in applier then every matching custom applier, unioning their mutations. Pure.

type NodeDelete

type NodeDelete struct {
	Label string
	ID    string
}

NodeDelete removes a node and, by contract, everything attached to it (graph adapters implement detach-delete semantics).

type NodeUpsert

type NodeUpsert struct {
	Label       string
	ExtraLabels []string // additional labels added via SET (e.g. []string{"Archived", "Featured"})
	ID          string
	Props       map[string]any
	Version     int64
}

NodeUpsert creates or updates a graph node. Props are column-keyed and always include "version"; adapters use Version for idempotency gating.

type ProjectedVersions

type ProjectedVersions func(ctx context.Context, tenantID string, ent *registry.Entity) (map[string]int64, error)

ProjectedVersions reads id->version from a projection engine — implemented by the graph/search adapters.

type Rebuilder

type Rebuilder struct {
	Projection string // "graph" | "search"
	State      StateRepo
	Sink       TargetSink
	Applier    Applier
	Custom     []CustomApplier // MUST mirror the live Engine.Custom so rebuilds stay identical
	Snapshot   Snapshotter
	// TargetName derives the versioned build target (registry naming).
	TargetName func(tenantID string, modelVersion int) string
	// OnFlip runs right after the state pointer flips — the seam for
	// engine-side cutovers that must accompany it (Elasticsearch swaps
	// the tenant aliases here, atomically, in one _aliases call).
	OnFlip func(ctx context.Context, tenantID string, oldModelVersion, newModelVersion int) error
}

Rebuilder performs blue-green projection rebuilds:

  1. Mark projection_state status=building. From this moment the live engine dual-applies every event to the live AND building targets (Engine.TargetsFor) — the live catch-up.
  2. Replay the Postgres snapshot into the building target. Version gating makes the overlap with live applies safe in both orders.
  3. Flip: model_version++, target_name=building target, status=soaking. Readers resolve targets through projection_state, so the flip is atomic for them.
  4. Finalize (after soak): drop the old target, status=live.

func (*Rebuilder) Finalize

func (r *Rebuilder) Finalize(ctx context.Context, tenantID, oldTarget string) error

Finalize ends the soak: drops the old target and marks the projection live. oldTarget may be empty (first rebuild: the unversioned default target was the implicit live one — pass it explicitly to drop it).

func (*Rebuilder) Rebuild

func (r *Rebuilder) Rebuild(ctx context.Context, tenantID string) (oldTarget, newTarget string, err error)

Rebuild builds and flips; it returns the old and new target names (the old one is dropped by Finalize after the soak window, or immediately if the operator passes --drop-old).

type Reconciler

type Reconciler struct {
	Projection string
	Registry   *registry.Registry
	Include    func(ent *registry.Entity) bool // which entities this projection carries
	Truth      TruthVersions
	Projected  ProjectedVersions
	Repair     RepairFunc
}

Reconciler compares per-aggregate versions between Postgres and one projection and optionally repairs the differences. Run it scheduled (leader-elected in fabriq-worker) or on demand (`fabriq reconcile`).

A projection AHEAD of the truth scan is not drift — events legitimately land between the two reads; the version-gated pipeline converges on its own.

func (*Reconciler) Reconcile

func (r *Reconciler) Reconcile(ctx context.Context, tenantID string, repair bool) ([]Drift, error)

Reconcile scans one tenant. With repair=false it only reports.

type RelDelete

type RelDelete struct {
	ID      string
	Version int64
}

RelDelete removes the relationship whose r.id matches ID. Keyed on id alone so payload-less delete events (which still carry AggID) can address it. A bare {id}-only relationship left by a stale/out-of-order upsert that missed its SET is also correctly swept.

type RelUpsert

type RelUpsert struct {
	ID        string
	Type      string
	FromLabel string
	FromID    string
	ToLabel   string
	ToID      string
	Props     map[string]any
	Version   int64
}

RelUpsert creates or refreshes ONE specific relationship of a reified-edge entity, identified by its own id (stored as r.id). Unlike EdgeUpsert (foreign-key semantics) it does not prune sibling edges and carries scalar properties. Endpoints are merged by id under their identity labels.

type RepairFunc

type RepairFunc func(ctx context.Context, tenantID string, d Drift) error

RepairFunc heals one drifted aggregate THROUGH THE NORMAL PIPELINE: republish the aggregate's latest event (missing/stale) or emit a synthetic deleted event (zombie) via the outbox — reconciliation never writes engines directly.

type Sink

type Sink interface {
	ApplyMutations(ctx context.Context, target string, muts []Mutation) error
}

Sink applies translated mutations to one engine target (implemented by the graph/search adapters). target "" means the tenant's live target, resolved by the sink from the tenant on ctx.

type Snapshotter

type Snapshotter interface {
	SnapshotEntities(ctx context.Context, tenantID string, fn func(env event.Envelope) error) error
}

Snapshotter streams a tenant's aggregates as synthetic envelopes at their current version (implemented by adapters/postgres — rebuilds always replay FROM POSTGRES, never from another projection).

type Source

type Source interface {
	EnsureGroup(ctx context.Context, group string) error
	Consume(ctx context.Context, group, consumer string, handle func(streamID string, env event.Envelope) error) error
}

Source is the consumer-group surface the engine reads from (implemented by adapters/redis).

type State

type State struct {
	TenantID     string
	Projection   string // "graph" | "search"
	ModelVersion int    // bumped by rebuilds (the _v{N} suffix)
	EventVersion string // last applied event ULID / stream position
	Status       string // "live" | "building" | "soaking" | "abandoned"
	TargetName   string // engine target currently receiving applies
}

State is one row of projection bookkeeping: the live pointer and stream position per (tenant, projection). target_name carries the blue-green pointer (tenant_{id}_v{N} graph or versioned index behind the alias).

type StateReader

type StateReader interface {
	// AppliedVersion returns the latest aggregate version the projection
	// has durably applied (0 if never seen).
	AppliedVersion(ctx context.Context, tenantID, projection, aggregate, aggID string) (int64, error)
}

StateReader reports how far a projection has applied a given aggregate. The Postgres-backed repo (phase 4) implements it from projection bookkeeping; fabriqtest provides an in-memory fake. WaitForProjection polls this port.

type StateRepo

type StateRepo interface {
	StateReader
	Get(ctx context.Context, tenantID, projection string) (State, error)
	Upsert(ctx context.Context, s State) error
}

StateRepo is the full bookkeeping port used by the projection engine, rebuild and reconcile (phase 4 implements it in adapters/postgres).

type TargetSink

type TargetSink interface {
	Sink
	DropTarget(ctx context.Context, target string) error
}

TargetSink is a Sink whose targets can also be dropped (rebuild cleanup).

type TruthVersions

type TruthVersions func(ctx context.Context, tenantID, entity string) (map[string]int64, error)

TruthVersions reads id->version for a tenant's entity from Postgres (the source of truth) — implemented by adapters/postgres.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL