Documentation
¶
Overview ¶
Package websocket provides a WebSocket server output component for streaming messages to WebSocket clients.
Overview ¶
The WebSocket output component runs a WebSocket server that streams incoming NATS messages to connected clients in real-time. It supports multiple concurrent clients, automatic reconnection handling, and per-client write timeouts. It implements the StreamKit component interfaces for lifecycle management and observability.
Quick Start ¶
Start a WebSocket server on port 8080:
config := websocket.Config{
Ports: &component.PortConfig{
Inputs: []component.PortDefinition{
{Name: "input", Type: "nats", Subject: "stream.>", Required: true},
},
},
Port: 8080,
Path: "/ws",
}
rawConfig, _ := json.Marshal(config)
output, err := websocket.NewOutput(rawConfig, deps)
Configuration ¶
The Config struct controls WebSocket server behavior:
- Port: TCP port to listen on (1024-65535)
- Path: WebSocket endpoint path (default: "/ws")
- Subjects: NATS subjects to subscribe to (from Ports config)
- WriteTimeout: Per-client write timeout (default: 5s)
- ReadTimeout: Client read timeout (default: 60s)
- PingInterval: WebSocket ping interval (default: 30s)
Client Management ¶
The server handles multiple concurrent clients with per-client goroutines:
// Each client gets: // 1. Read goroutine (handle pings/pongs/close) // 2. Write goroutine (forward NATS messages) // 3. Dedicated mutex (prevent concurrent writes) // 4. Message queue (buffer messages if client is slow)
Client lifecycle:
- Client connects via WebSocket handshake
- Server registers client and starts goroutines
- Messages streamed from NATS to client
- Client disconnect or error triggers cleanup
- Goroutines terminated, resources released
Message Flow ¶
NATS Subject → Message Handler → Fan-Out to All Clients → WebSocket Write
↓
Per-Client Queue (if slow)
Write Timeouts ¶
Each client write has a configurable timeout to prevent slow clients from blocking:
WriteTimeout: 5 * time.Second // If client can't receive within timeout: // 1. Write fails with deadline exceeded // 2. Client disconnected // 3. Error logged and counted // 4. Resources cleaned up
This prevents one slow client from affecting others.
Ping/Pong Keepalive ¶
WebSocket keepalive ensures connection health:
PingInterval: 30 * time.Second // Server sends ping every 30s // Client must respond with pong // If no pong received, connection closed
Lifecycle Management ¶
Proper server lifecycle with graceful shutdown:
// Start WebSocket server output.Start(ctx) // Graceful shutdown output.Stop(5 * time.Second)
During shutdown:
- Stop accepting new client connections
- Close all existing client connections
- Unsubscribe from NATS subjects
- Wait for all client goroutines to complete
- Close HTTP server
Observability ¶
The component implements component.Discoverable for monitoring:
meta := output.Meta() // Name: websocket-output // Type: output // Description: WebSocket server output health := output.Health() // Healthy: true if server accepting connections // ErrorCount: Write errors across all clients // Uptime: Time since Start() dataFlow := output.DataFlow() // MessagesPerSecond: Broadcast rate // BytesPerSecond: Total byte throughput // ErrorRate: Client error percentage
Additional metrics via Prometheus:
- websocket_clients_total: Total client connections
- websocket_clients_active: Current active clients
- websocket_messages_sent_total: Messages sent to clients
- websocket_errors_total: Error counter
Performance Characteristics ¶
- Throughput: 1,000+ messages/second to 100+ clients
- Memory: O(clients) + O(queued messages per client)
- Latency: Sub-millisecond for local clients
- Concurrency: One goroutine pair per client
Error Handling ¶
The component uses streamkit/errors for consistent error classification:
- Invalid config: errs.WrapInvalid (bad port, invalid path)
- Network errors: errs.WrapTransient (connection failures)
- Write timeouts: errs.WrapTransient (slow client)
- Client errors: Logged but don't stop server
Per-client errors don't affect other clients or server health.
Common Use Cases ¶
**Real-Time Dashboard:**
Port: 8080 Path: "/dashboard/stream" Subjects: ["metrics.>", "events.>"] WriteTimeout: 5 * time.Second
**Live Monitoring:**
Port: 9090 Path: "/monitor" Subjects: ["logs.>", "alerts.>"] PingInterval: 15 * time.Second
**Event Broadcasting:**
Port: 8000 Path: "/events" Subjects: ["notifications.>"]
Client Example ¶
JavaScript WebSocket client:
const ws = new WebSocket('ws://localhost:8080/ws');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received:', data);
};
ws.onopen = () => console.log('Connected');
ws.onclose = () => console.log('Disconnected');
ws.onerror = (error) => console.error('Error:', error);
Thread Safety ¶
The component is fully thread-safe:
- Per-client write mutex prevents concurrent writes (gorilla/websocket requirement)
- Client map protected by sync.RWMutex
- Atomic operations for metrics
- sync.Once for cleanup operations
Concurrency Patterns ¶
Excellent concurrency management demonstrated:
// Client cleanup with sync.Once
client.closeOnce.Do(func() {
close(client.send)
delete(ws.clients, client)
})
// WaitGroup for goroutine tracking
ws.wg.Add(2) // Read + Write goroutines
defer ws.wg.Done()
// Atomic metrics updates
atomic.AddInt64(&ws.messagesSent, 1)
Testing ¶
The package includes comprehensive test coverage:
- Unit tests: Config validation, client management
- Integration tests: Real WebSocket connections
- Race tests: 100 concurrent clients stress test
- Leak tests: Goroutine cleanup verification
- Panic tests: Double-close protection
Run tests:
go test ./output/websocket -v go test ./output/websocket -race # Race detector
Limitations ¶
Current version limitations:
- No authentication/authorization (add reverse proxy)
- No client message handling (server → client only)
- No per-client subject filtering
- No message compression (gzip)
- No SSL/TLS (use reverse proxy or add to component)
Security Considerations ¶
- Use reverse proxy (nginx, Caddy) for TLS termination
- Implement authentication at reverse proxy level
- Rate limit client connections (use firewall or proxy)
- Validate client origin headers (CORS)
- Monitor client connection patterns
Production Deployment ¶
Recommended production setup:
┌─────────┐ HTTPS ┌───────┐ WS ┌──────────┐
│ Clients │ ─────────→ │ nginx │ ─────────→ │ StreamKit│
└─────────┘ (TLS) └───────┘ (Local) └──────────┘
│
Auth, TLS,
Rate Limiting
nginx configuration example:
location /ws {
proxy_pass http://localhost:8080;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
Example: Complete Configuration ¶
{
"ports": {
"inputs": [
{"name": "stream", "type": "nats", "subject": "events.>", "required": true}
]
},
"port": 8080,
"path": "/ws",
"write_timeout": "5s",
"read_timeout": "60s",
"ping_interval": "30s"
}
Comparison with HTTP POST Output ¶
**WebSocket Output:**
- Server push (real-time streaming)
- Persistent connections
- Multiple clients
- Lower latency
- Server must run continuously
**HTTP POST Output:**
- Client pull (request/response)
- One request per message
- Single endpoint
- Higher latency
- Stateless
Use WebSocket for real-time dashboards, use HTTP POST for webhooks.
Package websocket provides WebSocket output component for sending data to external systems
Index ¶
- func CreateOutput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type Config
- type ConstructorConfig
- type DeliveryMode
- type MessageEnvelope
- type Metrics
- type Output
- func (w *Output) ConfigSchema() component.ConfigSchema
- func (w *Output) DataFlow() component.FlowMetrics
- func (w *Output) Health() component.HealthStatus
- func (w *Output) Initialize() error
- func (w *Output) InputPorts() []component.Port
- func (w *Output) Meta() component.Metadata
- func (w *Output) OutputPorts() []component.Port
- func (w *Output) Start(ctx context.Context) error
- func (w *Output) Stop(timeout time.Duration) error
- type PendingMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateOutput ¶
func CreateOutput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
CreateOutput creates a WebSocket output component following service pattern
Types ¶
type Config ¶
type Config struct {
// Port configuration for inputs and outputs
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
// DeliveryMode specifies reliability semantics (at-most-once or at-least-once)
DeliveryMode DeliveryMode `json:"delivery_mode,omitempty" schema:"type:string,description:Delivery reliability mode,category:advanced"`
// AckTimeout specifies how long to wait for ack before considering message lost
AckTimeout string `json:"ack_timeout,omitempty" schema:"type:string,description:Acknowledgment timeout (e.g. 5s),category:advanced"`
}
Config holds configuration for WebSocket output component
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns the default configuration for WebSocket output
type ConstructorConfig ¶
type ConstructorConfig struct {
Name string // Component name (empty = auto-generate)
Port int // WebSocket server port
Path string // WebSocket endpoint path
Subjects []string // NATS subjects to subscribe to
NATSClient *natsclient.Client // NATS client for messaging
MetricsRegistry *metric.MetricsRegistry // Optional Prometheus metrics registry
Logger *slog.Logger // Optional logger (nil = use default)
Security security.Config // Security configuration
DeliveryMode DeliveryMode // Reliability semantics
AckTimeout time.Duration // Acknowledgment timeout for at-least-once
}
ConstructorConfig holds all configuration needed to construct an Output instance
func DefaultConstructorConfig ¶
func DefaultConstructorConfig() ConstructorConfig
DefaultConstructorConfig returns sensible defaults for Output construction
type DeliveryMode ¶
type DeliveryMode string
DeliveryMode defines the reliability semantics for message delivery
const ( // DeliveryAtMostOnce sends messages without waiting for ack (fire-and-forget) DeliveryAtMostOnce DeliveryMode = "at-most-once" // DeliveryAtLeastOnce waits for ack and retries on failure DeliveryAtLeastOnce DeliveryMode = "at-least-once" )
type MessageEnvelope ¶
type MessageEnvelope struct {
Type string `json:"type"` // Message type
ID string `json:"id"` // Unique message ID (for correlation)
Timestamp int64 `json:"timestamp"` // Unix milliseconds
Payload json.RawMessage `json:"payload,omitempty"` // Optional payload
}
MessageEnvelope wraps all WebSocket messages with type discrimination This matches the protocol defined in input/websocket_input Supported types:
- "data": Application data from NATS
- "ack": Acknowledge successful receipt/processing of data message
- "nack": Negative acknowledgment (processing failed, may retry)
- "slow": Backpressure signal indicating receiver is overloaded
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics holds Prometheus metrics for Output component
type Output ¶
type Output struct {
// contains filtered or unexported fields
}
Output implements a WebSocket server that broadcasts NATS messages to connected clients This is designed for real-time visualization of graph updates and entity state changes
func NewOutput ¶
NewOutput creates a new WebSocket output component with minimal configuration. For more control over configuration, use NewOutputFromConfig().
func NewOutputFromConfig ¶
func NewOutputFromConfig(cfg ConstructorConfig) *Output
NewOutputFromConfig creates a new WebSocket output component from ConstructorConfig. This is the recommended way to create Output instances with full configuration control.
func (*Output) ConfigSchema ¶
func (w *Output) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema for this component References the package-level websocketSchema variable for efficient retrieval
func (*Output) DataFlow ¶
func (w *Output) DataFlow() component.FlowMetrics
DataFlow returns the current data flow metrics
func (*Output) Health ¶
func (w *Output) Health() component.HealthStatus
Health returns the current health status of the component
func (*Output) Initialize ¶
Initialize prepares the WebSocket output component but does not start the server
func (*Output) InputPorts ¶
InputPorts returns the input ports for this component
func (*Output) OutputPorts ¶
OutputPorts returns the output ports for this component
type PendingMessage ¶
type PendingMessage struct {
ID string // Unique message ID for correlation
Data []byte // JSON message data (with envelope)
Subject string // NATS subject
SentAt time.Time // When message was sent
Retries int // Number of retry attempts
AckChan chan bool // Signal channel for ack/nack (true=ack, false=nack)
}
PendingMessage represents a message awaiting acknowledgment