Documentation
¶
Overview ¶
Package fabriq is the TWINOS data fabric: the single module through which application code talks to every datastore.
Fabriq enforces three invariants across stores:
- Every write emits exactly one versioned event (transactional outbox).
- Every access is tenant-scoped (structural stamping + RLS + hook backstop).
- Projections (graph, search) are derived from Postgres and always rebuildable from it.
The kernel in core/ is domain-agnostic and engine-agnostic: capability ports (Relational, Graph, Search, Timeseries, Vector, Document), a declarative schema registry, a command plane, and a subscription hub. Engine dialects live exclusively under adapters/. The TWINOS domain pack lives in domain/ and is the only TWINOS-aware package.
f, err := fabriq.Open(ctx, cfg,
fabriq.WithConflationWindow(150*time.Millisecond),
)
Fabriq is built on the Forge ecosystem: storage on github.com/xraph/grove, binaries on github.com/xraph/forge (apps) and forge/cli (CLI).
Index ¶
- Variables
- func For[T any](f *Fabriq) (*query.Repo[T], error)
- func Open(ctx context.Context, reg *registry.Registry, cfg Config, opts ...Option) (*Fabriq, *Stores, error)
- type BlobRef
- type BlobSourceView
- type CacheConfig
- type Config
- type CreateFileOpts
- type CreateShareInput
- type ElasticsearchConfig
- type EncryptionConfig
- type Fabriq
- func (f *Fabriq) AddBookmark(ctx context.Context, userID, nodeID string, sortOrder int) (string, error)
- func (f *Fabriq) Ancestors(ctx context.Context, id string) ([]domain.FsNode, error)
- func (f *Fabriq) Blob() blob.Store
- func (f *Fabriq) CatchUp(ctx context.Context, scope query.SubscribeScope, afterID string, limit int) ([]query.Delta, error)
- func (f *Fabriq) Close() error
- func (f *Fabriq) CreateFile(ctx context.Context, parentID, name string, r io.Reader, opts CreateFileOpts) (FsRef, error)
- func (f *Fabriq) CreateFolder(ctx context.Context, parentID, name string) (FsRef, error)
- func (f *Fabriq) CreateMount(ctx context.Context, parentID, name string, mountConfig map[string]any) (FsRef, error)
- func (f *Fabriq) CreateShare(ctx context.Context, in CreateShareInput) (string, error)
- func (f *Fabriq) CreateSource(ctx context.Context, in SourceInput) (SourceRef, error)
- func (f *Fabriq) DeleteBlob(ctx context.Context, blobObjectID string) error
- func (f *Fabriq) DeleteShare(ctx context.Context, id string) error
- func (f *Fabriq) DeleteSource(ctx context.Context, id string) error
- func (f *Fabriq) Descendants(ctx context.Context, id string) ([]domain.FsNode, error)
- func (f *Fabriq) Document() document.Store
- func (f *Fabriq) Exec(ctx context.Context, cmd command.Command) (command.Result, error)
- func (f *Fabriq) ExecBatch(ctx context.Context, cmds []command.Command) ([]command.Result, error)
- func (f *Fabriq) GetBlob(ctx context.Context, blobObjectID string) (io.ReadCloser, BlobRef, error)
- func (f *Fabriq) GetNode(ctx context.Context, id string) (domain.FsNode, error)
- func (f *Fabriq) GetNodeByPath(ctx context.Context, path string) (domain.FsNode, error)
- func (f *Fabriq) GetShareByToken(ctx context.Context, token string) (domain.FsShare, error)
- func (f *Fabriq) GetSource(ctx context.Context, id string) (BlobSourceView, error)
- func (f *Fabriq) GrantPermission(ctx context.Context, ...) (string, error)
- func (f *Fabriq) Graph() query.GraphQuerier
- func (f *Fabriq) Hub() *subscribe.Hub
- func (f *Fabriq) IncrementShareDownload(ctx context.Context, id string) error
- func (f *Fabriq) ListChildren(ctx context.Context, parentID string, limit, offset int) ([]domain.FsNode, error)
- func (f *Fabriq) ListNodePermissions(ctx context.Context, nodeID string) ([]domain.FsPermission, error)
- func (f *Fabriq) ListNodeShares(ctx context.Context, nodeID string) ([]domain.FsShare, error)
- func (f *Fabriq) ListPrincipalPermissions(ctx context.Context, principalType, principalID string) ([]domain.FsPermission, error)
- func (f *Fabriq) ListSources(ctx context.Context) ([]BlobSourceView, error)
- func (f *Fabriq) ListUserBookmarks(ctx context.Context, userID string) ([]domain.FsBookmark, error)
- func (f *Fabriq) LiveQuery(ctx context.Context, q livequery.LiveQuery) (livequery.Snapshot, <-chan livequery.LiveDelta, *livequery.Handle, error)
- func (f *Fabriq) LockNode(ctx context.Context, id, by string) error
- func (f *Fabriq) MoveNode(ctx context.Context, id, newParentID string) (FsRef, error)
- func (f *Fabriq) PermanentDeleteNode(ctx context.Context, id string) error
- func (f *Fabriq) PutBlob(ctx context.Context, r io.Reader, opts PutBlobOpts) (BlobRef, error)
- func (f *Fabriq) ReconcileLiveQueries(ctx context.Context) (int, error)
- func (f *Fabriq) Registry() *registry.Registry
- func (f *Fabriq) Relational() query.RelationalQuerier
- func (f *Fabriq) RemoveBookmark(ctx context.Context, id string) error
- func (f *Fabriq) RenameNode(ctx context.Context, id, newName string) (FsRef, error)
- func (f *Fabriq) ReplaceFile(ctx context.Context, id string, r io.Reader, opts CreateFileOpts) (FsRef, error)
- func (f *Fabriq) ResolveMount(ctx context.Context, id string) (map[string]any, error)
- func (f *Fabriq) RestoreNode(ctx context.Context, id string) error
- func (f *Fabriq) RevokePermission(ctx context.Context, id string) error
- func (f *Fabriq) Search() query.SearchQuerier
- func (f *Fabriq) SearchNodesByName(ctx context.Context, q string, limit int) ([]domain.FsNode, error)
- func (f *Fabriq) Spatial() query.SpatialQuerier
- func (f *Fabriq) Subscribe(ctx context.Context, scope query.SubscribeScope) (<-chan query.Delta, error)
- func (f *Fabriq) SubscribeDocument(ctx context.Context, docID string) (<-chan query.Delta, error)
- func (f *Fabriq) Timeseries() query.TSQuerier
- func (f *Fabriq) TrashNode(ctx context.Context, id string) error
- func (f *Fabriq) UnlockNode(ctx context.Context, id string) error
- func (f *Fabriq) Upcasters() *event.UpcasterChain
- func (f *Fabriq) UpdateMount(ctx context.Context, id string, mountConfig map[string]any) (FsRef, error)
- func (f *Fabriq) UpdateSource(ctx context.Context, id string, in SourceInput) (SourceRef, error)
- func (f *Fabriq) Vector() query.VectorQuerier
- func (f *Fabriq) WaitForProjection(ctx context.Context, proj, aggregate, aggID string, version int64) error
- func (f *Fabriq) WatchChildren(ctx context.Context, parentID string, limit int) (livequery.Snapshot, <-chan livequery.LiveDelta, *livequery.Handle, error)
- type FalkorDBConfig
- type FsRef
- type LiveReader
- type NotFoundError
- type Option
- func WithAuthz(fn subscribe.AuthzFunc) Option
- func WithClock(now func() time.Time) Option
- func WithConflationWindow(d time.Duration) Option
- func WithDocumentAuthz(fn func(ctx context.Context, docID string) error) Option
- func WithEncryptor(e crypto.Encryptor) Option
- func WithLifecycleHook(hooks ...command.LifecycleHook) Option
- func WithLiveAuthz(fn livequery.AuthzFunc) Option
- func WithLiveCushion(n int) Option
- func WithStreamMaxLen(n int64) Option
- func WithSubscribeBuffer(n int) Option
- func WithTraceparent(fn func(context.Context) string) Option
- func WithUpcasters(chain *event.UpcasterChain) Option
- func WithWaitPollInterval(d time.Duration) Option
- type Ports
- type PostgresConfig
- type ProjectionsConfig
- type PutBlobOpts
- type RedisConfig
- type ShardConfig
- type ShardPG
- type SourceInput
- type SourceRef
- type StorageConfig
- type Stores
- func (s *Stores) AllTenants(ctx context.Context) ([]string, error)
- func (s *Stores) BlobReconciler(grace time.Duration) (*trovestore.BlobReconciler, error)
- func (s *Stores) Close() error
- func (s *Stores) GraphEngine(reg *registry.Registry, upcasters *event.UpcasterChain) (*projection.Engine, error)
- func (s *Stores) GraphRebuilder(reg *registry.Registry) (*projection.Rebuilder, error)
- func (s *Stores) GraphReconciler(reg *registry.Registry) (*projection.Reconciler, error)
- func (s *Stores) SearchEngine(reg *registry.Registry, upcasters *event.UpcasterChain) (*projection.Engine, error)
- func (s *Stores) SearchRebuilder(reg *registry.Registry) (*projection.Rebuilder, error)
- func (s *Stores) SearchReconciler(reg *registry.Registry) (*projection.Reconciler, error)
- func (s *Stores) ShardPGs() []ShardPG
- type SubscriptionsConfig
- type VersionConflictError
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoTenant is returned for any fabric call whose context was not // stamped with a tenant by auth middleware. ErrNoTenant = tenant.ErrNoTenant // ErrTenantHookTripped is returned when the grove pre-query backstop // detects a relational query without a tenant predicate. It firing // means a bug in fabriq itself: the structural stamping was bypassed. ErrTenantHookTripped = tenant.ErrTenantHookTripped // ErrNotFound is returned when an aggregate or row does not exist // within the calling tenant's scope. ErrNotFound = fabriqerr.ErrNotFound // ErrVersionConflict is returned when a command's expected version // does not match the stored aggregate version. ErrVersionConflict = fabriqerr.ErrVersionConflict // ErrProjectionLag is returned by WaitForProjection when the deadline // expires before the projection catches up to the requested version. ErrProjectionLag = fabriqerr.ErrProjectionLag // ErrStoreNotConfigured is returned by capability ports whose backing // store was not configured on Open (e.g. Graph() without FalkorDB). ErrStoreNotConfigured = fabriqerr.ErrStoreNotConfigured )
Sentinel errors form fabriq's typed taxonomy. Callers branch with errors.Is; rich variants (VersionConflictError, NotFoundError) carry detail and still match their sentinel. The canonical values live in core packages (root imports core, never the reverse) and are aliased here so application code uniformly writes fabriq.ErrX.
var ( // ErrNodeNameConflict is returned when a live sibling already has the name. ErrNodeNameConflict = errors.New("fabriq: fs_node name already exists in folder") // ErrNotContainer is returned when a child is created under a non-folder. ErrNotContainer = errors.New("fabriq: parent is not a folder") // ErrNodeLocked is returned when a mutating op targets a locked node. ErrNodeLocked = errors.New("fabriq: fs_node is locked") )
Functions ¶
func For ¶
For returns a type-safe repository over the entity whose grove model is T — the typed counterpart to f.Relational() and the projection ports. The entity is resolved from T (no string), and reads return *T / []*T:
repo, _ := fabriq.For[domain.Asset](f)
asset, err := repo.Get(ctx, id) // *domain.Asset
pump, err := repo.One(ctx, query.Eq("serial", sn))
kids, err := repo.Traverse(ctx, cypher, params) // graph -> hydrated
hits, err := repo.Search(ctx, "centrifugal", 20) // search -> hydrated
near, err := repo.Similar(ctx, embedding, 10) // vector -> hydrated
It is a free function, not a method, because Go methods cannot introduce type parameters. (The lower-level query.For takes a registry + querier directly; attach projection ports with WithGraph/WithSearch/WithVector.)
func Open ¶
func Open(ctx context.Context, reg *registry.Registry, cfg Config, opts ...Option) (*Fabriq, *Stores, error)
Open dials the configured stores and assembles a Fabriq:
- Postgres (required): command store, relational/timeseries/vector/spatial ports, projection bookkeeping.
- Redis (optional but required for live subscriptions and projections): change-channel tailer feeding the hub.
- FalkorDB / Elasticsearch ports come with phases 4–5; until then Graph()/Search() return ErrStoreNotConfigured.
The returned Stores handle also exposes the adapters the worker plane needs (relay, elector, consumer groups) — application services should only ever hold the *Fabriq.
Types ¶
type BlobRef ¶
type BlobRef struct {
ID string `json:"id"`
Hash string `json:"hash"`
Size int64 `json:"size"`
Version int64 `json:"version"`
}
BlobRef identifies a stored blob_object and its content.
type BlobSourceView ¶
type BlobSourceView struct {
ID string `json:"id"`
ProjectID string `json:"projectId"`
Name string `json:"name"`
Provider string `json:"provider"`
Endpoint string `json:"endpoint"`
BasePath string `json:"basePath"`
Auth map[string]any `json:"auth"`
WatchConfig map[string]any `json:"watchConfig"`
FileFilter map[string]any `json:"fileFilter"`
Tags map[string]string `json:"tags"`
Enabled bool `json:"enabled"`
Version int64 `json:"version"`
}
BlobSourceView is a decrypted read of a blob_source.
type CacheConfig ¶
type CacheConfig struct {
// L1Enabled gates the per-node LRU tier. Requires Redis.Addr to be set.
L1Enabled bool `yaml:"l1_enabled" json:"l1_enabled"`
// L1Size is the maximum number of entries the LRU holds (default 0 = no
// entries, so always set a positive value when L1Enabled is true).
L1Size int `yaml:"l1_size" json:"l1_size"`
// L1TTL is the per-entry time-to-live in the in-process store. It also
// bounds the cold-start cross-node staleness window: commits that land
// between Open() returning and the L1 evict tailer's first XRead attach
// are missed on this node and will remain stale until at most L1TTL
// elapses. Defaults to 5 minutes when L1Enabled is true and this is <= 0.
L1TTL time.Duration `yaml:"l1_ttl" json:"l1_ttl"`
}
CacheConfig controls the optional in-process L1 cache tier that sits in front of the shared Redis L2. When L1Enabled is false (the default) the engine uses the L2 adapter directly and behaviour is identical to P1-P3.
type Config ¶
type Config struct {
Postgres PostgresConfig `yaml:"postgres" json:"postgres"`
Shards []ShardConfig `yaml:"shards" json:"shards"`
Redis RedisConfig `yaml:"redis" json:"redis"`
FalkorDB FalkorDBConfig `yaml:"falkordb" json:"falkordb"`
Elasticsearch ElasticsearchConfig `yaml:"elasticsearch" json:"elasticsearch"`
Storage StorageConfig `yaml:"storage" json:"storage"`
Projections ProjectionsConfig `yaml:"projections" json:"projections"`
Subscriptions SubscriptionsConfig `yaml:"subscriptions" json:"subscriptions"`
Cache CacheConfig `yaml:"cache" json:"cache"`
Encryption EncryptionConfig `yaml:"encryption" json:"encryption"`
// CustomAppliers are consumer-supplied projection appliers, unioned after
// the built-in declarative applier for their Target. They MUST be pure (see
// projection.CustomApplier). The same set feeds both the live engines and
// the rebuilders, so live and rebuilt projections stay identical.
CustomAppliers []projection.CustomApplier `yaml:"-" json:"-"`
}
Config is fabriq's declarative configuration: which stores exist and which projections run. It is the same schema a future standalone `fabriqd` loads from YAML, hence the yaml tags. Entities are not configured here — they are registered in code via the registry.
type CreateFileOpts ¶
type CreateFileOpts struct {
ContentType string `json:"contentType"`
}
CreateFileOpts carries optional metadata for a file create.
type CreateShareInput ¶
type CreateShareInput struct {
}
CreateShareInput carries a share's fields. The seam supplies Token (generated) and PasswordHash (bcrypt) — fabriq persists them verbatim.
type ElasticsearchConfig ¶
type ElasticsearchConfig struct {
Addrs []string `yaml:"addrs" json:"addrs"`
Username string `yaml:"username" json:"username"`
Password string `yaml:"password" json:"password"`
}
ElasticsearchConfig locates the search projection engine.
type EncryptionConfig ¶
type EncryptionConfig struct {
Key string `yaml:"key" json:"key"`
}
EncryptionConfig configures field-level encryption (blob_source credentials). Key is a base64-encoded 32-byte AES-256 data-encryption key; empty disables encryption (writes that carry credentials then fail closed).
type Fabriq ¶
type Fabriq struct {
// contains filtered or unexported fields
}
Fabriq is the facade implementing query.Fabric: the single object application code holds to reach every datastore.
func New ¶
New assembles a Fabriq from explicit ports. Most services use Open (config-driven adapters) instead; New is the seam for tests, embedding and partial deployments.
func (*Fabriq) AddBookmark ¶
func (f *Fabriq) AddBookmark(ctx context.Context, userID, nodeID string, sortOrder int) (string, error)
AddBookmark bookmarks nodeID for userID. The (tenant, user, node) unique index rejects duplicates (surfaced as an error).
func (*Fabriq) Ancestors ¶
Ancestors returns the chain from the root down to (but excluding) the node, by walking parent_id. O(depth) reads — fine for filesystem depths. The returned slice is root→node order.
func (*Fabriq) CatchUp ¶
func (f *Fabriq) CatchUp(ctx context.Context, scope query.SubscribeScope, afterID string, limit int) ([]query.Delta, error)
CatchUp reads the deltas a reconnecting client missed on a scope's channel since afterID (its SSE Last-Event-ID), through the same authz gate as Subscribe. An empty slice with no error means the client is current; channels are short (MAXLEN~), so callers must treat a full page as "refetch instead". Delivery overlap with a live Subscribe is possible — consumers dedupe by StreamID.
func (*Fabriq) Close ¶
Close drains the subscription hub and, when this Fabriq was built by Open, closes the underlying stores.
func (*Fabriq) CreateFile ¶
func (f *Fabriq) CreateFile(ctx context.Context, parentID, name string, r io.Reader, opts CreateFileOpts) (FsRef, error)
CreateFile stores bytes (PutBlob → blob_object) then creates a file node referencing it (1:1), with denormalized facets. One blob event + one node event.
func (*Fabriq) CreateFolder ¶
CreateFolder creates a folder node under parentID ("" = root). One event.
func (*Fabriq) CreateMount ¶
func (f *Fabriq) CreateMount(ctx context.Context, parentID, name string, mountConfig map[string]any) (FsRef, error)
CreateMount creates a mount node (node_type=mount) under parentID with the given mount configuration. The sync engine that consumes the config lives in the seam, not in fabriq.
func (*Fabriq) CreateShare ¶
CreateShare persists a share record.
func (*Fabriq) CreateSource ¶
CreateSource persists a new blob_source with encrypted credentials.
func (*Fabriq) DeleteBlob ¶
DeleteBlob removes the blob_object catalog row (one versioned event). Byte GC and ref-count decrement are deferred to Phase 4.
func (*Fabriq) DeleteShare ¶
DeleteShare removes a share record.
func (*Fabriq) DeleteSource ¶
DeleteSource removes a blob_source.
func (*Fabriq) Descendants ¶
Descendants returns all live nodes under id (by path prefix), ordered by path.
func (*Fabriq) GetBlob ¶
GetBlob resolves the blob_object catalog row by ID, then streams its bytes from CAS. Returns ErrStoreNotConfigured when CAS is not wired.
func (*Fabriq) GetNodeByPath ¶
GetNodeByPath resolves a live node by its materialized path.
func (*Fabriq) GetShareByToken ¶
GetShareByToken resolves a share by its token (DATA only — expiry/cap/password enforcement is the seam's job).
func (*Fabriq) GrantPermission ¶
func (f *Fabriq) GrantPermission(ctx context.Context, nodeID, principalType, principalID, permission, grantedBy string) (string, error)
GrantPermission grants principal (type+id) `permission` on nodeID.
func (*Fabriq) Hub ¶
Hub exposes the subscription hub for the delta pump (fabriq-worker / the redis stream bridge) and for shutdown draining. Application code subscribes through Subscribe, never directly.
func (*Fabriq) IncrementShareDownload ¶
IncrementShareDownload atomically bumps download_count via a command-plane read-modify-write with optimistic concurrency (one retry on version conflict).
func (*Fabriq) ListChildren ¶
func (f *Fabriq) ListChildren(ctx context.Context, parentID string, limit, offset int) ([]domain.FsNode, error)
ListChildren returns the live children of parentID, ordered by name.
func (*Fabriq) ListNodePermissions ¶
func (f *Fabriq) ListNodePermissions(ctx context.Context, nodeID string) ([]domain.FsPermission, error)
ListNodePermissions returns all grants on nodeID.
func (*Fabriq) ListNodeShares ¶
ListNodeShares returns all shares for a node.
func (*Fabriq) ListPrincipalPermissions ¶
func (f *Fabriq) ListPrincipalPermissions(ctx context.Context, principalType, principalID string) ([]domain.FsPermission, error)
ListPrincipalPermissions returns all grants to a principal.
func (*Fabriq) ListSources ¶
func (f *Fabriq) ListSources(ctx context.Context) ([]BlobSourceView, error)
ListSources reads and decrypts all of the tenant's blob_sources.
func (*Fabriq) ListUserBookmarks ¶
ListUserBookmarks returns a user's bookmarks, ordered by sort_order.
func (*Fabriq) LiveQuery ¶
func (f *Fabriq) LiveQuery(ctx context.Context, q livequery.LiveQuery) (livequery.Snapshot, <-chan livequery.LiveDelta, *livequery.Handle, error)
LiveQuery registers a maintained-result-set subscription: it validates the query against the entity's LiveSpec, takes an RLS-enforced snapshot, and returns the snapshot plus a live channel of enter/leave/move/update deltas. Close the returned *livequery.Handle to tear the subscription down; its Reanchor method slides a maintained window for deep/infinite scroll.
func (*Fabriq) MoveNode ¶
MoveNode re-parents a node under newParentID, rewriting its own path and all descendant paths in the same tx. Rejects moving into its own subtree.
func (*Fabriq) PermanentDeleteNode ¶
PermanentDeleteNode hard-deletes a node and its subtree (one OpDelete event each) and deletes each file node's blob_object so Phase-4 GC reclaims bytes.
func (*Fabriq) PutBlob ¶
PutBlob stores bytes (content-addressed, deduped) then creates the blob_object catalog row — one versioned event. Bytes-first, command-authoritative. Returns ErrStoreNotConfigured when CAS is not wired.
func (*Fabriq) ReconcileLiveQueries ¶
ReconcileLiveQueries runs the live query drift backstop: every maintained subscription is re-checked against Postgres truth and re-snapshotted where it diverged. Returns the number of subscriptions repaired. Intended to be called on a low-cadence schedule by the worker. A no-op (0, nil) when live queries are not configured.
func (*Fabriq) Relational ¶
func (f *Fabriq) Relational() query.RelationalQuerier
Relational implements query.Fabric.
func (*Fabriq) RemoveBookmark ¶
RemoveBookmark removes a bookmark by id.
func (*Fabriq) RenameNode ¶
RenameNode renames a node in place, rewriting its own path and (in the same tx) all descendant paths. One event for the node.
func (*Fabriq) ReplaceFile ¶
func (f *Fabriq) ReplaceFile(ctx context.Context, id string, r io.Reader, opts CreateFileOpts) (FsRef, error)
ReplaceFile swaps a file node's bytes for new content: PutBlob a new blob_object, repoint blob_id + denormalized facets, bump version. The previous blob_object is intentionally NOT deleted (a prior version may still be referenced elsewhere; Phase-4 GC reclaims genuinely-unreferenced bytes).
func (*Fabriq) ResolveMount ¶
ResolveMount returns a mount node's configuration.
func (*Fabriq) RestoreNode ¶
RestoreNode clears soft-delete on a node and its whole subtree.
func (*Fabriq) RevokePermission ¶
RevokePermission removes a permission grant by id.
func (*Fabriq) Search ¶
func (f *Fabriq) Search() query.SearchQuerier
Search implements query.Fabric.
func (*Fabriq) SearchNodesByName ¶
func (f *Fabriq) SearchNodesByName(ctx context.Context, q string, limit int) ([]domain.FsNode, error)
SearchNodesByName does a live SQL ILIKE name search. The Elasticsearch projection (Search Spec) is the scalable/fuzzy path when ES is configured.
func (*Fabriq) Spatial ¶
func (f *Fabriq) Spatial() query.SpatialQuerier
Spatial implements query.Fabric.
func (*Fabriq) Subscribe ¶
func (f *Fabriq) Subscribe(ctx context.Context, scope query.SubscribeScope) (<-chan query.Delta, error)
Subscribe implements query.Fabric: authz hook, server-side channel resolution, conflated delivery.
func (*Fabriq) SubscribeDocument ¶
SubscribeDocument attaches to a document's live sync frames: RAW delivery (every frame, in order, no conflation), channel resolved server-side from the validated doc id and the context tenant, through the same authz hook as Subscribe (scope name "doc"). Frame payloads are the update blobs; Version is the log seq — a gap means "call Document().Sync and resume".
func (*Fabriq) Timeseries ¶
Timeseries implements query.Fabric.
func (*Fabriq) UnlockNode ¶
UnlockNode clears a node's lock.
func (*Fabriq) Upcasters ¶
func (f *Fabriq) Upcasters() *event.UpcasterChain
Upcasters exposes the registered payload upcaster chain (nil when none) — the worker hands it to projection engines.
func (*Fabriq) UpdateMount ¶
func (f *Fabriq) UpdateMount(ctx context.Context, id string, mountConfig map[string]any) (FsRef, error)
UpdateMount replaces a mount node's configuration.
func (*Fabriq) UpdateSource ¶
UpdateSource replaces a blob_source (re-encrypting auth), bumping version.
func (*Fabriq) Vector ¶
func (f *Fabriq) Vector() query.VectorQuerier
Vector implements query.Fabric.
func (*Fabriq) WaitForProjection ¶
func (f *Fabriq) WaitForProjection(ctx context.Context, proj, aggregate, aggID string, version int64) error
WaitForProjection implements query.Fabric by polling the projection state port until the aggregate reaches version or ctx expires.
func (*Fabriq) WatchChildren ¶
func (f *Fabriq) WatchChildren(ctx context.Context, parentID string, limit int) (livequery.Snapshot, <-chan livequery.LiveDelta, *livequery.Handle, error)
WatchChildren returns a maintained result set of a folder's live children, ordered by name — the per-folder live view the explorer subscribes to. Trashed children (deleted_at IS NOT NULL) are excluded from the live window. Single-shard deployments only (the LiveQuery constraint).
type FalkorDBConfig ¶
type FalkorDBConfig struct {
Addr string `yaml:"addr" json:"addr"`
Username string `yaml:"username" json:"username"`
Password string `yaml:"password" json:"password"`
}
FalkorDBConfig locates the graph projection engine.
type FsRef ¶
type FsRef struct {
ID string `json:"id"`
ParentID string `json:"parentId"`
Name string `json:"name"`
Path string `json:"path"`
NodeType string `json:"nodeType"`
Version int64 `json:"version"`
}
FsRef identifies a created/updated fs_node (analog of BlobRef).
type LiveReader ¶
type LiveReader interface {
livequery.Snapshotter
livequery.Refiller
}
LiveReader is the snapshot + boundary-refill oracle the live query engine reads from (implemented by adapters/postgres LiveStore). Postgres stays authoritative for ordering and exact top-N.
type NotFoundError ¶
type NotFoundError = fabriqerr.NotFoundError
NotFoundError reports a missing aggregate within the tenant's scope.
type Option ¶
type Option func(*settings)
Option customizes a Fabriq.
func WithConflationWindow ¶
WithConflationWindow tunes the hub's LWW flush window (spec range 100–250ms; default 150ms).
func WithDocumentAuthz ¶
WithDocumentAuthz installs the document-plane authorization hook, consulted for BOTH ApplyUpdate (writes) and SubscribeDocument (reads). Without it, any authenticated member of the tenant may touch any of the tenant's documents.
func WithEncryptor ¶
WithEncryptor sets the field encryptor used for blob_source credentials.
func WithLifecycleHook ¶
func WithLifecycleHook(hooks ...command.LifecycleHook) Option
WithLifecycleHook appends data-lifecycle hooks to the command plane. Each hook runs INSIDE the write transaction after every change is staged: it may write its own rows atomically (via the tx handle) or veto the write by returning an error (which rolls the whole command back). Hooks fire in registration order across all WithLifecycleHook calls. This is the in-tx, cross-cutting seam an auditing/chronicle extension hooks into.
func WithLiveAuthz ¶
WithLiveAuthz installs the authorization hook run before a live query's snapshot. It may also be used (in later phases) to inject mandatory row-visibility predicates into the query's filter.
func WithLiveCushion ¶
WithLiveCushion sets how many extra rows beyond the visible window each maintained live query buffers, to absorb boundary churn before a Postgres refill is needed (default 16).
func WithStreamMaxLen ¶
WithStreamMaxLen sets the approximate MAXLEN for per-channel Redis streams (catch-up depth before clients must refetch; default 500).
func WithSubscribeBuffer ¶
WithSubscribeBuffer sets the per-subscriber delta buffer; full buffers drop (clients refetch + resume by Last-Event-ID).
func WithTraceparent ¶
WithTraceparent supplies the W3C traceparent extractor stamped into event envelopes (internal/otel provides the production one).
func WithUpcasters ¶
func WithUpcasters(chain *event.UpcasterChain) Option
WithUpcasters registers the event payload upcaster chain. Projection engines apply it at decode, so appliers only ever see the latest payload shape; register one vN->vN+1 step per evolved event type.
func WithWaitPollInterval ¶
WithWaitPollInterval tunes WaitForProjection's poll cadence.
type Ports ¶
type Ports struct {
Store command.Store
Relational query.RelationalQuerier
Graph query.GraphQuerier
Search query.SearchQuerier
Timeseries query.TSQuerier
Vector query.VectorQuerier
Spatial query.SpatialQuerier
Documents document.Store
// Blob is the byte-plane port; nil degrades to ErrStoreNotConfigured.
Blob blob.Store
// CAS is the content-addressable store port; nil when EnableCas is false.
CAS blob.CAS
ProjectionState projection.StateReader
// Live is the snapshot/refill oracle for live queries; when set (and a
// tailer is configured) the facade exposes LiveQuery.
Live LiveReader
// Cache is the engine cache (nil when not configured); enables result-set
// caching at Repo[T] for opted-in entities.
Cache cache.Cache
}
Ports bundles the port implementations a Fabriq is assembled from. Open() fills them from configured adapters; tests and embedders may supply fabriqtest fakes or custom implementations directly. Store and Relational are mandatory (Postgres is the source of truth); every other port degrades to a typed ErrStoreNotConfigured.
type PostgresConfig ¶
type PostgresConfig struct {
DSN string `yaml:"dsn" json:"dsn"`
PoolSize int `yaml:"pool_size" json:"pool_size"`
}
PostgresConfig locates the source of truth. Required unless Shards is set (it is the one-shard shorthand).
type ProjectionsConfig ¶
type ProjectionsConfig struct {
Graph bool `yaml:"graph" json:"graph"`
Search bool `yaml:"search" json:"search"`
}
ProjectionsConfig switches projection planes on.
type PutBlobOpts ¶
type PutBlobOpts struct {
ContentType string `json:"contentType"`
}
PutBlobOpts carries optional metadata for a blob write.
type RedisConfig ¶
type RedisConfig struct {
Addr string `yaml:"addr" json:"addr"`
DB int `yaml:"db" json:"db"`
Username string `yaml:"username" json:"username"`
Password string `yaml:"password" json:"password"`
}
RedisConfig locates the event fan-out / cache store.
type ShardConfig ¶
type ShardConfig struct {
ID string `yaml:"id" json:"id"`
DSN string `yaml:"dsn" json:"dsn"`
PoolSize int `yaml:"pool_size" json:"pool_size"`
}
ShardConfig locates one source-of-truth shard. When Config.Shards is non-empty, tenants are routed across these by the directory (ADR 0007); each shard should be its own Postgres database so its advisory-lock leadership (relay) is independent. Leaving Shards empty and setting Postgres is the degenerate one-shard deployment.
type ShardPG ¶
ShardPG pairs a shard id with its concrete Postgres adapter — what the worker iterates to start a per-shard relay.
type SourceInput ¶
type SourceInput struct {
ProjectID string `json:"projectId"`
Name string `json:"name"`
Provider string `json:"provider"`
Endpoint string `json:"endpoint"`
BasePath string `json:"basePath"`
Auth map[string]any `json:"auth"`
WatchConfig map[string]any `json:"watchConfig"`
FileFilter map[string]any `json:"fileFilter"`
Tags map[string]string `json:"tags"`
Enabled bool `json:"enabled"`
}
SourceInput carries a blob_source's fields with PLAINTEXT auth (encrypted at the boundary).
type StorageConfig ¶
type StorageConfig struct {
StorageDriver string `yaml:"storageDriver" json:"storageDriver"`
DefaultBucket string `yaml:"defaultBucket" json:"defaultBucket"`
// EnableCas gates the content-addressable store layer. When true, Open()
// will wire a CASStore backed by the blob_cas ledger (requires a Postgres
// adapter). The open.go wiring that reads this field lands in Phase 3b.
EnableCas bool `yaml:"enableCas" json:"enableCas"`
}
StorageConfig configures the object-store backend that fills f.Blob(). Empty StorageDriver leaves the blob port unconfigured (shipped dark).
type Stores ¶
type Stores struct {
// Postgres is the PRIMARY shard: health checks, the migrations CLI, and
// the document plane (which stays single-shard in step 2) use it. For
// per-tenant work use Shards / ShardPGs.
Postgres *postgres.Adapter
Redis *redis.Adapter
Falkor *falkordb.Adapter
Elastic *elastic.Adapter
Blob *trovestore.Adapter // nil when Storage not configured
// CAS is the content-addressable store (nil when EnableCas is false).
CAS *trovestore.CASStore
// Cache is the engine cache (nil when Redis is not configured).
Cache corecache.Cache
// Shards is the tenant -> source-of-truth routing table backing the
// facade's relational/command/vector/timeseries/spatial ports (ADR 0007).
Shards *shard.Set
// contains filtered or unexported fields
}
Stores exposes the opened adapters for worker-plane wiring (relay, electors, projection consumers) and shutdown.
func (*Stores) AllTenants ¶
AllTenants unions every shard's known tenants (the outbox-derived discovery the reconciler and rebuild --all-tenants scan). Sorted, deduped.
func (*Stores) BlobReconciler ¶
func (s *Stores) BlobReconciler(grace time.Duration) (*trovestore.BlobReconciler, error)
BlobReconciler assembles the per-tenant blob CAS reconciler (ref-count recompute, byte GC, broken-row + orphan detection). Returns an error when the CAS layer is not configured (Storage.EnableCas false).
func (*Stores) GraphEngine ¶
func (s *Stores) GraphEngine(reg *registry.Registry, upcasters *event.UpcasterChain) (*projection.Engine, error)
GraphEngine assembles the graph projection consumer over the opened stores: Redis consumer group -> registry-derived applier -> FalkorDB sink, with projection_state bookkeeping and rebuild-aware dual targets. Run one per worker replica (consumer groups scale without election).
func (*Stores) GraphRebuilder ¶
GraphRebuilder assembles the blue-green rebuilder for the graph projection (used by `fabriq rebuild` and tests).
func (*Stores) GraphReconciler ¶
func (s *Stores) GraphReconciler(reg *registry.Registry) (*projection.Reconciler, error)
GraphReconciler assembles drift detection + repair for the graph projection.
func (*Stores) SearchEngine ¶
func (s *Stores) SearchEngine(reg *registry.Registry, upcasters *event.UpcasterChain) (*projection.Engine, error)
SearchEngine assembles the search projection consumer: Redis consumer group -> registry-derived applier -> Elasticsearch sink with external version gating. Run one per worker replica.
func (*Stores) SearchRebuilder ¶
SearchRebuilder assembles the blue-green rebuilder for the search projection; the alias swap rides the flip (OnFlip).
func (*Stores) SearchReconciler ¶
func (s *Stores) SearchReconciler(reg *registry.Registry) (*projection.Reconciler, error)
SearchReconciler assembles drift detection + repair for the search projection.
type SubscriptionsConfig ¶
type SubscriptionsConfig struct {
ConflationWindow time.Duration `yaml:"conflation_window" json:"conflation_window"`
StreamMaxLen int64 `yaml:"stream_max_len" json:"stream_max_len"`
SubscribeBuffer int `yaml:"subscribe_buffer" json:"subscribe_buffer"`
}
SubscriptionsConfig tunes the delta plane.
type VersionConflictError ¶
type VersionConflictError = fabriqerr.VersionConflictError
VersionConflictError reports an optimistic-concurrency failure.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
adapters
|
|
|
cache
Package cache is fabriq's caching adapter: the core/cache port implemented over grove kv's Store (redis driver).
|
Package cache is fabriq's caching adapter: the core/cache port implemented over grove kv's Store (redis driver). |
|
elastic
Package elastic is fabriq's search adapter on go-elasticsearch.
|
Package elastic is fabriq's search adapter on go-elasticsearch. |
|
falkordb
Package falkordb is fabriq's graph adapter.
|
Package falkordb is fabriq's graph adapter. |
|
graphtest
Package graphtest is the EXPORTED graph-dialect conformance suite: the gate every GraphQuerier must pass before fabriq will project into it, and the contract that keeps shipped Cypher inside the openCypher common subset so FalkorDB can later be swapped for Memgraph, Neo4j or Kùzu without touching appliers or call sites.
|
Package graphtest is the EXPORTED graph-dialect conformance suite: the gate every GraphQuerier must pass before fabriq will project into it, and the contract that keeps shipped Cypher inside the openCypher common subset so FalkorDB can later be swapped for Memgraph, Neo4j or Kùzu without touching appliers or call sites. |
|
postgres
Package postgres is fabriq's Postgres adapter, built on grove's pg driver.
|
Package postgres is fabriq's Postgres adapter, built on grove's pg driver. |
|
redis
Package redis is fabriq's Redis adapter: event fan-out over Streams (publisher for the relay, tailer for the hub, consumer groups for projections), versioned-prefix caching, and ephemeral presence pub/sub.
|
Package redis is fabriq's Redis adapter: event fan-out over Streams (publisher for the relay, tailer for the hub, consumer groups for projections), versioned-prefix caching, and ephemeral presence pub/sub. |
|
shard
Package shard routes a tenant's source-of-truth operations to the Postgres shard that holds it.
|
Package shard routes a tenant's source-of-truth operations to the Postgres shard that holds it. |
|
trove
Package trovestore implements fabriq's core/blob.Store over the Trove byte engine used as a LIBRARY (trove.Open + the driver registry).
|
Package trovestore implements fabriq's core/blob.Store over the Trove byte engine used as a LIBRARY (trove.Open + the driver registry). |
|
Package cachequery decorates the relational read port with a per-id, opt-in, read-through row cache.
|
Package cachequery decorates the relational read port with a per-id, opt-in, read-through row cache. |
|
cmd
|
|
|
api-example
command
Command api-example is a minimal TWINOS-style API service on Forge, demonstrating fabriq's data plane: commands, queries, and SSE fetch-then-subscribe.
|
Command api-example is a minimal TWINOS-style API service on Forge, demonstrating fabriq's data plane: commands, queries, and SSE fetch-then-subscribe. |
|
fabriq
command
Command fabriq is the data fabric's single binary: a Forge app wrapped in a CLI (forge/cli RunApp).
|
Command fabriq is the data fabric's single binary: a Forge app wrapped in a CLI (forge/cli RunApp). |
|
Package conformance is fabriq's cross-port conformance kit: one exported Case table per capability port, run against BOTH the in-memory fabriqtest fakes and the real adapters, so the fakes cannot silently drift from Postgres / FalkorDB / Elasticsearch truth.
|
Package conformance is fabriq's cross-port conformance kit: one exported Case table per capability port, run against BOTH the in-memory fabriqtest fakes and the real adapters, so the fakes cannot silently drift from Postgres / FalkorDB / Elasticsearch truth. |
|
core
|
|
|
agent
core/agent/altitude.go
|
core/agent/altitude.go |
|
blob
Package blob is fabriq's byte-plane capability port.
|
Package blob is fabriq's byte-plane capability port. |
|
cache
Package cache is fabriq's first-class caching port: a byte-level, scope-aware cache with per-keyspace freshness policy.
|
Package cache is fabriq's first-class caching port: a byte-level, scope-aware cache with per-keyspace freshness policy. |
|
command
Package command is fabriq's only write path for KindAggregate entities.
|
Package command is fabriq's only write path for KindAggregate entities. |
|
crypto
Package crypto provides field-level encryption for fabriq (e.g.
|
Package crypto provides field-level encryption for fabriq (e.g. |
|
document
Package document is the CRDT document plane for KindDocument entities (collaborative documents: page-builder documents, annotations).
|
Package document is the CRDT document plane for KindDocument entities (collaborative documents: page-builder documents, annotations). |
|
event
Package event defines fabriq's versioned event envelope, its codec, and the upcaster chain that migrates old payload schemas at decode time.
|
Package event defines fabriq's versioned event envelope, its codec, and the upcaster chain that migrates old payload schemas at decode time. |
|
fabriqerr
Package fabriqerr holds the canonical shared error values used across fabriq's core packages.
|
Package fabriqerr holds the canonical shared error values used across fabriq's core packages. |
|
livequery
Package livequery is fabriq's maintained-result-set live query engine: a client supplies a filter + sort + limit/cursor and receives a snapshot followed by a live stream of enter/leave/move/update deltas.
|
Package livequery is fabriq's maintained-result-set live query engine: a client supplies a filter + sort + limit/cursor and receives a snapshot followed by a live stream of enter/leave/move/update deltas. |
|
livequery/cluster
Package cluster holds the engine-neutral coordination primitives for the sharded live query matcher tier: how data maps to partitions, and how live shards divide those partitions among themselves by rendezvous (HRW) hashing.
|
Package cluster holds the engine-neutral coordination primitives for the sharded live query matcher tier: how data maps to partitions, and how live shards divide those partitions among themselves by rendezvous (HRW) hashing. |
|
livequery/match
Package match compiles a query.Where (fabriq's engine-neutral filter AST) into a Go predicate evaluated against a column-keyed map.
|
Package match compiles a query.Where (fabriq's engine-neutral filter AST) into a Go predicate evaluated against a column-keyed map. |
|
projection
Package projection turns domain events into engine-neutral mutations and (in later phases) drives the consumer loops that apply them.
|
Package projection turns domain events into engine-neutral mutations and (in later phases) drives the consumer loops that apply them. |
|
query
Package query defines fabriq's capability ports — explicit, engine-typed interfaces per storage capability — and the Fabric facade that exposes them.
|
Package query defines fabriq's capability ports — explicit, engine-typed interfaces per storage capability — and the Fabric facade that exposes them. |
|
registry
Package registry is fabriq's declarative schema registry.
|
Package registry is fabriq's declarative schema registry. |
|
subscribe
Package subscribe is fabriq's subscription plane: server-side channel resolution, the subscribe-time authorization gate, the conflating fan-out hub, and the SSE bridge.
|
Package subscribe is fabriq's subscription plane: server-side channel resolution, the subscribe-time authorization gate, the conflating fan-out hub, and the SSE bridge. |
|
tenant
Package tenant carries the tenant identity on context.Context and is the single structural enforcement point for fabriq's tenancy invariant.
|
Package tenant carries the tenant identity on context.Context and is the single structural enforcement point for fabriq's tenancy invariant. |
|
Package domain is the TWINOS domain pack: the only fabriq package with TWINOS-specific knowledge.
|
Package domain is the TWINOS domain pack: the only fabriq package with TWINOS-specific knowledge. |
|
Package fabriqtest is fabriq's exported test kit.
|
Package fabriqtest is fabriq's exported test kit. |
|
agentmcp
Package agentmcp exposes the fabriq agent toolkit over MCP (JSON-RPC 2.0).
|
Package agentmcp exposes the fabriq agent toolkit over MCP (JSON-RPC 2.0). |
|
shieldguard
Package shieldguard adapts github.com/xraph/shield to the agent.Guard seam.
|
Package shieldguard adapts github.com/xraph/shield to the agent.Guard seam. |
|
Package gateway is the transport shell that turns the in-process sharded live-query Gateway into a deployable edge tier: it terminates client SSE and WebSocket connections and forwards the maintained/streamed delta stream over them.
|
Package gateway is the transport shell that turns the in-process sharded live-query Gateway into a deployable edge tier: it terminates client SSE and WebSocket connections and forwards the maintained/streamed delta stream over them. |
|
internal
|
|
|
metrics
Package metrics defines fabriq's Prometheus instruments.
|
Package metrics defines fabriq's Prometheus instruments. |
|
otel
Package otel carries W3C trace context across fabriq's async hop: the command executor stamps the active traceparent into the event envelope, and consumers (relay, projections) restore it so ONE trace spans command -> outbox -> relay -> projection apply.
|
Package otel carries W3C trace context across fabriq's async hop: the command executor stamps the active traceparent into the event envelope, and consumers (relay, projections) restore it so ONE trace spans command -> outbox -> relay -> projection apply. |
|
Package migrations is fabriq's DDL authority: grove Go-code migrations for every fabriq-owned table.
|
Package migrations is fabriq's DDL authority: grove Go-code migrations for every fabriq-owned table. |
|
Package remote is the OPTIONAL server face of fabriq: it lets backend services talk to a central, connection-owning fabriq deployment over gRPC instead of embedding the library and owning their own datastore pools.
|
Package remote is the OPTIONAL server face of fabriq: it lets backend services talk to a central, connection-owning fabriq deployment over gRPC instead of embedding the library and owning their own datastore pools. |