Documentation
¶
Overview ¶
Package events owns the in-process Server-Sent Events hub for the mock's GET /events endpoint. The hub fans out events posted via POST /admin0/events to every connected subscriber whose event_type filter matches, backed by a bounded ring buffer that lets dropped subscribers resume via Last-Event-ID / ?from / ?from_timestamp.
The package wraps github.com/tmaxmax/go-sse: sse.Server is the HTTP handler, sse.Joe is the in-memory pub/sub provider, and recordingReplayer (this package) adds the (id, timestamp) index needed to translate ?from_timestamp into an event ID for the library's Last-Event-ID-driven Replay path.
Index ¶
Constants ¶
const DefaultKeepAliveInterval = 15 * time.Second
DefaultKeepAliveInterval is the cadence at which a `:keep-alive` comment is broadcast to every connected subscriber. 15s matches what most SSE deployments use; the library doesn't auto-emit.
Variables ¶
This section is empty.
Functions ¶
func SetKeepAliveIntervalForTest ¶
SetKeepAliveIntervalForTest changes the keep-alive cadence for the duration of a single test. Registers t.Cleanup to restore the original value, so tests can't accidentally bleed configuration between cases. Intended for use only from _test.go files.
The new cadence only affects Hub instances constructed AFTER the call: runKeepAlive captures the duration in its time.Ticker at hub startup, so changing it later doesn't retro-actively shorten an already-running ticker. Build the hub inside the test (or right after SetKeepAliveIntervalForTest) to apply the override.
Types ¶
type Event ¶
type Event struct {
Type string
ID string
Payload json.RawMessage
}
Event is the wire shape the control plane pushes into the hub. Type is the CloudEvent discriminator (e.g. "user.created") and drives the SSE `event:` field. ID is the SSE `id:` field; the hub auto-generates one when this is empty. Payload is the raw JSON body streamed in the SSE `data:` field — it MUST include the same `type` value (the OpenAPI schema requires it as the oneOf discriminator), but this type doesn't enforce that; the /admin0/events handler validates against the schema before calling Hub.Publish.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub is the SSE fan-out the mock owns. One Hub per process; the HTTP handler at GET /events is hub.Handler(), and POST /admin0/events pushes via hub.Publish. Hub is safe for concurrent use; every underlying primitive (sse.Server, sse.Joe, recordingReplayer) is.
Lifecycle:
- NewHub starts a keep-alive goroutine.
- Reset drains current subscribers and rebuilds the underlying server + replay buffer, so /admin0/reset between tests is non-destructive to the hub itself.
- Shutdown drains every subscriber permanently and stops the keep-alive goroutine; intended for process shutdown.
func NewHub ¶
NewHub constructs a Hub. BufferSize is the cap of the replay buffer (used for resume via Last-Event-ID / ?from / ?from_timestamp); values <= 0 disable replay entirely (sse.Joe accepts a nil Replayer); values of 1 are clamped to 2 because the library requires a count of at least 2. Now is the clock the replayer's timestamp index uses; nil falls back to time.Now. The caller should wire this to internal/clock.Clock.Now when a controllable clock is present so ?from_timestamp behaves deterministically in clock-controlled tests.
func (*Hub) ActiveSubscribers ¶ added in v0.229.0
ActiveSubscribers reports how many subscribers are connected to GET /events right now. A subscriber leaves the active set only when the server's read loop notices its connection closed, so a reading taken immediately after a client disconnects may briefly lag.
func (*Hub) Handler ¶
Handler returns the HTTP handler for GET /events. Wire it under bearer middleware at mount time.
The handler:
- Disables the http.Server WriteTimeout for this connection (SSE is long-lived; the server default would tear down healthy subscribers after the configured timeout).
- Promotes Auth0's ?from and ?from_timestamp query parameters to the SSE-spec Last-Event-ID header so the library's native replay path picks them up. ?from wins over ?from_timestamp. ?from_timestamp accepts RFC 3339; clients that send the timezone `+` unencoded (which URL-decodes to space) are tolerated by retrying with the space restored.
- Surfaces aged-out resume requests as 410 Gone (matching the OpenAPI declaration). Unparseable ?from_timestamp returns 400 with the standard mgmt error envelope.
- Pre-flushes the SSE response headers so http.Client.Do returns immediately rather than waiting for the first event.
- Tracks the request context in the active set so Reset / Shutdown can drain in-flight subscribers cleanly.
- Delegates to the underlying *sse.Server, which uses an OnSession callback to parse `?event_type=...` into the subscriber's topic list.
func (*Hub) Publish ¶
Publish broadcasts evt to every subscriber whose topic set intersects. The message is sent to broadcastTopic (reaches every filterless subscriber) and to evt.Type (reaches every filtered subscriber that listed this type). Keep-alives use a separate topic and never go through this path.
The RLock is held across server.Publish so a concurrent Reset can't swap h.server underneath an in-flight publish and produce a spurious "provider is closed" error.
func (*Hub) Reset ¶
Reset swaps in a fresh server + replay buffer (so any concurrent Publish atomically moves to the new instance), drains the subscribers that were attached to the old server, then shuts the old server down — all while the new server is already serving new subscribers and publishes. The swap-before-shutdown ordering is what closes the Publish/Reset race: a publisher that grabbed the RLock immediately before Reset's mu.Lock acquired sees the OLD server (and Reset's Lock blocks until the publish completes because Publish holds RLock across the call); every publisher after that sees the NEW server. The OLD server is then shut down with no concurrent publish in flight.
Intended for the /admin0/reset control-plane hook between tests. Idempotent under concurrent callers (serialised via lifecycleMu).
func (*Hub) Shutdown ¶
Shutdown drains every subscriber, stops the keep-alive goroutine, and marks the hub closed permanently. Intended for process shutdown. Idempotent — extra calls are no-ops.
Uses the same swap-before-shutdown ordering as Reset to keep in-flight publishers race-free: the swap to a nil server happens atomically under mu.Lock, then the old server is shut down with no lock held.
func (*Hub) TotalSubscribers ¶ added in v0.229.0
TotalSubscribers reports how many subscriptions have connected since the hub was created or last Reset. It increments on every connect and never decrements within a window; Reset zeroes it.