websocket

package
v1.4.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package websocket provides a zero-dependency native WebSocket middleware for celeris, implementing RFC 6455.

Basic Echo Server

server.GET("/ws", websocket.New(websocket.Config{
    Handler: func(c *websocket.Conn) {
        for {
            mt, msg, err := c.ReadMessage()
            if err != nil {
                break
            }
            if err := c.WriteMessage(mt, msg); err != nil {
                break
            }
        }
    },
}))

Origin Checking

By default, a same-origin check is enforced (the Origin header must match the Host header). To allow all origins:

websocket.New(websocket.Config{
    CheckOrigin: func(c *celeris.Context) bool { return true },
    Handler:     myHandler,
})

To restrict to specific origins:

websocket.New(websocket.Config{
    CheckOrigin: func(c *celeris.Context) bool {
        return c.Header("origin") == "https://example.com"
    },
    Handler: myHandler,
})

JSON Messaging

websocket.New(websocket.Config{
    Handler: func(c *websocket.Conn) {
        var msg MyType
        if err := c.ReadJSON(&msg); err != nil {
            return
        }
        c.WriteJSON(msg)
    },
})

Subprotocol Negotiation

websocket.New(websocket.Config{
    Subprotocols: []string{"graphql-transport-ws"},
    Handler: func(c *websocket.Conn) {
        proto := c.Subprotocol()
        // handle based on negotiated protocol
    },
})

HTTP/2 Limitation

WebSocket requires HTTP/1.1 connection hijacking. HTTP/2 multiplexes streams over a single TCP connection, making hijack impossible. This middleware returns 426 Upgrade Required for HTTP/2 requests.

Concurrency

All write methods (Conn.WriteMessage, Conn.WriteText, Conn.WriteBinary, Conn.WriteJSON, Conn.WritePing) are internally serialized and safe for concurrent use from multiple goroutines. A single goroutine may call Conn.ReadMessage while others write concurrently.

Conn.SetPingHandler, Conn.SetPongHandler, and Conn.SetCloseHandler must be called before starting the read loop.

Keepalive (Ping/Pong)

Detect dead connections with periodic pings:

Handler: func(c *websocket.Conn) {
    c.SetPongHandler(func(data []byte) error {
        c.SetReadDeadline(time.Now().Add(60 * time.Second))
        return nil
    })
    c.SetReadDeadline(time.Now().Add(60 * time.Second))

    go func() {
        ticker := time.NewTicker(30 * time.Second)
        defer ticker.Stop()
        for range ticker.C {
            if err := c.WritePing(nil); err != nil {
                return
            }
        }
    }()

    for {
        mt, msg, err := c.ReadMessage()
        if err != nil { break }
        c.WriteMessage(mt, msg)
    }
},

Compression (permessage-deflate)

Enable RFC 7692 permessage-deflate compression:

websocket.New(websocket.Config{
    EnableCompression: true,
    Handler:           myHandler,
})

Compression is negotiated during the upgrade handshake. Messages above the compression threshold (default 128 bytes) are compressed transparently.

Streaming Large Messages

For large messages, use Conn.NextReader and Conn.NextWriter to avoid buffering the entire message in memory:

mt, reader, _ := c.NextReader()
io.Copy(dst, reader)

writer, _ := c.NextWriter(websocket.TextMessage)
writer.Write(chunk1)
writer.Write(chunk2)
writer.Close()

ReadMessage vs ReadMessageReuse

Conn.ReadMessage returns an owned copy (safe to retain). Conn.ReadMessageReuse returns a reused buffer (zero-alloc, only valid until the next read call). Use ReadMessageReuse for echo servers and message-forwarding proxies where the data is processed immediately.

Access Request Data

Route params, query params, and headers are captured at upgrade time:

// Route: /ws/:room
c.Param("room")      // route parameter
c.Query("token")     // query parameter
c.Header("origin")   // request header

Engine-integrated mode vs hijack mode

On the std engine, WebSocket connections are upgraded via Go's standard connection hijacking — the handler runs on a goroutine and reads/writes directly on a *net.TCPConn. On the native engines (epoll, io_uring), the connection stays in the event loop after upgrade: inbound chunks are delivered to the handler goroutine via an internal chanReader (which applies TCP-level backpressure on overflow), and outbound writes go through the engine's per-connection write buffer (cs.writeBuf).

The same Handler signature works for both modes — the middleware picks the engine-integrated path automatically when the engine supports it and falls back to hijack on the std engine.

Backpressure semantics

On the engine path, the WebSocket conn maintains a bounded chanReader between the event loop and the handler goroutine. When the buffer fills past the high-water mark (75% of Config.MaxBackpressureBuffer), the engine pauses inbound delivery for that connection (epoll: drops EPOLLIN; io_uring: cancels the in-flight RECV). The kernel then closes the TCP receive window, slowing the peer at the network level. When the buffer drains below 25%, the engine resumes inbound delivery.

On the std (hijack) path, backpressure is handled directly by the kernel's TCP stack via net.Conn.Read returning when the kernel buffer has data — no middleware-level buffering happens.

In healthy operation Conn.BackpressureDropped returns 0. A non-zero value indicates the engine pause/resume mechanism is malfunctioning.

IdleTimeout semantics

On the std (hijack) path, Config.IdleTimeout is enforced via net.Conn.SetReadDeadline, which is reset before each blocking read. On the engine path, the WS middleware extends an absolute deadline via [Context.SetWSIdleDeadline] after each successful frame read, and the engine's idle sweep closes connections whose deadline has expired. Both paths converge to the same observable behavior.

WriteControl deadline semantics

Conn.WriteControl applies the supplied deadline to the channel-based write semaphore (so a stalled large NextWriter cannot indefinitely block pings/pongs). On the std path, the deadline is also pinned to the underlying net.Conn via net.Conn.SetWriteDeadline so a peer that has stopped reading cannot stall the actual flush. On the engine path, writes go into the engine's write buffer and never block at the syscall level — only the lock-acquisition deadline applies.

Fan-out (Hub)

For broadcasting a single message to N connections, use Hub with PreparedMessage. The frame is encoded once per uncompressed / compressed variant and reused across every Conn.WritePreparedMessage dispatch — so per-message wire-encoding cost is O(1) regardless of subscriber count, while per-Conn write throughput remains the engine's normal write path.

hub := websocket.NewHub(websocket.HubConfig{
    OnSlowConn: func(c *websocket.Conn, err error) websocket.HubPolicy {
        return websocket.HubPolicyClose // boot misbehaving peers
    },
})
server.GET("/ws", websocket.New(websocket.Config{
    Handler: func(c *websocket.Conn) {
        unregister := hub.Register(c)
        defer unregister()
        // your read loop here
    },
}))
// Publishers anywhere in the app:
hub.Broadcast(websocket.TextMessage, []byte(`{"type":"tick"}`))

Hub.Close drains every in-flight Broadcast (via an internal inflight WaitGroup) before tearing down conns, so a shutdown that synchronises on Close cannot race a still-fanning-out message.

Authorization MUST happen before Hub.Register. Hub broadcasts go to every registered connection unfiltered; if a per-conn ACL is required, use Hub.BroadcastFilter with a pure predicate.

PreparedMessage rejects control opcodes (Ping/Pong/Close) at construction time — control frames have RFC 6455 §5.5 size and fragmentation constraints that the cache-and-reuse model can't satisfy. Use Conn.WriteControl per-conn for those.

See also: middleware/sse for the equivalent on Server-Sent Events, where the same broker pattern lives as middleware/sse.Broker.

Index

Examples

Constants

View Source
const (
	CompressionLevelDefault   = flate.DefaultCompression // -1
	CompressionLevelBestSpeed = flate.BestSpeed          // 1
	CompressionLevelBestSize  = flate.BestCompression    // 9
	CompressionLevelHuffman   = flate.HuffmanOnly        // -2

)

Compression levels matching compress/flate.

View Source
const (
	// TextMessage denotes a UTF-8 text message.
	TextMessage = OpText
	// BinaryMessage denotes a binary message.
	BinaryMessage = OpBinary
)
View Source
const (
	CloseNormalClosure    = 1000
	CloseGoingAway        = 1001
	CloseProtocolError    = 1002
	CloseUnsupportedData  = 1003
	CloseNoStatusReceived = 1005
	CloseAbnormalClosure  = 1006
	CloseInvalidPayload   = 1007
	ClosePolicyViolation  = 1008
	CloseMessageTooBig    = 1009
	CloseMandatoryExt     = 1010
	CloseInternalError    = 1011
	CloseServiceRestart   = 1012
	CloseTryAgainLater    = 1013
)

Close status codes (RFC 6455 Section 7.4.1).

Variables

View Source
var (
	ErrProtocol          = errors.New("websocket: protocol error")
	ErrFrameTooLarge     = errors.New("websocket: frame payload too large")
	ErrReservedBits      = errors.New("websocket: reserved bits set")
	ErrFragmentedControl = errors.New("websocket: fragmented control frame")
	ErrControlTooLarge   = errors.New("websocket: control frame payload > 125")
	ErrInvalidCloseData  = errors.New("websocket: invalid close frame data")
	ErrInvalidUTF8       = errors.New("websocket: invalid UTF-8 in text frame")
	ErrReadLimit         = errors.New("websocket: message exceeds read limit")
	ErrClosed            = errors.New("websocket: connection closed")
	ErrWriteClosed       = errors.New("websocket: write on closed connection")
	ErrWriteTimeout      = errors.New("websocket: write deadline exceeded")
)

Errors.

View Source
var ErrInvalidPreparedOpcode = errors.New("websocket: PreparedMessage rejects control opcodes (RFC 6455 §5.5)")

ErrInvalidPreparedOpcode is returned by NewPreparedMessage when the caller passes an opcode that cannot be safely cached for fan-out.

Permitted opcodes are OpText, OpBinary, and OpContinuation (data frames). Control opcodes (OpClose, OpPing, OpPong) are rejected because RFC 6455 §5.5 requires control frames to be ≤125 bytes and non-fragmented; PreparedMessage's whole purpose is to share the frame across many Conn.WritePreparedMessage calls, but a >125-byte control frame would be a per-connection protocol violation. Callers who genuinely want to broadcast a control frame should use Conn.WriteControl per-connection instead, which validates length.

Functions

func DefaultHubConcurrency added in v1.4.2

func DefaultHubConcurrency() int

DefaultHubConcurrency is used when HubConfig.MaxConcurrency is zero. Sized at GOMAXPROCS*4 — enough headroom that fast-cohort dispatches (where a slow conn is rare) never queue, while bounding peak goroutine count under burst load to a small multiple of CPU cores.

func FormatCloseMessage

func FormatCloseMessage(code int, text string) []byte

FormatCloseMessage creates a close frame payload with the given code and text.

func IsCloseError

func IsCloseError(err error, codes ...int) bool

IsCloseError returns true if err is a CloseError with one of the given codes.

func IsUnexpectedCloseError

func IsUnexpectedCloseError(err error, expectedCodes ...int) bool

IsUnexpectedCloseError returns true if err is a CloseError whose code is NOT in the given list.

func New

func New(config ...Config) celeris.HandlerFunc

New creates a WebSocket middleware that upgrades matching requests.

Non-WebSocket requests are passed through to the next handler. HTTP/2 requests receive a 426 Upgrade Required response because connection hijacking is not possible over multiplexed streams.

On native engines (epoll, io_uring), the connection remains in the event loop after upgrade — reads are delivered by the engine, writes go through the engine's write buffer with backpressure. On the std engine, the connection is hijacked for direct I/O.

This is a zero-dependency native WebSocket implementation (RFC 6455).

Usage:

server.GET("/ws", websocket.New(websocket.Config{
    Handler: func(c *websocket.Conn) {
        for {
            mt, msg, err := c.ReadMessage()
            if err != nil {
                break
            }
            c.WriteMessage(mt, msg)
        }
    },
}))
Example

Echo each message back to the client. The middleware automatically uses the engine-integrated path on epoll/io_uring (with TCP-level backpressure) and falls back to net/http Hijack on the std engine.

package main

import (
	"fmt"
	"time"

	"github.com/goceleris/celeris"
	"github.com/goceleris/celeris/middleware/websocket"
)

func main() {
	s := celeris.New(celeris.Config{})

	s.GET("/ws", websocket.New(websocket.Config{
		EnableCompression: true,
		IdleTimeout:       60 * time.Second,
		Handler: func(c *websocket.Conn) {
			for {
				mt, data, err := c.ReadMessageReuse()
				if err != nil {
					return
				}
				if err := c.WriteMessage(mt, data); err != nil {
					return
				}
			}
		},
	}))

	fmt.Println("WebSocket echo handler installed at /ws")
}
Output:
WebSocket echo handler installed at /ws

Types

type BufferPool

type BufferPool interface {
	// Get returns a bufio.Writer reset to write into dst. The pool
	// should Reset(dst) on borrow so the returned writer has no stale
	// buffered bytes. If the pool is empty, it must allocate a new
	// [bufio.Writer] (typically with [bufio.NewWriterSize]).
	Get(dst io.Writer) *bufio.Writer
	// Put returns a bufio.Writer to the pool. The caller has already
	// called Flush on the writer. Implementations may discard the
	// writer if e.g. its buffer grew beyond an acceptable size.
	Put(bw *bufio.Writer)
}

BufferPool is an interface for borrowing and returning bufio.Writer instances that the WebSocket writer uses on the hijack (std engine) path. A typical production implementation wraps a sync.Pool:

type wsPool struct{ p sync.Pool }
func (w *wsPool) Get(dst io.Writer) *bufio.Writer {
    if v := w.p.Get(); v != nil {
        bw := v.(*bufio.Writer)
        bw.Reset(dst)
        return bw
    }
    return bufio.NewWriterSize(dst, 4096)
}
func (w *wsPool) Put(bw *bufio.Writer) { w.p.Put(bw) }

BufferPool is not consulted on the native-engine path (epoll/io_uring), which uses the engine's per-connection write buffer internally.

type CloseError

type CloseError struct {
	Code int
	Text string
}

CloseError is returned when a close frame is received.

func (*CloseError) Error

func (e *CloseError) Error() string

type Config

type Config struct {
	// Handler is called after a successful WebSocket upgrade.
	// Required. Panics if nil.
	Handler Handler

	// Skip defines a function to skip this middleware for certain requests.
	Skip func(c *celeris.Context) bool

	// SkipPaths lists paths to skip (exact match on c.Path()).
	SkipPaths []string

	// CheckOrigin returns true if the request origin is acceptable.
	// If nil, the default same-origin check is used (Origin header must
	// match the Host header). Set to func(*celeris.Context) bool { return true }
	// to allow all origins.
	CheckOrigin func(c *celeris.Context) bool

	// Subprotocols specifies the server's supported protocols in preference order.
	Subprotocols []string

	// ReadBufferSize specifies the I/O read buffer size in bytes.
	// Default: 4096.
	ReadBufferSize int

	// WriteBufferSize specifies the I/O write buffer size in bytes.
	// Default: 4096.
	WriteBufferSize int

	// ReadLimit is the maximum message size in bytes.
	// Default: 64MB.
	ReadLimit int64

	// HandshakeTimeout specifies the duration for the handshake to complete.
	// Default: 0 (no timeout).
	HandshakeTimeout time.Duration

	// WriteBufferPool is an optional pool for write buffers. When set,
	// write buffers are obtained from the pool before each write and
	// returned after flush, reducing memory for idle connections.
	// If nil, each connection allocates its own permanent write buffer.
	WriteBufferPool BufferPool

	// EnableCompression enables permessage-deflate compression (RFC 7692).
	// When enabled, the server negotiates compression during the upgrade
	// handshake. Messages are compressed transparently.
	EnableCompression bool

	// CompressionLevel controls the deflate compression level.
	// Valid range: -2 (Huffman only) to 9 (best compression).
	// Default: 1 (best speed). Use [CompressionLevelDefault] for the
	// flate library default (-1).
	CompressionLevel int

	// CompressionThreshold is the minimum payload size in bytes for
	// compression. Messages smaller than this are sent uncompressed.
	// Default: 128.
	CompressionThreshold int

	// IdleTimeout is the maximum time between messages before the connection
	// is closed. When set, the next read deadline is extended after each
	// successful frame read. On the std (hijack) path this is enforced via
	// net.Conn.SetReadDeadline; on native engines (epoll/io_uring) it is
	// enforced via the engine's idle sweep using SetWSIdleDeadline.
	// Zero means no idle timeout.
	IdleTimeout time.Duration

	// MaxBackpressureBuffer is the maximum number of inbound chunks
	// buffered between the engine event loop and the WebSocket handler
	// goroutine on the engine-integrated path. When the buffer fills past
	// BackpressureHighPct, the engine pauses inbound delivery for this
	// connection (TCP-level backpressure); when it drains below
	// BackpressureLowPct, delivery is resumed.
	// Default: 256. Ignored on the std (hijack) engine path.
	MaxBackpressureBuffer int

	// BackpressureHighPct is the buffer fill percentage (0-100) at which
	// the engine is asked to pause inbound delivery. Default: 75.
	BackpressureHighPct int

	// BackpressureLowPct is the buffer fill percentage (0-100) at which
	// the engine is asked to resume inbound delivery after a pause. Must
	// be lower than BackpressureHighPct or it falls back to the default
	// (25). Default: 25.
	BackpressureLowPct int

	// OnConnect is called after upgrade succeeds, before Handler.
	// If it returns a non-nil error, the connection is closed.
	OnConnect func(*Conn) error

	// OnDisconnect is called after the Handler returns.
	OnDisconnect func(*Conn)
}

Config defines the WebSocket middleware configuration.

Example (Backpressure)

Tune the engine-integrated backpressure watermarks. Default is 75% pause / 25% resume on a 256-chunk buffer; raise the pause point when the handler is bursty but generally fast.

package main

import (
	"fmt"

	"github.com/goceleris/celeris/middleware/websocket"
)

func main() {
	cfg := websocket.Config{
		MaxBackpressureBuffer: 1024,
		BackpressureHighPct:   90,
		BackpressureLowPct:    50,
		Handler: func(c *websocket.Conn) {
			_ = c
		},
	}
	_ = websocket.New(cfg)
	fmt.Println("ok")
}
Output:
ok
Example (LifecycleHooks)

Use OnConnect to authenticate and OnDisconnect to release resources.

package main

import (
	"fmt"

	"github.com/goceleris/celeris/middleware/websocket"
)

func main() {
	cfg := websocket.Config{
		OnConnect: func(c *websocket.Conn) error {
			// Reject unauthenticated peers — return non-nil to close the
			// connection before the Handler runs.
			if c.Subprotocol() != "v1.echo" {
				return fmt.Errorf("subprotocol required")
			}
			return nil
		},
		OnDisconnect: func(c *websocket.Conn) {
			// Release per-connection resources (DB handles, channels…).
			_ = c
		},
		Handler: func(c *websocket.Conn) {
			_ = c
		},
	}
	_ = websocket.New(cfg)
	fmt.Println("ok")
}
Output:
ok

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn represents a WebSocket connection. It is safe for one goroutine to read and another to write concurrently, but not for multiple readers or multiple writers.

func (*Conn) BackpressureDropped

func (c *Conn) BackpressureDropped() uint64

BackpressureDropped returns the number of inbound chunks dropped because the engine-path read buffer overflowed despite TCP-level backpressure. Should be 0 in healthy operation. Returns 0 on the std (hijack) path, where backpressure is handled directly by the kernel TCP stack.

func (*Conn) Close

func (c *Conn) Close() error

Close closes the underlying connection.

On the engine path (net.Conn unavailable), Close asks the engine to drop the FD on its next idle sweep by setting the WS idle deadline to the past. This is the server-side TCP close that RFC 6455 §7.1.1 and Autobahn 7.7.x (requireClean=True) require after the close handshake — without it the peer waits indefinitely for the server FIN. The underlying drain order (drainDetachQueue → flush writes → checkTimeouts → closeConn) guarantees any buffered close-frame echo reaches the wire before the FD is shut down.

func (*Conn) CloseHandler

func (c *Conn) CloseHandler() func(code int, text string) error

CloseHandler returns the current close handler.

func (*Conn) Context

func (c *Conn) Context() context.Context

Context returns the connection's context, cancelled when closed.

func (*Conn) EnableWriteCompression

func (c *Conn) EnableWriteCompression(enable bool)

EnableWriteCompression enables or disables write compression for this connection. Compression must have been negotiated during the upgrade handshake; this only controls whether subsequent writes actually compress.

func (*Conn) GracefulClose

func (c *Conn) GracefulClose(code int, text string) error

GracefulClose sends a close frame and waits for the peer's response.

On the hijack path the deadline is enforced via net.Conn.SetReadDeadline. On the engine path (chanReader-backed reads) net.Conn is nil, so a time.AfterFunc closes the engineReader to unblock ReadMessageReuse if the peer never sends its close frame.

func (*Conn) Header

func (c *Conn) Header(key string) string

Header returns a request header captured at upgrade time.

func (*Conn) IP

func (c *Conn) IP() string

IP returns the remote IP address (without port). The result is cached after the first call so per-message log loops don't re-parse RemoteAddr().String() on every iteration.

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

LocalAddr returns the local network address, if known.

func (*Conn) Locals

func (c *Conn) Locals(key string) any

Locals returns a per-connection value. Safe for concurrent use.

func (*Conn) NetConn

func (c *Conn) NetConn() net.Conn

NetConn returns the underlying net.Conn.

func (*Conn) NextReader

func (c *Conn) NextReader() (MessageType, io.Reader, error)

NextReader returns the next data message received. The io.Reader returned reads the message payload across fragmented frames. The reader is valid until the next call to NextReader, ReadMessage, or Close.

Control frames (ping, pong, close) are handled transparently.

For compressed messages, NextReader decompresses the entire message before returning. Use Conn.ReadMessage for the same behavior with a simpler API.

func (*Conn) NextWriter

func (c *Conn) NextWriter(messageType MessageType) (io.WriteCloser, error)

NextWriter returns an io.WriteCloser for sending a message of the given type. The writer sends the message as one or more WebSocket frames. The caller must call Close to complete the message.

Only one writer may be active at a time. Starting a new writer while the previous one is open is not supported.

Control frames (ping/pong/close) can still be sent concurrently while a NextWriter is active — they are not blocked.

If compression is negotiated and enabled, the writer buffers all data and compresses on Close (since permessage-deflate context spans the entire message).

func (*Conn) Param

func (c *Conn) Param(key string) string

Param returns a URL route parameter captured at upgrade time.

func (*Conn) PingHandler

func (c *Conn) PingHandler() func(data []byte) error

PingHandler returns the current ping handler.

func (*Conn) PongHandler

func (c *Conn) PongHandler() func(data []byte) error

PongHandler returns the current pong handler.

func (*Conn) Query

func (c *Conn) Query(key string) string

Query returns a query parameter captured at upgrade time.

func (*Conn) ReadJSON

func (c *Conn) ReadJSON(v any) error

ReadJSON reads the next message and unmarshals it from JSON.

func (*Conn) ReadMessage

func (c *Conn) ReadMessage() (MessageType, []byte, error)

ReadMessage reads the next complete message from the connection. Returns the message type and an owned copy of the payload. The returned slice is safe to retain, pass to other goroutines, or store.

For zero-allocation reads (advanced usage), use Conn.ReadMessageReuse.

func (*Conn) ReadMessageReuse

func (c *Conn) ReadMessageReuse() (MessageType, []byte, error)

ReadMessageReuse reads the next complete message from the connection. The returned byte slice is reused across calls and is only valid until the next call to ReadMessageReuse or ReadMessage.

Use this for zero-allocation reads when you process each message immediately without retaining the slice.

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

RemoteAddr returns the peer's network address.

func (*Conn) SetCloseHandler

func (c *Conn) SetCloseHandler(h func(code int, text string) error)

SetCloseHandler sets the handler for close frames. The default handler echoes the close frame back and returns a CloseError.

func (*Conn) SetCompressionLevel

func (c *Conn) SetCompressionLevel(level int) error

SetCompressionLevel sets the flate compression level for subsequent writes. Valid range: -2 (HuffmanOnly) to 9 (BestCompression), or -1 (DefaultCompression).

func (*Conn) SetLocals

func (c *Conn) SetLocals(key string, val any)

SetLocals stores a per-connection value. Safe for concurrent use.

func (*Conn) SetPingHandler

func (c *Conn) SetPingHandler(h func(data []byte) error)

SetPingHandler sets the handler for ping frames. The default handler replies with a pong containing the same payload.

func (*Conn) SetPongHandler

func (c *Conn) SetPongHandler(h func(data []byte) error)

SetPongHandler sets the handler for pong frames. The default is a no-op.

func (*Conn) SetReadDeadline

func (c *Conn) SetReadDeadline(t time.Time) error

SetReadDeadline sets the deadline for future reads. Returns nil on engine-integrated connections where deadlines are not supported.

func (*Conn) SetReadLimit

func (c *Conn) SetReadLimit(limit int64)

SetReadLimit sets the maximum message size in bytes. The default is 64MB.

func (*Conn) SetWriteDeadline

func (c *Conn) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the deadline for future writes. Returns nil on engine-integrated connections where deadlines are not supported.

func (*Conn) Subprotocol

func (c *Conn) Subprotocol() string

Subprotocol returns the negotiated subprotocol, or "" if none.

func (*Conn) WriteBinary

func (c *Conn) WriteBinary(data []byte) error

WriteBinary writes a binary message.

func (*Conn) WriteControl

func (c *Conn) WriteControl(messageType int, data []byte, deadline time.Time) error

WriteControl sends a control frame with a per-frame deadline. It can be called concurrently with Conn.NextWriter — the channel-based write semaphore is acquired with the deadline, returning ErrWriteTimeout if it cannot be obtained in time.

On the std (hijack) path the deadline is also applied to the underlying net.Conn via SetWriteDeadline so that a slow peer cannot indefinitely block the actual flush. On the engine path the write goes into the engine's per-connection write buffer (cs.writeBuf) and never blocks at the syscall level, so only the lock-acquisition deadline applies.

messageType must be a control opcode (OpClose, OpPing, OpPong). data must be <= 125 bytes.

func (*Conn) WriteJSON

func (c *Conn) WriteJSON(v any) error

WriteJSON marshals v as JSON and writes it as a text message.

func (*Conn) WriteMessage

func (c *Conn) WriteMessage(messageType MessageType, data []byte) error

WriteMessage writes a complete message to the connection. If compression is negotiated and the payload exceeds the compression threshold, the message is compressed transparently. Compression runs before the write lock is taken so that ping/pong control frames can interleave with large compressed data writes.

func (*Conn) WritePing

func (c *Conn) WritePing(data []byte) error

WritePing sends a ping control frame. The payload must be <= 125 bytes. Use this with Conn.SetPongHandler to implement keepalive.

func (*Conn) WritePreparedMessage

func (c *Conn) WritePreparedMessage(pm *PreparedMessage) error

WritePreparedMessage sends a pre-encoded message. This is efficient for broadcasting the same message to many connections — the frame is encoded once and reused.

func (*Conn) WriteText

func (c *Conn) WriteText(data []byte) error

WriteText writes a text message.

type Handler

type Handler func(*Conn)

Handler is called with the upgraded WebSocket connection. The function should block until the connection is done. When Handler returns, the connection is closed automatically.

type Hub added in v1.4.2

type Hub struct {
	// contains filtered or unexported fields
}

Hub is the connection-set abstraction for WebSocket fan-out: register connections, broadcast to all (or a filtered subset), unregister on disconnect. Uses PreparedMessage under the hood so the wire frame is built once per Broadcast regardless of the Conn count.

The internal mutex is an sync.RWMutex. Register / unregister / Close take the write lock; broadcasts only take the read lock for the snapshot, so register-while-broadcast does not serialise. Per-Conn writes happen outside the lock.

Safe for concurrent use from any number of publishers and from any number of Register / unregister call sites.

Example

ExampleHub is the classic chat-room handler: every message a client sends is broadcast to every other connected client. The Hub takes care of registration, fan-out, slow-conn handling, and close.

package main

import (
	"fmt"

	"github.com/goceleris/celeris"
	"github.com/goceleris/celeris/middleware/websocket"
)

func main() {
	hub := websocket.NewHub(websocket.HubConfig{})

	s := celeris.New(celeris.Config{})
	s.GET("/chat", websocket.New(websocket.Config{
		Handler: func(c *websocket.Conn) {
			unregister := hub.Register(c)
			defer unregister()
			for {
				mt, msg, err := c.ReadMessage()
				if err != nil {
					return
				}
				_, _ = hub.Broadcast(mt, msg)
			}
		},
	}))

	fmt.Println("chat hub installed at /chat")
}
Output:
chat hub installed at /chat

func NewHub added in v1.4.2

func NewHub(cfg HubConfig) *Hub

NewHub constructs a Hub with the given config.

func (*Hub) Broadcast added in v1.4.2

func (h *Hub) Broadcast(messageType MessageType, data []byte) (delivered int, err error)

Broadcast builds a PreparedMessage from messageType+data and dispatches it to every registered Conn. Returns the count of Conns the message reached and the first per-Conn error encountered (if any — failures are routed through HubConfig.OnSlowConn).

Ordering: each individual Conn's wire writes are serialised by Conn's own write semaphore, so a single Broadcast's frame arrives at every Conn intact. Across calls to Broadcast there is no cross-Conn ordering guarantee — two parallel publishers may interleave on different Conns.

func (*Hub) BroadcastFilter added in v1.4.2

func (h *Hub) BroadcastFilter(messageType MessageType, data []byte, pred func(*Conn) bool) (int, error)

BroadcastFilter sends only to Conns where pred returns true. Common for room / channel routing — filter on Conn.Locals without building a second Hub.

The membership snapshot happens under the Hub's read lock; pred is invoked LOCK-FREE against that snapshot. Parallel Register / unregister calls during dispatch are not observed by this broadcast.

func (*Hub) BroadcastPrepared added in v1.4.2

func (h *Hub) BroadcastPrepared(pm *PreparedMessage) (int, error)

BroadcastPrepared dispatches an already-prepared message — useful in dispatch loops where the same payload is published repeatedly.

func (*Hub) Close added in v1.4.2

func (h *Hub) Close()

Close unregisters every Conn and force-closes the underlying connection (via Conn.Close, NOT via HubConfig.OnSlowConn — shutdown is unconditional). Blocks subsequent Register calls. Idempotent.

Ordering guarantee: any in-flight Broadcast / BroadcastPrepared / BroadcastFilter that already snapshotted the conn set runs to completion before Close returns. Subsequent broadcasts return (delivered=0, err=nil) without dispatching. This makes Close safe to call from a shutdown path that needs to know "no more wire writes will happen after this returns".

Concurrency: per-Conn Close calls fan out under the same HubConfig.MaxConcurrency cap that gates Broadcast (default DefaultHubConcurrency, i.e. GOMAXPROCS*4; negative opts out). The semaphore is acquired INSIDE each goroutine, so the fan-out always spawns N goroutines that then queue on the sema. The alternative — outside-goroutine acquire — would gate spawn on the slowest Close, costing the same wall-clock for the *common* case (every Close returns) but letting a single hung Conn.Close stall the entire spawn loop. Inside-acquire trades a goroutine-burst (up to len(conns) parked on the sema) for resilience to a hung Close — Hub.Close still returns once every per-Conn Close returns, without serialising the rest of the fan-out behind it.

func (*Hub) Len added in v1.4.2

func (h *Hub) Len() int

Len reports the current number of registered Conns.

func (*Hub) Register added in v1.4.2

func (h *Hub) Register(c *Conn) func()

Register adds c to the Hub. Returns an unregister function the caller MUST defer; calling it twice is safe. Registering a Conn on a Hub that has been Close()'d is a no-op — the returned unregister is also a no-op.

type HubConfig added in v1.4.2

type HubConfig struct {
	// OnSlowConn is consulted whenever Broadcast/BroadcastPrepared/
	// BroadcastFilter fails to deliver a message to a specific Conn.
	// The error is the underlying [Conn.WritePreparedMessage] error
	// (commonly [ErrWriteClosed], [ErrWriteTimeout], or a wrapped I/O
	// error). When nil, [HubPolicyClose] is used.
	OnSlowConn func(c *Conn, err error) HubPolicy

	// MaxConcurrency caps the number of in-flight per-Conn writes
	// during a Broadcast. Zero means [DefaultHubConcurrency] —
	// runtime.GOMAXPROCS(0)*4 — which keeps goroutine pressure
	// bounded on very-large fan-outs while still leaving slow conns
	// non-blocking for the rest. Set to a negative value to opt OUT
	// (true unbounded) for benchmarks; set to a positive integer to
	// override the default.
	MaxConcurrency int
}

HubConfig tunes a Hub. All fields are optional.

type HubPolicy added in v1.4.2

type HubPolicy uint8

HubPolicy controls what the Hub does with a Conn whose Conn.WritePreparedMessage failed during a Broadcast.

const (
	// HubPolicyDrop skips the Conn for this message but keeps it
	// registered. Use when transient errors are expected (slow networks
	// where retries from a higher layer make sense).
	HubPolicyDrop HubPolicy = iota

	// HubPolicyRemove unregisters the Conn from the Hub without closing
	// it. The connection's lifecycle stays with whoever owns the Conn.
	HubPolicyRemove

	// HubPolicyClose unregisters the Conn AND closes the underlying
	// connection. Default — matches the implicit behavior of the
	// hand-rolled hub patterns this type replaces.
	HubPolicyClose
)

type MessageType

type MessageType = Opcode

MessageType is the type of a WebSocket message.

type Opcode

type Opcode byte

Opcode represents a WebSocket frame opcode (RFC 6455 Section 5.2).

const (
	// OpContinuation is the continuation frame opcode.
	OpContinuation Opcode = 0x0
	// OpText is the text data frame opcode.
	OpText Opcode = 0x1
	// OpBinary is the binary data frame opcode.
	OpBinary Opcode = 0x2

	// OpClose is the connection close control frame opcode.
	OpClose Opcode = 0x8
	// OpPing is the ping control frame opcode.
	OpPing Opcode = 0x9
	// OpPong is the pong control frame opcode.
	OpPong Opcode = 0xA
)

func (Opcode) IsControl

func (o Opcode) IsControl() bool

IsControl returns true for close, ping, and pong frames.

func (Opcode) IsData

func (o Opcode) IsData() bool

IsData returns true for text, binary, and continuation frames.

type PreparedMessage

type PreparedMessage struct {
	// contains filtered or unexported fields
}

PreparedMessage caches the wire-format encoding of a message for efficient broadcast to multiple connections. Create one with NewPreparedMessage and send it via Conn.WritePreparedMessage.

func NewPreparedMessage

func NewPreparedMessage(messageType MessageType, data []byte) (*PreparedMessage, error)

NewPreparedMessage creates a PreparedMessage from the given payload. The data is copied; the caller retains ownership of the original slice.

messageType MUST be a data opcode (OpText or OpBinary). Passing a control opcode returns ErrInvalidPreparedOpcode — see RFC 6455 §5.5.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL