Documentation
¶
Overview ¶
templ: version: v0.3.977
Package stream provides a reactive SSE stream backed by pub/sub messaging.
Components register scopes during render via WatchEffect. The stream handler subscribes to pub/sub topics for those scopes and pushes stale signals when invalidations occur. Components watch the stale signal via data-effect and reload themselves.
Scopes use colon-separated naming: "invoice:42", "invoices:*", "workspace:1:*". Wildcards: * = single level, > = rest (NATS convention, supported by all adapters).
Index ¶
- Constants
- func Attrs(ctx context.Context, scope string, reloadURL string) templ.Attributes
- func Connect() templ.Component
- func InitScope(sse *datastar.ServerSentEventGenerator, scope string) error
- func ScopeKey(scope string) string
- func ScopeSignals(scopes ...string) string
- func WatchEffect(ctx context.Context, scope string, reloadURL string) string
- type Broker
- func (b *Broker) AddScope(sessionID, scope string) error
- func (b *Broker) Handler() http.HandlerFunc
- func (b *Broker) Invalidate(scope string) error
- func (b *Broker) InvalidateMany(scopes ...string) error
- func (b *Broker) InvalidateWithData(scope string, data any) error
- func (b *Broker) SubscribeHandler() http.HandlerFunc
- type Option
Constants ¶
const ( // SignalNamespace is the Datastar signal namespace for stream stale flags. // The underscore prefix makes it local-only (never sent to backend). SignalNamespace = "_stream" // DataNamespace is the Datastar signal namespace for scope payload data. // When InvalidateWithData is used, entity data is pushed under this namespace. DataNamespace = "_streamData" )
Variables ¶
This section is empty.
Functions ¶
func Attrs ¶ added in v0.0.34
Attrs registers a scope and returns templ.Attributes that set up both data-signals (stale flag initialization) and data-effect (auto-reload on stale). Place these on the component's wrapper element.
<div { stream.Attrs(ctx, "invoice:42", wxctx.APIPath("/invoice/42"))... }>
func Connect ¶
Connect renders a hidden element that opens the persistent SSE stream. Place this AFTER { children... } in the page layout so all component scopes are accumulated before this component reads them.
If no scopes have been registered, nothing is rendered.
func InitScope ¶
func InitScope(sse *datastar.ServerSentEventGenerator, scope string) error
InitScope pushes a PatchSignals to initialize a stale signal for a scope that wasn't known at initial render (e.g. new rows from infinite scroll).
func ScopeKey ¶
ScopeKey converts a scope string to a safe signal property name. This is the key within the _stream signal namespace.
ScopeKey("invoice:42") → "invoice_42"
ScopeKey("invoices:*") → "invoices_WILD"
ScopeKey("workspace:1:*") → "workspace_1_WILD"
func ScopeSignals ¶
ScopeSignals returns a data-signals value that initializes the stale flags for the given scopes. Place this on the component element so the signals exist before data-effect runs.
stream.ScopeSignals("counter:shared") → "_stream: {\"counter_shared\":false}"
func WatchEffect ¶
WatchEffect registers a scope on the context and returns a data-effect expression string that auto-reloads when the scope goes stale.
stream.WatchEffect(ctx, "invoice:42", "/showcase/api/invoice/42")
// registers scope, returns: "if($_stream.invoice_42) { $_stream.invoice_42 = false; @get('/showcase/api/invoice/42') }"
Multiple components can watch the same scope — each gets a unique signal key.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker wraps a PubSub backend and provides publish/subscribe for scope invalidation. One Broker per application.
func NewBroker ¶
NewBroker creates a Broker from any PubSub backend.
broker := stream.NewBroker(natspubsub.New(nc)) broker := stream.NewBroker(chanpubsub.New())
func (*Broker) AddScope ¶ added in v0.0.34
AddScope publishes a control message to add a NATS subscription to a live SSE connection identified by sessionID.
func (*Broker) Handler ¶
func (b *Broker) Handler() http.HandlerFunc
Handler returns an http.HandlerFunc that serves the persistent SSE stream. It reads scopes from the query parameters (comma-separated or grouped by entity) and subscribes to pub/sub topics for each scope (supporting exact and wildcard patterns). It also listens on a per-session control channel for dynamic scope additions.
When a "keys" query param is present (JSON map of scope→[]signalKey), the handler pushes all signal keys for a matching scope. This supports multiple components watching the same scope with independent signals.
func (*Broker) Invalidate ¶
Invalidate publishes an invalidation message for the given scope. Call this after mutations to notify all connected browsers.
broker.Invalidate("invoice:42")
func (*Broker) InvalidateMany ¶
InvalidateMany publishes invalidation for multiple scopes at once.
func (*Broker) InvalidateWithData ¶ added in v0.0.34
InvalidateWithData publishes an invalidation with an attached data payload. The data is JSON-encoded and pushed to clients under the DataNamespace alongside the stale flag.
broker.InvalidateWithData("invoice:42", invoice)
func (*Broker) SubscribeHandler ¶ added in v0.0.34
func (b *Broker) SubscribeHandler() http.HandlerFunc
SubscribeHandler returns an http.HandlerFunc that accepts POST requests to dynamically add scopes to a live SSE connection. The request must include "scope" form value and the session must have an active stream.
type Option ¶
type Option func(*Broker)
Option configures the Broker.
func WithMaxConnectionDuration ¶ added in v0.0.35
WithMaxConnectionDuration sets a maximum lifetime for SSE connections. When the duration elapses the handler returns, causing Datastar to reconnect and re-run any auth middleware.
func WithSubjectPrefix ¶
WithSubjectPrefix overrides the default topic prefix ("webx.scope").