websocket

package
v1.0.0-alpha.27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: MIT Imports: 21 Imported by: 0

README

WebSocket Output Component

Real-time streaming of NATS messages to WebSocket clients with configurable delivery guarantees.

Purpose

The WebSocket output component runs a WebSocket server that broadcasts incoming NATS messages to connected clients in real-time. It supports multiple concurrent clients with automatic reconnection handling, per-client write timeouts, and configurable delivery modes (at-most-once or at-least-once). Designed for real-time dashboards, monitoring systems, and event visualization.

Configuration

Basic Configuration
type: websocket
ports:
  inputs:
    - name: nats_input
      type: nats
      subject: semantic.>
      required: true
  outputs:
    - name: websocket_server
      type: network
      subject: http://0.0.0.0:8081/ws
Advanced Configuration
type: websocket
delivery_mode: at-least-once
ack_timeout: 5s
ports:
  inputs:
    - name: events
      type: nats
      subject: events.>
    - name: metrics
      type: nats
      subject: metrics.>
  outputs:
    - name: websocket_server
      type: network
      subject: http://0.0.0.0:8081/ws
Configuration Options
Field Type Default Description
delivery_mode string at-most-once Delivery reliability (at-most-once, at-least-once)
ack_timeout duration 5s Timeout for client acknowledgments
ports.inputs[].subject string semantic.> NATS subject patterns to subscribe
ports.outputs[].subject string http://0.0.0.0:8081/ws WebSocket server endpoint

Input/Output Ports

Input Ports

Type: nats

Subscribes to NATS subjects and receives messages for broadcast to WebSocket clients. Multiple input ports are supported for subscribing to different subject patterns.

Example:

inputs:
  - name: graph_updates
    type: nats
    subject: graph.entities.>
  - name: alerts
    type: nats
    subject: alerts.critical
Output Ports

Type: network

Exposes a WebSocket server endpoint that clients connect to for receiving real-time messages. The endpoint is encoded as a URL in the subject field.

Example:

outputs:
  - name: websocket_server
    type: network
    subject: http://0.0.0.0:8081/ws

URL Format: http://<host>:<port><path>

  • host: Binding address (use 0.0.0.0 for all interfaces)
  • port: TCP port (1024-65535)
  • path: WebSocket endpoint path (e.g., /ws, /stream)

Connection Management

Client Lifecycle
flowchart TD
    A[Client Connects] --> B[WebSocket Handshake]
    B --> C[Register Client]
    C --> D[Start Read/Write Goroutines]
    D --> E[Stream Messages]
    E --> F{Client Active?}
    F -->|Yes| E
    F -->|No| G[Client Disconnect]
    G --> H[Cleanup Resources]
    H --> I[Remove from Registry]
Concurrent Client Handling

Each connected client receives:

  1. Read goroutine: Handles ping/pong keepalive and acknowledgment messages
  2. Write goroutine: Forwards NATS messages to the client
  3. Dedicated mutex: Prevents concurrent writes (gorilla/websocket requirement)
  4. Message buffer: Circular buffer for pending acknowledgments (at-least-once mode)
Reconnection Handling

Clients are automatically removed and cleaned up when:

  • Write timeout exceeded (default: 10s)
  • Read timeout exceeded (default: 60s)
  • Ping/pong keepalive fails (interval: 30s)
  • Network errors or connection closes

Reconnection is the client's responsibility. The server accepts new connections at any time.

Delivery Modes

At-Most-Once Delivery

Default mode. Messages are sent without waiting for acknowledgment.

Characteristics:

  • Fire-and-forget semantics
  • Lowest latency
  • No message buffering
  • Client may miss messages if slow or disconnected

Use when: Real-time visualization where occasional message loss is acceptable.

At-Least-Once Delivery

Reliable mode. Messages require client acknowledgment before considered delivered.

Characteristics:

  • Client must send ack or nack messages
  • Pending messages tracked in circular buffer
  • Timeout-based failure detection
  • Future support for retries

Use when: Critical events, alerts, or state updates that must not be lost.

Message Protocol

All messages are wrapped in a MessageEnvelope:

{
  "type": "data",
  "id": "msg-1707654321000-42",
  "timestamp": 1707654321000,
  "payload": { /* your data */ }
}

Client acknowledgment (at-least-once mode):

{
  "type": "ack",
  "id": "msg-1707654321000-42",
  "timestamp": 1707654321100
}

Negative acknowledgment:

{
  "type": "nack",
  "id": "msg-1707654321000-42",
  "timestamp": 1707654321100
}

Backpressure signal:

{
  "type": "slow",
  "timestamp": 1707654321100
}

Example Use Cases

Real-Time Dashboard

Stream knowledge graph updates to a web dashboard for live visualization.

type: websocket
ports:
  inputs:
    - name: graph_updates
      type: nats
      subject: graph.entities.>
    - name: query_results
      type: nats
      subject: graph.queries.>
  outputs:
    - name: dashboard
      type: network
      subject: http://0.0.0.0:8080/dashboard
Live Monitoring

Broadcast system metrics and logs to monitoring clients.

type: websocket
delivery_mode: at-most-once
ports:
  inputs:
    - name: metrics
      type: nats
      subject: metrics.>
    - name: logs
      type: nats
      subject: logs.>
  outputs:
    - name: monitor
      type: network
      subject: http://0.0.0.0:9090/monitor
Critical Alerts

Ensure critical alerts are reliably delivered to connected clients.

type: websocket
delivery_mode: at-least-once
ack_timeout: 10s
ports:
  inputs:
    - name: alerts
      type: nats
      subject: alerts.critical
  outputs:
    - name: alert_stream
      type: network
      subject: http://0.0.0.0:8000/alerts
Event Broadcasting

Broadcast application events to multiple client types.

type: websocket
ports:
  inputs:
    - name: events
      type: nats
      subject: events.>
  outputs:
    - name: event_stream
      type: network
      subject: http://0.0.0.0:8081/events

Client Example

JavaScript WebSocket Client
const ws = new WebSocket('ws://localhost:8081/ws');

ws.onopen = () => {
  console.log('Connected to WebSocket server');
};

ws.onmessage = (event) => {
  const envelope = JSON.parse(event.data);

  if (envelope.type === 'data') {
    console.log('Received:', envelope.payload);

    // Send acknowledgment (at-least-once mode)
    ws.send(JSON.stringify({
      type: 'ack',
      id: envelope.id,
      timestamp: Date.now()
    }));
  }
};

ws.onerror = (error) => {
  console.error('WebSocket error:', error);
};

ws.onclose = () => {
  console.log('Disconnected, reconnecting...');
  setTimeout(() => {
    // Reconnect logic
  }, 1000);
};
Go WebSocket Client
import (
    "encoding/json"
    "log"
    "github.com/gorilla/websocket"
)

type MessageEnvelope struct {
    Type      string          `json:"type"`
    ID        string          `json:"id"`
    Timestamp int64           `json:"timestamp"`
    Payload   json.RawMessage `json:"payload"`
}

func connect() {
    conn, _, err := websocket.DefaultDialer.Dial("ws://localhost:8081/ws", nil)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    for {
        _, data, err := conn.ReadMessage()
        if err != nil {
            log.Printf("Read error: %v", err)
            return
        }

        var envelope MessageEnvelope
        if err := json.Unmarshal(data, &envelope); err != nil {
            log.Printf("Parse error: %v", err)
            continue
        }

        if envelope.Type == "data" {
            log.Printf("Received: %s", envelope.Payload)

            // Send acknowledgment
            ack := map[string]any{
                "type":      "ack",
                "id":        envelope.ID,
                "timestamp": time.Now().UnixMilli(),
            }
            if err := conn.WriteJSON(ack); err != nil {
                log.Printf("Ack error: %v", err)
            }
        }
    }
}

Observability

Health Status
health := output.Health()
// Healthy: true if server accepting connections
// ErrorCount: Write errors across all clients
// Uptime: Time since Start()
Data Flow Metrics
dataFlow := output.DataFlow()
// MessagesPerSecond: Broadcast rate
// BytesPerSecond: Total byte throughput
// ErrorRate: Client error percentage
// LastActivity: Last message timestamp
Prometheus Metrics
Metric Type Description
semstreams_websocket_messages_received_total Counter Messages received from NATS
semstreams_websocket_messages_sent_total Counter Messages sent to clients
semstreams_websocket_bytes_sent_total Counter Total bytes sent
semstreams_websocket_clients_connected Gauge Current active clients
semstreams_websocket_client_connections_total Counter Total connections
semstreams_websocket_client_disconnections_total Counter Total disconnections
semstreams_websocket_broadcast_duration_seconds Histogram Broadcast latency
semstreams_websocket_message_size_bytes Histogram Message size distribution
semstreams_websocket_errors_total Counter Error counts by type
semstreams_websocket_server_uptime_seconds Gauge Server uptime

Performance Characteristics

  • Throughput: 1,000+ messages/second to 100+ concurrent clients
  • Memory: O(clients) + O(pending messages per client)
  • Latency: Sub-millisecond for local clients, ~10ms for network clients
  • Concurrency: One goroutine pair (read/write) per client
Capacity Planning

Message rate: With 100 clients and 1KB messages at 100 msg/s broadcast rate:

  • Network bandwidth: ~10 MB/s
  • Memory overhead: ~100 MB (1MB buffer per client)
  • Goroutines: ~200 (2 per client)

Security Considerations

Production Deployment

Use a reverse proxy (nginx, Caddy) for:

  • TLS/SSL termination
  • Authentication/authorization
  • Rate limiting
  • CORS policy enforcement

Example nginx configuration:

location /ws {
    proxy_pass http://localhost:8081;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
}
TLS Support

Enable TLS at the platform level via security configuration:

security:
  tls:
    server:
      enabled: true
      mode: manual
      cert_file: /path/to/cert.pem
      key_file: /path/to/key.pem

Or use ACME for automatic certificate management:

security:
  tls:
    server:
      enabled: true
      mode: acme
      acme:
        enabled: true
        email: admin@example.com
        domains:
          - websocket.example.com

Limitations

Current version limitations:

  • No per-client subject filtering (all clients receive all messages)
  • No client-to-server message handling (server push only)
  • No message compression (gzip/deflate)
  • No built-in authentication (use reverse proxy)
  • Retry logic not yet implemented for at-least-once mode

Thread Safety

The component is fully thread-safe:

  • Per-client write mutex prevents concurrent writes (gorilla/websocket requirement)
  • Client map protected by sync.RWMutex
  • Atomic operations for metrics counters
  • sync.Once for cleanup operations
  • sync.WaitGroup for goroutine lifecycle tracking

Documentation

Overview

Package websocket provides a WebSocket server output component for streaming messages to WebSocket clients.

Overview

The WebSocket output component runs a WebSocket server that streams incoming NATS messages to connected clients in real-time. It supports multiple concurrent clients, automatic reconnection handling, and per-client write timeouts. It implements the StreamKit component interfaces for lifecycle management and observability.

Quick Start

Start a WebSocket server on port 8080:

config := websocket.Config{
    Ports: &component.PortConfig{
        Inputs: []component.PortDefinition{
            {Name: "input", Type: "nats", Subject: "stream.>", Required: true},
        },
    },
    Port: 8080,
    Path: "/ws",
}

rawConfig, _ := json.Marshal(config)
output, err := websocket.NewOutput(rawConfig, deps)

Configuration

The Config struct controls WebSocket server behavior:

  • Port: TCP port to listen on (1024-65535)
  • Path: WebSocket endpoint path (default: "/ws")
  • Subjects: NATS subjects to subscribe to (from Ports config)
  • WriteTimeout: Per-client write timeout (default: 5s)
  • ReadTimeout: Client read timeout (default: 60s)
  • PingInterval: WebSocket ping interval (default: 30s)

Client Management

The server handles multiple concurrent clients with per-client goroutines:

// Each client gets:
// 1. Read goroutine (handle pings/pongs/close)
// 2. Write goroutine (forward NATS messages)
// 3. Dedicated mutex (prevent concurrent writes)
// 4. Message queue (buffer messages if client is slow)

Client lifecycle:

  1. Client connects via WebSocket handshake
  2. Server registers client and starts goroutines
  3. Messages streamed from NATS to client
  4. Client disconnect or error triggers cleanup
  5. Goroutines terminated, resources released

Message Flow

NATS Subject → Message Handler → Fan-Out to All Clients → WebSocket Write
                                          ↓
                                  Per-Client Queue (if slow)

Write Timeouts

Each client write has a configurable timeout to prevent slow clients from blocking:

WriteTimeout: 5 * time.Second

// If client can't receive within timeout:
// 1. Write fails with deadline exceeded
// 2. Client disconnected
// 3. Error logged and counted
// 4. Resources cleaned up

This prevents one slow client from affecting others.

Ping/Pong Keepalive

WebSocket keepalive ensures connection health:

PingInterval: 30 * time.Second

// Server sends ping every 30s
// Client must respond with pong
// If no pong received, connection closed

Lifecycle Management

Proper server lifecycle with graceful shutdown:

// Start WebSocket server
output.Start(ctx)

// Graceful shutdown
output.Stop(5 * time.Second)

During shutdown:

  1. Stop accepting new client connections
  2. Close all existing client connections
  3. Unsubscribe from NATS subjects
  4. Wait for all client goroutines to complete
  5. Close HTTP server

Observability

The component implements component.Discoverable for monitoring:

meta := output.Meta()
// Name: websocket-output
// Type: output
// Description: WebSocket server output

health := output.Health()
// Healthy: true if server accepting connections
// ErrorCount: Write errors across all clients
// Uptime: Time since Start()

dataFlow := output.DataFlow()
// MessagesPerSecond: Broadcast rate
// BytesPerSecond: Total byte throughput
// ErrorRate: Client error percentage

Additional metrics via Prometheus:

  • websocket_clients_total: Total client connections
  • websocket_clients_active: Current active clients
  • websocket_messages_sent_total: Messages sent to clients
  • websocket_errors_total: Error counter

Performance Characteristics

  • Throughput: 1,000+ messages/second to 100+ clients
  • Memory: O(clients) + O(queued messages per client)
  • Latency: Sub-millisecond for local clients
  • Concurrency: One goroutine pair per client

Error Handling

The component uses streamkit/errors for consistent error classification:

  • Invalid config: errs.WrapInvalid (bad port, invalid path)
  • Network errors: errs.WrapTransient (connection failures)
  • Write timeouts: errs.WrapTransient (slow client)
  • Client errors: Logged but don't stop server

Per-client errors don't affect other clients or server health.

Common Use Cases

**Real-Time Dashboard:**

Port: 8080
Path: "/dashboard/stream"
Subjects: ["metrics.>", "events.>"]
WriteTimeout: 5 * time.Second

**Live Monitoring:**

Port: 9090
Path: "/monitor"
Subjects: ["logs.>", "alerts.>"]
PingInterval: 15 * time.Second

**Event Broadcasting:**

Port: 8000
Path: "/events"
Subjects: ["notifications.>"]

Client Example

JavaScript WebSocket client:

const ws = new WebSocket('ws://localhost:8080/ws');

ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log('Received:', data);
};

ws.onopen = () => console.log('Connected');
ws.onclose = () => console.log('Disconnected');
ws.onerror = (error) => console.error('Error:', error);

Thread Safety

The component is fully thread-safe:

  • Per-client write mutex prevents concurrent writes (gorilla/websocket requirement)
  • Client map protected by sync.RWMutex
  • Atomic operations for metrics
  • sync.Once for cleanup operations

Concurrency Patterns

Excellent concurrency management demonstrated:

// Client cleanup with sync.Once
client.closeOnce.Do(func() {
    close(client.send)
    delete(ws.clients, client)
})

// WaitGroup for goroutine tracking
ws.wg.Add(2)  // Read + Write goroutines
defer ws.wg.Done()

// Atomic metrics updates
atomic.AddInt64(&ws.messagesSent, 1)

Testing

The package includes comprehensive test coverage:

  • Unit tests: Config validation, client management
  • Integration tests: Real WebSocket connections
  • Race tests: 100 concurrent clients stress test
  • Leak tests: Goroutine cleanup verification
  • Panic tests: Double-close protection

Run tests:

go test ./output/websocket -v
go test ./output/websocket -race  # Race detector

Limitations

Current version limitations:

  • No authentication/authorization (add reverse proxy)
  • No client message handling (server → client only)
  • No per-client subject filtering
  • No message compression (gzip)
  • No SSL/TLS (use reverse proxy or add to component)

Security Considerations

  • Use reverse proxy (nginx, Caddy) for TLS termination
  • Implement authentication at reverse proxy level
  • Rate limit client connections (use firewall or proxy)
  • Validate client origin headers (CORS)
  • Monitor client connection patterns

Production Deployment

Recommended production setup:

┌─────────┐    HTTPS    ┌───────┐    WS     ┌──────────┐
│ Clients │ ─────────→ │ nginx │ ─────────→ │ StreamKit│
└─────────┘   (TLS)     └───────┘  (Local)   └──────────┘
                          │
                    Auth, TLS,
                    Rate Limiting

nginx configuration example:

location /ws {
    proxy_pass http://localhost:8080;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header Host $host;
}

Example: Complete Configuration

{
  "ports": {
    "inputs": [
      {"name": "stream", "type": "nats", "subject": "events.>", "required": true}
    ]
  },
  "port": 8080,
  "path": "/ws",
  "write_timeout": "5s",
  "read_timeout": "60s",
  "ping_interval": "30s"
}

Comparison with HTTP POST Output

**WebSocket Output:**

  • Server push (real-time streaming)
  • Persistent connections
  • Multiple clients
  • Lower latency
  • Server must run continuously

**HTTP POST Output:**

  • Client pull (request/response)
  • One request per message
  • Single endpoint
  • Higher latency
  • Stateless

Use WebSocket for real-time dashboards, use HTTP POST for webhooks.

Package websocket provides WebSocket output component for sending data to external systems

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateOutput

func CreateOutput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateOutput creates a WebSocket output component following service pattern

func Register

func Register(registry *component.Registry) error

Register registers the WebSocket output component with the given registry

Types

type Config

type Config struct {
	// Port configuration for inputs and outputs
	Ports *component.PortConfig `json:"ports"                   schema:"type:ports,description:Port configuration,category:basic"`
	// DeliveryMode specifies reliability semantics (at-most-once or at-least-once)
	DeliveryMode DeliveryMode `json:"delivery_mode,omitempty" schema:"type:string,description:Delivery reliability mode,category:advanced"`
	// AckTimeout specifies how long to wait for ack before considering message lost
	AckTimeout string `json:"ack_timeout,omitempty"   schema:"type:string,description:Acknowledgment timeout (e.g. 5s),category:advanced"`
}

Config holds configuration for WebSocket output component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration for WebSocket output

type ConstructorConfig

type ConstructorConfig struct {
	Name            string                     // Component name (empty = auto-generate)
	Port            int                        // WebSocket server port
	Path            string                     // WebSocket endpoint path
	Subjects        []string                   // NATS subjects to subscribe to
	InputPorts      []component.PortDefinition // Full port definitions (type, stream, consumer config)
	NATSClient      *natsclient.Client         // NATS client for messaging
	MetricsRegistry *metric.MetricsRegistry    // Optional Prometheus metrics registry
	Logger          *slog.Logger               // Optional logger (nil = use default)
	Security        security.Config            // Security configuration
	DeliveryMode    DeliveryMode               // Reliability semantics
	AckTimeout      time.Duration              // Acknowledgment timeout for at-least-once
}

ConstructorConfig holds all configuration needed to construct an Output instance

func DefaultConstructorConfig

func DefaultConstructorConfig() ConstructorConfig

DefaultConstructorConfig returns sensible defaults for Output construction

type DeliveryMode

type DeliveryMode string

DeliveryMode defines the reliability semantics for message delivery

const (
	// DeliveryAtMostOnce sends messages without waiting for ack (fire-and-forget)
	DeliveryAtMostOnce DeliveryMode = "at-most-once"
	// DeliveryAtLeastOnce waits for ack and retries on failure
	DeliveryAtLeastOnce DeliveryMode = "at-least-once"
)

type MessageEnvelope

type MessageEnvelope struct {
	Type      string          `json:"type"`              // Message type
	ID        string          `json:"id"`                // Unique message ID (for correlation)
	Timestamp int64           `json:"timestamp"`         // Unix milliseconds
	Payload   json.RawMessage `json:"payload,omitempty"` // Optional payload
}

MessageEnvelope wraps all WebSocket messages with type discrimination This matches the protocol defined in input/websocket_input Supported types:

  • "data": Application data from NATS
  • "ack": Acknowledge successful receipt/processing of data message
  • "nack": Negative acknowledgment (processing failed, may retry)
  • "slow": Backpressure signal indicating receiver is overloaded

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds Prometheus metrics for Output component

type Output

type Output struct {
	// contains filtered or unexported fields
}

Output implements a WebSocket server that broadcasts NATS messages to connected clients This is designed for real-time visualization of graph updates and entity state changes

func NewOutput

func NewOutput(port int, path string, subjects []string, natsClient *natsclient.Client) *Output

NewOutput creates a new WebSocket output component with minimal configuration. For more control over configuration, use NewOutputFromConfig().

func NewOutputFromConfig

func NewOutputFromConfig(cfg ConstructorConfig) *Output

NewOutputFromConfig creates a new WebSocket output component from ConstructorConfig. This is the recommended way to create Output instances with full configuration control.

func (*Output) ConfigSchema

func (w *Output) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema for this component References the package-level websocketSchema variable for efficient retrieval

func (*Output) DataFlow

func (w *Output) DataFlow() component.FlowMetrics

DataFlow returns the current data flow metrics

func (*Output) Health

func (w *Output) Health() component.HealthStatus

Health returns the current health status of the component

func (*Output) Initialize

func (w *Output) Initialize() error

Initialize prepares the WebSocket output component but does not start the server

func (*Output) InputPorts

func (w *Output) InputPorts() []component.Port

InputPorts returns the input ports for this component

func (*Output) Meta

func (w *Output) Meta() component.Metadata

Meta returns the component metadata

func (*Output) OutputPorts

func (w *Output) OutputPorts() []component.Port

OutputPorts returns the output ports for this component

func (*Output) Start

func (w *Output) Start(ctx context.Context) error

Start begins the WebSocket server and NATS subscription

func (*Output) Stop

func (w *Output) Stop(timeout time.Duration) error

Stop gracefully stops the WebSocket server and closes all connections

type PendingMessage

type PendingMessage struct {
	ID      string    // Unique message ID for correlation
	Data    []byte    // JSON message data (with envelope)
	Subject string    // NATS subject
	SentAt  time.Time // When message was sent
	Retries int       // Number of retry attempts
	AckChan chan bool // Signal channel for ack/nack (true=ack, false=nack)
}

PendingMessage represents a message awaiting acknowledgment

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL