sse

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package sse provides Server-Sent Events helpers for rux handlers.

SSE is one of the simplest server-push protocols supported by every modern browser. This package wraps the wire-format and lifecycle so a handler only has to focus on producing events:

r.GET("/events", func(c *rux.Context) {
    _ = sse.Stream(c, nil, func(send sse.SendFunc, done <-chan struct{}) error {
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
        for {
            select {
            case <-done:
                return nil
            case t := <-ticker.C:
                if err := send(sse.Event{Data: t.Format(time.RFC3339)}); err != nil {
                    return err
                }
            }
        }
    })
})

Lifecycle hooks (auth, logging, metrics) plug in via the Hooks struct.

NOTE: server.Server.WriteTimeout defaults to 30 s, which will kill a long-running SSE connection. For dedicated SSE servers either set WriteTimeout = 0, or route SSE handlers through a server instance with a longer budget.

Index

Constants

This section is empty.

Variables

View Source
var ErrEventSkipped = errors.New("sse: event skipped by OnSend hook")

ErrEventSkipped is the sentinel returned by SendFunc when an OnSend hook chose to skip the event. Callers usually treat this as a non-fatal.

View Source
var ErrFlushNotSupported = errors.New("sse: ResponseWriter does not support http.Flusher")

ErrFlushNotSupported is returned by Stream when the underlying http.ResponseWriter does not implement http.Flusher (e.g. a buggy middleware wrapped it without preserving the interface).

Functions

func Stream

func Stream(c *rux.Context, hooks *Hooks, producer Producer) error

Stream is the common-case entry: SendConnected=true, no keepalive. For keepalive or other tuning use StreamWith.

Passing nil hooks is fine — equivalent to &Hooks{}.

func StreamWith

func StreamWith(c *rux.Context, opts *Options, producer Producer) (retErr error)

StreamWith upgrades c to an SSE connection and drives producer with the supplied Options.

Sequence:

  1. Verify the writer supports Flusher (return ErrFlushNotSupported if not)
  2. Call Hooks.OnConnect; if it returns an error, abort BEFORE any SSE headers are sent — the hook is free to write its own error response (e.g. http.Error(c.Resp, ..., 401)) using c.Resp directly.
  3. Write SSE response headers and flush them so the client transitions out of "connecting" state.
  4. If opts.SendConnected, emit ": connected\n\n" comment frame.
  5. If opts.KeepaliveInterval > 0, start a background ticker that emits ": keepalive\n\n" comment frames; both heartbeat + producer share a writer mutex so frames never interleave.
  6. Run producer until it returns or the client disconnects; the heartbeat goroutine stops when producer returns or done fires.
  7. Call Hooks.OnDisconnect with the final error (nil on clean exit).

Passing nil opts is fine — equivalent to &Options{SendConnected: true}.

Types

type Client

type Client struct {
	// ID is the lookup key the Client was registered under.
	ID string
	// contains filtered or unexported fields
}

Client is a registered, active SSE connection.

Create one via Hub.Register, drain it via HubProducer (typical) or manually via Recv, remove it via Hub.Unregister or by returning from the producer.

Client is safe for concurrent use by the Hub (push) and the producer goroutine (recv).

func (*Client) Dropped

func (c *Client) Dropped() int64

Dropped is the running total of events the Hub had to drop for this Client. Read it for per-client backpressure monitoring.

type Event

type Event struct {
	// ID populates the "id:" field. Browsers echo the last seen ID back
	// in the Last-Event-ID header on reconnect.
	ID string
	// Name populates the "event:" field — used by EventSource.addEventListener.
	Name string
	// Data is the event body. Multi-line values are split into one
	// "data:" line per segment as required by the SSE spec.
	Data string
	// Retry, when > 0, populates the "retry:" field (reconnection delay
	// in milliseconds). Typically only set on the first event.
	Retry int
}

Event is a single SSE message frame. Empty fields are omitted from the wire output, so the zero value Event{} encodes to a heartbeat (an empty data: line followed by the terminating blank line).

type Hooks

type Hooks struct {
	// OnConnect runs after the SSE headers are written but before any
	// event is emitted. Return a non-nil error to abort the stream —
	// OnDisconnect still fires with that error, but the producer is
	// never invoked. Typical uses: authentication, rate-limit checks,
	// channel subscription bookkeeping.
	OnConnect func(c *rux.Context) error

	// OnDisconnect runs exactly once after the stream ends, whether
	// from a clean producer return, OnConnect rejection, write error,
	// or client disconnect. reason is nil only on a clean producer
	// return. Typical uses: subscription cleanup, audit logging.
	OnDisconnect func(c *rux.Context, reason error)

	// OnSend runs before each event is written. Modify *e in place to
	// adjust the outgoing event; return a non-nil error to skip it
	// (the error is reported via OnError; the stream continues).
	// Typical uses: tagging events with a request ID, filtering, metrics.
	OnSend func(c *rux.Context, e *Event) error

	// OnError reports per-event encode/skip errors that do not terminate
	// the stream. Fatal errors (write failures, client gone) bypass this
	// and go directly to OnDisconnect.
	OnError func(c *rux.Context, err error)
}

Hooks are optional callbacks fired at well-defined points in the stream lifecycle. Any field may be nil. Hooks run on the handler goroutine, so they must not block indefinitely.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is an in-memory registry of active SSE Clients keyed by a user-supplied ID (typically a user ID or session ID).

A single ID may host multiple Clients — same user across browser tabs or devices is a common case — and Send fans out to all of them. For per-process broadcast use Broadcast.

Hub is safe for concurrent use.

func NewHub

func NewHub(bufSize int) *Hub

NewHub creates an empty Hub. bufSize is the per-Client channel buffer — Send drops events for that Client when the buffer is full. 32-64 is a reasonable starting point; larger values trade memory for tolerance to bursty producers and slow consumers.

func (*Hub) Broadcast

func (h *Hub) Broadcast(e Event) (delivered, dropped int)

Broadcast delivers e to every active Client across all IDs. Returns (delivered, dropped) totals.

func (*Hub) Count

func (h *Hub) Count() (clients, ids int)

Count returns the totals (active Clients, distinct IDs) — handy for /metrics or admin endpoints.

func (*Hub) Has

func (h *Hub) Has(id string) bool

Has reports whether at least one Client is registered under id.

func (*Hub) IDs

func (h *Hub) IDs() []string

IDs returns a snapshot of currently-registered IDs.

func (*Hub) Register

func (h *Hub) Register(id string) *Client

Register creates and indexes a new Client under id. The returned Client must eventually be Unregistered (HubProducer does this for you).

func (*Hub) Send

func (h *Hub) Send(id string, e Event) (delivered, dropped int)

Send delivers e to every Client registered under id. Returns the number of Clients the event reached (delivered) and the number whose buffer was full at the time of send (dropped).

If id is unknown both counters are zero.

func (*Hub) SetOnDrop

func (h *Hub) SetOnDrop(fn func(c *Client, e Event))

SetOnDrop installs a callback fired exactly once per dropped event. Pass nil to remove. Useful for metrics or for unregistering clients that fall persistently behind. The callback runs on the Send/Broadcast goroutine — keep it short.

func (*Hub) Unregister

func (h *Hub) Unregister(c *Client)

Unregister removes c from the Hub. Safe to call multiple times. Closing c.done signals waiting consumers (HubProducer) to exit; the channel itself is left for GC so concurrent push() calls can't panic on send-to-closed-chan.

type Options

type Options struct {
	// Hooks may be nil — equivalent to &Hooks{}.
	Hooks *Hooks

	// SendConnected, when true, emits a ": connected\n\n" comment frame
	// immediately after the SSE headers so the client can confirm the
	// stream opened end-to-end before any real event arrives. Defaults
	// to true via Stream(); set false on StreamWith if not desired.
	SendConnected bool

	// KeepaliveInterval, when > 0, makes Stream emit ": keepalive\n\n"
	// comment frames at this period in a background goroutine. SSE
	// spec says lines starting with ":" are ignored by clients, so
	// these frames keep idle proxies (nginx, ALB) from closing the
	// connection without polluting the event stream.
	//
	// IMPORTANT: heartbeats do NOT defeat http.Server.WriteTimeout —
	// that timer bounds the entire response lifetime, regardless of
	// how often you flush. SSE servers must still set WriteTimeout = 0
	// (or a value larger than any expected stream duration).
	KeepaliveInterval time.Duration
}

Options bundles configuration knobs and lifecycle Hooks for one Stream. Use StreamWith to consume; Stream is a thin wrapper around it for the common case.

type Producer

type Producer func(send SendFunc, done <-chan struct{}) error

Producer is the user callback driving an SSE stream. It receives a send function and a done channel — when done fires (typically because the client disconnected), the producer should return promptly.

Returning a non-nil error is reported via Hooks.OnDisconnect.

func HubProducer

func HubProducer(h *Hub, id string) Producer

HubProducer returns a Producer that registers a Client under id, forwards events from its channel via send, and unregisters on exit. Use with Stream / StreamWith:

r.GET("/events", func(c *rux.Context) {
    uid := authUserID(c)
    _ = sse.Stream(c, hooks, sse.HubProducer(hub, uid))
})

The producer exits cleanly when:

  • the client disconnects (done from the request context)
  • hub.Unregister(client) is called from elsewhere
  • send returns an error (write failure)

type SendFunc

type SendFunc func(Event) error

SendFunc writes an event to the active stream. Returns an error if the underlying write fails (usually because the client disconnected), or if a hook chose to abort.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL