gateway

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package gateway is the transport shell that turns the in-process sharded live-query Gateway into a deployable edge tier: it terminates client SSE and WebSocket connections and forwards the maintained/streamed delta stream over them. It carries NO live-query logic — it speaks to a Backend seam (satisfied by core/livequery/cluster.Gateway) and writes to two tiny transport interfaces (SSESink, WSConn) that forge's Stream/Connection — and test fakes — satisfy. This keeps the package free of any web framework or Redis dependency and fully unit-testable. The forge-native binding lives in forgeext.

Index

Constants

View Source
const (
	ActionSubscribe   = "subscribe"
	ActionReanchor    = "reanchor"
	ActionUnsubscribe = "unsubscribe"
)

Client→server command actions (WebSocket only; SSE is delivery-only).

Variables

This section is empty.

Functions

func ServeSSE

func ServeSSE(ctx context.Context, sink SSESink, sub *Sub, opts SSEOptions) error

ServeSSE forwards a subscription's snapshot+delta stream onto an SSE sink until the stream closes or ctx is cancelled. The snapshot arrives folded into the stream as OpReset + OpEnter rows, so there is one uniform path: every delta is written as an event named for its op, with id = StreamID.

It returns nil on a clean end (client disconnect via ctx, or the backend closing the stream) and the write error on a failed/stalled write — which the caller turns into a connection teardown.

func ServeWS

func ServeWS(ctx context.Context, conn WSConn, backend Backend, opts WSOptions) error

ServeWS runs one WebSocket subscription lifecycle: it reads the initial subscribe command, opens the backend subscription, then pumps deltas to the client (write pump) while serving reanchor/unsubscribe commands (read pump) until either side ends. A clean unsubscribe or backend stream-close returns nil; a transport error is returned for logging. The subscription is always torn down on exit.

Types

type Backend

type Backend interface {
	// Subscribe registers a live query and returns a handle whose Deltas
	// channel carries the snapshot (as OpReset + OpEnter rows) followed by live
	// deltas. The returned Sub MUST be Closed when the client disconnects.
	Subscribe(ctx context.Context, q livequery.LiveQuery) (*Sub, error)
}

Backend opens live-query subscriptions the gateway terminates. It is the only seam between the transport shell and the live-query tier; cluster.Gateway satisfies it through a small adapter (see forgeext), and tests substitute an in-memory fake.

type ClientCommand

type ClientCommand struct {
	Action string               `json:"action"`
	Query  *livequery.LiveQuery `json:"query,omitempty"`  // subscribe
	Cursor *livequery.Cursor    `json:"cursor,omitempty"` // reanchor
	Limit  int                  `json:"limit,omitempty"`  // reanchor
}

ClientCommand is a control frame a WebSocket client sends upstream. The first command on a connection must be a subscribe carrying the query; reanchor and unsubscribe then operate on the established subscription.

type Frame

type Frame struct {
	Op       string           `json:"op"`
	AggID    string           `json:"agg_id,omitempty"`
	Version  int64            `json:"version,omitempty"`
	Row      json.RawMessage  `json:"row,omitempty"`
	OldIndex int              `json:"old_index"`
	NewIndex int              `json:"new_index"`
	Cursor   livequery.Cursor `json:"cursor,omitempty"`
	StreamID string           `json:"stream_id,omitempty"`
}

Frame is one delta as the client sees it: livequery.LiveDelta minus the server-only timestamp, with the op rendered as a stable string ("enter", "leave", "move", "update", "reset") so SSE *and* WebSocket clients get the same friendly wire shape (the raw LiveDelta serialises op as an integer).

type SSEOptions

type SSEOptions struct {
	// HeartbeatInterval is how often a keep-alive comment is sent on an idle
	// stream. Defaults to 15s.
	HeartbeatInterval time.Duration
	// WriteTimeout bounds a single event write; on a stalled client the write
	// fails and the connection is torn down (the client reconnects to a fresh
	// snapshot). Zero disables the deadline.
	WriteTimeout time.Duration
}

SSEOptions tune the SSE delivery loop.

type SSESink

type SSESink interface {
	// WriteEvent emits one event: id = SSE "id:" (resume position),
	// event = the delta op name, v = the JSON payload. It flushes.
	WriteEvent(id, event string, v any) error
	// SetWriteDeadline bounds how long one write may block (backpressure).
	SetWriteDeadline(time.Time) error
	// Heartbeat writes a keep-alive comment.
	Heartbeat() error
}

SSESink is the minimal Server-Sent Events transport ServeSSE writes to. core/subscribe.SSEWriter satisfies it directly; tests substitute a fake.

type Sub

type Sub struct {
	// ID is the backend's subscription id (logging / client correlation).
	ID string
	// Deltas is the merged snapshot+live stream. It is closed by the backend
	// when the subscription ends.
	Deltas <-chan livequery.LiveDelta
	// contains filtered or unexported fields
}

Sub is one terminated live subscription: a delta stream plus its control verbs. It mirrors livequery.Handle — captured state exposed as methods rather than a multi-value tuple — so callers manipulate the subscription, not a bag of loose values.

func NewSub

func NewSub(id string, deltas <-chan livequery.LiveDelta, reanchor func(context.Context, *livequery.Cursor, int) error, closeFn func()) *Sub

NewSub builds a Sub from its stream and control closures. The adapter in forgeext uses it to wrap a cluster.Gateway subscription; tests use it to wrap a hand-fed channel.

func (*Sub) Close

func (s *Sub) Close()

Close tears the subscription down (unsubscribe control to the owning shard). Idempotent-safe if the backend supplied no close function.

func (*Sub) Reanchor

func (s *Sub) Reanchor(ctx context.Context, cursor *livequery.Cursor, limit int) error

Reanchor slides the maintained window to a new keyset anchor (deep/infinite scroll) in place, at O(window) server cost. No-op-safe if the backend did not supply a reanchor function.

type WSConn

type WSConn interface {
	// ReadJSON decodes the next client command frame; it errors when the client
	// disconnects or the connection is closed.
	ReadJSON(v any) error
	// WriteJSON sends one server frame to the client.
	WriteJSON(v any) error
	// Close tears the socket down (also used to abort a stalled write).
	Close() error
	// Context is cancelled when the connection ends.
	Context() context.Context
}

WSConn is the minimal WebSocket transport ServeWS uses. forge.Connection satisfies it directly (it is a subset of that interface); tests substitute an in-memory fake.

type WSOptions

type WSOptions struct {
	// WriteTimeout bounds a single frame write. forge.Connection exposes no
	// write deadline, so the bound is enforced with a watchdog: a write that
	// stalls past WriteTimeout closes the connection (the client reconnects to a
	// fresh snapshot). Zero disables the watchdog (the established idiom: rely on
	// the bounded delta buffer and tear down on any write error).
	WriteTimeout time.Duration
}

WSOptions tune the WebSocket loop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL