sse

package
v1.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package sse provides Server-Sent Events (SSE) middleware for celeris.

SSE enables unidirectional server-to-client streaming over HTTP. This middleware manages event formatting, heartbeat keep-alives, reconnection via Last-Event-ID, and client disconnect detection.

Basic Usage

server.GET("/events", sse.New(sse.Config{
    Handler: func(client *sse.Client) {
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
        for {
            select {
            case <-client.Context().Done():
                return
            case t := <-ticker.C:
                if err := client.Send(sse.Event{
                    Event: "time",
                    Data:  t.Format(time.RFC3339),
                }); err != nil {
                    return
                }
            }
        }
    },
}))

Reconnection

When a client reconnects, browsers send a Last-Event-ID header. Use Client.LastEventID to resume from where the client left off:

Handler: func(client *sse.Client) {
    lastID := client.LastEventID()
    events := fetchEventsSince(lastID)
    for _, e := range events {
        client.Send(e)
    }
},

For automatic replay see Config.ReplayStore + NewRingBuffer / NewKVReplayStore: the middleware will Append every Send and replay missed events on the next reconnect with no work from the handler.

Backpressure: two layers, two purposes

celeris exposes per-subscriber backpressure at TWO distinct layers:

  • Config.MaxQueueDepth / Config.OnSlowClient is the per-Client queue. When set, Client.Send enqueues onto a bounded channel that a per-Client drain goroutine writes to the wire. The user's Config.OnSlowClient policy fires when that queue overflows. Use this layer when a single subscriber drives the load — e.g. a long-lived per-user feed where Send is the only producer.

  • Broker / BrokerConfig.SubscriberBuffer / [OnSlowSubscriber] is the per-subscriber queue INSIDE the broker. Broker.Publish fans out via these queues; per-subscriber drain goroutines call Client.WritePreparedEvent (which writes synchronously to the wire, bypassing Client.MaxQueueDepth). Use this layer for fan-out from a single publisher to N subscribers — the broker formats each event ONCE and never blocks on a single slow subscriber.

The two layers are orthogonal. A Broker.Subscribe'd client can also have a non-zero MaxQueueDepth — the broker uses WritePreparedEvent (which is always synchronous) so the Client.queue lies dormant unless something else calls Client.Send directly. In that case the Client.queue absorbs Send-bursts independently of the broker fan-out. There is no scenario where both queues fight each other.

Heartbeat

By default, a heartbeat comment is sent every 15 seconds to detect disconnected clients. Configure via Config.HeartbeatInterval or disable with a negative value.

Engine Compatibility

SSE works with all celeris engines (std, epoll, io_uring). The middleware handles celeris.Context.Detach internally — callers do not need to manage the event loop lifecycle.

CORS

For cross-origin EventSource connections, configure CORS middleware on the SSE endpoint to allow the appropriate origin.

Index

Examples

Constants

View Source
const DefaultAsyncAppendConcurrency = 64

DefaultAsyncAppendConcurrency is the cap on in-flight async-append goroutines when KVReplayStoreConfig.AsyncAppend is on. A stalled KV without this cap could pile up unbounded goroutines under heavy publish load.

View Source
const DefaultBrokerSubscriberBuffer = 64

DefaultBrokerSubscriberBuffer is the queue capacity used when BrokerConfig.SubscriberBuffer is zero.

View Source
const (
	// DefaultHeartbeatInterval is the default interval between heartbeat
	// comments sent to detect client disconnects.
	DefaultHeartbeatInterval = 15 * time.Second
)
View Source
const MaxKVReplayIndex = 65536

MaxKVReplayIndex is the default soft cap on the in-memory ID index a NewKVReplayStore retains. Once reached, the oldest 25 % of entries are dropped; the underlying KV blobs age out via ttl independently. Set KVReplayStoreConfig.MaxIndex to override.

Variables

View Source
var ErrClientClosed = errors.New("sse: client closed")

ErrClientClosed is returned when Send is called on a closed Client.

View Source
var ErrLastIDUnknown = errors.New("sse: last-event-id unknown to replay store")

ErrLastIDUnknown is returned by ReplayStore.Since when the supplied lastID is non-empty but the store cannot interpret it (malformed, aged out of retention, or never recorded). The middleware treats this as "fresh start": the user's Handler still runs, but no replay events are written. The original Last-Event-ID header value remains observable via Client.LastEventID.

Functions

func DefaultBrokerSlowConcurrency added in v1.4.2

func DefaultBrokerSlowConcurrency() int

DefaultBrokerSlowConcurrency is used when BrokerConfig.SlowSubscriberConcurrency is zero. Sized at GOMAXPROCS*4 — same heuristic as middleware/websocket.DefaultHubConcurrency — so a many-slow- subscriber publish parallelises up to a small multiple of CPU cores without spawning unbounded goroutines.

func FormatEvent

func FormatEvent(buf []byte, e Event) []byte

FormatEvent formats an SSE event into buf, reusing its capacity. Exported for benchmarking; most users should use Client.Send instead.

func New

func New(config ...Config) celeris.HandlerFunc

New creates an SSE handler with the given config.

Usage:

server.GET("/events", sse.New(sse.Config{
    Handler: func(client *sse.Client) {
        for msg := range messages {
            if err := client.Send(sse.Event{Data: msg}); err != nil {
                return
            }
        }
    },
}))
Example

Stream a counter to the client every second; client disconnects close the context, which ends the loop cleanly.

package main

import (
	"fmt"
	"strconv"
	"time"

	"github.com/goceleris/celeris"
	"github.com/goceleris/celeris/middleware/sse"
)

func main() {
	s := celeris.New(celeris.Config{})

	s.GET("/events", sse.New(sse.Config{
		HeartbeatInterval: 30 * time.Second,
		Handler: func(c *sse.Client) {
			ticker := time.NewTicker(time.Second)
			defer ticker.Stop()
			i := 0
			for {
				select {
				case <-c.Context().Done():
					return
				case <-ticker.C:
					if err := c.Send(sse.Event{
						Event: "tick",
						ID:    strconv.Itoa(i),
						Data:  "tick " + strconv.Itoa(i),
					}); err != nil {
						return
					}
					i++
				}
			}
		},
	}))

	fmt.Println("SSE handler installed at /events")
}
Output:
SSE handler installed at /events

Types

type Broker added in v1.4.2

type Broker struct {
	// contains filtered or unexported fields
}

Broker fans out a single SSE event source to N subscribers without re-formatting. Each subscriber gets its own bounded outbound queue plus a dedicated drain goroutine; Broker.Publish does a single FormatEvent call and then non-blocking sends to every queue.

Safe for concurrent use from any number of publishers and from any number of Subscribe / unsubscribe call sites.

Example

Fan-out from a single publisher to N subscribers. Broker formats each event once into a PreparedEvent and dispatches the cached bytes to every subscriber's per-subscriber queue, so the FormatEvent cost stays constant as the subscriber count grows.

package main

import (
	"fmt"
	"strconv"
	"time"

	"github.com/goceleris/celeris"
	"github.com/goceleris/celeris/middleware/sse"
)

func main() {
	broker := sse.NewBroker(sse.BrokerConfig{
		SubscriberBuffer: 64,
		OnSlowSubscriber: func(_ *sse.Client, _ *sse.PreparedEvent) sse.BrokerPolicy {
			return sse.BrokerPolicyDrop
		},
	})

	s := celeris.New(celeris.Config{})
	s.GET("/events", sse.New(sse.Config{
		Handler: func(c *sse.Client) {
			unsub := broker.Subscribe(c)
			defer unsub()
			<-c.Context().Done()
		},
	}))

	go func() {
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()
		for i := 0; ; i++ {
			<-ticker.C
			broker.Publish(sse.Event{
				ID:    strconv.Itoa(i),
				Event: "tick",
				Data:  "tick " + strconv.Itoa(i),
			})
		}
	}()

	fmt.Println("broker installed at /events")
}
Output:
broker installed at /events

func NewBroker added in v1.4.2

func NewBroker(cfg BrokerConfig) *Broker

NewBroker constructs a Broker with the provided config.

func (*Broker) CallbackPanics added in v1.4.2

func (b *Broker) CallbackPanics() uint64

CallbackPanics returns the cumulative count of user OnSlowSubscriber callback panics recovered by the broker's slow- path goroutines. A non-zero value points at a misbehaving callback — the broker keeps running (panic was caught), but the affected subscriber's policy decision was lost so it stays registered with whatever queue state it had. Surface this via your metrics pipeline to avoid silent callback breakage.

func (*Broker) Close added in v1.4.2

func (b *Broker) Close()

Close unsubscribes every current subscriber and blocks new Subscribe calls. Pending in-flight Publish calls complete in best-effort order. Idempotent.

func (*Broker) Publish added in v1.4.2

func (b *Broker) Publish(e Event) *PreparedEvent

Publish formats e once into a PreparedEvent, dispatches it to every current subscriber, and returns the PreparedEvent so the caller can reuse it (e.g. for replay-store appends).

Ordering: per subscriber, events are delivered in publish order (the per-subscriber drain goroutine pulls from a FIFO channel). Across subscribers there is no global ordering guarantee — fan-out is concurrent and a fast subscriber may observe event N before a slow subscriber observes event N-1.

func (*Broker) PublishPrepared added in v1.4.2

func (b *Broker) PublishPrepared(pe *PreparedEvent)

PublishPrepared dispatches an already-prepared event to every current subscriber. Each subscriber has its own bounded queue + drain goroutine, so wire I/O for a fast subscriber never gates a slow one. Slow subscribers — those whose queue is full at the non- blocking send attempt — have the configured BrokerPolicy applied.

Slow-path concurrency: when N subscribers are slow, the per- subscriber policy callback + cleanup runs in parallel across goroutines bounded by BrokerConfig.SlowSubscriberConcurrency (default GOMAXPROCS*4). Total slow-path latency is therefore approximately max(callback) + cleanup rather than the sum across subscribers. PublishPrepared still WAITS for every spawned slow- path goroutine before returning — the join is required so a subsequent [Subscribe] cannot race a Client.Close from a prior policy firing (sync.Pool of *Client could otherwise hand a still- closing pointer to a fresh connection).

Panic isolation: a panic inside a user OnSlowSubscriber callback is recovered inside the slow-path goroutine — other slow-path goroutines continue, and the publisher returns normally. The failing subscriber stays registered (the policy could not be honoured); the panic is otherwise swallowed because the publisher has no error channel to surface it on.

Default OnSlowSubscriber == nil short-circuits without spawning any slow-path goroutines: events were already dropped at the non-blocking-send branch and no cleanup is required.

func (*Broker) Subscribe added in v1.4.2

func (b *Broker) Subscribe(c *Client) (unsubscribe func())

Subscribe registers c to receive every subsequent Publish. Spawns a drain goroutine that calls Client.WritePreparedEvent for each event. Returns an unsubscribe function the handler MUST defer; calling it twice is safe.

Subscribing a Client to a Broker that has already been Close()'d is a no-op — the returned unsubscribe is also a no-op.

func (*Broker) SubscriberCount added in v1.4.2

func (b *Broker) SubscriberCount() int

SubscriberCount is a point-in-time gauge useful for observability. Reflects state at the moment of the call; concurrent Subscribe / unsubscribe calls may change the value before the caller observes it.

type BrokerConfig added in v1.4.2

type BrokerConfig struct {
	// SubscriberBuffer bounds each subscriber's outbound queue inside the
	// broker. Fast subscribers never see backpressure; slow ones get the
	// OnSlowSubscriber policy applied. Default 64.
	SubscriberBuffer int

	// OnSlowSubscriber is consulted when a subscriber's queue is full at
	// Publish time. When nil, [BrokerPolicyDrop] is used. The Drop
	// default is deliberate for SSE: dropping one event from one slow
	// subscriber leaves the stream coherent (the next event with a
	// fresh id replaces it, and replay via Last-Event-ID can recover).
	// websocket.HubConfig.OnSlowConn defaults to Close instead — a
	// dropped WS frame would corrupt the message-boundary contract,
	// so the safer default is to evict the bad peer.
	OnSlowSubscriber func(c *Client, pe *PreparedEvent) BrokerPolicy

	// SlowSubscriberConcurrency caps the number of in-flight per-
	// subscriber slow-path goroutines spawned by Publish/PublishPrepared
	// when one or more subscribers' queues are full. Zero means
	// [DefaultBrokerSlowConcurrency] — runtime.GOMAXPROCS(0)*4. Negative
	// opts out (true unbounded) — useful for benchmarks; production
	// callers should keep the cap on so a misbehaving OnSlowSubscriber
	// callback cannot fan out into thousands of goroutines.
	SlowSubscriberConcurrency int
}

BrokerConfig tunes a Broker. All fields are optional; zero values produce reasonable defaults.

type BrokerPolicy added in v1.4.2

type BrokerPolicy uint8

BrokerPolicy controls what happens to a slow subscriber whose per-subscriber queue is full at Broker.Publish time. Values mirror websocket.HubPolicy and ClientPolicy — same verbs, same semantics.

const (
	// BrokerPolicyDrop silently discards the PreparedEvent for the slow
	// subscriber. Other subscribers are unaffected.
	BrokerPolicyDrop BrokerPolicy = iota

	// BrokerPolicyRemove unregisters the subscriber from the Broker
	// without closing the underlying [Client]. Use when the caller owns
	// the connection lifecycle and only wants the broker to stop
	// fanning events to that subscriber.
	BrokerPolicyRemove

	// BrokerPolicyClose unregisters the subscriber AND closes the
	// underlying [Client]. Use when prolonged backpressure indicates
	// a stuck client that should be evicted entirely.
	BrokerPolicyClose
)

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client provides the API for sending SSE events to a connected client. It wraps a celeris.StreamWriter and manages event formatting, heartbeat, and disconnect detection.

func (*Client) Close

func (c *Client) Close() error

Close closes the SSE stream. Safe to call multiple times.

func (*Client) Context

func (c *Client) Context() context.Context

Context returns a context.Context that is cancelled when the client disconnects or the stream is closed.

func (*Client) DroppedEvents added in v1.4.2

func (c *Client) DroppedEvents() uint64

DroppedEvents returns the cumulative count of events dropped under ClientPolicyDrop (or ClientPolicyClose) since this client connected. Always zero when Config.MaxQueueDepth is zero.

func (*Client) LastEventID

func (c *Client) LastEventID() string

LastEventID returns the value of the Last-Event-ID header sent by the client on reconnection. Returns empty string for initial connections.

Example

Resume from a Last-Event-ID supplied by the browser's reconnect logic.

package main

import (
	"fmt"
	"strconv"

	"github.com/goceleris/celeris/middleware/sse"
)

func main() {
	handler := sse.New(sse.Config{
		Handler: func(c *sse.Client) {
			start := 0
			if id := c.LastEventID(); id != "" {
				if n, err := strconv.Atoi(id); err == nil {
					start = n + 1
				}
			}
			_ = c.Send(sse.Event{
				ID:   strconv.Itoa(start),
				Data: "resumed at " + strconv.Itoa(start),
			})
		},
	})

	_ = handler
	fmt.Println("ok")
}
Output:
ok

func (*Client) QueueDepth added in v1.4.2

func (c *Client) QueueDepth() int

QueueDepth returns the current outbound-queue length, or zero when the client is in legacy blocking mode.

func (*Client) Send

func (c *Client) Send(e Event) error

Send sends a complete SSE event. Thread-safe (serialized with heartbeat writes).

Return contract:

  • non-nil error: the event was NOT delivered (closed, context cancelled, or write failed); the caller may retry on a new connection.
  • nil error in blocking mode (MaxQueueDepth = 0): the event was written to the underlying stream and flushed.
  • nil error in queued mode (MaxQueueDepth > 0): the event was enqueued OR dropped silently per the Config.OnSlowClient policy; check Client.DroppedEvents to detect drops.

When Config.MaxQueueDepth is zero (default), Send writes directly to the wire — historical blocking semantics. When MaxQueueDepth is positive Send enqueues onto a bounded channel that a per-client goroutine drains; if the queue is full Send dispatches the configured Config.OnSlowClient policy.

Send must NOT be called after the user's Handler has returned. The handler defer closes the per-client queue; a Send racing that close observes the queueClosed flag and returns ErrClientClosed instead of panicking, but the contract still belongs to the caller — spawned goroutines that publish into a Client must be joined before the handler returns.

func (*Client) SendComment

func (c *Client) SendComment(text string) error

SendComment sends a comment line. Useful for custom keep-alives.

func (*Client) SendData

func (c *Client) SendData(data string) error

SendData is a convenience for sending a data-only event. Equivalent to Send(Event{Data: data}).

func (*Client) WritePreparedEvent added in v1.4.2

func (c *Client) WritePreparedEvent(pe *PreparedEvent) error

WritePreparedEvent writes a PreparedEvent directly to the underlying stream, skipping the per-call FormatEvent step that Client.Send performs. Useful when the same event is fanned out to many clients.

Thread-safe; serialises with concurrent Client.Send and the heartbeat writer through c.mu. Returns ErrClientClosed after the client has been closed and the underlying context error after disconnect.

Always synchronous (writes to the wire under the lock) — bypasses Config.MaxQueueDepth. The Broker is the layer above this method: it provides its own per-subscriber queue (sized by BrokerConfig.SubscriberBuffer) and a drain goroutine per subscriber that calls WritePreparedEvent — adding another layer of buffering at this level would be redundant.

type ClientPolicy added in v1.4.2

type ClientPolicy uint8

ClientPolicy selects what the middleware does when a per-client outbound queue is full at Send time. Used together with Config.MaxQueueDepth and Config.OnSlowClient. Naming mirrors BrokerPolicy and websocket.HubPolicy — same verbs, same semantics.

Block is sse-Client-specific (it gates Send, not a fan-out dispatch); the rest of the {Drop, Close} pair is shared.

const (
	// ClientPolicyDrop silently discards the Event and increments
	// [Client.DroppedEvents]. Send returns nil.
	ClientPolicyDrop ClientPolicy = iota

	// ClientPolicyClose cancels the client's context, causing the
	// handler goroutine to exit. Send returns [ErrClientClosed].
	ClientPolicyClose

	// ClientPolicyBlock falls back to the legacy blocking semantics:
	// Send waits for queue space until the context is cancelled.
	// Provided so opt-in users who want backpressure on a single
	// subscriber without disabling the queue infrastructure for the
	// rest can do so.
	ClientPolicyBlock
)

type Config

type Config struct {
	// Handler is the SSE handler function called for each connected client.
	// Required; panics at init if nil.
	Handler Handler

	// HeartbeatInterval is the interval between heartbeat comments sent to
	// detect client disconnects. Set to a negative value to disable.
	// Default: 15s.
	HeartbeatInterval time.Duration

	// RetryInterval is the reconnection time (in milliseconds) sent to the
	// client in the initial "retry:" field. Zero means no retry field is sent
	// (client uses its default, typically ~3s).
	RetryInterval int

	// MaxQueueDepth bounds the per-client outbound queue. Zero means
	// unbounded — the legacy blocking Send semantics are preserved exactly.
	// When set, Send enqueues and returns immediately; a per-client drain
	// goroutine writes to the wire. If the queue is full at Send time,
	// OnSlowClient is invoked; if OnSlowClient is nil, [ClientPolicyDrop]
	// is the default.
	MaxQueueDepth int

	// OnSlowClient decides what to do when [MaxQueueDepth] is exceeded.
	// Only consulted when MaxQueueDepth > 0. The hook may inspect c (via
	// DroppedEvents/QueueDepth) and the dropped Event to drive
	// observability or escalating policies. Default: [ClientPolicyDrop].
	OnSlowClient func(c *Client, e Event) ClientPolicy

	// ReplayStore persists events for Last-Event-ID resume. When nil
	// (default), Client.LastEventID() returns the header but replay is
	// the user's problem — matches today's behavior. When set, the
	// middleware:
	//   - on connect with a Last-Event-ID, reads Since(lastID) and
	//     writes the missed events to the wire BEFORE invoking Handler;
	//   - wraps Send so each call also Appends to the store, rewriting
	//     the wire id: field with the canonical store-assigned ID;
	//   - on Since returning [ErrLastIDUnknown], silently falls
	//     through and still invokes Handler — the original header
	//     value remains visible via [Client.LastEventID] so the
	//     user's resumption logic can react.
	//
	// Performance note: synchronous Append runs INSIDE the per-Client
	// write lock so wire order matches store order. A slow store
	// (e.g. Redis over a degraded network) directly stalls Send and
	// any concurrent heartbeat or SendComment. For low-latency
	// publishing against a remote KV, prefer
	// [NewKVReplayStore] with [KVReplayStoreConfig.AsyncAppend] = true
	// so Append returns immediately and the actual KV.Set fires in
	// the background.
	ReplayStore ReplayStore

	// OnConnect is called when a new SSE client connects, before Handler.
	// The celeris.Context is available for extracting request metadata.
	// Return a non-nil error to reject the connection.
	OnConnect func(c *celeris.Context, client *Client) error

	// OnDisconnect is called after the SSE stream closes.
	OnDisconnect func(c *celeris.Context, client *Client)

	// Skip defines a function to skip this middleware for certain requests.
	Skip func(c *celeris.Context) bool

	// SkipPaths lists paths to skip from SSE handling (exact match).
	SkipPaths []string
}

Config defines the SSE middleware configuration.

type Event

type Event struct {
	// ID is the event ID. If non-empty, sent as "id: <ID>\n".
	// The client stores this and sends it back as Last-Event-ID on reconnect.
	ID string

	// Event is the event type. If non-empty, sent as "event: <Event>\n".
	// Defaults to "message" on the client side when omitted.
	Event string

	// Data is the event payload. Sent as "data: <line>\n" for each line.
	// Multi-line data is split on \n and each line gets its own "data:" prefix.
	Data string

	// Retry is the reconnection time in milliseconds. If > 0, sent as
	// "retry: <Retry>\n". Per the SSE specification, this value is in
	// milliseconds (not time.Duration) to match the wire format directly.
	Retry int
}

Event represents a single Server-Sent Event.

type Handler

type Handler func(client *Client)

Handler is the function signature for SSE endpoint handlers. The Client is valid for the duration of the call. When Handler returns, the SSE stream is closed automatically.

type KVReplayStoreConfig added in v1.4.2

type KVReplayStoreConfig struct {
	// KV is the backing store. Required.
	KV store.KV

	// Prefix is the namespace under which event blobs and the
	// shared counter live. Defaults to no prefix when empty.
	Prefix string

	// TTL bounds the lifetime of stored event blobs. Zero means no
	// expiry — same convention as [store.KV.Set].
	TTL time.Duration

	// MaxIndex bounds the in-memory ID index. Zero or negative ⇒
	// [MaxKVReplayIndex].
	MaxIndex int

	// AsyncAppend, when true, makes [ReplayStore.Append] return as
	// soon as the local ID has been allocated — the actual KV.Set
	// fires in a background goroutine. Trades durability for latency:
	//   - An Append() that returns successfully is NOT guaranteed to
	//     be in the KV when the next reconnect happens.
	//   - A Since() that runs concurrently with a still-flight Append
	//     may observe ErrNotFound on that id and skip it. Documented
	//     visibility window for the AsyncAppend mode.
	// Use only when wire-write latency dominates and replay can
	// tolerate eventual consistency.
	//
	// Backpressure: at the [KVReplayStoreConfig.AsyncAppendConcurrency]
	// cap, Append blocks the caller until a goroutine slot frees. This
	// is intentional — under heavy publish load against a stalled KV,
	// blocking the publisher is preferable to letting goroutine count
	// explode. Visible-latency backpressure beats unbounded memory
	// growth.
	AsyncAppend bool

	// AsyncAppendConcurrency caps the number of in-flight goroutines
	// spawned by AsyncAppend. Zero means [DefaultAsyncAppendConcurrency].
	// A slow / stalled KV (e.g. a Redis instance under degraded
	// network) without this cap could pile up unbounded goroutines.
	// At the cap, Append blocks (see [KVReplayStoreConfig.AsyncAppend]).
	AsyncAppendConcurrency int

	// CounterKey is the KV key under which the cross-instance counter
	// lives. Zero value ⇒ "<prefix>seq". Only consulted when the
	// supplied [store.KV] also implements [store.Counter]; otherwise
	// the constructor falls back to a per-process counter.
	CounterKey string
}

KVReplayStoreConfig tunes NewKVReplayStore. The KV / Prefix / TTL fields are required (the constructor errors when KV is nil); the rest have sensible defaults.

type PreparedEvent added in v1.4.2

type PreparedEvent struct {
	// contains filtered or unexported fields
}

PreparedEvent caches the wire bytes of a single SSE event so the same event can be broadcast to N subscribers with one FormatEvent call. It mirrors the role of websocket.PreparedMessage in the broadcast fan-out path.

Once constructed a PreparedEvent is immutable and safe for concurrent reads from any number of Client.WritePreparedEvent callers.

func NewPreparedEvent added in v1.4.2

func NewPreparedEvent(e Event) *PreparedEvent

NewPreparedEvent formats e once into the SSE wire format and returns a PreparedEvent backed by the formatted bytes.

func (*PreparedEvent) Len added in v1.4.2

func (pe *PreparedEvent) Len() int

Len reports the byte length of the formatted wire payload — useful for accounting + flow-control without exposing the cached bytes (and matching the asymmetric API of websocket.PreparedMessage, which also keeps its frame bytes private).

type ReplayStore added in v1.4.2

type ReplayStore interface {
	// Append records the event and returns the canonical ID the
	// middleware will emit on the wire. Implementations SHOULD ignore
	// e.ID and assign their own monotonically increasing ID.
	Append(ctx context.Context, e Event) (id string, err error)

	// Since returns every event appended strictly after lastID, in
	// append order. lastID == "" means "no checkpoint" — implementations
	// may return the empty slice or a recent-N tail (default: empty).
	// Returns ErrLastIDUnknown if lastID is non-empty but cannot be
	// interpreted.
	Since(ctx context.Context, lastID string) ([]Event, error)
}

ReplayStore persists events and serves them back on reconnect.

Implementations:

  • NewRingBuffer: in-memory, fixed-size; bounded memory; lost on process restart.
  • NewKVReplayStore: backed by a store.KV (memory / Redis / Postgres / Memcached); durable across restarts when the underlying KV is durable; multi-instance users must take care to share an ID space (e.g. via a Redis SetNXer counter — see implementation notes).

All methods must be safe for concurrent use.

func NewKVReplayStore added in v1.4.2

func NewKVReplayStore(cfg KVReplayStoreConfig) (ReplayStore, error)

NewKVReplayStore backs the replay log with a store.KV for durability across restarts and (with a shared backend) across processes. Events live at "<Prefix>events/<id>"; TTL bounds retention.

When the supplied store.KV also implements store.Counter (e.g. the Redis adapter via INCR), IDs are allocated atomically against that backend — multiple processes sharing the KV see a single monotonic ID space. When the KV does not implement Counter, the store falls back to a per-process counter; multi-instance setups will see ID collisions across instances.

Returns an error when KVReplayStoreConfig.KV is nil.

Example

Persist events through a KV-backed ReplayStore for durability across restarts. With a store.Counter-aware KV (e.g. Redis INCR), multiple processes share a single monotonic ID space so reconnects survive load-balancer reshuffles. With AsyncAppend, the wire-write path returns as soon as the local ID is allocated; the KV.Set fires in the background, bounded by AsyncAppendConcurrency.

package main

import (
	"fmt"
	"time"

	"github.com/goceleris/celeris"
	"github.com/goceleris/celeris/middleware/sse"
	"github.com/goceleris/celeris/middleware/store"
)

func main() {
	kv := store.NewMemoryKV()
	defer kv.Close()

	replay, err := sse.NewKVReplayStore(sse.KVReplayStoreConfig{
		KV:                     kv,
		Prefix:                 "myapp/sse/",
		TTL:                    10 * time.Minute,
		AsyncAppend:            true,
		AsyncAppendConcurrency: 32,
	})
	if err != nil {
		panic(err)
	}

	s := celeris.New(celeris.Config{})
	s.GET("/events", sse.New(sse.Config{
		ReplayStore: replay,
		Handler: func(c *sse.Client) {
			_ = c.Send(sse.Event{Data: "hello"})
		},
	}))

	fmt.Println("KV-backed replay SSE handler installed at /events")
}
Output:
KV-backed replay SSE handler installed at /events

func NewRingBuffer added in v1.4.2

func NewRingBuffer(size int) ReplayStore

NewRingBuffer constructs an in-memory ReplayStore retaining the last `size` appended events. Append is O(1); Since is O(retained-after-lastID). IDs are sequential decimal strings starting at 1, so a numerically larger ID is always more recent (within a single ring instance).

`size` is clamped to 1 if non-positive.

Example

Persist events through a ReplayStore so a reconnecting client can resume from its Last-Event-ID without losing in-flight events. The in-memory ring buffer keeps the last 256 events; swap to NewKVReplayStore for durability across restarts.

package main

import (
	"fmt"

	"github.com/goceleris/celeris"
	"github.com/goceleris/celeris/middleware/sse"
)

func main() {
	store := sse.NewRingBuffer(256)

	s := celeris.New(celeris.Config{})
	s.GET("/events", sse.New(sse.Config{
		ReplayStore: store,
		Handler: func(c *sse.Client) {
			_ = c.Send(sse.Event{Data: "hello"})
		},
	}))

	fmt.Println("replay-enabled SSE handler installed at /events")
}
Output:
replay-enabled SSE handler installed at /events

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL