Documentation
¶
Overview ¶
Package gateway is the transport shell that turns the in-process sharded live-query Gateway into a deployable edge tier: it terminates client SSE and WebSocket connections and forwards the maintained/streamed delta stream over them. It carries NO live-query logic — it speaks to a Backend seam (satisfied by core/livequery/cluster.Gateway) and writes to two tiny transport interfaces (SSESink, WSConn) that forge's Stream/Connection — and test fakes — satisfy. This keeps the package free of any web framework or Redis dependency and fully unit-testable. The forge-native binding lives in forgeext.
Index ¶
Constants ¶
const ( ActionSubscribe = "subscribe" ActionReanchor = "reanchor" ActionUnsubscribe = "unsubscribe" )
Client→server command actions (WebSocket only; SSE is delivery-only).
Variables ¶
This section is empty.
Functions ¶
func ServeSSE ¶
ServeSSE forwards a subscription's snapshot+delta stream onto an SSE sink until the stream closes or ctx is cancelled. The snapshot arrives folded into the stream as OpReset + OpEnter rows, so there is one uniform path: every delta is written as an event named for its op, with id = StreamID.
It returns nil on a clean end (client disconnect via ctx, or the backend closing the stream) and the write error on a failed/stalled write — which the caller turns into a connection teardown.
func ServeWS ¶
ServeWS runs one WebSocket subscription lifecycle: it reads the initial subscribe command, opens the backend subscription, then pumps deltas to the client (write pump) while serving reanchor/unsubscribe commands (read pump) until either side ends. A clean unsubscribe or backend stream-close returns nil; a transport error is returned for logging. The subscription is always torn down on exit.
Types ¶
type Backend ¶
type Backend interface {
// Subscribe registers a live query and returns a handle whose Deltas
// channel carries the snapshot (as OpReset + OpEnter rows) followed by live
// deltas. The returned Sub MUST be Closed when the client disconnects.
Subscribe(ctx context.Context, q livequery.LiveQuery) (*Sub, error)
}
Backend opens live-query subscriptions the gateway terminates. It is the only seam between the transport shell and the live-query tier; cluster.Gateway satisfies it through a small adapter (see forgeext), and tests substitute an in-memory fake.
type ClientCommand ¶
type ClientCommand struct {
Action string `json:"action"`
Query *livequery.LiveQuery `json:"query,omitempty"` // subscribe
Cursor *livequery.Cursor `json:"cursor,omitempty"` // reanchor
Limit int `json:"limit,omitempty"` // reanchor
}
ClientCommand is a control frame a WebSocket client sends upstream. The first command on a connection must be a subscribe carrying the query; reanchor and unsubscribe then operate on the established subscription.
type Frame ¶
type Frame struct {
Op string `json:"op"`
AggID string `json:"agg_id,omitempty"`
Version int64 `json:"version,omitempty"`
Row json.RawMessage `json:"row,omitempty"`
OldIndex int `json:"old_index"`
NewIndex int `json:"new_index"`
Cursor livequery.Cursor `json:"cursor,omitempty"`
StreamID string `json:"stream_id,omitempty"`
}
Frame is one delta as the client sees it: livequery.LiveDelta minus the server-only timestamp, with the op rendered as a stable string ("enter", "leave", "move", "update", "reset") so SSE *and* WebSocket clients get the same friendly wire shape (the raw LiveDelta serialises op as an integer).
type SSEOptions ¶
type SSEOptions struct {
// HeartbeatInterval is how often a keep-alive comment is sent on an idle
// stream. Defaults to 15s.
HeartbeatInterval time.Duration
// WriteTimeout bounds a single event write; on a stalled client the write
// fails and the connection is torn down (the client reconnects to a fresh
// snapshot). Zero disables the deadline.
WriteTimeout time.Duration
}
SSEOptions tune the SSE delivery loop.
type SSESink ¶
type SSESink interface {
// WriteEvent emits one event: id = SSE "id:" (resume position),
// event = the delta op name, v = the JSON payload. It flushes.
WriteEvent(id, event string, v any) error
// SetWriteDeadline bounds how long one write may block (backpressure).
SetWriteDeadline(time.Time) error
// Heartbeat writes a keep-alive comment.
Heartbeat() error
}
SSESink is the minimal Server-Sent Events transport ServeSSE writes to. core/subscribe.SSEWriter satisfies it directly; tests substitute a fake.
type Sub ¶
type Sub struct {
// ID is the backend's subscription id (logging / client correlation).
ID string
// Deltas is the merged snapshot+live stream. It is closed by the backend
// when the subscription ends.
Deltas <-chan livequery.LiveDelta
// contains filtered or unexported fields
}
Sub is one terminated live subscription: a delta stream plus its control verbs. It mirrors livequery.Handle — captured state exposed as methods rather than a multi-value tuple — so callers manipulate the subscription, not a bag of loose values.
func NewSub ¶
func NewSub(id string, deltas <-chan livequery.LiveDelta, reanchor func(context.Context, *livequery.Cursor, int) error, closeFn func()) *Sub
NewSub builds a Sub from its stream and control closures. The adapter in forgeext uses it to wrap a cluster.Gateway subscription; tests use it to wrap a hand-fed channel.
type WSConn ¶
type WSConn interface {
// ReadJSON decodes the next client command frame; it errors when the client
// disconnects or the connection is closed.
ReadJSON(v any) error
// WriteJSON sends one server frame to the client.
WriteJSON(v any) error
// Close tears the socket down (also used to abort a stalled write).
Close() error
// Context is cancelled when the connection ends.
Context() context.Context
}
WSConn is the minimal WebSocket transport ServeWS uses. forge.Connection satisfies it directly (it is a subset of that interface); tests substitute an in-memory fake.
type WSOptions ¶
type WSOptions struct {
// WriteTimeout bounds a single frame write. forge.Connection exposes no
// write deadline, so the bound is enforced with a watchdog: a write that
// stalls past WriteTimeout closes the connection (the client reconnects to a
// fresh snapshot). Zero disables the watchdog (the established idiom: rely on
// the bounded delta buffer and tear down on any write error).
WriteTimeout time.Duration
}
WSOptions tune the WebSocket loop.