Documentation
¶
Overview ¶
Package sse provides Server-Sent Events helpers for rux handlers.
SSE is one of the simplest server-push protocols supported by every modern browser. This package wraps the wire-format and lifecycle so a handler only has to focus on producing events:
r.GET("/events", func(c *rux.Context) {
_ = sse.Stream(c, nil, func(send sse.SendFunc, done <-chan struct{}) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-done:
return nil
case t := <-ticker.C:
if err := send(sse.Event{Data: t.Format(time.RFC3339)}); err != nil {
return err
}
}
}
})
})
Lifecycle hooks (auth, logging, metrics) plug in via the Hooks struct.
NOTE: server.Server.WriteTimeout defaults to 30 s, which will kill a long-running SSE connection. For dedicated SSE servers either set WriteTimeout = 0, or route SSE handlers through a server instance with a longer budget.
Index ¶
- Variables
- func Stream(c *rux.Context, hooks *Hooks, producer Producer) error
- func StreamWith(c *rux.Context, opts *Options, producer Producer) (retErr error)
- type Client
- type Event
- type Hooks
- type Hub
- func (h *Hub) Broadcast(e Event) (delivered, dropped int)
- func (h *Hub) Count() (clients, ids int)
- func (h *Hub) Has(id string) bool
- func (h *Hub) IDs() []string
- func (h *Hub) Register(id string) *Client
- func (h *Hub) Send(id string, e Event) (delivered, dropped int)
- func (h *Hub) SetOnDrop(fn func(c *Client, e Event))
- func (h *Hub) Unregister(c *Client)
- type Options
- type Producer
- type SendFunc
Constants ¶
This section is empty.
Variables ¶
var ErrEventSkipped = errors.New("sse: event skipped by OnSend hook")
ErrEventSkipped is the sentinel returned by SendFunc when an OnSend hook chose to skip the event. Callers usually treat this as a non-fatal.
var ErrFlushNotSupported = errors.New("sse: ResponseWriter does not support http.Flusher")
ErrFlushNotSupported is returned by Stream when the underlying http.ResponseWriter does not implement http.Flusher (e.g. a buggy middleware wrapped it without preserving the interface).
Functions ¶
func Stream ¶
Stream is the common-case entry: SendConnected=true, no keepalive. For keepalive or other tuning use StreamWith.
Passing nil hooks is fine — equivalent to &Hooks{}.
func StreamWith ¶
StreamWith upgrades c to an SSE connection and drives producer with the supplied Options.
Sequence:
- Verify the writer supports Flusher (return ErrFlushNotSupported if not)
- Call Hooks.OnConnect; if it returns an error, abort BEFORE any SSE headers are sent — the hook is free to write its own error response (e.g. http.Error(c.Resp, ..., 401)) using c.Resp directly.
- Write SSE response headers and flush them so the client transitions out of "connecting" state.
- If opts.SendConnected, emit ": connected\n\n" comment frame.
- If opts.KeepaliveInterval > 0, start a background ticker that emits ": keepalive\n\n" comment frames; both heartbeat + producer share a writer mutex so frames never interleave.
- Run producer until it returns or the client disconnects; the heartbeat goroutine stops when producer returns or done fires.
- Call Hooks.OnDisconnect with the final error (nil on clean exit).
Passing nil opts is fine — equivalent to &Options{SendConnected: true}.
Types ¶
type Client ¶
type Client struct {
// ID is the lookup key the Client was registered under.
ID string
// contains filtered or unexported fields
}
Client is a registered, active SSE connection.
Create one via Hub.Register, drain it via HubProducer (typical) or manually via Recv, remove it via Hub.Unregister or by returning from the producer.
Client is safe for concurrent use by the Hub (push) and the producer goroutine (recv).
type Event ¶
type Event struct {
// ID populates the "id:" field. Browsers echo the last seen ID back
// in the Last-Event-ID header on reconnect.
ID string
// Name populates the "event:" field — used by EventSource.addEventListener.
Name string
// Data is the event body. Multi-line values are split into one
// "data:" line per segment as required by the SSE spec.
Data string
// Retry, when > 0, populates the "retry:" field (reconnection delay
// in milliseconds). Typically only set on the first event.
Retry int
}
Event is a single SSE message frame. Empty fields are omitted from the wire output, so the zero value Event{} encodes to a heartbeat (an empty data: line followed by the terminating blank line).
type Hooks ¶
type Hooks struct {
// OnConnect runs after the SSE headers are written but before any
// event is emitted. Return a non-nil error to abort the stream —
// OnDisconnect still fires with that error, but the producer is
// never invoked. Typical uses: authentication, rate-limit checks,
// channel subscription bookkeeping.
OnConnect func(c *rux.Context) error
// OnDisconnect runs exactly once after the stream ends, whether
// from a clean producer return, OnConnect rejection, write error,
// or client disconnect. reason is nil only on a clean producer
// return. Typical uses: subscription cleanup, audit logging.
OnDisconnect func(c *rux.Context, reason error)
// OnSend runs before each event is written. Modify *e in place to
// adjust the outgoing event; return a non-nil error to skip it
// (the error is reported via OnError; the stream continues).
// Typical uses: tagging events with a request ID, filtering, metrics.
OnSend func(c *rux.Context, e *Event) error
// OnError reports per-event encode/skip errors that do not terminate
// the stream. Fatal errors (write failures, client gone) bypass this
// and go directly to OnDisconnect.
OnError func(c *rux.Context, err error)
}
Hooks are optional callbacks fired at well-defined points in the stream lifecycle. Any field may be nil. Hooks run on the handler goroutine, so they must not block indefinitely.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub is an in-memory registry of active SSE Clients keyed by a user-supplied ID (typically a user ID or session ID).
A single ID may host multiple Clients — same user across browser tabs or devices is a common case — and Send fans out to all of them. For per-process broadcast use Broadcast.
Hub is safe for concurrent use.
func NewHub ¶
NewHub creates an empty Hub. bufSize is the per-Client channel buffer — Send drops events for that Client when the buffer is full. 32-64 is a reasonable starting point; larger values trade memory for tolerance to bursty producers and slow consumers.
func (*Hub) Broadcast ¶
Broadcast delivers e to every active Client across all IDs. Returns (delivered, dropped) totals.
func (*Hub) Count ¶
Count returns the totals (active Clients, distinct IDs) — handy for /metrics or admin endpoints.
func (*Hub) Register ¶
Register creates and indexes a new Client under id. The returned Client must eventually be Unregistered (HubProducer does this for you).
func (*Hub) Send ¶
Send delivers e to every Client registered under id. Returns the number of Clients the event reached (delivered) and the number whose buffer was full at the time of send (dropped).
If id is unknown both counters are zero.
func (*Hub) SetOnDrop ¶
SetOnDrop installs a callback fired exactly once per dropped event. Pass nil to remove. Useful for metrics or for unregistering clients that fall persistently behind. The callback runs on the Send/Broadcast goroutine — keep it short.
func (*Hub) Unregister ¶
Unregister removes c from the Hub. Safe to call multiple times. Closing c.done signals waiting consumers (HubProducer) to exit; the channel itself is left for GC so concurrent push() calls can't panic on send-to-closed-chan.
type Options ¶
type Options struct {
// Hooks may be nil — equivalent to &Hooks{}.
Hooks *Hooks
// SendConnected, when true, emits a ": connected\n\n" comment frame
// immediately after the SSE headers so the client can confirm the
// stream opened end-to-end before any real event arrives. Defaults
// to true via Stream(); set false on StreamWith if not desired.
SendConnected bool
// KeepaliveInterval, when > 0, makes Stream emit ": keepalive\n\n"
// comment frames at this period in a background goroutine. SSE
// spec says lines starting with ":" are ignored by clients, so
// these frames keep idle proxies (nginx, ALB) from closing the
// connection without polluting the event stream.
//
// IMPORTANT: heartbeats do NOT defeat http.Server.WriteTimeout —
// that timer bounds the entire response lifetime, regardless of
// how often you flush. SSE servers must still set WriteTimeout = 0
// (or a value larger than any expected stream duration).
KeepaliveInterval time.Duration
}
Options bundles configuration knobs and lifecycle Hooks for one Stream. Use StreamWith to consume; Stream is a thin wrapper around it for the common case.
type Producer ¶
Producer is the user callback driving an SSE stream. It receives a send function and a done channel — when done fires (typically because the client disconnected), the producer should return promptly.
Returning a non-nil error is reported via Hooks.OnDisconnect.
func HubProducer ¶
HubProducer returns a Producer that registers a Client under id, forwards events from its channel via send, and unregisters on exit. Use with Stream / StreamWith:
r.GET("/events", func(c *rux.Context) {
uid := authUserID(c)
_ = sse.Stream(c, hooks, sse.HubProducer(hub, uid))
})
The producer exits cleanly when:
- the client disconnects (done from the request context)
- hub.Unregister(client) is called from elsewhere
- send returns an error (write failure)