Documentation
¶
Overview ¶
Package query defines fabriq's capability ports — explicit, engine-typed interfaces per storage capability — and the Fabric facade that exposes them. There is deliberately no unified query language: relational work speaks SQL through grove, graph work speaks openCypher, search speaks queries against declared fields. No engine types (pgx, grove, Falkor, go-elasticsearch) appear in any signature, so adapters stay swappable.
Index ¶
- func SortField(sort string) (column string, desc bool)
- func TraverseAndHydrate(ctx context.Context, reg *registry.Registry, g GraphQuerier, ...) error
- func ValidateConds(conds []Cond, has func(column string) bool) error
- func ValidateSearchQuery(q SearchQuery, searchFields []string) error
- type Cond
- func Eq(column string, value any) Cond
- func Gt(column string, value any) Cond
- func Gte(column string, value any) Cond
- func ILike(column, pattern string) Cond
- func In(column string, values any) Cond
- func IsNotNull(column string) Cond
- func IsNull(column string) Cond
- func Like(column, pattern string) Cond
- func Lt(column string, value any) Cond
- func Lte(column string, value any) Cond
- func Ne(column string, value any) Cond
- func NotIn(column string, values any) Cond
- func Or(conds ...Cond) Cond
- type Delta
- type Fabric
- type Geometry
- type GraphQuerier
- type ListQuery
- type Op
- type Point
- type RangeQuery
- type RelationalQuerier
- type Repo
- func (r *Repo[T]) Entity() string
- func (r *Repo[T]) Get(ctx context.Context, id string) (*T, error)
- func (r *Repo[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
- func (r *Repo[T]) In(ctx context.Context, id, rel string) ([]*T, error)
- func (r *Repo[T]) List(ctx context.Context, q ListQuery) ([]*T, error)
- func (r *Repo[T]) One(ctx context.Context, where ...Cond) (*T, error)
- func (r *Repo[T]) Out(ctx context.Context, id, rel string) ([]*T, error)
- func (r *Repo[T]) Reachable(ctx context.Context, id, rel string, minHops, maxHops int) ([]*T, error)
- func (r *Repo[T]) Search(ctx context.Context, text string, limit int) ([]*T, error)
- func (r *Repo[T]) SearchWith(ctx context.Context, req SearchRequest) ([]*T, error)
- func (r *Repo[T]) Similar(ctx context.Context, embedding []float32, k int) ([]*T, error)
- func (r *Repo[T]) Traverse(ctx context.Context, cypher string, params map[string]any) ([]*T, error)
- func (r *Repo[T]) WithGraph(g GraphQuerier) *Repo[T]
- func (r *Repo[T]) WithResultCache(c cache.Cache, ks cache.Keyspace) *Repo[T]
- func (r *Repo[T]) WithSearch(s SearchQuerier) *Repo[T]
- func (r *Repo[T]) WithVector(v VectorQuerier) *Repo[T]
- type SearchQuerier
- type SearchQuery
- type SearchRequest
- type SpatialMatch
- type SpatialQuerier
- type SpatialQuery
- type SubscribeScope
- type TSQuerier
- type VectorMatch
- type VectorQuerier
- type VectorQuery
- type Where
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SortField ¶
SortField splits a "column [DESC]" sort spec into the column and whether it is descending. Empty input yields an empty column — sort by relevance. Adapters and the validator share it so they agree on how a sort is read.
func TraverseAndHydrate ¶
func TraverseAndHydrate(ctx context.Context, reg *registry.Registry, g GraphQuerier, rel RelationalQuerier, cypher string, params map[string]any, into any, ) error
TraverseAndHydrate is the composed graph→relational read: run a Cypher traversal that RETURNs aggregate ids (single column), then hydrate the full rows from Postgres in exactly ONE batched relational query. The entity is inferred from into's element type via the registry, so the call site stays Graph().TraverseAndHydrate(ctx, cypher, params, &assets).
Adapters implement GraphQuerier.TraverseAndHydrate by delegating here.
func ValidateConds ¶
ValidateConds checks a filter tree against an entity's columns and the operator vocabulary. has reports whether a column exists. Adapters call this before translating, so the same rules guard every engine.
func ValidateSearchQuery ¶
func ValidateSearchQuery(q SearchQuery, searchFields []string) error
ValidateSearchQuery checks a SearchQuery's Filter and Sort against the entity's indexed fields and the operator vocabulary — so every search adapter rejects the same unknown-column / bad-operator inputs (and the same injection surface) before translating to its engine. searchFields is the entity's declared search fields; id/tenant_id/version are always allowed.
Types ¶
type Cond ¶
type Cond struct {
Column string
Op Op
Value any // a slice for In/NotIn; ignored for IsNull/IsNotNull
Or []Cond // when non-empty, an OR group (Column/Op/Value ignored)
}
Cond is one engine-neutral predicate on a column — or, when Or is set, an OR group of sub-predicates. Build them with the constructors (Eq, In, Like, Or, …) rather than by hand.
type Delta ¶
type Delta struct {
// StreamID is the transport position (Redis stream entry ID). It maps
// 1:1 onto the SSE "id:" field so Last-Event-ID resume works.
StreamID string `json:"stream_id"`
// Channel the delta was delivered on.
Channel string `json:"channel"`
TenantID string `json:"tenant_id"`
Aggregate string `json:"aggregate"`
AggID string `json:"agg_id"`
Version int64 `json:"version"`
Type string `json:"type"`
At time.Time `json:"at"`
Payload json.RawMessage `json:"payload"`
}
Delta is what Subscribe channels carry: one change notification, small enough to conflate, complete enough that simple UIs can patch state without a refetch (and rich UIs can refetch on demand).
type Fabric ¶
type Fabric interface {
// Exec runs one command: the only write path for aggregates.
Exec(ctx context.Context, cmd command.Command) (command.Result, error)
// ExecBatch runs N commands in one transaction, ordered, all-or-nothing.
ExecBatch(ctx context.Context, cmds []command.Command) ([]command.Result, error)
Relational() RelationalQuerier
Graph() GraphQuerier
Search() SearchQuerier
Timeseries() TSQuerier
Vector() VectorQuerier
Spatial() SpatialQuerier
Document() document.Store
Blob() blob.Store
// Subscribe resolves the scope to a channel server-side (authz hook
// included) and returns a conflated delta stream.
Subscribe(ctx context.Context, scope SubscribeScope) (<-chan Delta, error)
// WaitForProjection blocks until the named projection has applied the
// aggregate at or beyond version, or the context deadline expires
// (ErrProjectionLag). It is the read-your-writes helper for callers
// that need a projection-backed query right after a command.
WaitForProjection(ctx context.Context, projection, aggregate, aggID string, version int64) error
}
Fabric is the facade application code holds. Open() wires it from configured adapters; fabriqtest wires it from fakes.
type Geometry ¶
Geometry is an engine-neutral geometry value: WKT plus its SRID. e.g. {WKT: "POINT Z (10 20 3)", SRID: 0} (local/planar, metres) or {WKT: "POINT (-122.4 37.8)", SRID: 4326} (geographic).
type GraphQuerier ¶
type GraphQuerier interface {
// Query runs a read-only openCypher query. into may be *[]string for
// single-column id traversals, or a pointer to a slice of structs for
// multi-column rows (adapter-mapped).
Query(ctx context.Context, cypher string, params map[string]any, into any) error
// TraverseAndHydrate runs a traversal that RETURNs ids, then hydrates
// the rows from Postgres in one batched relational query. The target
// entity is inferred from into's element type via the registry. Never
// N+1.
TraverseAndHydrate(ctx context.Context, cypher string, params map[string]any, into any) error
// ApplyMutations applies engine-neutral projection mutations to the
// named target graph (projection consumers and rebuilds only).
ApplyMutations(ctx context.Context, target string, muts []projection.Mutation) error
}
GraphQuerier queries the knowledge-graph projection. Cypher shipped in this repo must stick to the openCypher common subset — the adapters/graphtest conformance suite is the gate for engine swaps.
type ListQuery ¶
type ListQuery struct {
// Where: conditions ANDed together. Use the constructors; Eqs(map)
// is the terse equality shorthand.
Where Where
// OrderBy: one or more comma-separated "col [ASC|DESC]" terms, e.g.
// "sort_order ASC, created_at ASC". Empty orders by id.
OrderBy string
Limit int
Offset int
}
ListQuery selects, filters, orders and paginates an entity's rows. The filter is a single structured, engine-neutral mechanism: Where is a list of conditions, ANDed, built with Eq, Ne, Gt/Lt, In, Like/ILike, IsNull, Or, … (and Eqs for the pure-equality case). Columns are validated against the entity — an unknown column is rejected, which is also the injection guard. Reads the structured filter cannot express drop to raw Query.
type Op ¶
type Op string
Op is the bounded, engine-neutral comparison vocabulary the RelationalQuerier accepts on List filters. It is deliberately a fixed allowlist (not arbitrary SQL): adapters map each Op to their dialect, columns are validated against the entity, and values travel as bound parameters — so a structured filter is as injection-safe as the equality shorthand, while covering what grove's builder expresses. Anything outside this vocabulary belongs in the raw Query escape hatch.
func (Op) NeedsValue ¶
NeedsValue reports whether the operator takes a value.
type Point ¶
type Point struct {
Key string // series key within the tenant, e.g. tag id
At time.Time
Value float64
Quality int
}
Point is one telemetry sample.
type RangeQuery ¶
type RangeQuery struct {
Series string
Key string
From time.Time
To time.Time
Bucket time.Duration // 0 = raw points
Agg string // "avg", "min", "max", "last" (when Bucket > 0)
}
RangeQuery reads a time window of a series, optionally bucketed.
type RelationalQuerier ¶
type RelationalQuerier interface {
// Get loads one aggregate row by id into a model pointer.
Get(ctx context.Context, entity, id string, into any) error
// GetMany loads many rows in ONE batched query (WHERE id = ANY($1)) —
// the dataloader-style hydration primitive. Order follows ids; missing
// rows are skipped.
GetMany(ctx context.Context, entity string, ids []string, into any) error
// List pages through an entity's rows with a structured, engine-neutral
// filter (Where/Filter), ordering and pagination. The filter covers
// grove's builder expressiveness — operators, IN, LIKE, null checks,
// OR groups — without leaking engine types; reads it cannot express
// drop to the raw Query escape hatch.
List(ctx context.Context, entity string, q ListQuery, into any) error
// Query is the raw SQL escape hatch (still tenant-guarded). Use it for
// reads the structured filter cannot express; writes belong to Exec.
Query(ctx context.Context, into any, sql string, args ...any) error
}
RelationalQuerier reads the source of truth through grove. Every method is tenant-scoped structurally; the grove hook backstop asserts it.
type Repo ¶
type Repo[T any] struct { // contains filtered or unexported fields }
Repo is a type-safe view over one entity, parameterised by its grove model T. It is a thin generic layer over RelationalQuerier — the interface stays string/any (Go interface methods cannot be generic, and the untyped form is what adapters and fakes implement), while Repo gives call sites the entity-from-type and typed results:
repo, _ := query.For[domain.Asset](reg, f.Relational())
asset, err := repo.Get(ctx, id) // *domain.Asset, not any
pumps, err := repo.List(ctx, query.ListQuery{Where: []query.Cond{query.Eq("kind", "pump")}})
It adds no query capability beyond the ports — just typing. The graph, search and vector queriers are optional; the relational one is required.
func For ¶
For builds a typed Repo by resolving T's registered entity. T is the grove model struct (value or pointer); an unregistered type errors. The repo is relational-only until you attach projection queriers via With* (fabriq.For wires them all from the facade).
func (*Repo[T]) GetMany ¶
GetMany loads many rows in one batched query, typed; order follows ids, missing rows are skipped.
func (*Repo[T]) In ¶
In is Out with the edge reversed: same-type neighbours one hop in along a self-edge — MATCH (n:L {id:$id})<-[:REL]-(m:L) RETURN m.id.
func (*Repo[T]) List ¶
List runs a structured query, typed. When result-set caching is enabled for the entity, the ordered id-list is cached (Versioned by the entity generation, TTL backstop) and hydrated through GetMany.
func (*Repo[T]) One ¶
One fetches the single row matching the given conditions (ANDed) — the "load one by something other than id" primitive (e.g. a unique serial):
pump, err := repo.One(ctx, query.Eq("serial", "SN-777"))
Zero matches is ErrNotFound; more than one is an error (One means one). It needs no ListQuery — order and pagination are meaningless for a single row — and caps the read at two to detect ambiguity cheaply.
func (*Repo[T]) Out ¶
Out returns the same-type neighbours one hop out along a self-edge, typed and hydrated from Postgres:
MATCH (n:Asset {id:$id})-[:CHILD_OF]->(m:Asset) RETURN m.id
parents, err := repo.Out(ctx, assetID, "CHILD_OF") // []*domain.Asset
rel must be an edge this entity declares whose Target is the entity itself (a self-edge) — that is what makes the []*T result sound. Edges to other entity types, and anything outside MATCH/edge/RETURN, drop to the raw Traverse escape hatch. These helpers emit only the openCypher common subset the graphtest conformance suite gates, so they stay portable across graph engines.
func (*Repo[T]) Reachable ¶
func (r *Repo[T]) Reachable(ctx context.Context, id, rel string, minHops, maxHops int) ([]*T, error)
Reachable returns the same-type nodes reachable from id by following a self-edge between minHops and maxHops times (a variable-length path) — the typed ancestors/descendants walk:
MATCH (n:Asset {id:$id})-[:CHILD_OF*1..3]->(m:Asset) RETURN m.id
ancestors, err := repo.Reachable(ctx, assetID, "CHILD_OF", 1, 5)
minHops must be >= 1 and maxHops within [minHops, 16]. Ids are deduped (multiple paths may reach the same node) before hydration. For the reverse direction or richer shapes, use raw Traverse.
func (*Repo[T]) Search ¶
Search runs a full-text query against the entity's declared search fields, then hydrates the matching rows from Postgres in one batched query — typed entities in relevance order, never N+1. It is the one-line form of SearchWith; reach for SearchWith when you need filters, sort or pagination. For raw hits (highlighting, scores) use f.Search() directly.
func (*Repo[T]) SearchWith ¶
func (r *Repo[T]) SearchWith(ctx context.Context, req SearchRequest) ([]*T, error)
SearchWith runs a structured full-text query — free text plus optional non-scoring Filter (the same Cond vocabulary as List), Sort and pagination — and hydrates the matches from Postgres in one batched query, typed:
hits, _ := repo.SearchWith(ctx, query.SearchRequest{
Query: "centrifugal",
Filter: query.Where{query.Eq("kind", "pump"), query.Gte("version", 3)},
Sort: "name",
Limit: 20,
})
Filter and Sort may reference only indexed fields (the declared search fields plus id/tenant_id/version). There is no raw engine-DSL form by design — full-text search has no portable raw language, so the structured query is the whole surface, which keeps the port swappable.
func (*Repo[T]) Similar ¶
Similar runs a vector nearest-neighbour search and hydrates the matched rows from Postgres in one batched query — typed entities in similarity order, never N+1. For the relevance scores use f.Vector() directly.
func (*Repo[T]) Traverse ¶
Traverse runs a graph traversal that RETURNs ids and hydrates the full rows from Postgres in one batched query — typed, never N+1. The Cypher stays raw (the graph's swappability rests on common-subset openCypher, not a builder); only the result is typed:
assets, err := repo.Traverse(ctx,
`MATCH (a:Asset)-[:LOCATED_AT]->(:Site {id:$s}) RETURN a.id`,
map[string]any{"s": siteID})
func (*Repo[T]) WithGraph ¶
func (r *Repo[T]) WithGraph(g GraphQuerier) *Repo[T]
WithGraph attaches the graph querier (enables Traverse).
func (*Repo[T]) WithResultCache ¶
WithResultCache enables result-set (id-list) caching for this repo, keyed in ks (an entity-keyed Versioned keyspace). Wired by fabriq.For for opted-in entities; a repo without it behaves exactly as before.
func (*Repo[T]) WithSearch ¶
func (r *Repo[T]) WithSearch(s SearchQuerier) *Repo[T]
WithSearch attaches the search querier (enables Search).
func (*Repo[T]) WithVector ¶
func (r *Repo[T]) WithVector(v VectorQuerier) *Repo[T]
WithVector attaches the vector querier (enables Similar).
type SearchQuerier ¶
type SearchQuerier interface {
// Search runs a query against an entity's declared search fields.
Search(ctx context.Context, q SearchQuery, into any) error
// ApplyMutations applies DocIndex/DocDeindex mutations to the named
// target index (projection consumers and rebuilds only).
ApplyMutations(ctx context.Context, target string, muts []projection.Mutation) error
}
SearchQuerier queries the full-text projection.
type SearchQuery ¶
type SearchQuery struct {
Entity string
Query string
// Filter narrows results with the same Cond vocabulary as relational
// List (Eq/In/Gt/…/Or), applied by engines in non-scoring filter
// context. Columns must be indexed fields.
Filter Where
// Sort is an indexed column, optionally suffixed " DESC". Empty sorts
// by relevance score.
Sort string
// Limit caps the page size (adapter default when <= 0); Offset skips
// that many leading hits.
Limit int
Offset int
}
SearchQuery is a full-text query over an entity's declared fields, optionally narrowed by structured non-scoring filters, ordered and paginated. Filter and Sort are validated against the INDEXED fields (the declared search fields plus id/tenant_id/version) — you can only filter or sort on what the index holds.
There is deliberately no raw engine-DSL field. Unlike relational (raw SQL) and graph (raw openCypher common subset), full-text search has no portable raw language — an Elasticsearch query body could not be honored by a Postgres-FTS or Typesense adapter — so a raw DSL would break the swappable-port contract. Everything expressible stays in this neutral struct; genuinely engine-specific needs belong on a dedicated adapter method, outside the port.
type SearchRequest ¶
SearchRequest is the call-site form of a structured search over a typed Repo: SearchQuery without Entity, which the Repo supplies from T. Build Filter with the same constructors as relational filters (query.Eq, …).
type SpatialMatch ¶
SpatialMatch is one nearest-neighbour hit, nearest first.
type SpatialQuerier ¶
type SpatialQuerier interface {
// Upsert stores or replaces the geometry for (tenant, entity, id).
Upsert(ctx context.Context, entity, id string, geom Geometry, meta map[string]any) error
// Within returns entities whose geometry lies within q.RadiusM of q.Center,
// nearest-first, scanned into *[]SpatialMatch.
Within(ctx context.Context, q SpatialQuery, into any) error
// Delete removes the geometry for (tenant, entity, id).
Delete(ctx context.Context, entity, id string) error
}
SpatialQuerier is the geometry port (PostGIS). Geometry is exchanged as WKT plus an SRID — engine-neutral, covering point/line/polygon. Consumers holding GeoJSON convert to WKT at the boundary. Like Vector, it is direct-write.
type SpatialQuery ¶
type SpatialQuery struct {
Entity string
Center Geometry
RadiusM float64 // radius in metres
K int // cap; <=0 → adapter default
}
SpatialQuery is a radius search around a center point.
type SubscribeScope ¶
type SubscribeScope struct {
// Entity is the registry entity name.
Entity string `json:"entity"`
// Scope is a scope name declared in the entity's spec ("id", "site",
// "tenant", ...).
Scope string `json:"scope"`
// ID is the scope id (aggregate id for "id" scopes, container id for
// field scopes). Ignored for tenant scope — the tenant always comes
// from the authenticated context.
ID string `json:"id,omitempty"`
}
SubscribeScope is a subscription request. The channel is always resolved server-side from the validated scope plus the context tenant — client input never names a channel or tenant directly.
type TSQuerier ¶
type TSQuerier interface {
BulkWrite(ctx context.Context, series string, points []Point) error
Range(ctx context.Context, q RangeQuery, into any) error
}
TSQuerier is the telemetry port (TimescaleDB hypertables). BulkWrite is the event-bypass ingest path: per-row events would melt the outbox, so bulk telemetry skips it and the relay publishes conflated deltas instead.
type VectorMatch ¶
type VectorMatch struct {
ID string
Score float64 // cosine similarity, higher is closer
Meta map[string]any
}
VectorMatch is one nearest-neighbour hit, best first.
type VectorQuerier ¶
type VectorQuerier interface {
Upsert(ctx context.Context, entity, id string, embedding []float32, meta map[string]any) error
Similar(ctx context.Context, q VectorQuery, into any) error
// Delete removes the embedding for (tenant, entity, id). Deleting a missing
// id is a no-op. Mirrors SpatialQuerier.Delete.
Delete(ctx context.Context, entity, id string) error
// Get returns the stored embedding for (entity, id), or a *fabriqerr.NotFoundError
// when absent. Mirrors RelationalQuerier.Get's get-by-id + NotFound convention.
Get(ctx context.Context, entity, id string) ([]float32, error)
// DeleteByMeta removes every embedding for (tenant, entity) whose meta
// contains all key/value pairs in filter (AND-of-equals). An empty filter
// deletes ALL embeddings for (tenant, entity) — scope intentionally.
DeleteByMeta(ctx context.Context, entity string, filter map[string]string) error
}
VectorQuerier is the embedding port (pgvector).
type VectorQuery ¶
type VectorQuery struct {
Entity string
Embedding []float32
K int
// Filter restricts matches to embeddings whose meta contains all of these
// key/value pairs (exact-match, AND-of-equals). Empty/nil = no filter.
Filter map[string]string
}
VectorQuery is a nearest-neighbour search.
type Where ¶
type Where []Cond
Where is a conjunction (AND) of conditions — the assignable filter type carried by ListQuery and accepted across the query API. Build it as a literal (query.Where{Eq(...), Like(...)}), from Eqs(map), or by append.