Documentation
¶
Index ¶
- Constants
- func ProcessWebsocketsServerBlock(config *cfg.Config, block *hcl.Block, remainingBody hcl.Body) (cfg.Listener, hcl.Diagnostics)
- type Config
- func (c *Config) Build() (*Listener, error)
- func (c *Config) IsValid() error
- func (c *Config) WithEventBus(eventBus bus.EventBus) *Config
- func (c *Config) WithInboundTransforms(transforms ...transform.MessageTransformFunc) *Config
- func (c *Config) WithInitialSubscriptions(topics ...string) *Config
- func (c *Config) WithLogger(logger *zap.Logger) *Config
- func (c *Config) WithMeterProvider(provider metric.MeterProvider) *Config
- func (c *Config) WithOutboundTransforms(transforms ...transform.MessageTransformFunc) *Config
- func (c *Config) WithQueueSize(size int) *Config
- func (c *Config) WithReceivedBinaryTopic(topic string) *Config
- func (c *Config) WithReceivedTextTopic(topic string) *Config
- func (c *Config) WithSendBinary(binary bool) *Config
- type Connection
- func (c *Connection) Close()
- func (c *Connection) OnEvent(ctx context.Context, topic string, payload any, fields map[string]string) error
- func (c *Connection) OnSubscribe(ctx context.Context, topic string) error
- func (c *Connection) OnUnsubscribe(ctx context.Context, topic string) error
- func (c *Connection) PassThrough(msg bus.EventBusMessage) error
- func (c *Connection) Start(listener *Listener)
- type Listener
- type WebSocketMetrics
- func (m *WebSocketMetrics) RecordConnectionActive(ctx context.Context, count int)
- func (m *WebSocketMetrics) RecordConnectionEnd(ctx context.Context, duration time.Duration)
- func (m *WebSocketMetrics) RecordConnectionError(ctx context.Context, errorType string)
- func (m *WebSocketMetrics) RecordConnectionStart(ctx context.Context)
- func (m *WebSocketMetrics) RecordMessageError(ctx context.Context, errorType string, messageType string)
- func (m *WebSocketMetrics) RecordMessageReceived(ctx context.Context, sizeBytes int, messageType string)
- func (m *WebSocketMetrics) RecordMessageSent(ctx context.Context, sizeBytes int, messageType string)
- type WebsocketServer
- type WebsocketsServerDefinition
Constants ¶
const (
// DefaultQueueSize is the default size for the WebSocket message queue.
DefaultQueueSize = 256
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config holds the configuration for creating a simple WebSocket server. Use NewServer() to create a new configuration and chain methods to set the required parameters before calling Build().
func NewServer ¶
func NewServer() *Config
NewServer creates a new Config for building a simple WebSocket server. Use the fluent methods to set the required EventBus and Logger, then call Build().
Example:
server, err := websockets.NewServer().
WithEventBus(eventBus).
WithLogger(logger).
WithQueueSize(512).
WithInitialSubscriptions("system/alerts", "server/status").
WithSendBinary(true).
WithReceivedTextTopic("messages/text").
WithReceivedBinaryTopic("messages/binary").
WithMeterProvider(meterProvider).
WithOutboundTransforms(transform.DropTopicPrefix("debug/")).
Build()
func (*Config) Build ¶
Build creates a new simple WebSocket server from the configuration. Returns an error if the configuration is invalid (missing EventBus or Logger).
func (*Config) IsValid ¶
IsValid checks if the configuration has all required parameters set. Returns nil if the configuration is valid, or an error describing what's missing.
func (*Config) WithEventBus ¶
WithEventBus sets the EventBus for the WebSocket server. The EventBus is required for integrating WebSocket connections with the pub/sub system.
func (*Config) WithInboundTransforms ¶
func (c *Config) WithInboundTransforms(transforms ...transform.MessageTransformFunc) *Config
WithInboundTransforms sets the message transformation functions that will be applied to inbound messages from WebSocket clients before publishing to the EventBus. These transforms use the transform.MessageTransformFunc type and operate on EventBusMessage created from the WebSocket message.
Transform functions are called in the order they are provided and can:
- Modify message content (topic, payload, fields)
- Drop messages (return nil message)
- Stop the transform pipeline (return false)
Example:
transforms := []transform.MessageTransformFunc{
transform.FilterByTopicPrefix("allowed/"),
transform.AddField("source", "websocket"),
transform.ValidatePayload(),
}
config.WithInboundTransforms(transforms...)
Default: No transforms (messages published as-is)
func (*Config) WithInitialSubscriptions ¶
WithInitialSubscriptions sets the topic patterns that new WebSocket connections should be automatically subscribed to when they connect. These subscriptions happen automatically without client request.
This is useful for:
- Pushing server-side events to all clients
- Providing default subscriptions for convenience
- Broadcasting system notifications
Example:
config.WithInitialSubscriptions("system/alerts", "server/status")
Default: No initial subscriptions
func (*Config) WithLogger ¶
WithLogger sets the Logger for the WebSocket server. The Logger is required for connection events, errors, and debugging.
func (*Config) WithMeterProvider ¶ added in v0.24.0
func (c *Config) WithMeterProvider(provider metric.MeterProvider) *Config
WithMeterProvider sets the OTel MeterProvider for the WebSocket server. The MeterProvider is optional and enables collection of WebSocket server metrics such as connection counts, message rates, error rates, and connection durations.
If not provided, no metrics will be collected.
func (*Config) WithOutboundTransforms ¶
func (c *Config) WithOutboundTransforms(transforms ...transform.MessageTransformFunc) *Config
WithOutboundTransforms sets the message transformation functions that will be applied to outbound messages from the EventBus before sending to WebSocket clients. These transforms use the transform.MessageTransformFunc type and operate on EventBusMessage.
Transform functions are called in the order they are provided and can:
- Modify message content
- Drop messages (return nil)
- Stop the transform pipeline (return false)
Example:
transforms := []transform.MessageTransformFunc{
transform.DropTopicPrefix("debug/"),
transform.ModifyPayload(func(ctx context.Context, payload any, fields map[string]string) any {
return map[string]any{
"data": payload,
"timestamp": time.Now().Unix(),
}
}),
}
config.WithOutboundTransforms(transforms...)
Default: No transforms (messages sent as-is)
func (*Config) WithQueueSize ¶
WithQueueSize sets the message queue size for WebSocket connections. This controls how many messages can be buffered per connection before messages start getting dropped. Larger values handle bursts better but use more memory. Must be positive.
Default: 256 messages per connection
func (*Config) WithReceivedBinaryTopic ¶
WithReceivedBinaryTopic sets the topic that received WebSocket binary frames will be published to on the EventBus. If set to an empty string, binary frames will be discarded.
Default: "binary"
func (*Config) WithReceivedTextTopic ¶
WithReceivedTextTopic sets the topic that received WebSocket text frames will be published to on the EventBus. If set to an empty string, text frames will be discarded.
Default: "text"
func (*Config) WithSendBinary ¶
WithSendBinary sets the default message type for WebSocket frames when the fields map doesn't contain a "format" key.
Message type is determined by:
- fields["format"] == "text": Send as text frame
- fields["format"] == "binary": Send as binary frame
- fields["format"] missing or other value: Use this default
Default: false (send as text frames)
type Connection ¶
type Connection struct {
// Async subscriber wrapper for handling outbound messages
AsyncSubscriber *subutils.AsyncQueueingSubscriber
// contains filtered or unexported fields
}
Connection represents an individual WebSocket connection for the simple server. It handles reading messages from the client and sending messages to the client.
func (*Connection) OnEvent ¶
func (c *Connection) OnEvent(ctx context.Context, topic string, payload any, fields map[string]string) error
OnEvent is called when an event is published to a topic this connection is subscribed to. This method forwards the event to the WebSocket client. Message type is determined by fields["format"]:
- "text": Send as text frame
- "binary": Send as binary frame
- missing/other: Use configured default (sendBinary)
func (*Connection) OnSubscribe ¶
func (c *Connection) OnSubscribe(ctx context.Context, topic string) error
func (*Connection) OnUnsubscribe ¶
func (c *Connection) OnUnsubscribe(ctx context.Context, topic string) error
func (*Connection) PassThrough ¶
func (c *Connection) PassThrough(msg bus.EventBusMessage) error
func (*Connection) Start ¶
func (c *Connection) Start(listener *Listener)
Start begins handling the WebSocket connection.
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener implements a simplified WebSocket server similar to the VWS server interface. Unlike the full VWS server, this server just deals in raw messages, and has no protocol for subscribe/unsubscribe.
func (*Listener) ConnectionCount ¶
ConnectionCount returns the current number of active WebSocket connections.
type WebSocketMetrics ¶
type WebSocketMetrics struct {
// contains filtered or unexported fields
}
WebSocketMetrics defines the metrics collected by the simple WebSocket server. This is a simplified version of the VWS server metrics, focusing on the most important connection and message metrics.
func NewWebSocketMetrics ¶
func NewWebSocketMetrics(mp metric.MeterProvider) *WebSocketMetrics
NewWebSocketMetrics creates a new WebSocketMetrics instance using the provided MeterProvider. If the provider is nil, returns nil (no metrics will be collected).
func (*WebSocketMetrics) RecordConnectionActive ¶
func (m *WebSocketMetrics) RecordConnectionActive(ctx context.Context, count int)
RecordConnectionActive updates the active connection count.
func (*WebSocketMetrics) RecordConnectionEnd ¶
func (m *WebSocketMetrics) RecordConnectionEnd(ctx context.Context, duration time.Duration)
RecordConnectionEnd records when a WebSocket connection ends and its duration.
func (*WebSocketMetrics) RecordConnectionError ¶
func (m *WebSocketMetrics) RecordConnectionError(ctx context.Context, errorType string)
RecordConnectionError records connection-level errors (upgrade failures, etc.).
func (*WebSocketMetrics) RecordConnectionStart ¶
func (m *WebSocketMetrics) RecordConnectionStart(ctx context.Context)
RecordConnectionStart records when a new WebSocket connection is established.
func (*WebSocketMetrics) RecordMessageError ¶
func (m *WebSocketMetrics) RecordMessageError(ctx context.Context, errorType string, messageType string)
RecordMessageError records message processing errors.
func (*WebSocketMetrics) RecordMessageReceived ¶
func (m *WebSocketMetrics) RecordMessageReceived(ctx context.Context, sizeBytes int, messageType string)
RecordMessageReceived records when a message is received from a client.
func (*WebSocketMetrics) RecordMessageSent ¶
func (m *WebSocketMetrics) RecordMessageSent(ctx context.Context, sizeBytes int, messageType string)
RecordMessageSent records when a message is sent to a client.
type WebsocketServer ¶
type WebsocketServer struct {
cfg.BaseServer
Listener *Listener
}
func (*WebsocketServer) GetHandler ¶
func (s *WebsocketServer) GetHandler() http.Handler
type WebsocketsServerDefinition ¶
type WebsocketsServerDefinition struct {
Bus hcl.Expression `hcl:"bus"`
QueueSize *int `hcl:"queue_size,optional"`
PingInterval hcl.Expression `hcl:"ping_interval,optional"`
WriteTimeout hcl.Expression `hcl:"write_timeout,optional"`
InitialSubscriptions []string `hcl:"initial_subscriptions,optional"`
OutboundTransforms hcl.Expression `hcl:"outbound_transforms,optional"`
InboundTransforms hcl.Expression `hcl:"inbound_transforms,optional"`
DefRange hcl.Range `hcl:",def_range"`
}