remote

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package remote is the OPTIONAL server face of fabriq: it lets backend services talk to a central, connection-owning fabriq deployment over gRPC instead of embedding the library and owning their own datastore pools. It changes the DEPLOYMENT TOPOLOGY, not the engine — Fabric implements the same core/query.Fabric interface application code already holds, so the call sites are identical (ADR 0009).

Layering. The client (Fabric) and server (Handler) sit on a narrow, codec-neutral Transport seam so the request/response envelope can be exercised — and unit-tested via Loopback — before the gRPC+protobuf binding (proto/fabriq/v1/fabriq.proto) is generated and wired. The production Transport is gRPC over HTTP/2 with mTLS; tenant and principal travel in call metadata, authenticated at the server edge, never trusted from a client field.

Scope of this skeleton. The write plane (Exec/ExecBatch) is implemented end-to-end to pressure-test the envelope, including registry-typed payload decoding on the server and the typed-error taxonomy across the wire. The read, live, blob and interactive-transaction planes named in the proto are follow-ons; their accessors return ErrNotImplemented.

Index

Constants

View Source
const (
	MethodExec               = "fabriq.v1.Fabriq/Exec"
	MethodExecBatch          = "fabriq.v1.Fabriq/ExecBatch"
	MethodGet                = "fabriq.v1.Fabriq/Get"
	MethodGetMany            = "fabriq.v1.Fabriq/GetMany"
	MethodList               = "fabriq.v1.Fabriq/List"
	MethodSubscribe          = "fabriq.v1.Fabriq/Subscribe"
	MethodLiveQuery          = "fabriq.v1.Fabriq/LiveQuery"
	MethodPutBlob            = "fabriq.v1.Fabriq/PutBlob"
	MethodGetBlob            = "fabriq.v1.Fabriq/GetBlob"
	MethodHeadBlob           = "fabriq.v1.Fabriq/HeadBlob"
	MethodDeleteBlob         = "fabriq.v1.Fabriq/DeleteBlob"
	MethodPresignBlob        = "fabriq.v1.Fabriq/PresignBlob"
	MethodVectorSimilar      = "fabriq.v1.Fabriq/VectorSimilar"
	MethodVectorUpsert       = "fabriq.v1.Fabriq/VectorUpsert"
	MethodVectorDelete       = "fabriq.v1.Fabriq/VectorDelete"
	MethodVectorDeleteByMeta = "fabriq.v1.Fabriq/VectorDeleteByMeta"
	MethodVectorGet          = "fabriq.v1.Fabriq/VectorGet"
	MethodSearch             = "fabriq.v1.Fabriq/Search"
	MethodGraphQuery         = "fabriq.v1.Fabriq/GraphQuery"
)

Fully-qualified RPC method names, mirroring the proto service.

Variables

View Source
var ErrNotImplemented = errors.New("remote: not implemented over the remote transport (see ADR 0009)")

ErrNotImplemented is returned by the methods the remote surface does not wire (raw-SQL Query, the timeseries/spatial ports, and projection-plane-internal methods such as ApplyMutations). It is deliberately distinct from ErrStoreNotConfigured: the store may well be configured server-side — the remote transport for that method just isn't built (ADR 0009).

Functions

This section is empty.

Types

type ClientStreamConn

type ClientStreamConn interface {
	Send(frame []byte) error
	CloseAndRecv() (reply []byte, err error)
	Close() error
}

ClientStreamConn is the client view of a client-streaming call: send N frames, then CloseAndRecv for the single response.

type Fabric

type Fabric struct {
	// contains filtered or unexported fields
}

Fabric is the client face: it implements core/query.Fabric by marshaling each call onto a Transport. Application code holds it exactly as it holds the embedded *fabriq.Fabriq — same interface, same call sites (ADR 0009). For typed repositories use query.For[T](reg, f.Relational()) rather than fabriq.For[T], which is bound to the concrete embedded facade.

func New

func New(t Transport) *Fabric

New builds a Fabric over a Transport (gRPC in production, Loopback in tests).

func (*Fabric) Blob

func (r *Fabric) Blob() blob.Store

func (*Fabric) Document

func (r *Fabric) Document() document.Store

Document returns nil until the document plane is wired. Blob streams bytes (Put/Get) and the presign bypass over the transport; List/Copy are follow-ons.

func (*Fabric) Exec

func (r *Fabric) Exec(ctx context.Context, cmd command.Command) (command.Result, error)

Exec sends one command and reconstructs the result (or typed error).

func (*Fabric) ExecBatch

func (r *Fabric) ExecBatch(ctx context.Context, cmds []command.Command) ([]command.Result, error)

ExecBatch sends N commands to run in one server-side transaction.

func (*Fabric) Graph

func (r *Fabric) Graph() query.GraphQuerier

func (*Fabric) LiveQuery

LiveQuery registers a maintained-result-set subscription over the remote transport: it returns the initial ordered snapshot, a channel of enter/leave/move/update deltas, and a handle to tear it down. It mirrors *fabriq.Fabriq.LiveQuery, except the handle is a remote *LiveHandle (no Reanchor yet). Close the handle — or cancel ctx — to end the subscription.

func (*Fabric) Relational

func (r *Fabric) Relational() query.RelationalQuerier

func (*Fabric) Search

func (r *Fabric) Search() query.SearchQuerier

func (*Fabric) Spatial

func (r *Fabric) Spatial() query.SpatialQuerier

func (*Fabric) Subscribe

func (r *Fabric) Subscribe(ctx context.Context, scope query.SubscribeScope) (<-chan query.Delta, error)

Subscribe opens the conflated channel-delta stream. The first frame is a handshake: a setup error (authz / scope resolution) returns synchronously, mirroring the in-process contract; otherwise a goroutine drains delta frames into the returned channel until the stream ends or ctx is cancelled.

func (*Fabric) Timeseries

func (r *Fabric) Timeseries() query.TSQuerier

func (*Fabric) Vector

func (r *Fabric) Vector() query.VectorQuerier

func (*Fabric) WaitForProjection

func (r *Fabric) WaitForProjection(_ context.Context, _, _, _ string, _ int64) error

WaitForProjection is a read-your-writes helper; it rides the read plane.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is the server face: it terminates the remote envelope and delegates to a real, embedded query.Fabric (an Open()ed *fabriq.Fabriq on the connection-owning tier). The gRPC service implementation calls Dispatch / DispatchStream (or the per-method handlers) from its generated stubs.

SECURITY: ctx MUST already carry the tenant and principal resolved from the call's transport metadata by an auth interceptor — never from a field in the decoded message. The embedded facade then enforces RLS and authz exactly as it does in-process (ADR 0009 §Security).

func NewHandler

func NewHandler(fab query.Fabric, reg *registry.Registry) *Handler

NewHandler builds a Handler over the embedded facade and its registry (the registry is the schema authority used to decode opaque payloads). If the facade also implements LiveQuerier (the concrete *fabriq.Fabriq does), the remote LiveQuery plane is enabled; otherwise it returns ErrNotImplemented.

func (*Handler) DeleteBlob

func (h *Handler) DeleteBlob(ctx context.Context, in []byte) ([]byte, error)

func (*Handler) Dispatch

func (h *Handler) Dispatch(ctx context.Context, method string, in []byte) ([]byte, error)

Dispatch routes a unary call by its method name — the server-side mirror of Transport.Unary, used by Loopback and by a thin gRPC unary shim.

func (*Handler) DispatchClientStream

func (h *Handler) DispatchClientStream(ctx context.Context, method string, recv func() ([]byte, error)) ([]byte, error)

DispatchClientStream routes a client-streaming call by method name. recv returns the next request frame, or io.EOF when the client is done; the handler returns the single response frame.

func (*Handler) DispatchStream

func (h *Handler) DispatchStream(ctx context.Context, method string, in []byte, send func([]byte) error) error

DispatchStream routes a server-streaming call by method name. send delivers one frame; the error it returns (e.g. client gone) aborts the stream.

func (*Handler) Exec

func (h *Handler) Exec(ctx context.Context, in []byte) ([]byte, error)

Exec decodes one command, rebuilds its registry-typed payload, runs it on the embedded facade, and encodes the result or typed error.

func (*Handler) ExecBatch

func (h *Handler) ExecBatch(ctx context.Context, in []byte) ([]byte, error)

ExecBatch decodes N commands and runs them in one server-side transaction.

func (*Handler) Get

func (h *Handler) Get(ctx context.Context, in []byte) ([]byte, error)

Get is the server side of MethodGet: build a registry-typed scan target, run the real relational read, and return the row as opaque JSON.

func (*Handler) GetBlob

func (h *Handler) GetBlob(ctx context.Context, in []byte, send func([]byte) error) error

GetBlob is the server side of MethodGetBlob: the first frame carries the object metadata (or a setup error), then one data frame per chunk.

func (*Handler) GetMany

func (h *Handler) GetMany(ctx context.Context, in []byte) ([]byte, error)

GetMany is the server side of MethodGetMany: the batched (no-N+1) read.

func (*Handler) GraphQuery

func (h *Handler) GraphQuery(ctx context.Context, in []byte) ([]byte, error)

func (*Handler) HeadBlob

func (h *Handler) HeadBlob(ctx context.Context, in []byte) ([]byte, error)

HeadBlob, DeleteBlob and PresignBlob are unary.

func (*Handler) List

func (h *Handler) List(ctx context.Context, in []byte) ([]byte, error)

List is the server side of MethodList: decode the structured filter (an opaque JSON body), run the real paged read into a registry-typed slice target, and return opaque-JSON rows.

func (*Handler) LiveQuery

func (h *Handler) LiveQuery(ctx context.Context, in []byte, send func([]byte) error) error

LiveQuery is the server side of MethodLiveQuery. It opens a maintained-window subscription on the facade, sends the snapshot as the first frame, then one delta per frame until the channel closes or the client disconnects; on exit it Closes the engine handle. A facade without LiveQuery answers ErrNotImplemented.

func (*Handler) PresignBlob

func (h *Handler) PresignBlob(ctx context.Context, in []byte) ([]byte, error)

func (*Handler) PutBlob

func (h *Handler) PutBlob(ctx context.Context, recv func() ([]byte, error)) ([]byte, error)

PutBlob is the server side of MethodPutBlob: the first frame carries metadata, the rest carry bytes, which it pipes into the byte store's streaming Put.

func (*Handler) Search

func (h *Handler) Search(ctx context.Context, in []byte) ([]byte, error)

func (*Handler) Subscribe

func (h *Handler) Subscribe(ctx context.Context, in []byte, send func([]byte) error) error

Subscribe is the server side of MethodSubscribe. The embedded facade resolves the scope (authz + channel resolution happen there); the first frame is a handshake reporting setup success or a typed error so the client can honor Subscribe's synchronous-error contract, then one Delta per frame follows.

func (*Handler) VectorDelete

func (h *Handler) VectorDelete(ctx context.Context, in []byte) ([]byte, error)

func (*Handler) VectorDeleteByMeta added in v0.0.3

func (h *Handler) VectorDeleteByMeta(ctx context.Context, in []byte) ([]byte, error)

func (*Handler) VectorGet added in v0.0.3

func (h *Handler) VectorGet(ctx context.Context, in []byte) ([]byte, error)

func (*Handler) VectorSimilar

func (h *Handler) VectorSimilar(ctx context.Context, in []byte) ([]byte, error)

func (*Handler) VectorUpsert

func (h *Handler) VectorUpsert(ctx context.Context, in []byte) ([]byte, error)

type LiveHandle

type LiveHandle struct {
	// contains filtered or unexported fields
}

LiveHandle controls a remote maintained subscription. Unlike the in-process *livequery.Handle it cannot carry engine state across the wire, so it exposes only Close (tear down); Reanchor (deep scroll) needs a bidirectional stream and is a follow-on (ADR 0009).

func (*LiveHandle) Close

func (h *LiveHandle) Close()

Close tears the remote subscription down: it cancels the stream, which the server observes (ctx.Done) and uses to Close the underlying engine handle.

func (*LiveHandle) Reanchor

Reanchor slides a maintained window to a new cursor anchor. It needs the bidirectional Live stream (client→server control mid-stream), not yet wired, so it returns ErrNotImplemented.

type LiveQuerier

type LiveQuerier interface {
	LiveQuery(ctx context.Context, q livequery.LiveQuery) (livequery.Snapshot, <-chan livequery.LiveDelta, *livequery.Handle, error)
}

LiveQuerier is the maintained-result-set surface the remote Live plane needs. It is NOT part of query.Fabric — LiveQuery lives on the concrete *fabriq.Fabriq — so the Handler type-asserts it from the facade (NewHandler); a facade without it makes the remote LiveQuery return ErrNotImplemented. The signature matches *fabriq.Fabriq exactly, so the facade satisfies this by construction.

type Loopback

type Loopback struct{ Handler *Handler }

Loopback is an in-process Transport that dispatches straight to a Handler — no network, only the envelope (de)serialization itself. It exists so the client, the envelope and the server-side handlers can be round-tripped in a unit test before the gRPC binding exists.

func (Loopback) ClientStream

func (l Loopback) ClientStream(ctx context.Context, method string) (ClientStreamConn, error)

ClientStream implements Transport: it runs the client-streaming handler in a goroutine that pulls frames from a channel the caller Sends to and produces a single reply. The ctx is cancelled once the handler returns so a pending Send unblocks instead of leaking.

func (Loopback) ServerStream

func (l Loopback) ServerStream(ctx context.Context, method string, in []byte) (Stream, error)

ServerStream implements Transport: it runs the streaming handler in a goroutine that pushes frames into a buffered channel the returned Stream drains. The ctx is made cancellable so Close (or client disconnect) stops the server goroutine instead of leaking it — every send is guarded by ctx.Done.

func (Loopback) Unary

func (l Loopback) Unary(ctx context.Context, method string, in []byte) ([]byte, error)

Unary implements Transport.

type Stream

type Stream interface {
	// Recv returns the next frame, (nil, io.EOF) at a clean end, or (nil, err).
	Recv() ([]byte, error)
	// Close releases the stream and signals the server to stop producing.
	Close() error
}

Stream is the client view of a server-streaming response.

type Transport

type Transport interface {
	// Unary invokes a request/response method.
	Unary(ctx context.Context, method string, in []byte) (out []byte, err error)
	// ServerStream opens a server-streaming method: one request, a stream of
	// framed responses. Drain Recv until io.EOF (clean end) or a non-EOF error,
	// then Close to release it.
	ServerStream(ctx context.Context, method string, in []byte) (Stream, error)
	// ClientStream opens a client-streaming method: the client Sends N frames
	// then CloseAndRecv for the single response. Used by chunked blob upload.
	ClientStream(ctx context.Context, method string) (ClientStreamConn, error)
}

Transport is the codec- and wire-neutral seam the client and server halves sit on. The production binding is gRPC over HTTP/2 with protobuf framing and mTLS (see proto/fabriq/v1/fabriq.proto and ADR 0009); Loopback is the in-process binding used to exercise the envelope without a network.

The ctx passed here carries the tenant/principal; the gRPC Transport turns those into call metadata that the server edge authenticates. in/out are the marshaled envelope bytes — canonical JSON in this skeleton, protobuf once the stubs are generated; the planes above this seam do not care which.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL