stream

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package stream is part of the GoFastr framework. See https://github.com/DonaldMurillo/gofastr for documentation.

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("stream: connection closed")

ErrClosed is returned when writing to a closed connection. It is a plain errors.New sentinel so callers can compare with errors.Is.

Functions

func Encode

func Encode(e Event) string

Encode formats an Event as a W3C-compliant SSE frame.

The output uses the following fields:

  • "id:" when Event.ID is non-empty
  • "event:" for Error, Done, and Custom types
  • "data:" for the payload (multi-line data splits on \n)
  • terminated by a blank line ("\n\n")

ID and custom event names are truncated at the first CR/LF/NUL — those bytes terminate an SSE field and would let a caller-supplied value inject forged directives ("event: forged", "data: pwned"…) below it. Multi-line data is split on '\n' and each line is re-prefixed with "data: " so an injected blank line ("\n\n") still appears as a single event frame to the client. CRs and NULs in data are stripped for the same reason.

func LastEventID

func LastEventID(r *http.Request) string

LastEventID returns the Last-Event-ID from the request headers or the "last_event_id" query parameter. The value is truncated at the first CR/LF/NUL so a malicious resume token can't inject forged SSE fields when later echoed back to clients.

Types

type ChunkedWriter

type ChunkedWriter struct {
	// contains filtered or unexported fields
}

ChunkedWriter writes raw chunks to an http.ResponseWriter with flush.

func NewChunkedWriter

func NewChunkedWriter(w http.ResponseWriter) *ChunkedWriter

NewChunkedWriter creates a ChunkedWriter wrapping w. It panics if w does not implement http.Flusher.

func (*ChunkedWriter) Close

func (c *ChunkedWriter) Close() error

Close performs a final flush. It is safe to call multiple times.

func (*ChunkedWriter) WriteChunk

func (c *ChunkedWriter) WriteChunk(data []byte) error

WriteChunk writes data to the underlying response writer and flushes.

type Event

type Event struct {
	Type EventType
	Name string // used when Type == Custom
	Data string
	ID   string // optional Last-Event-ID value
}

Event represents a single Server-Sent Event.

type EventType

type EventType int

EventType represents the kind of SSE event.

const (
	// Message is a standard data event.
	Message EventType = iota
	// Error signals an error to the client.
	Error
	// Done is the terminal sentinel event.
	Done
	// Custom is a named event with an arbitrary event type string.
	Custom
)

func (EventType) String

func (t EventType) String() string

String returns a human-readable label for the EventType.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub manages a set of WebSocket connections and broadcasts messages to all of them. Multiple hubs can coexist (one per room, channel, etc.).

Usage:

hub := stream.NewHub()
go hub.Run()

// On connect:
hub.Register(conn)

// Broadcast:
hub.Broadcast([]byte("hello everyone"))

// On disconnect:
hub.Unregister(conn)

func NewHub

func NewHub() *Hub

NewHub creates a new Hub.

func (*Hub) Broadcast

func (h *Hub) Broadcast(msg []byte)

Broadcast sends a message to all registered connections. Non-blocking — if the hub's broadcast channel is full, the message is dropped.

func (*Hub) BroadcastWait

func (h *Hub) BroadcastWait(msg []byte)

BroadcastWait sends a message to all connections, blocking if the broadcast channel is full.

func (*Hub) Count

func (h *Hub) Count() int

Count returns the number of active connections.

func (*Hub) Register

func (h *Hub) Register(conn *WebSocketConn)

Register adds a connection to the hub. Non-blocking. Returns immediately if the hub has been stopped.

func (*Hub) Run

func (h *Hub) Run()

Run starts the hub's event loop. Block until Stop is called. Must be called in a goroutine:

go hub.Run()

Register/Unregister are now mutex-only ops on the connections map, so Run is only responsible for draining the broadcast channel and shutting down on Stop.

func (*Hub) Stop

func (h *Hub) Stop()

Stop stops the hub and closes all registered connections.

func (*Hub) Unregister

func (h *Hub) Unregister(conn *WebSocketConn)

Unregister removes a connection from the hub. Non-blocking.

type SSEBroker

type SSEBroker struct {
	// contains filtered or unexported fields
}

SSEBroker fans out SSE events to multiple HTTP subscribers. Each subscriber gets a buffered channel. Default subscribers drop the oldest queued event when the buffer is full; clients that opt into ?slow=block or X-SSE-Slow: block instead backpressure Publish until buffer space is available.

Buffer size is configurable per-subscriber via query param (?buffer=128) or header (X-SSE-Buffer), with a default fallback bounded by MaxBuf.

func NewSSEBroker

func NewSSEBroker(cfg SSEBrokerConfig) *SSEBroker

NewSSEBroker creates a new broker for fan-out SSE delivery.

func (*SSEBroker) Publish

func (b *SSEBroker) Publish(name, data string, id ...string)

Publish sends an event to all subscribers. If a default subscriber's buffer is full, the oldest event is dropped. A subscriber that opted into slow=block backpressures this call until buffer space opens or that subscriber is closed. Subscribers are snapshotted under the read lock; sends happen outside the lock to keep fan-out from holding the broker lock during slow per-channel writes.

func (*SSEBroker) Subscribe

func (b *SSEBroker) Subscribe(w http.ResponseWriter, r *http.Request)

Subscribe adds a subscriber and blocks, writing events to the response. The subscriber ID is taken from ?subscriber_id or X-Subscriber-ID header. Buffer size from ?buffer= or X-SSE-Buffer header, clamped to MaxBuf. Subscribe returns when the request context is canceled or the client disconnects.

func (*SSEBroker) SubscriberCount

func (b *SSEBroker) SubscriberCount() int

SubscriberCount returns the number of active subscribers.

type SSEBrokerConfig

type SSEBrokerConfig struct {
	Topic             string        // logical topic name (for logging/debugging)
	DefaultBuf        int           // default subscriber buffer size (0 = 64)
	MaxBuf            int           // maximum allowed subscriber buffer (0 = 1024)
	HeartbeatInterval time.Duration // 0 = 30s; emits a comment frame to keep idle connections open
}

SSEBrokerConfig configures the broker.

type SSEWriter

type SSEWriter struct {
	// contains filtered or unexported fields
}

SSEWriter writes Server-Sent Events to an http.ResponseWriter.

It automatically sets the required headers (Content-Type, Cache-Control, Connection) on first write and flushes after every event.

func NewSSEWriter

func NewSSEWriter(w http.ResponseWriter) *SSEWriter

NewSSEWriter creates an SSEWriter wrapping w. It panics if w does not implement http.Flusher.

func (*SSEWriter) Flush

func (s *SSEWriter) Flush()

Flush sends any buffered data to the client immediately.

func (*SSEWriter) SetID

func (s *SSEWriter) SetID(id string)

SetID queues an "id:" field to be emitted before the next event.

func (*SSEWriter) SetRetry

func (s *SSEWriter) SetRetry(seconds int)

SetRetry writes the "retry:" field, telling the client how many milliseconds to wait before reconnecting. Non-positive values are dropped: `retry: 0` tells the client to reconnect with zero delay, which spins into a reconnect storm — accidental DoS amplifier.

func (*SSEWriter) WriteComment

func (s *SSEWriter) WriteComment(comment string) error

WriteComment writes an SSE comment (keepalive):

: <comment>

followed by a blank line and a flush. The comment is truncated at the first CR/LF so a caller can't terminate the comment line and inject arbitrary SSE fields ("event: …", "data: …", …) below it.

func (*SSEWriter) WriteData

func (s *SSEWriter) WriteData(data string) error

WriteData writes an anonymous SSE event (type defaults to "message"):

data: <data>

followed by a blank line and a flush.

func (*SSEWriter) WriteDone

func (s *SSEWriter) WriteDone() error

WriteDone sends the terminal "[DONE]" sentinel.

func (*SSEWriter) WriteError

func (s *SSEWriter) WriteError(message string) error

WriteError is a convenience for writing an Error event.

func (*SSEWriter) WriteEvent

func (s *SSEWriter) WriteEvent(event, data string) error

WriteEvent writes a named SSE event:

event: <name>
data: <data>

followed by a blank line and a flush.

CR/LF characters in the event name are stripped — an event name may only occupy a single SSE field line. A caller-supplied newline would otherwise terminate the field and let following bytes appear as arbitrary SSE directives.

func (*SSEWriter) WriteMessage

func (s *SSEWriter) WriteMessage(data string) error

WriteMessage is a convenience for writing a Message event.

type WSConfig

type WSConfig struct {
	// ReadLimit is the maximum message size in bytes. 0 = default 1MB.
	ReadLimit int64

	// SendBuffer is the number of messages that can be buffered before
	// Write blocks. 0 = 32.
	SendBuffer int

	// WriteTimeout bounds each frame write. 0 means default 10s. Set
	// negative to disable (not recommended): a peer with a full TCP send
	// buffer otherwise pins the writePump and keepalive goroutines forever.
	WriteTimeout time.Duration

	// CheckOrigin returns true if the Origin header is acceptable.
	// If nil, Upgrade enforces same-origin by comparing Origin host to
	// the request Host. Use a custom CheckOrigin to allow cross-origin
	// upgrades (e.g. for trusted third-party clients).
	CheckOrigin func(*http.Request) bool

	// ReadIdleTimeout bounds the longest period of read inactivity before
	// the keepalive sends a Ping. 0 means default 60s. Set negative to
	// disable keepalive entirely.
	ReadIdleTimeout time.Duration

	// PongTimeout bounds how long after a Ping we wait for the matching
	// Pong. If exceeded, the connection is closed. 0 means default 10s.
	// Set negative to disable the pong timeout check.
	PongTimeout time.Duration

	// CloseTimeout caps how long Close() waits for the peer's reciprocal
	// Close frame after sending our own. 0 means default 1s.
	CloseTimeout time.Duration

	// Subprotocols is the server's preferred list of WebSocket subprotocols
	// in priority order. During Upgrade, the first subprotocol that the
	// client offered AND we support is echoed back via
	// Sec-WebSocket-Protocol. If no match, no header is sent (RFC 6455).
	Subprotocols []string

	// OnClose is called when the connection closes.
	OnClose func()
	// contains filtered or unexported fields
}

WSConfig configures the WebSocket connection.

type WebSocketConn

type WebSocketConn struct {
	// contains filtered or unexported fields
}

WebSocketConn wraps a hijacked HTTP connection as a simple WebSocket client. It implements a minimal WebSocket frame parser sufficient for the framework's needs: text and binary messages, close frames.

For production use with full RFC 6455 compliance, use a dedicated WebSocket library (nhooyr.io/websocket, gorilla/websocket). This implementation avoids external dependencies so the core/stream package compiles without `go get` additions.

Backpressure: writes block when the send buffer is full. The caller controls the read loop.

func Upgrade

func Upgrade(w http.ResponseWriter, r *http.Request, cfg WSConfig) (*WebSocketConn, error)

Upgrade upgrades an HTTP connection to a simple WebSocket. Performs the HTTP upgrade handshake and returns a managed connection.

func (*WebSocketConn) Close

func (c *WebSocketConn) Close() error

Close closes the WebSocket connection. Safe to call multiple times. Performs the RFC 6455 closing handshake: sends a Close frame, then waits up to CloseTimeout for the peer's reciprocal Close before tearing down the underlying TCP connection. This avoids the abnormal 1006 close code on the peer side.

If the peer initiated the close, the echo Close frame preserves the peer's 2-byte status code per RFC 6455 §5.5.1. Otherwise we send an empty Close payload (status 1000 implied by absence).

func (*WebSocketConn) Closed

func (c *WebSocketConn) Closed() <-chan struct{}

Closed returns a channel closed when the connection closes.

func (*WebSocketConn) OnClose

func (c *WebSocketConn) OnClose(fn func())

OnClose registers a callback for when the connection closes.

func (*WebSocketConn) Read

func (c *WebSocketConn) Read() ([]byte, error)

Read reads a message from the client. Blocks until a message arrives or the connection closes.

func (*WebSocketConn) Write

func (c *WebSocketConn) Write(data []byte) error

Write sends a text message to the client. Blocks if the send buffer is full (backpressure). Returns an error if the connection is closed.

func (*WebSocketConn) WriteString

func (c *WebSocketConn) WriteString(data string) error

WriteString is a convenience for sending a text message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL