sse

package
v2.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

README

sse — Server-Sent Events with topic-based fan-out

In-process pub/sub for HTTP SSE. One Manager per service holds the topic registry; subscribers connect via sse.Handler; publishers push via Manager.Publish or a PollingService.

Overview

graph LR
    P1[Manager.Publish] -- push --> M[Manager]
    P2[PollingService] -- push --> M
    M -- owns --> B[broadcaster<br/>internal]
    B -- events --> H[sse.Handler]
    H --> Sub[HTTP client]

Data flows left to right. Manager.Publish and PollingService are the two publisher entries; sse.Handler registers itself with Manager and streams events out to the connected HTTP client. broadcaster is package-internal — consumers never hold a reference.

Two ways to produce events

Source API Use when
Inside the pkg WithPollingService(...) Data is fetched on a timer (DB row, hardware state, ...).
Outside the pkg Manager.Publish(topic, data) Data arrives from a message bus / callback / scheduled job.

Quick start

Event-driven (external publisher)
sseManager := sse.NewManager(serviceCtx, lc, 30*time.Second)

e.GET("/api/v1/alarms/sse", sse.Handler(
    sseManager,
    sse.WithCustomTopic("alarms"),
))

// Wherever an alarm fires:
sseManager.Publish("alarms", alarm)
Polling-driven
func devicesHandler(c echo.Context) error {
    polling := sse.NewPolling(lc,
        func(ctx context.Context) (any, error) { return fetchDevices(ctx) },
        sse.WithCustomPollingInterval(5*time.Second),
    )
    return sse.Handler(sseManager, sse.WithPollingService(polling))(c)
}

e.GET("/api/v1/devices/sse", devicesHandler)
URL-derived topic (multiple views of the same endpoint)
// Topic = request path + query, so different params get different broadcasters.
e.GET("/api/v1/device/sse", sse.Handler(sseManager))

// Publisher computes the same topic from the request context, or builds the
// string by hand.
sseManager.Publish(sse.ConstructSSETopic(c), event)

Public API

Symbol Role
NewManager Construct a Manager bound to a parent context.
Manager.Publish(topic, data) Forward data to current subscribers of topic.
Manager.Shutdown() Cancel context; active handlers drop their streams.
Handler(m, opts...) Echo HandlerFunc that opens an SSE stream.
WithCustomTopic(topic) Override the URL-derived topic.
WithPollingService(s) Attach a polling service to drive events for the topic.
ConstructSSETopic(c) Build the default topic from a request URL.
NewPolling(lc, fn, opts...) Construct a polling service.
WithCustomPollingInterval(d) Interval between polls (default 5 s).
WithCustomApiVersion(v) API version used in error payloads.
WithStopCondition(fn) Polling self-terminates when fn(payload) returns true.
WithStopCallback(fn) Invoked when polling stops, for any reason.
Publisher interface Publish(data any).
PollingService interface Start(Publisher) / Stop() error.

Lifecycle

sequenceDiagram
    actor S1 as Subscriber 1
    actor S2 as Subscriber 2
    participant M as Manager
    participant P as Polling

    S1->>M: subscribe
    Note right of M: first subscribe →<br/>new broadcaster<br/>+ Polling.Start
    M->>P: Start

    S2->>M: subscribe
    Note right of M: reuse broadcaster

    P-->>S1: event
    P-->>S2: event

    S1->>M: unsubscribe
    S2->>M: unsubscribe
    Note right of M: last subscribe →<br/>remove broadcaster<br/>+ async Polling.Stop
    M--)P: Stop

Map removal is synchronous so a concurrent Publish or subscribe sees the fresh state immediately. The slow part (polling teardown) runs in a goroutine to keep unsubscribe non-blocking.

Design choices

Decision Rationale
Manager.Publish returns no value Fire-and-forget, like Redis pub/sub, NATS, in-memory event buses. SSE has no acknowledgement to return.
broadcaster type is package-internal An exported broadcaster reference would escape Manager's lifecycle control — historical source of every race here.
Topic strings are caller-defined (no validation) Use ConstructSSETopic for URL-derived topics, or shared constants between publisher and subscriber.
Polling lifecycle bound to first/last subscribe Polling does no work while nobody is connected. New Polling instance per Handler invocation (see quick start).
Heartbeat sent on the configured interval Keeps idle connections alive through proxies; surfaces broken connections as a write error.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConstructSSETopic added in v2.0.3

func ConstructSSETopic(c echo.Context) string

ConstructSSETopic derives a topic from the request URL — path on its own, or path with the query string appended when one is present.

e.g. "/api/v3/device/all/sse?offset=10&labels=label1,label2"

func Handler

func Handler(m *Manager, opts ...HandlerOption) echo.HandlerFunc

Handler returns an echo.HandlerFunc that opens an SSE stream for the caller. It atomically subscribes to the topic (creating the topic's broadcaster on demand) and forwards every published event to the response.

Options:

  • WithCustomTopic: override the auto-derived topic (defaults to the request URL path + query).
  • WithPollingService: attach a polling service that produces events for the topic. The service starts on the first subscribe and stops when the last subscriber leaves.

func WriteHeartbeat added in v2.0.21

func WriteHeartbeat(c echo.Context, writeDeadline time.Duration, lc log.Logger) error

WriteHeartbeat writes an SSE heartbeat frame (`:\n\n`) and flushes. Same writeDeadline semantics as WriteSSEEvent.

func WriteSSEEvent added in v2.0.21

func WriteSSEEvent(c echo.Context, payload any, writeDeadline time.Duration, lc log.Logger) error

WriteSSEEvent JSON-encodes payload, writes a single SSE data frame, and flushes. A marshal failure is logged and the event is skipped without killing the stream. The returned error signals an underlying write failure (broken pipe, deadline expired, or a missing write-deadline capability — see applyWriteDeadline).

writeDeadline bounds the time the underlying connection has to accept the write; pass the same value as the heartbeat interval. The caller's ResponseWriter MUST support http.ResponseController write deadlines (stock Echo + raw net/http does; test fixtures must implement SetWriteDeadline as a no-op).

func WriteSSEHeaders added in v2.0.21

func WriteSSEHeaders(c echo.Context, writeDeadline time.Duration, lc log.Logger) error

WriteSSEHeaders applies a write deadline, sets the three SSE response headers on c, and flushes them immediately so the client sees the text/event-stream content type before any payload arrives.

The deadline is applied BEFORE the initial flush so the goroutine can't hang on a slow/dead client while writing headers. writeDeadline should match the per-event/heartbeat value (typically the heartbeat interval). A failure to set the deadline is fatal and returned to the caller before any bytes are committed — the caller can then surface a proper 5xx rather than committing 200 and dying on the first event. ErrNotSupported here means the writer chain (middleware/wrappers) doesn't expose SetWriteDeadline; fix the writer, not this code.

Types

type HandlerConfig

type HandlerConfig struct {
	PollingService PollingService
	CustomTopic    string
}

HandlerConfig holds the configuration for the SSE handler.

type HandlerOption

type HandlerOption func(*HandlerConfig)

HandlerOption is a function that modifies the HandlerConfig.

func WithCustomTopic added in v2.0.5

func WithCustomTopic(topic string) HandlerOption

WithCustomTopic returns a HandlerOption that sets a custom topic in the HandlerConfig.

func WithPollingService

func WithPollingService(service PollingService) HandlerOption

WithPollingService returns a HandlerOption that sets the PollingService in the HandlerConfig.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns one fan-out broadcaster per topic. It is the only public surface that external code interacts with for SSE pubsub — subscribers connect through sse.Handler and publishers push via Manager.Publish.

m.mu guards the broadcasters map: lookup, install, and removal are serialized so a Publish can never see a half-torn-down topic. Fan-out to individual subscribers is guarded by the broadcaster's own b.mu, which Publish acquires after releasing m.mu.

func NewManager

func NewManager(ctx context.Context, lc log.Logger, heartbeatInterval time.Duration) *Manager

NewManager creates an SSE Manager that lives for the supplied parent context. The heartbeatInterval is applied to every handler the Manager serves; pass zero or a negative value to fall back to the package default.

func (*Manager) Publish added in v2.0.19

func (m *Manager) Publish(topic string, data any)

Publish forwards data to every subscriber currently registered on topic. When no broadcaster exists for the topic (no subscriber has connected, or the last one left and cleanup has run), the call returns without doing anything — matching the standard fire-and-forget semantics of in-memory pubsub (Redis pubsub, NATS, in-memory event buses). Callers ship the event on a best-effort basis; SSE does not guarantee delivery.

Safe to call at high frequency: the no-subscriber path is a single map lookup under a read lock.

func (*Manager) Shutdown added in v2.0.4

func (m *Manager) Shutdown()

Shutdown cancels the Manager's context. Any handler currently servicing an SSE stream observes the cancellation and returns.

type Polling

type Polling struct {
	// contains filtered or unexported fields
}

Polling is a struct that implements a polling mechanism for fetching data from a data source at regular intervals. It is designed to be started once and can be stopped gracefully.

func NewPolling

func NewPolling(lc log.Logger, pollingFunc func(context.Context) (any, error), opts ...PollingOption) *Polling

NewPolling builds a Polling that calls pollingFunc every interval and publishes the result to subscribers.

pollingFunc receives a ctx that is cancelled when Stop is called. Watch for cancellation, otherwise Stop will block until pollingFunc returns on its own.

Good — passes ctx down to the network call:

func fetch(ctx context.Context) (any, error) {
    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    return http.DefaultClient.Do(req)
}

Bad — ignores ctx, so a slow server makes Stop block:

func fetch(ctx context.Context) (any, error) {
    return http.Get(url) // no ctx
}

func (*Polling) Start

func (p *Polling) Start(publisher Publisher)

Start initializes the polling mechanism. It sets up an internal context with cancel and starts the polling goroutine.

func (*Polling) Stop

func (p *Polling) Stop() error

Stop gracefully stops the polling mechanism. It cancels the context and waits for the polling goroutine to finish.

type PollingConfig added in v2.0.8

type PollingConfig struct {
	ApiVersion    string
	StopCondition func(data any) bool
	StopCallback  func()
	// contains filtered or unexported fields
}

type PollingOption added in v2.0.8

type PollingOption func(*PollingConfig)

PollingOption is a function that modifies the PollingConfig.

func WithCustomApiVersion added in v2.0.8

func WithCustomApiVersion(apiVersion string) PollingOption

WithCustomApiVersion returns a PollingOption that sets a custom API version in the PollingConfig, which is used to present the API version for the error response when polling fails. Default is common.ApiVersion set in go-mod-edge-utils if not set.

func WithCustomPollingInterval added in v2.0.8

func WithCustomPollingInterval(interval time.Duration) PollingOption

WithCustomPollingInterval returns a PollingOption that sets a custom polling interval in the PollingConfig. Default is 5 seconds if not set.

func WithStopCallback added in v2.0.16

func WithStopCallback(fn func()) PollingOption

WithStopCallback returns a PollingOption that sets a callback to be invoked when polling stops. The callback is called regardless of the reason polling stopped (stop condition met, explicit Stop(), or context cancellation).

func WithStopCondition added in v2.0.16

func WithStopCondition(fn func(any) bool) PollingOption

WithStopCondition returns a PollingOption that sets a stop condition function in the PollingConfig. Polling will stop after publishing data if the function returns true.

type PollingService

type PollingService interface {
	// Start begins fetching. Called once per broadcaster, on the first
	// subscribe.
	Start(publisher Publisher)

	// Stop ends fetching. Called in a background goroutine when the last
	// subscriber leaves; Manager does not wait for it. If Stop never
	// returns, that goroutine leaks.
	//
	// Stop may also be called when Start was never called, so guard
	// against nil fields.
	//
	// See NewPolling for the bundled implementation and an example.
	Stop() error
}

PollingService periodically fetches data and publishes it to subscribers. One PollingService is attached to a broadcaster when the first subscriber connects (via WithPollingService).

type Publisher

type Publisher interface {
	Publish(data any)
}

Publisher is an interface for publishing data to subscribers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL