websocketserver

package
v0.41.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

README

Simple WebSocket Server

This package provides a raw WebSocket server implementation that is much simpler than the full VWS server. It's designed for implementing other WebSocket based protocols.

Features

  • VMS Server compatible interface
  • Text/Binary Support: Supports both text and binary frames and a configurable default for sending
  • Configurable Queue Size: Supports configurable message queue size per connection
  • Logger Support: Configurable logger for debugging and monitoring
  • Graceful Shutdown: Proper connection cleanup and graceful shutdown

Architecture

The simple WebSocket server consists of:

  • Listener: Manages WebSocket connections and integrates with EventBus
  • Connection: Handles individual WebSocket connections with async message processing
  • Config: Builder pattern for server configuration
  • Metrics: Optional performance monitoring
Message Processing Architecture
EventBus → AsyncQueueingSubscriber → TransformingSubscriber → Connection → WebSocket Client
                    ↓
            [Async Queue Processing]
            - Configurable queue size
            - Background goroutine
            - Transform pipeline

Each WebSocket connection uses a layered subscriber architecture:

  1. AsyncQueueingSubscriber: Provides async message processing with configurable queue size
  2. TransformingSubscriber: Applies message transforms (if configured)
  3. Connection: Handles WebSocket-specific message formatting and sending

This architecture ensures:

  • Non-blocking: EventBus publishing never blocks due to slow WebSocket clients
  • Efficient: Messages are processed asynchronously in background goroutines
  • Flexible: Transform pipeline allows message modification, filtering, and enrichment
  • Scalable: Each connection has its own queue and processing goroutine

Usage

Basic Setup
package main

import (
    "context"
    "log"
    "net/http"
    
    "github.com/tsarna/vinculum-bus"
    "github.com/tsarna/vinculum/websockets/server"
    "go.uber.org/zap"
)

func main() {
    logger, _ := zap.NewProduction()
    
    // Create EventBus
    eventBus, err := bus.NewEventBus().
        WithLogger(logger).
        Build()
    if err != nil {
        log.Fatal(err)
    }
    eventBus.Start()
    defer eventBus.Stop()
    
    // Create the simple WebSocket listener
    wsListener, err := server.NewServer().
        WithEventBus(eventBus).
        WithLogger(logger).
        WithQueueSize(512).
        WithInitialSubscriptions("system/alerts", "server/status").
        WithSendBinary(false).
        WithReceivedTextTopic("messages/text").
        WithReceivedBinaryTopic("messages/binary").
        WithMeterProvider(meterProvider).
        WithOutboundTransforms(transform.DropTopicPrefix("debug/")).
        WithInboundTransforms(transform.AddField("source", "websocket")).
        Build()
    if err != nil {
        log.Fatal(err)
    }
    
    // Set up HTTP handler
    http.Handle("/ws", wsListener)
    
    // Start HTTP server
    log.Println("Starting server on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}
Publishing Messages

Messages are published through the EventBus, and all connected WebSocket clients will receive them. The message type (text/binary frame) is determined by the fields map:

// Send as text frame using fields["format"] = "text"
err := eventBus.PublishWithFields(ctx, "notifications", "Hello, World!", map[string]string{
    "format": "text",
})

// Send as binary frame using fields["format"] = "binary"  
err := eventBus.PublishWithFields(ctx, "data", []byte{0x01, 0x02, 0x03}, map[string]string{
    "format": "binary",
})

// Send using configured default (no format specified)
err := eventBus.Publish(ctx, "system/alerts", map[string]interface{}{
    "level": "warning",
    "message": "System maintenance in 5 minutes",
})

// Send with unknown format - uses configured default
err := eventBus.PublishWithFields(ctx, "server/status", statusData, map[string]string{
    "format": "json", // Unknown format, uses default
})
Receiving Messages

Messages received from WebSocket clients are automatically published to the EventBus:

// Configure topics for received messages
server := server.NewServer().
    WithReceivedTextTopic("client/text").      // Text frames → "client/text" topic
    WithReceivedBinaryTopic("client/binary").  // Binary frames → "client/binary" topic
    Build()

// To disable receiving certain frame types, set topic to empty string
server := server.NewServer().
    WithReceivedTextTopic("client/messages").  // Text frames → "client/messages"
    WithReceivedBinaryTopic("").               // Binary frames discarded
    Build()

// Subscribe to received messages
eventBus.Subscribe(ctx, "client/text", mySubscriber)
eventBus.Subscribe(ctx, "client/binary", mySubscriber)
Message Handling
  • EventBus Integration: Each WebSocket connection is automatically subscribed to topics configured via WithInitialSubscriptions()

  • Frame Type Determination: Message type is determined by fields["format"]:

    • fields["format"] == "text": Sent as WebSocket text frame
    • fields["format"] == "binary": Sent as WebSocket binary frame
    • fields["format"] missing or other value: Uses configured default (WithSendBinary)
  • Message Types:

    • []byte: Sent as-is
    • string: Converted to []byte and sent
    • Other types: Marshalled to JSON and sent
  • Receiving: Messages from clients are published to the EventBus

    • Text frames published to configured text topic (default: "text")
    • Binary frames published to configured binary topic (default: "binary")
    • If topic is empty, messages are discarded
    • Raw frame bytes are published as-is (no JSON parsing)
Configuration Options
  • EventBus: Required for integration with the pub/sub system
  • Logger: Required for debugging and error reporting
  • Queue Size: Buffer size for outbound messages per connection (default: 256)
  • Initial Subscriptions: Topics to automatically subscribe connections to
  • Send Binary: Default frame type when fields["format"] is missing (default: false = text frames)
  • Received Text Topic: Topic for received text frames (default: "text", empty = discard)
  • Received Binary Topic: Topic for received binary frames (default: "binary", empty = discard)
  • Metrics Provider: Optional metrics collection (default: none)
  • Message Transforms: Optional message transformation functions (default: none)
Message Transforms

The server supports message transformation functions that are applied to outbound messages from the EventBus before sending to WebSocket clients. This uses the same transform system as the full VWS server.

Transform Functions

Transform functions can:

  • Modify message content - Change the payload or topic
  • Drop messages - Return nil to prevent sending
  • Stop processing - Return false to halt the transform pipeline
Built-in Transforms
import "github.com/tsarna/vinculum/transform"

// Drop messages from debug topics
transform.DropTopicPrefix("debug/")

// Drop messages matching MQTT patterns
transform.DropTopicPattern("internal/+/secrets")

// Add metadata to all messages
transform.ModifyPayload(func(ctx context.Context, payload any, fields map[string]string) any {
    return map[string]any{
        "data": payload,
        "timestamp": time.Now().Unix(),
        "server": "simple-websocket",
    }
})

// Transform messages matching specific patterns
transform.TransformOnPattern("sensor/+device/data", func(ctx context.Context, payload any, fields map[string]string) any {
    deviceName := fields["device"]
    return map[string]any{
        "device": deviceName,
        "reading": payload,
        "processed_at": time.Now().Unix(),
    }
})
Example Usage
// Configure multiple transforms
transforms := []transform.MessageTransformFunc{
    transform.DropTopicPrefix("debug/"),
    transform.DropTopicPattern("internal/+/secrets"),
    transform.ModifyPayload(func(ctx context.Context, payload any, fields map[string]string) any {
        return map[string]any{
            "original": payload,
            "timestamp": time.Now().Unix(),
        }
    }),
}

listener := server.NewServer().
    WithEventBus(eventBus).
    WithLogger(logger).
    WithOutboundTransforms(transforms...).
    WithInboundTransforms(inboundTransforms...).
    Build()
Metrics Support

When a MeterProvider is configured, the server automatically tracks:

Connection Metrics
  • websocket.active_connections - Current number of active connections (gauge)
  • websocket.connections - Total connections established (counter)
  • websocket.connection.duration - Connection duration in seconds (histogram)
  • websocket.connection.errors - Connection errors by error.type (counter)
Message Metrics
  • websocket.received.messages - Messages received from clients (counter)
  • websocket.sent.messages - Messages sent to clients (counter)
  • websocket.message.errors - Message processing errors (counter)
  • websocket.message.size - Message size distribution in bytes (histogram)
Example with Metrics
// Create server with OTel metrics
server := server.NewServer().
    WithEventBus(eventBus).
    WithLogger(logger).
    WithMeterProvider(meterProvider).
    Build()
Graceful Shutdown
// Graceful shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err := wsServer.Shutdown(ctx)
if err != nil {
    log.Printf("Shutdown error: %v", err)
}

Limitations

This is a simplified server that intentionally omits many features of the full VWS server:

  • No Subscriptions: Subscribe/Unsubscribe methods are no-ops
  • No Authentication: No event authorization or access control
  • No Metrics: No built-in metrics collection
  • No Message Transforms: No message transformation pipeline
  • No Subscription Controllers: No subscription management
  • No Initial Subscriptions: No automatic subscriptions on connect
  • Limited Topics: Only "text" and "binary" topics supported

When to Use

Use this simple server when you need:

  • Basic WebSocket broadcasting to all clients
  • Simple text/binary message sending
  • Minimal complexity and dependencies
  • No need for subscription management or advanced features

For more advanced use cases, consider using the full VWS server instead.

Documentation

Index

Constants

View Source
const (
	// DefaultQueueSize is the default size for the WebSocket message queue.
	DefaultQueueSize = 256
)

Variables

This section is empty.

Functions

func ProcessWebsocketsServerBlock

func ProcessWebsocketsServerBlock(config *cfg.Config, block *hcl.Block, remainingBody hcl.Body) (cfg.Listener, hcl.Diagnostics)

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config holds the configuration for creating a simple WebSocket server. Use NewServer() to create a new configuration and chain methods to set the required parameters before calling Build().

func NewServer

func NewServer() *Config

NewServer creates a new Config for building a simple WebSocket server. Use the fluent methods to set the required EventBus and Logger, then call Build().

Example:

server, err := websockets.NewServer().
    WithEventBus(eventBus).
    WithLogger(logger).
    WithQueueSize(512).
    WithInitialSubscriptions("system/alerts", "server/status").
    WithSendBinary(true).
    WithReceivedTextTopic("messages/text").
    WithReceivedBinaryTopic("messages/binary").
    WithMeterProvider(meterProvider).
    WithOutboundTransforms(transform.DropTopicPrefix("debug/")).
    Build()

func (*Config) Build

func (c *Config) Build() (*Listener, error)

Build creates a new simple WebSocket server from the configuration. Returns an error if the configuration is invalid (missing EventBus or Logger).

func (*Config) IsValid

func (c *Config) IsValid() error

IsValid checks if the configuration has all required parameters set. Returns nil if the configuration is valid, or an error describing what's missing.

func (*Config) WithEventBus

func (c *Config) WithEventBus(eventBus bus.EventBus) *Config

WithEventBus sets the EventBus for the WebSocket server. The EventBus is required for integrating WebSocket connections with the pub/sub system.

func (*Config) WithInboundTransforms

func (c *Config) WithInboundTransforms(transforms ...transform.MessageTransformFunc) *Config

WithInboundTransforms sets the message transformation functions that will be applied to inbound messages from WebSocket clients before publishing to the EventBus. These transforms use the transform.MessageTransformFunc type and operate on EventBusMessage created from the WebSocket message.

Transform functions are called in the order they are provided and can:

  • Modify message content (topic, payload, fields)
  • Drop messages (return nil message)
  • Stop the transform pipeline (return false)

Example:

transforms := []transform.MessageTransformFunc{
    transform.FilterByTopicPrefix("allowed/"),
    transform.AddField("source", "websocket"),
    transform.ValidatePayload(),
}
config.WithInboundTransforms(transforms...)

Default: No transforms (messages published as-is)

func (*Config) WithInitialSubscriptions

func (c *Config) WithInitialSubscriptions(topics ...string) *Config

WithInitialSubscriptions sets the topic patterns that new WebSocket connections should be automatically subscribed to when they connect. These subscriptions happen automatically without client request.

This is useful for:

  • Pushing server-side events to all clients
  • Providing default subscriptions for convenience
  • Broadcasting system notifications

Example:

config.WithInitialSubscriptions("system/alerts", "server/status")

Default: No initial subscriptions

func (*Config) WithLogger

func (c *Config) WithLogger(logger *zap.Logger) *Config

WithLogger sets the Logger for the WebSocket server. The Logger is required for connection events, errors, and debugging.

func (*Config) WithMeterProvider added in v0.24.0

func (c *Config) WithMeterProvider(provider metric.MeterProvider) *Config

WithMeterProvider sets the OTel MeterProvider for the WebSocket server. The MeterProvider is optional and enables collection of WebSocket server metrics such as connection counts, message rates, error rates, and connection durations.

If not provided, no metrics will be collected.

func (*Config) WithOutboundTransforms

func (c *Config) WithOutboundTransforms(transforms ...transform.MessageTransformFunc) *Config

WithOutboundTransforms sets the message transformation functions that will be applied to outbound messages from the EventBus before sending to WebSocket clients. These transforms use the transform.MessageTransformFunc type and operate on EventBusMessage.

Transform functions are called in the order they are provided and can:

  • Modify message content
  • Drop messages (return nil)
  • Stop the transform pipeline (return false)

Example:

transforms := []transform.MessageTransformFunc{
    transform.DropTopicPrefix("debug/"),
    transform.ModifyPayload(func(ctx context.Context, payload any, fields map[string]string) any {
        return map[string]any{
            "data": payload,
            "timestamp": time.Now().Unix(),
        }
    }),
}
config.WithOutboundTransforms(transforms...)

Default: No transforms (messages sent as-is)

func (*Config) WithQueueSize

func (c *Config) WithQueueSize(size int) *Config

WithQueueSize sets the message queue size for WebSocket connections. This controls how many messages can be buffered per connection before messages start getting dropped. Larger values handle bursts better but use more memory. Must be positive.

Default: 256 messages per connection

func (*Config) WithReceivedBinaryTopic

func (c *Config) WithReceivedBinaryTopic(topic string) *Config

WithReceivedBinaryTopic sets the topic that received WebSocket binary frames will be published to on the EventBus. If set to an empty string, binary frames will be discarded.

Default: "binary"

func (*Config) WithReceivedTextTopic

func (c *Config) WithReceivedTextTopic(topic string) *Config

WithReceivedTextTopic sets the topic that received WebSocket text frames will be published to on the EventBus. If set to an empty string, text frames will be discarded.

Default: "text"

func (*Config) WithSendBinary

func (c *Config) WithSendBinary(binary bool) *Config

WithSendBinary sets the default message type for WebSocket frames when the fields map doesn't contain a "format" key.

Message type is determined by:

  • fields["format"] == "text": Send as text frame
  • fields["format"] == "binary": Send as binary frame
  • fields["format"] missing or other value: Use this default

Default: false (send as text frames)

type Connection

type Connection struct {

	// Async subscriber wrapper for handling outbound messages
	AsyncSubscriber *subutils.AsyncQueueingSubscriber
	// contains filtered or unexported fields
}

Connection represents an individual WebSocket connection for the simple server. It handles reading messages from the client and sending messages to the client.

func (*Connection) Close

func (c *Connection) Close()

Close closes the connection.

func (*Connection) OnEvent

func (c *Connection) OnEvent(ctx context.Context, topic string, payload any, fields map[string]string) error

OnEvent is called when an event is published to a topic this connection is subscribed to. This method forwards the event to the WebSocket client. Message type is determined by fields["format"]:

  • "text": Send as text frame
  • "binary": Send as binary frame
  • missing/other: Use configured default (sendBinary)

func (*Connection) OnSubscribe

func (c *Connection) OnSubscribe(ctx context.Context, topic string) error

func (*Connection) OnUnsubscribe

func (c *Connection) OnUnsubscribe(ctx context.Context, topic string) error

func (*Connection) PassThrough

func (c *Connection) PassThrough(msg bus.EventBusMessage) error

func (*Connection) Start

func (c *Connection) Start(listener *Listener)

Start begins handling the WebSocket connection.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener implements a simplified WebSocket server similar to the VWS server interface. Unlike the full VWS server, this server just deals in raw messages, and has no protocol for subscribe/unsubscribe.

func (*Listener) ConnectionCount

func (s *Listener) ConnectionCount() int

ConnectionCount returns the current number of active WebSocket connections.

func (*Listener) ServeHTTP

func (s *Listener) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP handles incoming HTTP requests and upgrades them to WebSocket connections.

func (*Listener) Shutdown

func (s *Listener) Shutdown(ctx context.Context) error

Shutdown gracefully closes all active WebSocket connections and stops accepting new ones. This method should be called when the server is shutting down to ensure proper cleanup.

type WebSocketMetrics

type WebSocketMetrics struct {
	// contains filtered or unexported fields
}

WebSocketMetrics defines the metrics collected by the simple WebSocket server. This is a simplified version of the VWS server metrics, focusing on the most important connection and message metrics.

func NewWebSocketMetrics

func NewWebSocketMetrics(mp metric.MeterProvider) *WebSocketMetrics

NewWebSocketMetrics creates a new WebSocketMetrics instance using the provided MeterProvider. If the provider is nil, returns nil (no metrics will be collected).

func (*WebSocketMetrics) RecordConnectionActive

func (m *WebSocketMetrics) RecordConnectionActive(ctx context.Context, count int)

RecordConnectionActive updates the active connection count.

func (*WebSocketMetrics) RecordConnectionEnd

func (m *WebSocketMetrics) RecordConnectionEnd(ctx context.Context, duration time.Duration)

RecordConnectionEnd records when a WebSocket connection ends and its duration.

func (*WebSocketMetrics) RecordConnectionError

func (m *WebSocketMetrics) RecordConnectionError(ctx context.Context, errorType string)

RecordConnectionError records connection-level errors (upgrade failures, etc.).

func (*WebSocketMetrics) RecordConnectionStart

func (m *WebSocketMetrics) RecordConnectionStart(ctx context.Context)

RecordConnectionStart records when a new WebSocket connection is established.

func (*WebSocketMetrics) RecordMessageError

func (m *WebSocketMetrics) RecordMessageError(ctx context.Context, errorType string, messageType string)

RecordMessageError records message processing errors.

func (*WebSocketMetrics) RecordMessageReceived

func (m *WebSocketMetrics) RecordMessageReceived(ctx context.Context, sizeBytes int, messageType string)

RecordMessageReceived records when a message is received from a client.

func (*WebSocketMetrics) RecordMessageSent

func (m *WebSocketMetrics) RecordMessageSent(ctx context.Context, sizeBytes int, messageType string)

RecordMessageSent records when a message is sent to a client.

type WebsocketServer

type WebsocketServer struct {
	cfg.BaseServer
	Listener *Listener
}

func (*WebsocketServer) GetHandler

func (s *WebsocketServer) GetHandler() http.Handler

type WebsocketsServerDefinition

type WebsocketsServerDefinition struct {
	Bus                  hcl.Expression `hcl:"bus"`
	QueueSize            *int           `hcl:"queue_size,optional"`
	PingInterval         hcl.Expression `hcl:"ping_interval,optional"`
	WriteTimeout         hcl.Expression `hcl:"write_timeout,optional"`
	InitialSubscriptions []string       `hcl:"initial_subscriptions,optional"`
	OutboundTransforms   hcl.Expression `hcl:"outbound_transforms,optional"`
	InboundTransforms    hcl.Expression `hcl:"inbound_transforms,optional"`
	DefRange             hcl.Range      `hcl:",def_range"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL