stream

package
v0.0.39 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

templ: version: v0.3.977

Package stream provides a reactive SSE stream backed by pub/sub messaging.

Components register scopes during render via WatchEffect. The stream handler subscribes to pub/sub topics for those scopes and pushes stale signals when invalidations occur. Components watch the stale signal via data-effect and reload themselves.

Scopes use colon-separated naming: "invoice:42", "invoices:*", "workspace:1:*". Wildcards: * = single level, > = rest (NATS convention, supported by all adapters).

Index

Constants

View Source
const (
	// SignalNamespace is the Datastar signal namespace for stream stale flags.
	// The underscore prefix makes it local-only (never sent to backend).
	SignalNamespace = "_stream"

	// DataNamespace is the Datastar signal namespace for scope payload data.
	// When InvalidateWithData is used, entity data is pushed under this namespace.
	DataNamespace = "_streamData"
)

Variables

This section is empty.

Functions

func Attrs added in v0.0.34

func Attrs(ctx context.Context, scope string, reloadURL string) templ.Attributes

Attrs registers a scope and returns templ.Attributes that set up both data-signals (stale flag initialization) and data-effect (auto-reload on stale). Place these on the component's wrapper element.

<div { stream.Attrs(ctx, "invoice:42", wxctx.APIPath("/invoice/42"))... }>

func Connect

func Connect() templ.Component

Connect renders a hidden element that opens the persistent SSE stream. Place this AFTER { children... } in the page layout so all component scopes are accumulated before this component reads them.

If no scopes have been registered, nothing is rendered.

func InitScope

func InitScope(sse *datastar.ServerSentEventGenerator, scope string) error

InitScope pushes a PatchSignals to initialize a stale signal for a scope that wasn't known at initial render (e.g. new rows from infinite scroll).

func ScopeKey

func ScopeKey(scope string) string

ScopeKey converts a scope string to a safe signal property name. This is the key within the _stream signal namespace.

ScopeKey("invoice:42")  → "invoice_42"
ScopeKey("invoices:*")  → "invoices_WILD"
ScopeKey("workspace:1:*") → "workspace_1_WILD"

func ScopeSignals

func ScopeSignals(scopes ...string) string

ScopeSignals returns a data-signals value that initializes the stale flags for the given scopes. Place this on the component element so the signals exist before data-effect runs.

stream.ScopeSignals("counter:shared") → "_stream: {\"counter_shared\":false}"

func WatchEffect

func WatchEffect(ctx context.Context, scope string, reloadURL string) string

WatchEffect registers a scope on the context and returns a data-effect expression string that auto-reloads when the scope goes stale.

stream.WatchEffect(ctx, "invoice:42", "/showcase/api/invoice/42")
// registers scope, returns: "if($_stream.invoice_42) { $_stream.invoice_42 = false; @get('/showcase/api/invoice/42') }"

Multiple components can watch the same scope — each gets a unique signal key.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker wraps a PubSub backend and provides publish/subscribe for scope invalidation. One Broker per application.

func NewBroker

func NewBroker(ps pubsub.PubSub, opts ...Option) *Broker

NewBroker creates a Broker from any PubSub backend.

broker := stream.NewBroker(natspubsub.New(nc))
broker := stream.NewBroker(chanpubsub.New())

func (*Broker) AddScope added in v0.0.34

func (b *Broker) AddScope(sessionID, scope string) error

AddScope publishes a control message to add a NATS subscription to a live SSE connection identified by sessionID.

func (*Broker) Handler

func (b *Broker) Handler() http.HandlerFunc

Handler returns an http.HandlerFunc that serves the persistent SSE stream. It reads scopes from the query parameters (comma-separated or grouped by entity) and subscribes to pub/sub topics for each scope (supporting exact and wildcard patterns). It also listens on a per-session control channel for dynamic scope additions.

When a "keys" query param is present (JSON map of scope→[]signalKey), the handler pushes all signal keys for a matching scope. This supports multiple components watching the same scope with independent signals.

func (*Broker) Invalidate

func (b *Broker) Invalidate(scope string) error

Invalidate publishes an invalidation message for the given scope. Call this after mutations to notify all connected browsers.

broker.Invalidate("invoice:42")

func (*Broker) InvalidateMany

func (b *Broker) InvalidateMany(scopes ...string) error

InvalidateMany publishes invalidation for multiple scopes at once.

func (*Broker) InvalidateWithData added in v0.0.34

func (b *Broker) InvalidateWithData(scope string, data any) error

InvalidateWithData publishes an invalidation with an attached data payload. The data is JSON-encoded and pushed to clients under the DataNamespace alongside the stale flag.

broker.InvalidateWithData("invoice:42", invoice)

func (*Broker) SubscribeHandler added in v0.0.34

func (b *Broker) SubscribeHandler() http.HandlerFunc

SubscribeHandler returns an http.HandlerFunc that accepts POST requests to dynamically add scopes to a live SSE connection. The request must include "scope" form value and the session must have an active stream.

type Option

type Option func(*Broker)

Option configures the Broker.

func WithMaxConnectionDuration added in v0.0.35

func WithMaxConnectionDuration(d time.Duration) Option

WithMaxConnectionDuration sets a maximum lifetime for SSE connections. When the duration elapses the handler returns, causing Datastar to reconnect and re-run any auth middleware.

func WithSubjectPrefix

func WithSubjectPrefix(prefix string) Option

WithSubjectPrefix overrides the default topic prefix ("webx.scope").

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL