Documentation
¶
Overview ¶
Package stream is part of the GoFastr framework. See https://github.com/DonaldMurillo/gofastr for documentation.
Index ¶
- Variables
- func Encode(e Event) string
- func LastEventID(r *http.Request) string
- type ChunkedWriter
- type Event
- type EventType
- type Hub
- type SSEBroker
- type SSEBrokerConfig
- type SSEWriter
- func (s *SSEWriter) Flush()
- func (s *SSEWriter) SetID(id string)
- func (s *SSEWriter) SetRetry(seconds int)
- func (s *SSEWriter) WriteComment(comment string) error
- func (s *SSEWriter) WriteData(data string) error
- func (s *SSEWriter) WriteDone() error
- func (s *SSEWriter) WriteError(message string) error
- func (s *SSEWriter) WriteEvent(event, data string) error
- func (s *SSEWriter) WriteMessage(data string) error
- type WSConfig
- type WebSocketConn
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("stream: connection closed")
ErrClosed is returned when writing to a closed connection. It is a plain errors.New sentinel so callers can compare with errors.Is.
Functions ¶
func Encode ¶
Encode formats an Event as a W3C-compliant SSE frame.
The output uses the following fields:
- "id:" when Event.ID is non-empty
- "event:" for Error, Done, and Custom types
- "data:" for the payload (multi-line data splits on \n)
- terminated by a blank line ("\n\n")
ID and custom event names are truncated at the first CR/LF/NUL — those bytes terminate an SSE field and would let a caller-supplied value inject forged directives ("event: forged", "data: pwned"…) below it. Multi-line data is split on '\n' and each line is re-prefixed with "data: " so an injected blank line ("\n\n") still appears as a single event frame to the client. CRs and NULs in data are stripped for the same reason.
func LastEventID ¶
LastEventID returns the Last-Event-ID from the request headers or the "last_event_id" query parameter. The value is truncated at the first CR/LF/NUL so a malicious resume token can't inject forged SSE fields when later echoed back to clients.
Types ¶
type ChunkedWriter ¶
type ChunkedWriter struct {
// contains filtered or unexported fields
}
ChunkedWriter writes raw chunks to an http.ResponseWriter with flush.
func NewChunkedWriter ¶
func NewChunkedWriter(w http.ResponseWriter) *ChunkedWriter
NewChunkedWriter creates a ChunkedWriter wrapping w. It panics if w does not implement http.Flusher.
func (*ChunkedWriter) Close ¶
func (c *ChunkedWriter) Close() error
Close performs a final flush. It is safe to call multiple times.
func (*ChunkedWriter) WriteChunk ¶
func (c *ChunkedWriter) WriteChunk(data []byte) error
WriteChunk writes data to the underlying response writer and flushes.
type Event ¶
type Event struct {
Type EventType
Name string // used when Type == Custom
Data string
ID string // optional Last-Event-ID value
}
Event represents a single Server-Sent Event.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub manages a set of WebSocket connections and broadcasts messages to all of them. Multiple hubs can coexist (one per room, channel, etc.).
Usage:
hub := stream.NewHub()
go hub.Run()
// On connect:
hub.Register(conn)
// Broadcast:
hub.Broadcast([]byte("hello everyone"))
// On disconnect:
hub.Unregister(conn)
func (*Hub) Broadcast ¶
Broadcast sends a message to all registered connections. Non-blocking — if the hub's broadcast channel is full, the message is dropped.
func (*Hub) BroadcastWait ¶
BroadcastWait sends a message to all connections, blocking if the broadcast channel is full.
func (*Hub) Register ¶
func (h *Hub) Register(conn *WebSocketConn)
Register adds a connection to the hub. Non-blocking. Returns immediately if the hub has been stopped.
func (*Hub) Run ¶
func (h *Hub) Run()
Run starts the hub's event loop. Block until Stop is called. Must be called in a goroutine:
go hub.Run()
Register/Unregister are now mutex-only ops on the connections map, so Run is only responsible for draining the broadcast channel and shutting down on Stop.
func (*Hub) Unregister ¶
func (h *Hub) Unregister(conn *WebSocketConn)
Unregister removes a connection from the hub. Non-blocking.
type SSEBroker ¶
type SSEBroker struct {
// contains filtered or unexported fields
}
SSEBroker fans out SSE events to multiple HTTP subscribers. Each subscriber gets a buffered channel. Default subscribers drop the oldest queued event when the buffer is full; clients that opt into ?slow=block or X-SSE-Slow: block instead backpressure Publish until buffer space is available.
Buffer size is configurable per-subscriber via query param (?buffer=128) or header (X-SSE-Buffer), with a default fallback bounded by MaxBuf.
func NewSSEBroker ¶
func NewSSEBroker(cfg SSEBrokerConfig) *SSEBroker
NewSSEBroker creates a new broker for fan-out SSE delivery.
func (*SSEBroker) Publish ¶
Publish sends an event to all subscribers. If a default subscriber's buffer is full, the oldest event is dropped. A subscriber that opted into slow=block backpressures this call until buffer space opens or that subscriber is closed. Subscribers are snapshotted under the read lock; sends happen outside the lock to keep fan-out from holding the broker lock during slow per-channel writes.
func (*SSEBroker) Subscribe ¶
func (b *SSEBroker) Subscribe(w http.ResponseWriter, r *http.Request)
Subscribe adds a subscriber and blocks, writing events to the response. The subscriber ID is taken from ?subscriber_id or X-Subscriber-ID header. Buffer size from ?buffer= or X-SSE-Buffer header, clamped to MaxBuf. Subscribe returns when the request context is canceled or the client disconnects.
func (*SSEBroker) SubscriberCount ¶
SubscriberCount returns the number of active subscribers.
type SSEBrokerConfig ¶
type SSEBrokerConfig struct {
Topic string // logical topic name (for logging/debugging)
DefaultBuf int // default subscriber buffer size (0 = 64)
MaxBuf int // maximum allowed subscriber buffer (0 = 1024)
HeartbeatInterval time.Duration // 0 = 30s; emits a comment frame to keep idle connections open
}
SSEBrokerConfig configures the broker.
type SSEWriter ¶
type SSEWriter struct {
// contains filtered or unexported fields
}
SSEWriter writes Server-Sent Events to an http.ResponseWriter.
It automatically sets the required headers (Content-Type, Cache-Control, Connection) on first write and flushes after every event.
func NewSSEWriter ¶
func NewSSEWriter(w http.ResponseWriter) *SSEWriter
NewSSEWriter creates an SSEWriter wrapping w. It panics if w does not implement http.Flusher.
func (*SSEWriter) Flush ¶
func (s *SSEWriter) Flush()
Flush sends any buffered data to the client immediately.
func (*SSEWriter) SetRetry ¶
SetRetry writes the "retry:" field, telling the client how many milliseconds to wait before reconnecting. Non-positive values are dropped: `retry: 0` tells the client to reconnect with zero delay, which spins into a reconnect storm — accidental DoS amplifier.
func (*SSEWriter) WriteComment ¶
WriteComment writes an SSE comment (keepalive):
: <comment>
followed by a blank line and a flush. The comment is truncated at the first CR/LF so a caller can't terminate the comment line and inject arbitrary SSE fields ("event: …", "data: …", …) below it.
func (*SSEWriter) WriteData ¶
WriteData writes an anonymous SSE event (type defaults to "message"):
data: <data>
followed by a blank line and a flush.
func (*SSEWriter) WriteError ¶
WriteError is a convenience for writing an Error event.
func (*SSEWriter) WriteEvent ¶
WriteEvent writes a named SSE event:
event: <name> data: <data>
followed by a blank line and a flush.
CR/LF characters in the event name are stripped — an event name may only occupy a single SSE field line. A caller-supplied newline would otherwise terminate the field and let following bytes appear as arbitrary SSE directives.
func (*SSEWriter) WriteMessage ¶
WriteMessage is a convenience for writing a Message event.
type WSConfig ¶
type WSConfig struct {
// ReadLimit is the maximum message size in bytes. 0 = default 1MB.
ReadLimit int64
// SendBuffer is the number of messages that can be buffered before
// Write blocks. 0 = 32.
SendBuffer int
// WriteTimeout bounds each frame write. 0 means default 10s. Set
// negative to disable (not recommended): a peer with a full TCP send
// buffer otherwise pins the writePump and keepalive goroutines forever.
WriteTimeout time.Duration
// CheckOrigin returns true if the Origin header is acceptable.
// If nil, Upgrade enforces same-origin by comparing Origin host to
// the request Host. Use a custom CheckOrigin to allow cross-origin
// upgrades (e.g. for trusted third-party clients).
CheckOrigin func(*http.Request) bool
// ReadIdleTimeout bounds the longest period of read inactivity before
// the keepalive sends a Ping. 0 means default 60s. Set negative to
// disable keepalive entirely.
ReadIdleTimeout time.Duration
// PongTimeout bounds how long after a Ping we wait for the matching
// Pong. If exceeded, the connection is closed. 0 means default 10s.
// Set negative to disable the pong timeout check.
PongTimeout time.Duration
// CloseTimeout caps how long Close() waits for the peer's reciprocal
// Close frame after sending our own. 0 means default 1s.
CloseTimeout time.Duration
// Subprotocols is the server's preferred list of WebSocket subprotocols
// in priority order. During Upgrade, the first subprotocol that the
// client offered AND we support is echoed back via
// Sec-WebSocket-Protocol. If no match, no header is sent (RFC 6455).
Subprotocols []string
// OnClose is called when the connection closes.
OnClose func()
// contains filtered or unexported fields
}
WSConfig configures the WebSocket connection.
type WebSocketConn ¶
type WebSocketConn struct {
// contains filtered or unexported fields
}
WebSocketConn wraps a hijacked HTTP connection as a simple WebSocket client. It implements a minimal WebSocket frame parser sufficient for the framework's needs: text and binary messages, close frames.
For production use with full RFC 6455 compliance, use a dedicated WebSocket library (nhooyr.io/websocket, gorilla/websocket). This implementation avoids external dependencies so the core/stream package compiles without `go get` additions.
Backpressure: writes block when the send buffer is full. The caller controls the read loop.
func Upgrade ¶
func Upgrade(w http.ResponseWriter, r *http.Request, cfg WSConfig) (*WebSocketConn, error)
Upgrade upgrades an HTTP connection to a simple WebSocket. Performs the HTTP upgrade handshake and returns a managed connection.
func (*WebSocketConn) Close ¶
func (c *WebSocketConn) Close() error
Close closes the WebSocket connection. Safe to call multiple times. Performs the RFC 6455 closing handshake: sends a Close frame, then waits up to CloseTimeout for the peer's reciprocal Close before tearing down the underlying TCP connection. This avoids the abnormal 1006 close code on the peer side.
If the peer initiated the close, the echo Close frame preserves the peer's 2-byte status code per RFC 6455 §5.5.1. Otherwise we send an empty Close payload (status 1000 implied by absence).
func (*WebSocketConn) Closed ¶
func (c *WebSocketConn) Closed() <-chan struct{}
Closed returns a channel closed when the connection closes.
func (*WebSocketConn) OnClose ¶
func (c *WebSocketConn) OnClose(fn func())
OnClose registers a callback for when the connection closes.
func (*WebSocketConn) Read ¶
func (c *WebSocketConn) Read() ([]byte, error)
Read reads a message from the client. Blocks until a message arrives or the connection closes.
func (*WebSocketConn) Write ¶
func (c *WebSocketConn) Write(data []byte) error
Write sends a text message to the client. Blocks if the send buffer is full (backpressure). Returns an error if the connection is closed.
func (*WebSocketConn) WriteString ¶
func (c *WebSocketConn) WriteString(data string) error
WriteString is a convenience for sending a text message.