sse

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package sse provides Server-Sent Events (SSE) middleware for celeris.

SSE enables unidirectional server-to-client streaming over HTTP. This middleware manages event formatting, heartbeat keep-alives, reconnection via Last-Event-ID, and client disconnect detection.

Basic Usage

server.GET("/events", sse.New(sse.Config{
    Handler: func(client *sse.Client) {
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
        for {
            select {
            case <-client.Context().Done():
                return
            case t := <-ticker.C:
                if err := client.Send(sse.Event{
                    Event: "time",
                    Data:  t.Format(time.RFC3339),
                }); err != nil {
                    return
                }
            }
        }
    },
}))

Reconnection

When a client reconnects, browsers send a Last-Event-ID header. Use Client.LastEventID to resume from where the client left off:

Handler: func(client *sse.Client) {
    lastID := client.LastEventID()
    events := fetchEventsSince(lastID)
    for _, e := range events {
        client.Send(e)
    }
},

Heartbeat

By default, a heartbeat comment is sent every 15 seconds to detect disconnected clients. Configure via [Config.HeartbeatInterval] or disable with a negative value.

Engine Compatibility

SSE works with all celeris engines (std, epoll, io_uring). The middleware handles celeris.Context.Detach internally — callers do not need to manage the event loop lifecycle.

CORS

For cross-origin EventSource connections, configure CORS middleware on the SSE endpoint to allow the appropriate origin.

Index

Examples

Constants

View Source
const (
	// DefaultHeartbeatInterval is the default interval between heartbeat
	// comments sent to detect client disconnects.
	DefaultHeartbeatInterval = 15 * time.Second
)

Variables

View Source
var ErrClientClosed = errors.New("sse: client closed")

ErrClientClosed is returned when Send is called on a closed Client.

Functions

func FormatEvent

func FormatEvent(buf []byte, e Event) []byte

FormatEvent formats an SSE event into buf, reusing its capacity. Exported for benchmarking; most users should use Client.Send instead.

func New

func New(config ...Config) celeris.HandlerFunc

New creates an SSE handler with the given config.

Usage:

server.GET("/events", sse.New(sse.Config{
    Handler: func(client *sse.Client) {
        for msg := range messages {
            if err := client.Send(sse.Event{Data: msg}); err != nil {
                return
            }
        }
    },
}))
Example

Stream a counter to the client every second; client disconnects close the context, which ends the loop cleanly.

package main

import (
	"fmt"
	"strconv"
	"time"

	"github.com/goceleris/celeris"
	"github.com/goceleris/celeris/middleware/sse"
)

func main() {
	s := celeris.New(celeris.Config{})

	s.GET("/events", sse.New(sse.Config{
		HeartbeatInterval: 30 * time.Second,
		Handler: func(c *sse.Client) {
			ticker := time.NewTicker(time.Second)
			defer ticker.Stop()
			i := 0
			for {
				select {
				case <-c.Context().Done():
					return
				case <-ticker.C:
					if err := c.Send(sse.Event{
						Event: "tick",
						ID:    strconv.Itoa(i),
						Data:  "tick " + strconv.Itoa(i),
					}); err != nil {
						return
					}
					i++
				}
			}
		},
	}))

	fmt.Println("SSE handler installed at /events")
}
Output:
SSE handler installed at /events

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client provides the API for sending SSE events to a connected client. It wraps a celeris.StreamWriter and manages event formatting, heartbeat, and disconnect detection.

func (*Client) Close

func (c *Client) Close() error

Close closes the SSE stream. Safe to call multiple times.

func (*Client) Context

func (c *Client) Context() context.Context

Context returns a context.Context that is cancelled when the client disconnects or the stream is closed.

func (*Client) LastEventID

func (c *Client) LastEventID() string

LastEventID returns the value of the Last-Event-ID header sent by the client on reconnection. Returns empty string for initial connections.

Example

Resume from a Last-Event-ID supplied by the browser's reconnect logic.

package main

import (
	"fmt"
	"strconv"

	"github.com/goceleris/celeris/middleware/sse"
)

func main() {
	handler := sse.New(sse.Config{
		Handler: func(c *sse.Client) {
			start := 0
			if id := c.LastEventID(); id != "" {
				if n, err := strconv.Atoi(id); err == nil {
					start = n + 1
				}
			}
			_ = c.Send(sse.Event{
				ID:   strconv.Itoa(start),
				Data: "resumed at " + strconv.Itoa(start),
			})
		},
	})

	_ = handler
	fmt.Println("ok")
}
Output:
ok

func (*Client) Send

func (c *Client) Send(e Event) error

Send sends a complete SSE event. Thread-safe (serialized with heartbeat writes). Returns an error if the client has disconnected or the stream is closed.

func (*Client) SendComment

func (c *Client) SendComment(text string) error

SendComment sends a comment line. Useful for custom keep-alives.

func (*Client) SendData

func (c *Client) SendData(data string) error

SendData is a convenience for sending a data-only event. Equivalent to Send(Event{Data: data}).

type Config

type Config struct {
	// Handler is the SSE handler function called for each connected client.
	// Required; panics at init if nil.
	Handler Handler

	// HeartbeatInterval is the interval between heartbeat comments sent to
	// detect client disconnects. Set to a negative value to disable.
	// Default: 15s.
	HeartbeatInterval time.Duration

	// RetryInterval is the reconnection time (in milliseconds) sent to the
	// client in the initial "retry:" field. Zero means no retry field is sent
	// (client uses its default, typically ~3s).
	RetryInterval int

	// OnConnect is called when a new SSE client connects, before Handler.
	// The celeris.Context is available for extracting request metadata.
	// Return a non-nil error to reject the connection.
	OnConnect func(c *celeris.Context, client *Client) error

	// OnDisconnect is called after the SSE stream closes.
	OnDisconnect func(c *celeris.Context, client *Client)

	// Skip defines a function to skip this middleware for certain requests.
	Skip func(c *celeris.Context) bool

	// SkipPaths lists paths to skip from SSE handling (exact match).
	SkipPaths []string
}

Config defines the SSE middleware configuration.

type Event

type Event struct {
	// ID is the event ID. If non-empty, sent as "id: <ID>\n".
	// The client stores this and sends it back as Last-Event-ID on reconnect.
	ID string

	// Event is the event type. If non-empty, sent as "event: <Event>\n".
	// Defaults to "message" on the client side when omitted.
	Event string

	// Data is the event payload. Sent as "data: <line>\n" for each line.
	// Multi-line data is split on \n and each line gets its own "data:" prefix.
	Data string

	// Retry is the reconnection time in milliseconds. If > 0, sent as
	// "retry: <Retry>\n". Per the SSE specification, this value is in
	// milliseconds (not time.Duration) to match the wire format directly.
	Retry int
}

Event represents a single Server-Sent Event.

type Handler

type Handler func(client *Client)

Handler is the function signature for SSE endpoint handlers. The Client is valid for the duration of the call. When Handler returns, the SSE stream is closed automatically.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL