websocket

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package websocket provides Server-Sent Events (SSE) support for real-time dashboard streaming using only Go's standard library.

Server-Sent Events (SSE) is an HTTP-based standard for one-way server-to-client real-time streaming. Unlike WebSockets, SSE:

  • Uses standard HTTP connections (no protocol upgrade)
  • Works through proxies, load balancers, and CDNs
  • Supports automatic reconnection with event IDs
  • Requires no external dependencies
  • Integrates seamlessly with HTTP middleware

Quick Start:

server := websocket.NewDefaultSSEServer()
http.HandleFunc("/events", server.HandleSSE)
http.HandleFunc("/config", server.HandleConfig)
log.Fatal(http.ListenAndServe(":8080", nil))

Client-side JavaScript:

const eventSource = new EventSource('http://localhost:8080/events');
eventSource.addEventListener('metrics', (e) => {
    const data = JSON.parse(e.data);
    console.log('Metrics:', data);
});

Key Types

  • Config: Server configuration (ping interval, rate limiting, CORS, etc.)
  • SSEServer: Central server managing all client connections
  • SSEClient: Individual client connection
  • Event: SSE event structure (id, event, data, retry)
  • Message: Internal message format
  • RateLimiter: Token bucket rate limiter per client
  • MessageType: Event type constants (ping, metrics, alert, etc.)

Broadcasting

Send events to connected clients:

server.Broadcast(websocket.Event{
    Event: "metrics",
    Data:  map[string]interface{}{"cpu": 45.2, "memory": 78.5},
})

server.BroadcastMetrics(metricsData)
server.BroadcastAlert("security", "Suspicious activity detected", "warning")

Graceful Shutdown

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
    log.Fatal(err)
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	PingInterval       time.Duration
	RateLimit          int
	ClientBufferSize   int
	EnableRateLimiting bool
	EnablePing         bool
	AllowedOrigins     []string
	MetricsCallback    func(*Event)
	AlertCallback      func(*Event)
}

Config holds SSE server configuration

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns sensible default configuration

type Event

type Event struct {
	ID    string      `json:"id,omitempty"`
	Event string      `json:"event"`
	Data  interface{} `json:"data"`
	Retry int         `json:"retry,omitempty"`
}

Event represents a Server-Sent Event

type Message

type Message struct {
	Type      MessageType            `json:"type"`
	Data      map[string]interface{} `json:"data,omitempty"`
	Timestamp time.Time              `json:"timestamp"`
}

Message represents a structured message

func NewMessage

func NewMessage(msgType MessageType, data map[string]interface{}) Message

NewMessage creates a new message with the current timestamp

type MessageType

type MessageType string

MessageType represents the type of SSE event

const (
	MessageTypePing         MessageType = "ping"
	MessageTypePong         MessageType = "pong"
	MessageTypeSubscribe    MessageType = "subscribe"
	MessageTypeUnsubscribe  MessageType = "unsubscribe"
	MessageTypeMetrics      MessageType = "metrics"
	MessageTypeAlert        MessageType = "alert"
	MessageTypeError        MessageType = "error"
	MessageTypeConnected    MessageType = "connected"
	MessageTypeDisconnected MessageType = "disconnected"
	MessageTypeConfig       MessageType = "config"
	SSEContentType                      = "text/event-stream"
	DefaultPingInterval                 = 30 * time.Second
	DefaultRateLimit                    = 10
	DefaultBufferSize                   = 256
)

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements token bucket rate limiting

func NewRateLimiter

func NewRateLimiter(rate int, per time.Duration) *RateLimiter

NewRateLimiter creates a new rate limiter

func (*RateLimiter) Allow

func (rl *RateLimiter) Allow() bool

Allow checks if an event can be sent (token bucket algorithm)

type SSEClient

type SSEClient struct {
	// contains filtered or unexported fields
}

SSEClient represents a connected SSE client

func (*SSEClient) ID

func (c *SSEClient) ID() string

Client accessors

func (*SSEClient) IsAlive

func (c *SSEClient) IsAlive() bool

IsAlive checks if client is still connected

func (*SSEClient) IsSubscribed

func (c *SSEClient) IsSubscribed(eventType string) bool

IsSubscribed checks if client is subscribed to an event type

func (*SSEClient) LastActivity

func (c *SSEClient) LastActivity() time.Time

func (*SSEClient) String

func (c *SSEClient) String() string

func (*SSEClient) Subscribe

func (c *SSEClient) Subscribe(eventType string)

Subscribe adds an event type to subscription list

func (*SSEClient) Unsubscribe

func (c *SSEClient) Unsubscribe(eventType string)

Unsubscribe removes an event type from subscription list

func (*SSEClient) UpdateActivity

func (c *SSEClient) UpdateActivity()

type SSEServer

type SSEServer struct {
	// contains filtered or unexported fields
}

SSEServer manages all SSE clients

func NewDefaultSSEServer

func NewDefaultSSEServer() *SSEServer

NewDefaultSSEServer creates a server with default configuration

Example
package main

import (
	"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)

func main() {
	server := websocket.NewDefaultSSEServer()
	_ = server
}

func NewSSEServer

func NewSSEServer(config Config) *SSEServer

NewSSEServer creates a new SSE server with given configuration

Example
package main

import (
	"time"

	"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)

func main() {
	config := websocket.Config{
		PingInterval:       30 * time.Second,
		RateLimit:          10,
		ClientBufferSize:   256,
		EnableRateLimiting: true,
		EnablePing:         true,
		AllowedOrigins:     []string{"http://localhost:3000"},
	}
	server := websocket.NewSSEServer(config)
	_ = server
}

func (*SSEServer) Broadcast

func (s *SSEServer) Broadcast(event Event)

Broadcast sends an event to all connected clients

Example
package main

import (
	"fmt"

	"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)

func main() {
	server := websocket.NewDefaultSSEServer()
	server.Broadcast(websocket.Event{
		Event: "metrics",
		Data: map[string]interface{}{
			"cpu":    45.2,
			"memory": 78.5,
			"disk":   23.1,
		},
	})
	fmt.Println("Event broadcast to all subscribed clients")
}
Output:

Event broadcast to all subscribed clients

func (*SSEServer) BroadcastAlert

func (s *SSEServer) BroadcastAlert(alertType, message, severity string)

BroadcastAlert sends an alert to all subscribed clients

Example
package main

import (
	"fmt"

	"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)

func main() {
	server := websocket.NewDefaultSSEServer()
	server.BroadcastAlert(
		"security",
		"Suspicious activity detected on port 22",
		"warning",
	)
	fmt.Println("Alert broadcast to all clients")
}
Output:

Alert broadcast to all clients

func (*SSEServer) BroadcastEvent

func (s *SSEServer) BroadcastEvent(ctx context.Context, event Event)

BroadcastEvent sends an event to all clients (with context for cancellation)

func (*SSEServer) BroadcastMetrics

func (s *SSEServer) BroadcastMetrics(metricsData interface{})

BroadcastMetrics sends metrics data to all subscribed clients

Example
package main

import (
	"fmt"
	"time"

	"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)

func main() {
	server := websocket.NewDefaultSSEServer()
	metrics := map[string]interface{}{
		"scan_count":      42,
		"last_scan":       time.Now().Unix(),
		"vulnerabilities": 5,
	}
	server.BroadcastMetrics(metrics)
	fmt.Println("Metrics broadcast to all clients")
}
Output:

Metrics broadcast to all clients

func (*SSEServer) BroadcastTo

func (s *SSEServer) BroadcastTo(clientID string, event Event) bool

BroadcastTo sends an event to a specific client

func (*SSEServer) GetClientCount

func (s *SSEServer) GetClientCount() int

GetClientCount returns the number of active clients

func (*SSEServer) GetClients

func (s *SSEServer) GetClients() []string

GetClients returns list of active client IDs

func (*SSEServer) HandleCommand

func (s *SSEServer) HandleCommand(w http.ResponseWriter, r *http.Request)

HandleCommand handles client subscription commands

func (*SSEServer) HandleConfig

func (s *SSEServer) HandleConfig(w http.ResponseWriter, r *http.Request)

HandleConfig returns server configuration (HTTP endpoint)

func (*SSEServer) HandleSSE

func (s *SSEServer) HandleSSE(w http.ResponseWriter, r *http.Request)

HandleSSE is the HTTP handler for SSE connections

func (*SSEServer) HealthCheck

func (s *SSEServer) HealthCheck() map[string]interface{}

HealthCheck returns server health status

Example
package main

import (
	"fmt"

	"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)

func main() {
	server := websocket.NewDefaultSSEServer()
	health := server.HealthCheck()
	_ = health
	fmt.Println(`{"status": "healthy", "active_clients": 0}`)
}
Output:

{"status": "healthy", "active_clients": 0}

func (*SSEServer) RegisterClient

func (s *SSEServer) RegisterClient(w http.ResponseWriter, r *http.Request) (*SSEClient, error)

RegisterClient registers a new SSE client from an HTTP request

func (*SSEServer) RemoveClient

func (s *SSEServer) RemoveClient(clientID string)

RemoveClient removes a client from the server

func (*SSEServer) Shutdown

func (s *SSEServer) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the server

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)

func main() {
	server := websocket.NewDefaultSSEServer()
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	err := server.Shutdown(ctx)
	if err != nil {
		fmt.Println("Shutdown error:", err)
	} else {
		fmt.Println("All clients disconnected gracefully")
	}
}
Output:

All clients disconnected gracefully

func (*SSEServer) SubscribeAlerts

func (s *SSEServer) SubscribeAlerts(callback func(*Event))

SubscribeAlerts sets a callback for alert events

func (*SSEServer) SubscribeMetrics

func (s *SSEServer) SubscribeMetrics(callback func(*Event))

SubscribeMetrics sets a callback for metrics events

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL