Documentation
¶
Overview ¶
Package livequery is fabriq's maintained-result-set live query engine: a client supplies a filter + sort + limit/cursor and receives a snapshot followed by a live stream of enter/leave/move/update deltas. Engine-neutral by construction — no pgx/grove/redis types appear here.
Index ¶
- func CompareCursors(a, b Cursor, sort []SortKey) int
- type AuthzFunc
- type Change
- type ChannelFor
- type Cursor
- type DeltaOp
- type Engine
- type EngineOptions
- type Feed
- type Handle
- type LiveDelta
- type LiveQuery
- type MemberLister
- type Mode
- type RedisFeed
- type Refiller
- type Registration
- type Row
- type Snapshot
- type Snapshotter
- type SortKey
- type SubscriptionRegistry
- type Tailer
- type Window
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CompareCursors ¶
CompareCursors returns -1/0/1 ordering a before/equal/after b under sort, honoring per-key Desc and using the trailing id value as the final ASC tiebreak. Cursors must come from SortKeyOf with the same sort spec.
Types ¶
type Change ¶
type Change struct {
AggID string
Version int64
Deleted bool
Vals map[string]any // nil when Deleted
Raw json.RawMessage
StreamID string
At time.Time
}
Change is one event handed to the matcher (column-keyed, from event.Envelope).
type ChannelFor ¶
ChannelFor resolves the coarse event channel a live query tails in P1 (the entity's by-tenant scope channel); P3 replaces this with partition streams. It takes ctx because the tenant comes from the authenticated context, never the client.
type Cursor ¶
type Cursor struct {
Values []any `json:"values"`
}
Cursor is a keyset anchor: one value per SortKey plus the trailing id.
type DeltaOp ¶
type DeltaOp int
DeltaOp is the maintained-result-set delta vocabulary.
const ( OpEnter DeltaOp = iota // row entered the visible window OpLeave // row left the visible window OpMove // row stayed visible, position changed OpUpdate // row stayed at same position, payload changed OpReset // discard window and re-snapshot (reanchor/failover/overflow) OpMatch // ModeStreamed: row now matches (P2) OpUnmatch // ModeStreamed: row no longer matches (P2) )
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine runs Maintained and Streamed live queries. Subscriptions to the same (tenant, entity) share one partition; identical query shapes within it share one view (one window, one matcher) with deltas fanned out — so a saved view watched by N clients costs one window, not N.
func NewEngine ¶
func NewEngine(snap Snapshotter, refill Refiller, feed Feed, opts EngineOptions) *Engine
NewEngine wires the engine; zero options fall back to sane defaults.
func (*Engine) Reconcile ¶
Reconcile re-runs every ready Maintained view's query against the snapshot oracle (Postgres) and, where the truth diverges from the in-engine window, re-seeds the window and emits OpReset so clients re-snapshot. The low-cadence drift backstop. Returns the number of views repaired.
type EngineOptions ¶
type EngineOptions struct {
Cushion int
Buffer int
// Members optionally seeds Streamed subscriptions' membership from the full
// matching id set; nil falls back to the snapshot page.
Members MemberLister
}
EngineOptions tunes window cushion and the per-subscription delta buffer.
type Feed ¶
type Feed interface {
Changes(ctx context.Context, q LiveQuery, fromWatermark string) (<-chan Change, func(), error)
}
Feed is the change source for a live query partition (the in-process Redis tail bridged to Change; later: per-partition consumer groups).
type Handle ¶
type Handle struct {
// contains filtered or unexported fields
}
Handle controls one live subscription: tear it down, or slide its window to a new anchor for deep/infinite scroll (Reanchor).
func (*Handle) Reanchor ¶
Reanchor slides a Maintained subscription's window to a new cursor anchor (and optionally a new size) for deep/infinite scroll. It re-keys the subscription onto the new shape's view — O(window), not O(result set) — and returns the fresh snapshot; the same delta channel then carries the new window. The gateway emits OpReset to the client before rendering the result.
type LiveDelta ¶
type LiveDelta struct {
Op DeltaOp `json:"op"`
AggID string `json:"agg_id,omitempty"`
Version int64 `json:"version,omitempty"`
Row json.RawMessage `json:"row,omitempty"`
OldIndex int `json:"old_index"`
NewIndex int `json:"new_index"`
Cursor Cursor `json:"cursor,omitempty"`
StreamID string `json:"stream_id,omitempty"`
At time.Time `json:"at"`
}
LiveDelta is one change to a maintained window.
type LiveQuery ¶
type LiveQuery struct {
Entity string
Where query.Where
Sort []SortKey
Limit int // window size N
Cursor *Cursor
Mode Mode
}
LiveQuery is a registered, server-maintained window over an entity.
type MemberLister ¶
MemberLister returns every aggregate id currently matching a query's filter (no ordering, no payloads). It seeds a Streamed subscription's membership set so +match/-unmatch transitions are exact even for sets too large to materialize. Optional: when absent, Streamed mode seeds from the snapshot page instead (exact only up to that page).
type RedisFeed ¶
type RedisFeed struct {
// contains filtered or unexported fields
}
RedisFeed adapts the Redis Tailer into the engine Feed: it converts each query.Delta on the channel into a Change (decoding the column-keyed payload), dropping events for other entities that share the coarse by-tenant channel.
func NewRedisFeed ¶
func NewRedisFeed(tail Tailer, channel ChannelFor) *RedisFeed
NewRedisFeed builds a feed over a Redis tailer and a channel resolver.
type Refiller ¶
type Refiller interface {
After(ctx context.Context, q LiveQuery, after Cursor, limit int) ([]Row, error)
}
Refiller returns up to `limit` rows strictly after `after` in total order — the bounded keyset boundary refill that keeps the window an exact prefix.
type Registration ¶
type Registration struct {
SubID string
TenantID string
Entity string
Partition int // data partition (cluster.PartitionOf); set by the caller
Mode Mode
Query LiveQuery
GatewayID string // the gateway terminating this subscription's connection
Watermark string // last delivered stream position, for gapless resume
}
Registration is one durably-recorded live subscription: the descriptor a reassigned matcher shard rebuilds from after a node failure or rebalance.
type Row ¶
type Row struct {
AggID string
Version int64
Cursor Cursor
Raw json.RawMessage
Vals map[string]any
}
Row is a result row carried by the snapshot/refill ports.
type Snapshot ¶
type Snapshot struct {
SubID string
Rows []Row
Watermark string // event-stream id at snapshot time; live applied strictly after
}
Snapshot is the initial result of a live subscription.
type Snapshotter ¶
Snapshotter returns the first `limit` rows from a live query's anchor in total order (Sort…, id). Implemented by adapters/postgres (RLS-enforced).
type SortKey ¶
SortKey is one ordering term. Fabriq always appends id ASC as the final unique tiebreak so (Sort…, id) is a TOTAL order — required for keyset.
type SubscriptionRegistry ¶
type SubscriptionRegistry interface {
// Put records or updates a subscription (idempotent on SubID).
Put(ctx context.Context, r Registration) error
// Delete removes a subscription on clean unsubscribe.
Delete(ctx context.Context, subID string) error
// ByPartition lists the subscriptions for a (tenant, entity) — convenience
// for single-entity inspection.
ByPartition(ctx context.Context, tenantID, entity string) ([]Registration, error)
// ByPartitionNum lists every subscription in data partition p, across all
// tenants/entities — the query a reassigned shard runs to rebuild what it
// now owns after a failover or rebalance.
ByPartitionNum(ctx context.Context, p int) ([]Registration, error)
// ByGateway lists a gateway's subscriptions — for gateway recovery.
ByGateway(ctx context.Context, gatewayID string) ([]Registration, error)
}
SubscriptionRegistry durably records live subscriptions so the sharded matcher tier can recover them. When a partition is (re)assigned to a shard, the shard loads its subscriptions with ByPartition and re-snapshots them — the failover path that keeps a client's live query alive across a server restart. Single- node deployments do not need it.
type Tailer ¶
type Tailer interface {
Tail(ctx context.Context, channel, fromID string, deliver func(query.Delta)) error
}
Tailer is the subset of the redis adapter the feed needs.
type Window ¶
type Window struct {
// contains filtered or unexported fields
}
Window maintains an ordered prefix of a live query's result: the visible window [0,N) plus a cushion [N,N+C). Invariant: rows are exactly the first len(rows) of the Postgres-ordered result from the anchor — Postgres owns ordering, the Window only splices. complete=true means the result is exhausted at len(rows) (no deeper rows exist).
func NewWindow ¶
func NewWindow(q LiveQuery, pred match.Predicate, seed []Row, complete bool, cushion int, refill Refiller) (*Window, error)
NewWindow seeds a window from a snapshot prefix (visible+cushion already ordered). complete reports whether the snapshot exhausted the result.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package cluster holds the engine-neutral coordination primitives for the sharded live query matcher tier: how data maps to partitions, and how live shards divide those partitions among themselves by rendezvous (HRW) hashing.
|
Package cluster holds the engine-neutral coordination primitives for the sharded live query matcher tier: how data maps to partitions, and how live shards divide those partitions among themselves by rendezvous (HRW) hashing. |
|
Package match compiles a query.Where (fabriq's engine-neutral filter AST) into a Go predicate evaluated against a column-keyed map.
|
Package match compiles a query.Where (fabriq's engine-neutral filter AST) into a Go predicate evaluated against a column-keyed map. |