subscribe

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package subscribe is fabriq's subscription plane: server-side channel resolution, the subscribe-time authorization gate, the conflating fan-out hub, and the SSE bridge.

Channel names are derived exclusively here (via core/registry): client input never names a channel or a tenant.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AllowAll

AllowAll is the default authorization hook.

func ChannelsForEnvelope

func ChannelsForEnvelope(reg *registry.Registry, env event.Envelope) ([]string, error)

ChannelsForEnvelope derives every channel an event must be published to: the entity channel plus each containing-scope channel whose field is present and non-empty in the payload, in spec declaration order. Events for unregistered entities derive no channels.

func LastEventID

func LastEventID(r *http.Request) string

LastEventID extracts the SSE resume position from a request.

func ResolveChannel

func ResolveChannel(ctx context.Context, reg *registry.Registry, req query.SubscribeScope) (string, error)

ResolveChannel maps a validated subscription request onto its channel name, taking the tenant from the authenticated context only.

Types

type AuthzFunc

type AuthzFunc func(ctx context.Context, req query.SubscribeScope) error

AuthzFunc authorizes a subscription request before any channel is resolved. Implementations get the validated scope and the (already tenant-stamped) context; returning an error denies the subscription.

type Gate

type Gate struct {
	// contains filtered or unexported fields
}

Gate combines the authz hook with server-side channel resolution; the facade's Subscribe goes through a Gate.

func NewGate

func NewGate(reg *registry.Registry, authz AuthzFunc) *Gate

NewGate builds a Gate; a nil authz defaults to AllowAll.

func (*Gate) Resolve

func (g *Gate) Resolve(ctx context.Context, req query.SubscribeScope) (string, error)

Resolve authorizes the request and then resolves its channel.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub fans deltas out to subscribers with per-channel last-write-wins conflation. It is transport-agnostic: in production a pump goroutine reads Redis change-channel streams and calls Publish; unit tests publish directly; the future CRDT sync sub-protocol shares the connection layer through PublishRaw, which bypasses conflation entirely.

Delivery policy: subscriber channels are buffered; when a buffer is full the delta is dropped for that subscriber. The fetch-then-subscribe contract makes this safe — clients that fall behind refetch and resume from Last-Event-ID.

func NewHub

func NewHub(opts ...HubOption) *Hub

NewHub builds a Hub.

func (*Hub) Close

func (h *Hub) Close()

Close stops the hub and closes every subscriber channel.

func (*Hub) ConflationDepth

func (h *Hub) ConflationDepth() int

ConflationDepth reports the total buffered (not yet flushed) delta count — exported as a gauge by internal/metrics.

func (*Hub) Flush

func (h *Hub) Flush()

Flush forces all pending conflation buffers out immediately (shutdown drain and deterministic tests).

func (*Hub) Publish

func (h *Hub) Publish(channel string, d query.Delta)

Publish offers a delta to a channel. Conflated channels buffer it (LWW per aggregate, window flush); raw channels deliver immediately in order. The pump calls this for every transport entry, so the channel's mode decides the semantics.

func (*Hub) PublishRaw

func (h *Hub) PublishRaw(channel string, d query.Delta)

PublishRaw bypasses conflation: ordered, complete delivery for protocols that cannot tolerate coalescing (CRDT sync).

func (*Hub) Subscribe

func (h *Hub) Subscribe(ctx context.Context, channel string, buffer int) (deltas <-chan query.Delta, cancel func(), err error)

Subscribe attaches a buffered subscriber to a conflated delta channel. The returned cancel function detaches and closes the subscriber channel; cancelling the context does the same.

func (*Hub) SubscribeRaw

func (h *Hub) SubscribeRaw(ctx context.Context, channel string, buffer int) (frames <-chan query.Delta, cancel func(), err error)

SubscribeRaw attaches to a RAW channel: every frame is delivered immediately and in order, with no conflation and no coalescing — the document-sync sub-protocol's contract. A channel's mode is fixed by its first subscriber; mixing modes on one channel is an error.

type HubOption

type HubOption func(*Hub)

HubOption configures a Hub.

func WithConflationWindow

func WithConflationWindow(d time.Duration) HubOption

WithConflationWindow sets the flush window for delta conflation (default 150ms; the spec'd range is 100–250ms).

func WithTailer

func WithTailer(t Tailer) HubOption

WithTailer wires a transport pump into the hub: each channel's pump starts with its first subscriber (tailing from "now") and stops with its last.

type SSEWriter

type SSEWriter struct {
	// contains filtered or unexported fields
}

SSEWriter bridges delta streams onto Server-Sent Events. It is deliberately stdlib-only and proxy-safe: it requires an http.Flusher and flushes after every event, sets X-Accel-Buffering: no, and maps the transport stream ID onto the SSE "id:" field so reconnecting clients resume via Last-Event-ID.

func NewSSEWriter

func NewSSEWriter(w http.ResponseWriter) (*SSEWriter, error)

NewSSEWriter prepares w for event streaming. It fails if the ResponseWriter cannot flush — buffering proxies would otherwise hold events indefinitely.

func (*SSEWriter) Heartbeat

func (s *SSEWriter) Heartbeat() error

Heartbeat writes an SSE comment to keep intermediaries from idling the connection out.

func (*SSEWriter) SetWriteDeadline

func (s *SSEWriter) SetWriteDeadline(t time.Time) error

SetWriteDeadline bounds how long a single event write may block, so a slow or stalled client cannot wedge the streaming goroutine indefinitely. A write that exceeds the deadline fails, which the caller turns into a connection teardown (the client then reconnects to a fresh snapshot). It is a no-op if the underlying writer does not support deadlines.

func (*SSEWriter) WriteDelta

func (s *SSEWriter) WriteDelta(d query.Delta) error

WriteDelta emits one delta as an SSE event: id = StreamID, event = delta type, data = the JSON delta.

func (*SSEWriter) WriteEvent

func (s *SSEWriter) WriteEvent(id, eventName string, data any) error

WriteEvent emits an arbitrary SSE event and flushes.

type Tailer

type Tailer interface {
	Tail(ctx context.Context, channel, fromID string, deliver func(query.Delta)) error
	ReadRange(ctx context.Context, channel, afterID string, limit int) ([]query.Delta, error)
}

Tailer is the transport port the hub pumps deltas from (the Redis adapter implements it over change-channel streams; fakes drive it in tests). Tail blocks, delivering every entry after fromID until ctx ends. ReadRange is the catch-up read used for Last-Event-ID resume.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL