Documentation
¶
Overview ¶
Package subscribe is fabriq's subscription plane: server-side channel resolution, the subscribe-time authorization gate, the conflating fan-out hub, and the SSE bridge.
Channel names are derived exclusively here (via core/registry): client input never names a channel or a tenant.
Index ¶
- func AllowAll(context.Context, query.SubscribeScope) error
- func ChannelsForEnvelope(reg *registry.Registry, env event.Envelope) ([]string, error)
- func LastEventID(r *http.Request) string
- func ResolveChannel(ctx context.Context, reg *registry.Registry, req query.SubscribeScope) (string, error)
- type AuthzFunc
- type Gate
- type Hub
- func (h *Hub) Close()
- func (h *Hub) ConflationDepth() int
- func (h *Hub) Flush()
- func (h *Hub) Publish(channel string, d query.Delta)
- func (h *Hub) PublishRaw(channel string, d query.Delta)
- func (h *Hub) Subscribe(ctx context.Context, channel string, buffer int) (deltas <-chan query.Delta, cancel func(), err error)
- func (h *Hub) SubscribeRaw(ctx context.Context, channel string, buffer int) (frames <-chan query.Delta, cancel func(), err error)
- type HubOption
- type SSEWriter
- type Tailer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AllowAll ¶
func AllowAll(context.Context, query.SubscribeScope) error
AllowAll is the default authorization hook.
func ChannelsForEnvelope ¶
ChannelsForEnvelope derives every channel an event must be published to: the entity channel plus each containing-scope channel whose field is present and non-empty in the payload, in spec declaration order. Events for unregistered entities derive no channels.
func LastEventID ¶
LastEventID extracts the SSE resume position from a request.
func ResolveChannel ¶
func ResolveChannel(ctx context.Context, reg *registry.Registry, req query.SubscribeScope) (string, error)
ResolveChannel maps a validated subscription request onto its channel name, taking the tenant from the authenticated context only.
Types ¶
type AuthzFunc ¶
type AuthzFunc func(ctx context.Context, req query.SubscribeScope) error
AuthzFunc authorizes a subscription request before any channel is resolved. Implementations get the validated scope and the (already tenant-stamped) context; returning an error denies the subscription.
type Gate ¶
type Gate struct {
// contains filtered or unexported fields
}
Gate combines the authz hook with server-side channel resolution; the facade's Subscribe goes through a Gate.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub fans deltas out to subscribers with per-channel last-write-wins conflation. It is transport-agnostic: in production a pump goroutine reads Redis change-channel streams and calls Publish; unit tests publish directly; the future CRDT sync sub-protocol shares the connection layer through PublishRaw, which bypasses conflation entirely.
Delivery policy: subscriber channels are buffered; when a buffer is full the delta is dropped for that subscriber. The fetch-then-subscribe contract makes this safe — clients that fall behind refetch and resume from Last-Event-ID.
func (*Hub) ConflationDepth ¶
ConflationDepth reports the total buffered (not yet flushed) delta count — exported as a gauge by internal/metrics.
func (*Hub) Flush ¶
func (h *Hub) Flush()
Flush forces all pending conflation buffers out immediately (shutdown drain and deterministic tests).
func (*Hub) Publish ¶
Publish offers a delta to a channel. Conflated channels buffer it (LWW per aggregate, window flush); raw channels deliver immediately in order. The pump calls this for every transport entry, so the channel's mode decides the semantics.
func (*Hub) PublishRaw ¶
PublishRaw bypasses conflation: ordered, complete delivery for protocols that cannot tolerate coalescing (CRDT sync).
func (*Hub) Subscribe ¶
func (h *Hub) Subscribe(ctx context.Context, channel string, buffer int) (deltas <-chan query.Delta, cancel func(), err error)
Subscribe attaches a buffered subscriber to a conflated delta channel. The returned cancel function detaches and closes the subscriber channel; cancelling the context does the same.
func (*Hub) SubscribeRaw ¶
func (h *Hub) SubscribeRaw(ctx context.Context, channel string, buffer int) (frames <-chan query.Delta, cancel func(), err error)
SubscribeRaw attaches to a RAW channel: every frame is delivered immediately and in order, with no conflation and no coalescing — the document-sync sub-protocol's contract. A channel's mode is fixed by its first subscriber; mixing modes on one channel is an error.
type HubOption ¶
type HubOption func(*Hub)
HubOption configures a Hub.
func WithConflationWindow ¶
WithConflationWindow sets the flush window for delta conflation (default 150ms; the spec'd range is 100–250ms).
func WithTailer ¶
WithTailer wires a transport pump into the hub: each channel's pump starts with its first subscriber (tailing from "now") and stops with its last.
type SSEWriter ¶
type SSEWriter struct {
// contains filtered or unexported fields
}
SSEWriter bridges delta streams onto Server-Sent Events. It is deliberately stdlib-only and proxy-safe: it requires an http.Flusher and flushes after every event, sets X-Accel-Buffering: no, and maps the transport stream ID onto the SSE "id:" field so reconnecting clients resume via Last-Event-ID.
func NewSSEWriter ¶
func NewSSEWriter(w http.ResponseWriter) (*SSEWriter, error)
NewSSEWriter prepares w for event streaming. It fails if the ResponseWriter cannot flush — buffering proxies would otherwise hold events indefinitely.
func (*SSEWriter) Heartbeat ¶
Heartbeat writes an SSE comment to keep intermediaries from idling the connection out.
func (*SSEWriter) SetWriteDeadline ¶
SetWriteDeadline bounds how long a single event write may block, so a slow or stalled client cannot wedge the streaming goroutine indefinitely. A write that exceeds the deadline fails, which the caller turns into a connection teardown (the client then reconnects to a fresh snapshot). It is a no-op if the underlying writer does not support deadlines.
func (*SSEWriter) WriteDelta ¶
WriteDelta emits one delta as an SSE event: id = StreamID, event = delta type, data = the JSON delta.
type Tailer ¶
type Tailer interface {
Tail(ctx context.Context, channel, fromID string, deliver func(query.Delta)) error
ReadRange(ctx context.Context, channel, afterID string, limit int) ([]query.Delta, error)
}
Tailer is the transport port the hub pumps deltas from (the Redis adapter implements it over change-channel streams; fakes drive it in tests). Tail blocks, delivering every entry after fromID until ctx ends. ReadRange is the catch-up read used for Last-Event-ID resume.