fabriq

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2026 License: Apache-2.0 Imports: 33 Imported by: 0

README

Fabriq

One write path. Every engine, in step.

Go Version

Fabriq is a standalone data fabric for Go. Every command commits once through a transactional outbox, then fans out to relational, time-series, vector, graph, and search engines — versioned, tenant-scoped, and always rebuildable.

It gives an application a single write path and typed read ports across multiple storage engines while enforcing three structural invariants:

  • Every write emits exactly one versioned event — commands run in a Postgres transaction that appends to a transactional outbox; a leader-elected relay publishes the event to Redis Streams.
  • Every access is tenant-scoped — tenant rides on context.Context and is stamped structurally into every engine (Postgres SET LOCAL + row-level security, FalkorDB graph-per-tenant, Elasticsearch index routing, Redis key prefixes), with a pre-query hook as a loud backstop.
  • Projections are always rebuildable from Postgres — the knowledge graph and the search index are derived projections, never written directly.

Architecture

        commands                queries (capability ports)        deltas
           │                            │                           ▲
           ▼                            ▼                           │
┌──────────────────────────────────────────────────────────────────┴─────┐
│ fabriq (facade)                                                          │
│  core/registry  core/command  core/event  core/projection  core/subscribe
│  ─────────────────────────── ports ──────────────────────────────────  │
│  adapters/postgres  adapters/redis  adapters/falkordb  adapters/elastic  │
└──────────────────────────────────────────────────────────────────────────┘
     Postgres+Timescale+pgvector   Redis Streams   FalkorDB    Elasticsearch
        (source of truth)           (fan-out)      (projection) (projection)

Binaries:

  • cmd/fabriq — the data fabric in one binary. serve runs the worker (outbox relay, projection consumers, reconciler, document plane); migrate up|down|status, rebuild, reconcile, and inspect are the operator commands. The default (no args) is serve.
  • cmd/api-example — a demo API: commands, queries, and SSE fetch-then-subscribe.

Quick start

reg := registry.New()
_ = domain.RegisterAll(reg) // or your own entity pack

f, stores, err := fabriq.Open(ctx, reg, fabriq.Config{
    Postgres: fabriq.PostgresConfig{DSN: dsn},
    Redis:    fabriq.RedisConfig{Addr: redisAddr},
})

// Writes: the only path, one versioned event per command.
res, err := f.Exec(tenantCtx, command.Command{
    Entity: "asset", Op: command.OpCreate,
    Payload: &domain.Asset{Name: "Pump 7", SiteID: siteID},
})

// Reads: typed capability ports.
var a domain.Asset
err = f.Relational().Get(tenantCtx, "asset", res.AggID, &a)

// Live deltas: server-resolved channel, conflated and resumable.
deltas, err := f.Subscribe(tenantCtx, query.SubscribeScope{Entity: "asset", Scope: "site", ID: siteID})

Every call requires a tenant-stamped context (tenant.WithTenant), set only by auth middleware from validated claims.

Capabilities

All of the following are implemented and covered by integration tests:

  • Command plane & outbox — registry-driven commands, optimistic concurrency, atomic batches, and a transactional outbox in Postgres.
  • Postgres source of truth — row-level security verified as a non-superuser, Timescale hypertables for bulk telemetry, pgvector (HNSW) for similarity search, and PostGIS for geometry storage (Upsert/Within/Delete, GiST-accelerated, SRID 4326 → true metres, other SRIDs → planar metres), with migrations and a registry-conformance test.
  • Dynamic entities — entities defined at runtime from a schema descriptor instead of a Go struct, with fabriq-managed DDL (CREATE + additive ALTER), map-native writes/reads over real typed columns, and the full projection pipeline — a fenced, opt-in lane that leaves the static migrations-as-authority discipline intact.
  • Redis Streams fan-out — a leader-elected outbox relay (LISTEN/NOTIFY wake), consumer groups with XAUTOCLAIM recovery, and a subscription hub (delta conflation, SSE, Last-Event-ID resume).
  • Live queries — maintained result sets: a filter + sort + limit/cursor subscription returns a snapshot, then exact enter/leave/move/update deltas as data changes (changefeed-style). The in-engine window stays an exact prefix of the Postgres-ordered result via a cushion + keyset boundary refill, so top-N is exact at all times. (P1: single-node maintained mode; sharding, a streamed mode, and a predicate index are on the roadmap.)
  • Graph projection (FalkorDB) — an openCypher dialect behind a conformance suite (the engine-swap gate), a batched TraverseAndHydrate, and blue-green rebuilds verified to produce an identical graph.
  • Search projection (Elasticsearch) — version-gated bulk writes, multi-field full-text search, lazy per-tenant index + alias provisioning, and atomic alias-swap rebuilds.
  • Reconciler — per-aggregate drift detection (missing / stale / zombie) between Postgres and each projection, healed through the ordinary outbox rather than direct engine writes.
  • CRDT document plane — an append-only update log folded through a merge engine, with sequence-vector sync, compaction, and quiet-window materialization that emits a single ordinary versioned event, so collaborative documents are normal entities downstream.
  • Observability — a W3C traceparent stamped on every event envelope by default, plus Prometheus metrics exposed by the worker (fabriq serve, /metrics).

Development

make test              # unit tests (no Docker)
make test-integration  # testcontainers: PG+Timescale+pgvector, Redis, FalkorDB, Elasticsearch
make bench             # benchmarks
make lint              # incl. depguard architecture boundaries

Operational runbooks live in docs/OPERATIONS.md, and schema discipline in docs/MIGRATIONS.md.

Fabriq builds on grove for storage and forge for application and CLI scaffolding.

License

Licensed under the Apache License, Version 2.0. You may obtain a copy of the License in the LICENSE file or at http://www.apache.org/licenses/LICENSE-2.0.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

Copyright 2026 xraph.

Documentation

Overview

Package fabriq is the TWINOS data fabric: the single module through which application code talks to every datastore.

Fabriq enforces three invariants across stores:

  • Every write emits exactly one versioned event (transactional outbox).
  • Every access is tenant-scoped (structural stamping + RLS + hook backstop).
  • Projections (graph, search) are derived from Postgres and always rebuildable from it.

The kernel in core/ is domain-agnostic and engine-agnostic: capability ports (Relational, Graph, Search, Timeseries, Vector, Document), a declarative schema registry, a command plane, and a subscription hub. Engine dialects live exclusively under adapters/. The TWINOS domain pack lives in domain/ and is the only TWINOS-aware package.

f, err := fabriq.Open(ctx, cfg,
    fabriq.WithConflationWindow(150*time.Millisecond),
)

Fabriq is built on the Forge ecosystem: storage on github.com/xraph/grove, binaries on github.com/xraph/forge (apps) and forge/cli (CLI).

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoTenant is returned for any fabric call whose context was not
	// stamped with a tenant by auth middleware.
	ErrNoTenant = tenant.ErrNoTenant

	// ErrTenantHookTripped is returned when the grove pre-query backstop
	// detects a relational query without a tenant predicate. It firing
	// means a bug in fabriq itself: the structural stamping was bypassed.
	ErrTenantHookTripped = tenant.ErrTenantHookTripped

	// ErrNotFound is returned when an aggregate or row does not exist
	// within the calling tenant's scope.
	ErrNotFound = fabriqerr.ErrNotFound

	// ErrVersionConflict is returned when a command's expected version
	// does not match the stored aggregate version.
	ErrVersionConflict = fabriqerr.ErrVersionConflict

	// ErrProjectionLag is returned by WaitForProjection when the deadline
	// expires before the projection catches up to the requested version.
	ErrProjectionLag = fabriqerr.ErrProjectionLag

	// ErrStoreNotConfigured is returned by capability ports whose backing
	// store was not configured on Open (e.g. Graph() without FalkorDB).
	ErrStoreNotConfigured = fabriqerr.ErrStoreNotConfigured
)

Sentinel errors form fabriq's typed taxonomy. Callers branch with errors.Is; rich variants (VersionConflictError, NotFoundError) carry detail and still match their sentinel. The canonical values live in core packages (root imports core, never the reverse) and are aliased here so application code uniformly writes fabriq.ErrX.

View Source
var (
	// ErrNodeNameConflict is returned when a live sibling already has the name.
	ErrNodeNameConflict = errors.New("fabriq: fs_node name already exists in folder")
	// ErrNotContainer is returned when a child is created under a non-folder.
	ErrNotContainer = errors.New("fabriq: parent is not a folder")
	// ErrNodeLocked is returned when a mutating op targets a locked node.
	ErrNodeLocked = errors.New("fabriq: fs_node is locked")
)

Functions

func For

func For[T any](f *Fabriq) (*query.Repo[T], error)

For returns a type-safe repository over the entity whose grove model is T — the typed counterpart to f.Relational() and the projection ports. The entity is resolved from T (no string), and reads return *T / []*T:

repo, _ := fabriq.For[domain.Asset](f)
asset, err := repo.Get(ctx, id)                       // *domain.Asset
pump, err := repo.One(ctx, query.Eq("serial", sn))
kids, err := repo.Traverse(ctx, cypher, params)       // graph -> hydrated
hits, err := repo.Search(ctx, "centrifugal", 20)      // search -> hydrated
near, err := repo.Similar(ctx, embedding, 10)         // vector -> hydrated

It is a free function, not a method, because Go methods cannot introduce type parameters. (The lower-level query.For takes a registry + querier directly; attach projection ports with WithGraph/WithSearch/WithVector.)

func Open

func Open(ctx context.Context, reg *registry.Registry, cfg Config, opts ...Option) (*Fabriq, *Stores, error)

Open dials the configured stores and assembles a Fabriq:

  • Postgres (required): command store, relational/timeseries/vector/spatial ports, projection bookkeeping.
  • Redis (optional but required for live subscriptions and projections): change-channel tailer feeding the hub.
  • FalkorDB / Elasticsearch ports come with phases 4–5; until then Graph()/Search() return ErrStoreNotConfigured.

The returned Stores handle also exposes the adapters the worker plane needs (relay, elector, consumer groups) — application services should only ever hold the *Fabriq.

Types

type BlobRef

type BlobRef struct {
	ID      string `json:"id"`
	Hash    string `json:"hash"`
	Size    int64  `json:"size"`
	Version int64  `json:"version"`
}

BlobRef identifies a stored blob_object and its content.

type BlobSourceView

type BlobSourceView struct {
	ID          string            `json:"id"`
	ProjectID   string            `json:"projectId"`
	Name        string            `json:"name"`
	Provider    string            `json:"provider"`
	Endpoint    string            `json:"endpoint"`
	BasePath    string            `json:"basePath"`
	Auth        map[string]any    `json:"auth"`
	WatchConfig map[string]any    `json:"watchConfig"`
	FileFilter  map[string]any    `json:"fileFilter"`
	Tags        map[string]string `json:"tags"`
	Enabled     bool              `json:"enabled"`
	Version     int64             `json:"version"`
}

BlobSourceView is a decrypted read of a blob_source.

type CacheConfig

type CacheConfig struct {
	// L1Enabled gates the per-node LRU tier. Requires Redis.Addr to be set.
	L1Enabled bool `yaml:"l1_enabled" json:"l1_enabled"`
	// L1Size is the maximum number of entries the LRU holds (default 0 = no
	// entries, so always set a positive value when L1Enabled is true).
	L1Size int `yaml:"l1_size" json:"l1_size"`
	// L1TTL is the per-entry time-to-live in the in-process store. It also
	// bounds the cold-start cross-node staleness window: commits that land
	// between Open() returning and the L1 evict tailer's first XRead attach
	// are missed on this node and will remain stale until at most L1TTL
	// elapses. Defaults to 5 minutes when L1Enabled is true and this is <= 0.
	L1TTL time.Duration `yaml:"l1_ttl" json:"l1_ttl"`
}

CacheConfig controls the optional in-process L1 cache tier that sits in front of the shared Redis L2. When L1Enabled is false (the default) the engine uses the L2 adapter directly and behaviour is identical to P1-P3.

type Config

type Config struct {
	Postgres      PostgresConfig      `yaml:"postgres" json:"postgres"`
	Shards        []ShardConfig       `yaml:"shards" json:"shards"`
	Redis         RedisConfig         `yaml:"redis" json:"redis"`
	FalkorDB      FalkorDBConfig      `yaml:"falkordb" json:"falkordb"`
	Elasticsearch ElasticsearchConfig `yaml:"elasticsearch" json:"elasticsearch"`
	Storage       StorageConfig       `yaml:"storage" json:"storage"`
	Projections   ProjectionsConfig   `yaml:"projections" json:"projections"`
	Subscriptions SubscriptionsConfig `yaml:"subscriptions" json:"subscriptions"`
	Cache         CacheConfig         `yaml:"cache" json:"cache"`
	Encryption    EncryptionConfig    `yaml:"encryption" json:"encryption"`
	// CustomAppliers are consumer-supplied projection appliers, unioned after
	// the built-in declarative applier for their Target. They MUST be pure (see
	// projection.CustomApplier). The same set feeds both the live engines and
	// the rebuilders, so live and rebuilt projections stay identical.
	CustomAppliers []projection.CustomApplier `yaml:"-" json:"-"`
}

Config is fabriq's declarative configuration: which stores exist and which projections run. It is the same schema a future standalone `fabriqd` loads from YAML, hence the yaml tags. Entities are not configured here — they are registered in code via the registry.

func (Config) Options

func (c Config) Options() []Option

Options derives facade options from config tuning knobs.

func (Config) Validate

func (c Config) Validate() error

Validate checks cross-field consistency. It does not dial anything.

type CreateFileOpts

type CreateFileOpts struct {
	ContentType string `json:"contentType"`
}

CreateFileOpts carries optional metadata for a file create.

type CreateShareInput

type CreateShareInput struct {
	NodeID       string     `json:"nodeId"`
	Token        string     `json:"token"`
	Permission   string     `json:"permission"`
	ExpiresAt    *time.Time `json:"expiresAt"`
	MaxDownloads *int       `json:"maxDownloads"`
	PasswordHash string     `json:"-"`
	CreatedBy    string     `json:"createdBy"`
}

CreateShareInput carries a share's fields. The seam supplies Token (generated) and PasswordHash (bcrypt) — fabriq persists them verbatim.

type ElasticsearchConfig

type ElasticsearchConfig struct {
	Addrs    []string `yaml:"addrs" json:"addrs"`
	Username string   `yaml:"username" json:"username"`
	Password string   `yaml:"password" json:"password"`
}

ElasticsearchConfig locates the search projection engine.

type EncryptionConfig

type EncryptionConfig struct {
	Key string `yaml:"key" json:"key"`
}

EncryptionConfig configures field-level encryption (blob_source credentials). Key is a base64-encoded 32-byte AES-256 data-encryption key; empty disables encryption (writes that carry credentials then fail closed).

type Fabriq

type Fabriq struct {
	// contains filtered or unexported fields
}

Fabriq is the facade implementing query.Fabric: the single object application code holds to reach every datastore.

func New

func New(reg *registry.Registry, ports Ports, opts ...Option) (*Fabriq, error)

New assembles a Fabriq from explicit ports. Most services use Open (config-driven adapters) instead; New is the seam for tests, embedding and partial deployments.

func (*Fabriq) AddBookmark

func (f *Fabriq) AddBookmark(ctx context.Context, userID, nodeID string, sortOrder int) (string, error)

AddBookmark bookmarks nodeID for userID. The (tenant, user, node) unique index rejects duplicates (surfaced as an error).

func (*Fabriq) Ancestors

func (f *Fabriq) Ancestors(ctx context.Context, id string) ([]domain.FsNode, error)

Ancestors returns the chain from the root down to (but excluding) the node, by walking parent_id. O(depth) reads — fine for filesystem depths. The returned slice is root→node order.

func (*Fabriq) Blob

func (f *Fabriq) Blob() blob.Store

Blob implements query.Fabric.

func (*Fabriq) CatchUp

func (f *Fabriq) CatchUp(ctx context.Context, scope query.SubscribeScope, afterID string, limit int) ([]query.Delta, error)

CatchUp reads the deltas a reconnecting client missed on a scope's channel since afterID (its SSE Last-Event-ID), through the same authz gate as Subscribe. An empty slice with no error means the client is current; channels are short (MAXLEN~), so callers must treat a full page as "refetch instead". Delivery overlap with a live Subscribe is possible — consumers dedupe by StreamID.

func (*Fabriq) Close

func (f *Fabriq) Close() error

Close drains the subscription hub and, when this Fabriq was built by Open, closes the underlying stores.

func (*Fabriq) CreateFile

func (f *Fabriq) CreateFile(ctx context.Context, parentID, name string, r io.Reader, opts CreateFileOpts) (FsRef, error)

CreateFile stores bytes (PutBlob → blob_object) then creates a file node referencing it (1:1), with denormalized facets. One blob event + one node event.

func (*Fabriq) CreateFolder

func (f *Fabriq) CreateFolder(ctx context.Context, parentID, name string) (FsRef, error)

CreateFolder creates a folder node under parentID ("" = root). One event.

func (*Fabriq) CreateMount

func (f *Fabriq) CreateMount(ctx context.Context, parentID, name string, mountConfig map[string]any) (FsRef, error)

CreateMount creates a mount node (node_type=mount) under parentID with the given mount configuration. The sync engine that consumes the config lives in the seam, not in fabriq.

func (*Fabriq) CreateShare

func (f *Fabriq) CreateShare(ctx context.Context, in CreateShareInput) (string, error)

CreateShare persists a share record.

func (*Fabriq) CreateSource

func (f *Fabriq) CreateSource(ctx context.Context, in SourceInput) (SourceRef, error)

CreateSource persists a new blob_source with encrypted credentials.

func (*Fabriq) DeleteBlob

func (f *Fabriq) DeleteBlob(ctx context.Context, blobObjectID string) error

DeleteBlob removes the blob_object catalog row (one versioned event). Byte GC and ref-count decrement are deferred to Phase 4.

func (*Fabriq) DeleteShare

func (f *Fabriq) DeleteShare(ctx context.Context, id string) error

DeleteShare removes a share record.

func (*Fabriq) DeleteSource

func (f *Fabriq) DeleteSource(ctx context.Context, id string) error

DeleteSource removes a blob_source.

func (*Fabriq) Descendants

func (f *Fabriq) Descendants(ctx context.Context, id string) ([]domain.FsNode, error)

Descendants returns all live nodes under id (by path prefix), ordered by path.

func (*Fabriq) Document

func (f *Fabriq) Document() document.Store

Document implements query.Fabric.

func (*Fabriq) Exec

func (f *Fabriq) Exec(ctx context.Context, cmd command.Command) (command.Result, error)

Exec implements query.Fabric.

func (*Fabriq) ExecBatch

func (f *Fabriq) ExecBatch(ctx context.Context, cmds []command.Command) ([]command.Result, error)

ExecBatch implements query.Fabric.

func (*Fabriq) GetBlob

func (f *Fabriq) GetBlob(ctx context.Context, blobObjectID string) (io.ReadCloser, BlobRef, error)

GetBlob resolves the blob_object catalog row by ID, then streams its bytes from CAS. Returns ErrStoreNotConfigured when CAS is not wired.

func (*Fabriq) GetNode

func (f *Fabriq) GetNode(ctx context.Context, id string) (domain.FsNode, error)

GetNode loads a node by id (any state, including trashed).

func (*Fabriq) GetNodeByPath

func (f *Fabriq) GetNodeByPath(ctx context.Context, path string) (domain.FsNode, error)

GetNodeByPath resolves a live node by its materialized path.

func (*Fabriq) GetShareByToken

func (f *Fabriq) GetShareByToken(ctx context.Context, token string) (domain.FsShare, error)

GetShareByToken resolves a share by its token (DATA only — expiry/cap/password enforcement is the seam's job).

func (*Fabriq) GetSource

func (f *Fabriq) GetSource(ctx context.Context, id string) (BlobSourceView, error)

GetSource reads and decrypts a blob_source.

func (*Fabriq) GrantPermission

func (f *Fabriq) GrantPermission(ctx context.Context, nodeID, principalType, principalID, permission, grantedBy string) (string, error)

GrantPermission grants principal (type+id) `permission` on nodeID.

func (*Fabriq) Graph

func (f *Fabriq) Graph() query.GraphQuerier

Graph implements query.Fabric.

func (*Fabriq) Hub

func (f *Fabriq) Hub() *subscribe.Hub

Hub exposes the subscription hub for the delta pump (fabriq-worker / the redis stream bridge) and for shutdown draining. Application code subscribes through Subscribe, never directly.

func (*Fabriq) IncrementShareDownload

func (f *Fabriq) IncrementShareDownload(ctx context.Context, id string) error

IncrementShareDownload atomically bumps download_count via a command-plane read-modify-write with optimistic concurrency (one retry on version conflict).

func (*Fabriq) ListChildren

func (f *Fabriq) ListChildren(ctx context.Context, parentID string, limit, offset int) ([]domain.FsNode, error)

ListChildren returns the live children of parentID, ordered by name.

func (*Fabriq) ListNodePermissions

func (f *Fabriq) ListNodePermissions(ctx context.Context, nodeID string) ([]domain.FsPermission, error)

ListNodePermissions returns all grants on nodeID.

func (*Fabriq) ListNodeShares

func (f *Fabriq) ListNodeShares(ctx context.Context, nodeID string) ([]domain.FsShare, error)

ListNodeShares returns all shares for a node.

func (*Fabriq) ListPrincipalPermissions

func (f *Fabriq) ListPrincipalPermissions(ctx context.Context, principalType, principalID string) ([]domain.FsPermission, error)

ListPrincipalPermissions returns all grants to a principal.

func (*Fabriq) ListSources

func (f *Fabriq) ListSources(ctx context.Context) ([]BlobSourceView, error)

ListSources reads and decrypts all of the tenant's blob_sources.

func (*Fabriq) ListUserBookmarks

func (f *Fabriq) ListUserBookmarks(ctx context.Context, userID string) ([]domain.FsBookmark, error)

ListUserBookmarks returns a user's bookmarks, ordered by sort_order.

func (*Fabriq) LiveQuery

LiveQuery registers a maintained-result-set subscription: it validates the query against the entity's LiveSpec, takes an RLS-enforced snapshot, and returns the snapshot plus a live channel of enter/leave/move/update deltas. Close the returned *livequery.Handle to tear the subscription down; its Reanchor method slides a maintained window for deep/infinite scroll.

func (*Fabriq) LockNode

func (f *Fabriq) LockNode(ctx context.Context, id, by string) error

LockNode marks a node locked by `by`.

func (*Fabriq) MoveNode

func (f *Fabriq) MoveNode(ctx context.Context, id, newParentID string) (FsRef, error)

MoveNode re-parents a node under newParentID, rewriting its own path and all descendant paths in the same tx. Rejects moving into its own subtree.

func (*Fabriq) PermanentDeleteNode

func (f *Fabriq) PermanentDeleteNode(ctx context.Context, id string) error

PermanentDeleteNode hard-deletes a node and its subtree (one OpDelete event each) and deletes each file node's blob_object so Phase-4 GC reclaims bytes.

func (*Fabriq) PutBlob

func (f *Fabriq) PutBlob(ctx context.Context, r io.Reader, opts PutBlobOpts) (BlobRef, error)

PutBlob stores bytes (content-addressed, deduped) then creates the blob_object catalog row — one versioned event. Bytes-first, command-authoritative. Returns ErrStoreNotConfigured when CAS is not wired.

func (*Fabriq) ReconcileLiveQueries

func (f *Fabriq) ReconcileLiveQueries(ctx context.Context) (int, error)

ReconcileLiveQueries runs the live query drift backstop: every maintained subscription is re-checked against Postgres truth and re-snapshotted where it diverged. Returns the number of subscriptions repaired. Intended to be called on a low-cadence schedule by the worker. A no-op (0, nil) when live queries are not configured.

func (*Fabriq) Registry

func (f *Fabriq) Registry() *registry.Registry

Registry exposes the schema registry (read-only use).

func (*Fabriq) Relational

func (f *Fabriq) Relational() query.RelationalQuerier

Relational implements query.Fabric.

func (*Fabriq) RemoveBookmark

func (f *Fabriq) RemoveBookmark(ctx context.Context, id string) error

RemoveBookmark removes a bookmark by id.

func (*Fabriq) RenameNode

func (f *Fabriq) RenameNode(ctx context.Context, id, newName string) (FsRef, error)

RenameNode renames a node in place, rewriting its own path and (in the same tx) all descendant paths. One event for the node.

func (*Fabriq) ReplaceFile

func (f *Fabriq) ReplaceFile(ctx context.Context, id string, r io.Reader, opts CreateFileOpts) (FsRef, error)

ReplaceFile swaps a file node's bytes for new content: PutBlob a new blob_object, repoint blob_id + denormalized facets, bump version. The previous blob_object is intentionally NOT deleted (a prior version may still be referenced elsewhere; Phase-4 GC reclaims genuinely-unreferenced bytes).

func (*Fabriq) ResolveMount

func (f *Fabriq) ResolveMount(ctx context.Context, id string) (map[string]any, error)

ResolveMount returns a mount node's configuration.

func (*Fabriq) RestoreNode

func (f *Fabriq) RestoreNode(ctx context.Context, id string) error

RestoreNode clears soft-delete on a node and its whole subtree.

func (*Fabriq) RevokePermission

func (f *Fabriq) RevokePermission(ctx context.Context, id string) error

RevokePermission removes a permission grant by id.

func (*Fabriq) Search

func (f *Fabriq) Search() query.SearchQuerier

Search implements query.Fabric.

func (*Fabriq) SearchNodesByName

func (f *Fabriq) SearchNodesByName(ctx context.Context, q string, limit int) ([]domain.FsNode, error)

SearchNodesByName does a live SQL ILIKE name search. The Elasticsearch projection (Search Spec) is the scalable/fuzzy path when ES is configured.

func (*Fabriq) Spatial

func (f *Fabriq) Spatial() query.SpatialQuerier

Spatial implements query.Fabric.

func (*Fabriq) Subscribe

func (f *Fabriq) Subscribe(ctx context.Context, scope query.SubscribeScope) (<-chan query.Delta, error)

Subscribe implements query.Fabric: authz hook, server-side channel resolution, conflated delivery.

func (*Fabriq) SubscribeDocument

func (f *Fabriq) SubscribeDocument(ctx context.Context, docID string) (<-chan query.Delta, error)

SubscribeDocument attaches to a document's live sync frames: RAW delivery (every frame, in order, no conflation), channel resolved server-side from the validated doc id and the context tenant, through the same authz hook as Subscribe (scope name "doc"). Frame payloads are the update blobs; Version is the log seq — a gap means "call Document().Sync and resume".

func (*Fabriq) Timeseries

func (f *Fabriq) Timeseries() query.TSQuerier

Timeseries implements query.Fabric.

func (*Fabriq) TrashNode

func (f *Fabriq) TrashNode(ctx context.Context, id string) error

TrashNode soft-deletes a node and its whole subtree.

func (*Fabriq) UnlockNode

func (f *Fabriq) UnlockNode(ctx context.Context, id string) error

UnlockNode clears a node's lock.

func (*Fabriq) Upcasters

func (f *Fabriq) Upcasters() *event.UpcasterChain

Upcasters exposes the registered payload upcaster chain (nil when none) — the worker hands it to projection engines.

func (*Fabriq) UpdateMount

func (f *Fabriq) UpdateMount(ctx context.Context, id string, mountConfig map[string]any) (FsRef, error)

UpdateMount replaces a mount node's configuration.

func (*Fabriq) UpdateSource

func (f *Fabriq) UpdateSource(ctx context.Context, id string, in SourceInput) (SourceRef, error)

UpdateSource replaces a blob_source (re-encrypting auth), bumping version.

func (*Fabriq) Vector

func (f *Fabriq) Vector() query.VectorQuerier

Vector implements query.Fabric.

func (*Fabriq) WaitForProjection

func (f *Fabriq) WaitForProjection(ctx context.Context, proj, aggregate, aggID string, version int64) error

WaitForProjection implements query.Fabric by polling the projection state port until the aggregate reaches version or ctx expires.

func (*Fabriq) WatchChildren

func (f *Fabriq) WatchChildren(ctx context.Context, parentID string, limit int) (livequery.Snapshot, <-chan livequery.LiveDelta, *livequery.Handle, error)

WatchChildren returns a maintained result set of a folder's live children, ordered by name — the per-folder live view the explorer subscribes to. Trashed children (deleted_at IS NOT NULL) are excluded from the live window. Single-shard deployments only (the LiveQuery constraint).

type FalkorDBConfig

type FalkorDBConfig struct {
	Addr     string `yaml:"addr" json:"addr"`
	Username string `yaml:"username" json:"username"`
	Password string `yaml:"password" json:"password"`
}

FalkorDBConfig locates the graph projection engine.

type FsRef

type FsRef struct {
	ID       string `json:"id"`
	ParentID string `json:"parentId"`
	Name     string `json:"name"`
	Path     string `json:"path"`
	NodeType string `json:"nodeType"`
	Version  int64  `json:"version"`
}

FsRef identifies a created/updated fs_node (analog of BlobRef).

type LiveReader

type LiveReader interface {
	livequery.Snapshotter
	livequery.Refiller
}

LiveReader is the snapshot + boundary-refill oracle the live query engine reads from (implemented by adapters/postgres LiveStore). Postgres stays authoritative for ordering and exact top-N.

type NotFoundError

type NotFoundError = fabriqerr.NotFoundError

NotFoundError reports a missing aggregate within the tenant's scope.

type Option

type Option func(*settings)

Option customizes a Fabriq.

func WithAuthz

func WithAuthz(fn subscribe.AuthzFunc) Option

WithAuthz installs the subscribe-time authorization hook.

func WithClock

func WithClock(now func() time.Time) Option

WithClock overrides the command-plane clock (tests).

func WithConflationWindow

func WithConflationWindow(d time.Duration) Option

WithConflationWindow tunes the hub's LWW flush window (spec range 100–250ms; default 150ms).

func WithDocumentAuthz

func WithDocumentAuthz(fn func(ctx context.Context, docID string) error) Option

WithDocumentAuthz installs the document-plane authorization hook, consulted for BOTH ApplyUpdate (writes) and SubscribeDocument (reads). Without it, any authenticated member of the tenant may touch any of the tenant's documents.

func WithEncryptor

func WithEncryptor(e crypto.Encryptor) Option

WithEncryptor sets the field encryptor used for blob_source credentials.

func WithLifecycleHook

func WithLifecycleHook(hooks ...command.LifecycleHook) Option

WithLifecycleHook appends data-lifecycle hooks to the command plane. Each hook runs INSIDE the write transaction after every change is staged: it may write its own rows atomically (via the tx handle) or veto the write by returning an error (which rolls the whole command back). Hooks fire in registration order across all WithLifecycleHook calls. This is the in-tx, cross-cutting seam an auditing/chronicle extension hooks into.

func WithLiveAuthz

func WithLiveAuthz(fn livequery.AuthzFunc) Option

WithLiveAuthz installs the authorization hook run before a live query's snapshot. It may also be used (in later phases) to inject mandatory row-visibility predicates into the query's filter.

func WithLiveCushion

func WithLiveCushion(n int) Option

WithLiveCushion sets how many extra rows beyond the visible window each maintained live query buffers, to absorb boundary churn before a Postgres refill is needed (default 16).

func WithStreamMaxLen

func WithStreamMaxLen(n int64) Option

WithStreamMaxLen sets the approximate MAXLEN for per-channel Redis streams (catch-up depth before clients must refetch; default 500).

func WithSubscribeBuffer

func WithSubscribeBuffer(n int) Option

WithSubscribeBuffer sets the per-subscriber delta buffer; full buffers drop (clients refetch + resume by Last-Event-ID).

func WithTraceparent

func WithTraceparent(fn func(context.Context) string) Option

WithTraceparent supplies the W3C traceparent extractor stamped into event envelopes (internal/otel provides the production one).

func WithUpcasters

func WithUpcasters(chain *event.UpcasterChain) Option

WithUpcasters registers the event payload upcaster chain. Projection engines apply it at decode, so appliers only ever see the latest payload shape; register one vN->vN+1 step per evolved event type.

func WithWaitPollInterval

func WithWaitPollInterval(d time.Duration) Option

WithWaitPollInterval tunes WaitForProjection's poll cadence.

type Ports

type Ports struct {
	Store      command.Store
	Relational query.RelationalQuerier
	Graph      query.GraphQuerier
	Search     query.SearchQuerier
	Timeseries query.TSQuerier
	Vector     query.VectorQuerier
	Spatial    query.SpatialQuerier
	Documents  document.Store
	// Blob is the byte-plane port; nil degrades to ErrStoreNotConfigured.
	Blob blob.Store
	// CAS is the content-addressable store port; nil when EnableCas is false.
	CAS             blob.CAS
	ProjectionState projection.StateReader

	// Live is the snapshot/refill oracle for live queries; when set (and a
	// tailer is configured) the facade exposes LiveQuery.
	Live LiveReader

	// Cache is the engine cache (nil when not configured); enables result-set
	// caching at Repo[T] for opted-in entities.
	Cache cache.Cache
}

Ports bundles the port implementations a Fabriq is assembled from. Open() fills them from configured adapters; tests and embedders may supply fabriqtest fakes or custom implementations directly. Store and Relational are mandatory (Postgres is the source of truth); every other port degrades to a typed ErrStoreNotConfigured.

type PostgresConfig

type PostgresConfig struct {
	DSN      string `yaml:"dsn" json:"dsn"`
	PoolSize int    `yaml:"pool_size" json:"pool_size"`
}

PostgresConfig locates the source of truth. Required unless Shards is set (it is the one-shard shorthand).

type ProjectionsConfig

type ProjectionsConfig struct {
	Graph  bool `yaml:"graph" json:"graph"`
	Search bool `yaml:"search" json:"search"`
}

ProjectionsConfig switches projection planes on.

type PutBlobOpts

type PutBlobOpts struct {
	ContentType string `json:"contentType"`
}

PutBlobOpts carries optional metadata for a blob write.

type RedisConfig

type RedisConfig struct {
	Addr     string `yaml:"addr" json:"addr"`
	DB       int    `yaml:"db" json:"db"`
	Username string `yaml:"username" json:"username"`
	Password string `yaml:"password" json:"password"`
}

RedisConfig locates the event fan-out / cache store.

type ShardConfig

type ShardConfig struct {
	ID       string `yaml:"id" json:"id"`
	DSN      string `yaml:"dsn" json:"dsn"`
	PoolSize int    `yaml:"pool_size" json:"pool_size"`
}

ShardConfig locates one source-of-truth shard. When Config.Shards is non-empty, tenants are routed across these by the directory (ADR 0007); each shard should be its own Postgres database so its advisory-lock leadership (relay) is independent. Leaving Shards empty and setting Postgres is the degenerate one-shard deployment.

type ShardPG

type ShardPG struct {
	ID string
	PG *postgres.Adapter
}

ShardPG pairs a shard id with its concrete Postgres adapter — what the worker iterates to start a per-shard relay.

type SourceInput

type SourceInput struct {
	ProjectID   string            `json:"projectId"`
	Name        string            `json:"name"`
	Provider    string            `json:"provider"`
	Endpoint    string            `json:"endpoint"`
	BasePath    string            `json:"basePath"`
	Auth        map[string]any    `json:"auth"`
	WatchConfig map[string]any    `json:"watchConfig"`
	FileFilter  map[string]any    `json:"fileFilter"`
	Tags        map[string]string `json:"tags"`
	Enabled     bool              `json:"enabled"`
}

SourceInput carries a blob_source's fields with PLAINTEXT auth (encrypted at the boundary).

type SourceRef

type SourceRef struct {
	ID      string `json:"id"`
	Version int64  `json:"version"`
}

SourceRef identifies a created/updated blob_source.

type StorageConfig

type StorageConfig struct {
	StorageDriver string `yaml:"storageDriver" json:"storageDriver"`
	DefaultBucket string `yaml:"defaultBucket" json:"defaultBucket"`
	// EnableCas gates the content-addressable store layer. When true, Open()
	// will wire a CASStore backed by the blob_cas ledger (requires a Postgres
	// adapter). The open.go wiring that reads this field lands in Phase 3b.
	EnableCas bool `yaml:"enableCas" json:"enableCas"`
}

StorageConfig configures the object-store backend that fills f.Blob(). Empty StorageDriver leaves the blob port unconfigured (shipped dark).

type Stores

type Stores struct {
	// Postgres is the PRIMARY shard: health checks, the migrations CLI, and
	// the document plane (which stays single-shard in step 2) use it. For
	// per-tenant work use Shards / ShardPGs.
	Postgres *postgres.Adapter
	Redis    *redis.Adapter
	Falkor   *falkordb.Adapter
	Elastic  *elastic.Adapter
	Blob     *trovestore.Adapter // nil when Storage not configured
	// CAS is the content-addressable store (nil when EnableCas is false).
	CAS *trovestore.CASStore
	// Cache is the engine cache (nil when Redis is not configured).
	Cache corecache.Cache
	// Shards is the tenant -> source-of-truth routing table backing the
	// facade's relational/command/vector/timeseries/spatial ports (ADR 0007).
	Shards *shard.Set
	// contains filtered or unexported fields
}

Stores exposes the opened adapters for worker-plane wiring (relay, electors, projection consumers) and shutdown.

func (*Stores) AllTenants

func (s *Stores) AllTenants(ctx context.Context) ([]string, error)

AllTenants unions every shard's known tenants (the outbox-derived discovery the reconciler and rebuild --all-tenants scan). Sorted, deduped.

func (*Stores) BlobReconciler

func (s *Stores) BlobReconciler(grace time.Duration) (*trovestore.BlobReconciler, error)

BlobReconciler assembles the per-tenant blob CAS reconciler (ref-count recompute, byte GC, broken-row + orphan detection). Returns an error when the CAS layer is not configured (Storage.EnableCas false).

func (*Stores) Close

func (s *Stores) Close() error

Close releases every opened adapter (every shard, plus the projections).

func (*Stores) GraphEngine

func (s *Stores) GraphEngine(reg *registry.Registry, upcasters *event.UpcasterChain) (*projection.Engine, error)

GraphEngine assembles the graph projection consumer over the opened stores: Redis consumer group -> registry-derived applier -> FalkorDB sink, with projection_state bookkeeping and rebuild-aware dual targets. Run one per worker replica (consumer groups scale without election).

func (*Stores) GraphRebuilder

func (s *Stores) GraphRebuilder(reg *registry.Registry) (*projection.Rebuilder, error)

GraphRebuilder assembles the blue-green rebuilder for the graph projection (used by `fabriq rebuild` and tests).

func (*Stores) GraphReconciler

func (s *Stores) GraphReconciler(reg *registry.Registry) (*projection.Reconciler, error)

GraphReconciler assembles drift detection + repair for the graph projection.

func (*Stores) SearchEngine

func (s *Stores) SearchEngine(reg *registry.Registry, upcasters *event.UpcasterChain) (*projection.Engine, error)

SearchEngine assembles the search projection consumer: Redis consumer group -> registry-derived applier -> Elasticsearch sink with external version gating. Run one per worker replica.

func (*Stores) SearchRebuilder

func (s *Stores) SearchRebuilder(reg *registry.Registry) (*projection.Rebuilder, error)

SearchRebuilder assembles the blue-green rebuilder for the search projection; the alias swap rides the flip (OnFlip).

func (*Stores) SearchReconciler

func (s *Stores) SearchReconciler(reg *registry.Registry) (*projection.Reconciler, error)

SearchReconciler assembles drift detection + repair for the search projection.

func (*Stores) ShardPGs

func (s *Stores) ShardPGs() []ShardPG

ShardPGs returns the source-of-truth shards in id order.

type SubscriptionsConfig

type SubscriptionsConfig struct {
	ConflationWindow time.Duration `yaml:"conflation_window" json:"conflation_window"`
	StreamMaxLen     int64         `yaml:"stream_max_len" json:"stream_max_len"`
	SubscribeBuffer  int           `yaml:"subscribe_buffer" json:"subscribe_buffer"`
}

SubscriptionsConfig tunes the delta plane.

type VersionConflictError

type VersionConflictError = fabriqerr.VersionConflictError

VersionConflictError reports an optimistic-concurrency failure.

Directories

Path Synopsis
adapters
cache
Package cache is fabriq's caching adapter: the core/cache port implemented over grove kv's Store (redis driver).
Package cache is fabriq's caching adapter: the core/cache port implemented over grove kv's Store (redis driver).
elastic
Package elastic is fabriq's search adapter on go-elasticsearch.
Package elastic is fabriq's search adapter on go-elasticsearch.
falkordb
Package falkordb is fabriq's graph adapter.
Package falkordb is fabriq's graph adapter.
graphtest
Package graphtest is the EXPORTED graph-dialect conformance suite: the gate every GraphQuerier must pass before fabriq will project into it, and the contract that keeps shipped Cypher inside the openCypher common subset so FalkorDB can later be swapped for Memgraph, Neo4j or Kùzu without touching appliers or call sites.
Package graphtest is the EXPORTED graph-dialect conformance suite: the gate every GraphQuerier must pass before fabriq will project into it, and the contract that keeps shipped Cypher inside the openCypher common subset so FalkorDB can later be swapped for Memgraph, Neo4j or Kùzu without touching appliers or call sites.
postgres
Package postgres is fabriq's Postgres adapter, built on grove's pg driver.
Package postgres is fabriq's Postgres adapter, built on grove's pg driver.
redis
Package redis is fabriq's Redis adapter: event fan-out over Streams (publisher for the relay, tailer for the hub, consumer groups for projections), versioned-prefix caching, and ephemeral presence pub/sub.
Package redis is fabriq's Redis adapter: event fan-out over Streams (publisher for the relay, tailer for the hub, consumer groups for projections), versioned-prefix caching, and ephemeral presence pub/sub.
shard
Package shard routes a tenant's source-of-truth operations to the Postgres shard that holds it.
Package shard routes a tenant's source-of-truth operations to the Postgres shard that holds it.
trove
Package trovestore implements fabriq's core/blob.Store over the Trove byte engine used as a LIBRARY (trove.Open + the driver registry).
Package trovestore implements fabriq's core/blob.Store over the Trove byte engine used as a LIBRARY (trove.Open + the driver registry).
Package cachequery decorates the relational read port with a per-id, opt-in, read-through row cache.
Package cachequery decorates the relational read port with a per-id, opt-in, read-through row cache.
cmd
api-example command
Command api-example is a minimal TWINOS-style API service on Forge, demonstrating fabriq's data plane: commands, queries, and SSE fetch-then-subscribe.
Command api-example is a minimal TWINOS-style API service on Forge, demonstrating fabriq's data plane: commands, queries, and SSE fetch-then-subscribe.
fabriq command
Command fabriq is the data fabric's single binary: a Forge app wrapped in a CLI (forge/cli RunApp).
Command fabriq is the data fabric's single binary: a Forge app wrapped in a CLI (forge/cli RunApp).
Package conformance is fabriq's cross-port conformance kit: one exported Case table per capability port, run against BOTH the in-memory fabriqtest fakes and the real adapters, so the fakes cannot silently drift from Postgres / FalkorDB / Elasticsearch truth.
Package conformance is fabriq's cross-port conformance kit: one exported Case table per capability port, run against BOTH the in-memory fabriqtest fakes and the real adapters, so the fakes cannot silently drift from Postgres / FalkorDB / Elasticsearch truth.
core
agent
core/agent/altitude.go
core/agent/altitude.go
blob
Package blob is fabriq's byte-plane capability port.
Package blob is fabriq's byte-plane capability port.
cache
Package cache is fabriq's first-class caching port: a byte-level, scope-aware cache with per-keyspace freshness policy.
Package cache is fabriq's first-class caching port: a byte-level, scope-aware cache with per-keyspace freshness policy.
command
Package command is fabriq's only write path for KindAggregate entities.
Package command is fabriq's only write path for KindAggregate entities.
crypto
Package crypto provides field-level encryption for fabriq (e.g.
Package crypto provides field-level encryption for fabriq (e.g.
document
Package document is the CRDT document plane for KindDocument entities (collaborative documents: page-builder documents, annotations).
Package document is the CRDT document plane for KindDocument entities (collaborative documents: page-builder documents, annotations).
event
Package event defines fabriq's versioned event envelope, its codec, and the upcaster chain that migrates old payload schemas at decode time.
Package event defines fabriq's versioned event envelope, its codec, and the upcaster chain that migrates old payload schemas at decode time.
fabriqerr
Package fabriqerr holds the canonical shared error values used across fabriq's core packages.
Package fabriqerr holds the canonical shared error values used across fabriq's core packages.
livequery
Package livequery is fabriq's maintained-result-set live query engine: a client supplies a filter + sort + limit/cursor and receives a snapshot followed by a live stream of enter/leave/move/update deltas.
Package livequery is fabriq's maintained-result-set live query engine: a client supplies a filter + sort + limit/cursor and receives a snapshot followed by a live stream of enter/leave/move/update deltas.
livequery/cluster
Package cluster holds the engine-neutral coordination primitives for the sharded live query matcher tier: how data maps to partitions, and how live shards divide those partitions among themselves by rendezvous (HRW) hashing.
Package cluster holds the engine-neutral coordination primitives for the sharded live query matcher tier: how data maps to partitions, and how live shards divide those partitions among themselves by rendezvous (HRW) hashing.
livequery/match
Package match compiles a query.Where (fabriq's engine-neutral filter AST) into a Go predicate evaluated against a column-keyed map.
Package match compiles a query.Where (fabriq's engine-neutral filter AST) into a Go predicate evaluated against a column-keyed map.
projection
Package projection turns domain events into engine-neutral mutations and (in later phases) drives the consumer loops that apply them.
Package projection turns domain events into engine-neutral mutations and (in later phases) drives the consumer loops that apply them.
query
Package query defines fabriq's capability ports — explicit, engine-typed interfaces per storage capability — and the Fabric facade that exposes them.
Package query defines fabriq's capability ports — explicit, engine-typed interfaces per storage capability — and the Fabric facade that exposes them.
registry
Package registry is fabriq's declarative schema registry.
Package registry is fabriq's declarative schema registry.
subscribe
Package subscribe is fabriq's subscription plane: server-side channel resolution, the subscribe-time authorization gate, the conflating fan-out hub, and the SSE bridge.
Package subscribe is fabriq's subscription plane: server-side channel resolution, the subscribe-time authorization gate, the conflating fan-out hub, and the SSE bridge.
tenant
Package tenant carries the tenant identity on context.Context and is the single structural enforcement point for fabriq's tenancy invariant.
Package tenant carries the tenant identity on context.Context and is the single structural enforcement point for fabriq's tenancy invariant.
Package domain is the TWINOS domain pack: the only fabriq package with TWINOS-specific knowledge.
Package domain is the TWINOS domain pack: the only fabriq package with TWINOS-specific knowledge.
Package fabriqtest is fabriq's exported test kit.
Package fabriqtest is fabriq's exported test kit.
agentmcp
Package agentmcp exposes the fabriq agent toolkit over MCP (JSON-RPC 2.0).
Package agentmcp exposes the fabriq agent toolkit over MCP (JSON-RPC 2.0).
shieldguard
Package shieldguard adapts github.com/xraph/shield to the agent.Guard seam.
Package shieldguard adapts github.com/xraph/shield to the agent.Guard seam.
Package gateway is the transport shell that turns the in-process sharded live-query Gateway into a deployable edge tier: it terminates client SSE and WebSocket connections and forwards the maintained/streamed delta stream over them.
Package gateway is the transport shell that turns the in-process sharded live-query Gateway into a deployable edge tier: it terminates client SSE and WebSocket connections and forwards the maintained/streamed delta stream over them.
internal
metrics
Package metrics defines fabriq's Prometheus instruments.
Package metrics defines fabriq's Prometheus instruments.
otel
Package otel carries W3C trace context across fabriq's async hop: the command executor stamps the active traceparent into the event envelope, and consumers (relay, projections) restore it so ONE trace spans command -> outbox -> relay -> projection apply.
Package otel carries W3C trace context across fabriq's async hop: the command executor stamps the active traceparent into the event envelope, and consumers (relay, projections) restore it so ONE trace spans command -> outbox -> relay -> projection apply.
Package migrations is fabriq's DDL authority: grove Go-code migrations for every fabriq-owned table.
Package migrations is fabriq's DDL authority: grove Go-code migrations for every fabriq-owned table.
Package remote is the OPTIONAL server face of fabriq: it lets backend services talk to a central, connection-owning fabriq deployment over gRPC instead of embedding the library and owning their own datastore pools.
Package remote is the OPTIONAL server face of fabriq: it lets backend services talk to a central, connection-owning fabriq deployment over gRPC instead of embedding the library and owning their own datastore pools.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL