livequery

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package livequery is fabriq's maintained-result-set live query engine: a client supplies a filter + sort + limit/cursor and receives a snapshot followed by a live stream of enter/leave/move/update deltas. Engine-neutral by construction — no pgx/grove/redis types appear here.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CompareCursors

func CompareCursors(a, b Cursor, sort []SortKey) int

CompareCursors returns -1/0/1 ordering a before/equal/after b under sort, honoring per-key Desc and using the trailing id value as the final ASC tiebreak. Cursors must come from SortKeyOf with the same sort spec.

Types

type AuthzFunc

type AuthzFunc func(ctx context.Context, q LiveQuery) error

AuthzFunc authorizes (and may later constrain) a live query before snapshot.

type Change

type Change struct {
	AggID    string
	Version  int64
	Deleted  bool
	Vals     map[string]any // nil when Deleted
	Raw      json.RawMessage
	StreamID string
	At       time.Time
}

Change is one event handed to the matcher (column-keyed, from event.Envelope).

type ChannelFor

type ChannelFor func(ctx context.Context, q LiveQuery) (channel string, err error)

ChannelFor resolves the coarse event channel a live query tails in P1 (the entity's by-tenant scope channel); P3 replaces this with partition streams. It takes ctx because the tenant comes from the authenticated context, never the client.

type Cursor

type Cursor struct {
	Values []any `json:"values"`
}

Cursor is a keyset anchor: one value per SortKey plus the trailing id.

func SortKeyOf

func SortKeyOf(row map[string]any, sort []SortKey, id string) Cursor

SortKeyOf builds the keyset cursor for a row: one value per sort key, then the id as the final unique tiebreak (so (Sort…, id) is a total order).

type DeltaOp

type DeltaOp int

DeltaOp is the maintained-result-set delta vocabulary.

const (
	OpEnter   DeltaOp = iota // row entered the visible window
	OpLeave                  // row left the visible window
	OpMove                   // row stayed visible, position changed
	OpUpdate                 // row stayed at same position, payload changed
	OpReset                  // discard window and re-snapshot (reanchor/failover/overflow)
	OpMatch                  // ModeStreamed: row now matches (P2)
	OpUnmatch                // ModeStreamed: row no longer matches (P2)
)

func (DeltaOp) String

func (op DeltaOp) String() string

String returns the SSE event name for a delta op.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine runs Maintained and Streamed live queries. Subscriptions to the same (tenant, entity) share one partition; identical query shapes within it share one view (one window, one matcher) with deltas fanned out — so a saved view watched by N clients costs one window, not N.

func NewEngine

func NewEngine(snap Snapshotter, refill Refiller, feed Feed, opts EngineOptions) *Engine

NewEngine wires the engine; zero options fall back to sane defaults.

func (*Engine) Reconcile

func (e *Engine) Reconcile(ctx context.Context) (int, error)

Reconcile re-runs every ready Maintained view's query against the snapshot oracle (Postgres) and, where the truth diverges from the in-engine window, re-seeds the window and emits OpReset so clients re-snapshot. The low-cadence drift backstop. Returns the number of views repaired.

func (*Engine) Subscribe

func (e *Engine) Subscribe(ctx context.Context, q LiveQuery) (Snapshot, <-chan LiveDelta, *Handle, error)

Subscribe registers a subscription and returns its snapshot, live delta channel, and control handle.

type EngineOptions

type EngineOptions struct {
	Cushion int
	Buffer  int
	// Members optionally seeds Streamed subscriptions' membership from the full
	// matching id set; nil falls back to the snapshot page.
	Members MemberLister
}

EngineOptions tunes window cushion and the per-subscription delta buffer.

type Feed

type Feed interface {
	Changes(ctx context.Context, q LiveQuery, fromWatermark string) (<-chan Change, func(), error)
}

Feed is the change source for a live query partition (the in-process Redis tail bridged to Change; later: per-partition consumer groups).

type Handle

type Handle struct {
	// contains filtered or unexported fields
}

Handle controls one live subscription: tear it down, or slide its window to a new anchor for deep/infinite scroll (Reanchor).

func (*Handle) Close

func (h *Handle) Close()

Close tears the subscription down.

func (*Handle) Reanchor

func (h *Handle) Reanchor(ctx context.Context, cursor *Cursor, limit int) (Snapshot, error)

Reanchor slides a Maintained subscription's window to a new cursor anchor (and optionally a new size) for deep/infinite scroll. It re-keys the subscription onto the new shape's view — O(window), not O(result set) — and returns the fresh snapshot; the same delta channel then carries the new window. The gateway emits OpReset to the client before rendering the result.

type LiveDelta

type LiveDelta struct {
	Op       DeltaOp         `json:"op"`
	AggID    string          `json:"agg_id,omitempty"`
	Version  int64           `json:"version,omitempty"`
	Row      json.RawMessage `json:"row,omitempty"`
	OldIndex int             `json:"old_index"`
	NewIndex int             `json:"new_index"`
	Cursor   Cursor          `json:"cursor,omitempty"`
	StreamID string          `json:"stream_id,omitempty"`
	At       time.Time       `json:"at"`
}

LiveDelta is one change to a maintained window.

type LiveQuery

type LiveQuery struct {
	Entity string
	Where  query.Where
	Sort   []SortKey
	Limit  int // window size N
	Cursor *Cursor
	Mode   Mode
}

LiveQuery is a registered, server-maintained window over an entity.

func (LiveQuery) Validate

func (q LiveQuery) Validate(hasColumn, isSortable func(string) bool) error

Validate checks the query against an entity: filter columns must exist (the injection guard, via query.ValidateConds), sort columns must be declared sortable, and Limit must be positive.

type MemberLister

type MemberLister interface {
	Members(ctx context.Context, q LiveQuery) ([]string, error)
}

MemberLister returns every aggregate id currently matching a query's filter (no ordering, no payloads). It seeds a Streamed subscription's membership set so +match/-unmatch transitions are exact even for sets too large to materialize. Optional: when absent, Streamed mode seeds from the snapshot page instead (exact only up to that page).

type Mode

type Mode int

Mode selects the per-subscription delivery policy over one matcher.

const (
	// ModeMaintained keeps an ordered window + cushion and emits
	// enter/leave/move/update; exact top-N via Postgres boundary refill.
	ModeMaintained Mode = iota
	// ModeStreamed forwards matched change events; the client orders. (P2.)
	ModeStreamed
)

type RedisFeed

type RedisFeed struct {
	// contains filtered or unexported fields
}

RedisFeed adapts the Redis Tailer into the engine Feed: it converts each query.Delta on the channel into a Change (decoding the column-keyed payload), dropping events for other entities that share the coarse by-tenant channel.

func NewRedisFeed

func NewRedisFeed(tail Tailer, channel ChannelFor) *RedisFeed

NewRedisFeed builds a feed over a Redis tailer and a channel resolver.

func (*RedisFeed) Changes

func (f *RedisFeed) Changes(ctx context.Context, q LiveQuery, from string) (stream <-chan Change, stop func(), retErr error)

Changes implements Feed.

type Refiller

type Refiller interface {
	After(ctx context.Context, q LiveQuery, after Cursor, limit int) ([]Row, error)
}

Refiller returns up to `limit` rows strictly after `after` in total order — the bounded keyset boundary refill that keeps the window an exact prefix.

type Registration

type Registration struct {
	SubID     string
	TenantID  string
	Entity    string
	Partition int // data partition (cluster.PartitionOf); set by the caller
	Mode      Mode
	Query     LiveQuery
	GatewayID string // the gateway terminating this subscription's connection
	Watermark string // last delivered stream position, for gapless resume
}

Registration is one durably-recorded live subscription: the descriptor a reassigned matcher shard rebuilds from after a node failure or rebalance.

type Row

type Row struct {
	AggID   string
	Version int64
	Cursor  Cursor
	Raw     json.RawMessage
	Vals    map[string]any
}

Row is a result row carried by the snapshot/refill ports.

type Snapshot

type Snapshot struct {
	SubID     string
	Rows      []Row
	Watermark string // event-stream id at snapshot time; live applied strictly after
}

Snapshot is the initial result of a live subscription.

type Snapshotter

type Snapshotter interface {
	Snapshot(ctx context.Context, q LiveQuery, limit int) ([]Row, error)
}

Snapshotter returns the first `limit` rows from a live query's anchor in total order (Sort…, id). Implemented by adapters/postgres (RLS-enforced).

type SortKey

type SortKey struct {
	Column string
	Desc   bool
}

SortKey is one ordering term. Fabriq always appends id ASC as the final unique tiebreak so (Sort…, id) is a TOTAL order — required for keyset.

type SubscriptionRegistry

type SubscriptionRegistry interface {
	// Put records or updates a subscription (idempotent on SubID).
	Put(ctx context.Context, r Registration) error
	// Delete removes a subscription on clean unsubscribe.
	Delete(ctx context.Context, subID string) error
	// ByPartition lists the subscriptions for a (tenant, entity) — convenience
	// for single-entity inspection.
	ByPartition(ctx context.Context, tenantID, entity string) ([]Registration, error)
	// ByPartitionNum lists every subscription in data partition p, across all
	// tenants/entities — the query a reassigned shard runs to rebuild what it
	// now owns after a failover or rebalance.
	ByPartitionNum(ctx context.Context, p int) ([]Registration, error)
	// ByGateway lists a gateway's subscriptions — for gateway recovery.
	ByGateway(ctx context.Context, gatewayID string) ([]Registration, error)
}

SubscriptionRegistry durably records live subscriptions so the sharded matcher tier can recover them. When a partition is (re)assigned to a shard, the shard loads its subscriptions with ByPartition and re-snapshots them — the failover path that keeps a client's live query alive across a server restart. Single- node deployments do not need it.

type Tailer

type Tailer interface {
	Tail(ctx context.Context, channel, fromID string, deliver func(query.Delta)) error
}

Tailer is the subset of the redis adapter the feed needs.

type Window

type Window struct {
	// contains filtered or unexported fields
}

Window maintains an ordered prefix of a live query's result: the visible window [0,N) plus a cushion [N,N+C). Invariant: rows are exactly the first len(rows) of the Postgres-ordered result from the anchor — Postgres owns ordering, the Window only splices. complete=true means the result is exhausted at len(rows) (no deeper rows exist).

func NewWindow

func NewWindow(q LiveQuery, pred match.Predicate, seed []Row, complete bool, cushion int, refill Refiller) (*Window, error)

NewWindow seeds a window from a snapshot prefix (visible+cushion already ordered). complete reports whether the snapshot exhausted the result.

func (*Window) Apply

func (w *Window) Apply(ctx context.Context, ch Change) []LiveDelta

Apply folds one change into the window and returns the resulting deltas.

func (*Window) Contains

func (w *Window) Contains(aggID string) bool

Contains reports whether aggID is in the maintained buffer (visible or cushion). The dispatcher uses it to maintain its member reverse-index so a change to a row already held is routed to this window even when the row's new state no longer matches the predicate (a leave).

func (*Window) Visible

func (w *Window) Visible() []Row

Visible returns the current visible window rows (for tests / re-snapshot).

Directories

Path Synopsis
Package cluster holds the engine-neutral coordination primitives for the sharded live query matcher tier: how data maps to partitions, and how live shards divide those partitions among themselves by rendezvous (HRW) hashing.
Package cluster holds the engine-neutral coordination primitives for the sharded live query matcher tier: how data maps to partitions, and how live shards divide those partitions among themselves by rendezvous (HRW) hashing.
Package match compiles a query.Where (fabriq's engine-neutral filter AST) into a Go predicate evaluated against a column-keyed map.
Package match compiles a query.Where (fabriq's engine-neutral filter AST) into a Go predicate evaluated against a column-keyed map.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL