Documentation
¶
Overview ¶
Package websocket provides Server-Sent Events (SSE) support for real-time dashboard streaming using only Go's standard library.
Server-Sent Events (SSE) is an HTTP-based standard for one-way server-to-client real-time streaming. Unlike WebSockets, SSE:
- Uses standard HTTP connections (no protocol upgrade)
- Works through proxies, load balancers, and CDNs
- Supports automatic reconnection with event IDs
- Requires no external dependencies
- Integrates seamlessly with HTTP middleware
Quick Start:
server := websocket.NewDefaultSSEServer()
http.HandleFunc("/events", server.HandleSSE)
http.HandleFunc("/config", server.HandleConfig)
log.Fatal(http.ListenAndServe(":8080", nil))
Client-side JavaScript:
const eventSource = new EventSource('http://localhost:8080/events');
eventSource.addEventListener('metrics', (e) => {
const data = JSON.parse(e.data);
console.log('Metrics:', data);
});
Key Types ¶
- Config: Server configuration (ping interval, rate limiting, CORS, etc.)
- SSEServer: Central server managing all client connections
- SSEClient: Individual client connection
- Event: SSE event structure (id, event, data, retry)
- Message: Internal message format
- RateLimiter: Token bucket rate limiter per client
- MessageType: Event type constants (ping, metrics, alert, etc.)
Broadcasting ¶
Send events to connected clients:
server.Broadcast(websocket.Event{
Event: "metrics",
Data: map[string]interface{}{"cpu": 45.2, "memory": 78.5},
})
server.BroadcastMetrics(metricsData)
server.BroadcastAlert("security", "Suspicious activity detected", "warning")
Graceful Shutdown ¶
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Fatal(err)
}
Index ¶
- type Config
- type Event
- type Message
- type MessageType
- type RateLimiter
- type SSEClient
- func (c *SSEClient) ID() string
- func (c *SSEClient) IsAlive() bool
- func (c *SSEClient) IsSubscribed(eventType string) bool
- func (c *SSEClient) LastActivity() time.Time
- func (c *SSEClient) String() string
- func (c *SSEClient) Subscribe(eventType string)
- func (c *SSEClient) Unsubscribe(eventType string)
- func (c *SSEClient) UpdateActivity()
- type SSEServer
- func (s *SSEServer) Broadcast(event Event)
- func (s *SSEServer) BroadcastAlert(alertType, message, severity string)
- func (s *SSEServer) BroadcastEvent(ctx context.Context, event Event)
- func (s *SSEServer) BroadcastMetrics(metricsData interface{})
- func (s *SSEServer) BroadcastTo(clientID string, event Event) bool
- func (s *SSEServer) GetClientCount() int
- func (s *SSEServer) GetClients() []string
- func (s *SSEServer) HandleCommand(w http.ResponseWriter, r *http.Request)
- func (s *SSEServer) HandleConfig(w http.ResponseWriter, r *http.Request)
- func (s *SSEServer) HandleSSE(w http.ResponseWriter, r *http.Request)
- func (s *SSEServer) HealthCheck() map[string]interface{}
- func (s *SSEServer) RegisterClient(w http.ResponseWriter, r *http.Request) (*SSEClient, error)
- func (s *SSEServer) RemoveClient(clientID string)
- func (s *SSEServer) Shutdown(ctx context.Context) error
- func (s *SSEServer) SubscribeAlerts(callback func(*Event))
- func (s *SSEServer) SubscribeMetrics(callback func(*Event))
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
PingInterval time.Duration
RateLimit int
ClientBufferSize int
EnableRateLimiting bool
EnablePing bool
AllowedOrigins []string
MetricsCallback func(*Event)
AlertCallback func(*Event)
}
Config holds SSE server configuration
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns sensible default configuration
type Event ¶
type Event struct {
ID string `json:"id,omitempty"`
Event string `json:"event"`
Data interface{} `json:"data"`
Retry int `json:"retry,omitempty"`
}
Event represents a Server-Sent Event
type Message ¶
type Message struct {
Type MessageType `json:"type"`
Data map[string]interface{} `json:"data,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
Message represents a structured message
func NewMessage ¶
func NewMessage(msgType MessageType, data map[string]interface{}) Message
NewMessage creates a new message with the current timestamp
type MessageType ¶
type MessageType string
MessageType represents the type of SSE event
const ( MessageTypePing MessageType = "ping" MessageTypePong MessageType = "pong" MessageTypeSubscribe MessageType = "subscribe" MessageTypeUnsubscribe MessageType = "unsubscribe" MessageTypeMetrics MessageType = "metrics" MessageTypeAlert MessageType = "alert" MessageTypeError MessageType = "error" MessageTypeConnected MessageType = "connected" MessageTypeDisconnected MessageType = "disconnected" MessageTypeConfig MessageType = "config" SSEContentType = "text/event-stream" DefaultPingInterval = 30 * time.Second DefaultRateLimit = 10 DefaultBufferSize = 256 )
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements token bucket rate limiting
func NewRateLimiter ¶
func NewRateLimiter(rate int, per time.Duration) *RateLimiter
NewRateLimiter creates a new rate limiter
func (*RateLimiter) Allow ¶
func (rl *RateLimiter) Allow() bool
Allow checks if an event can be sent (token bucket algorithm)
type SSEClient ¶
type SSEClient struct {
// contains filtered or unexported fields
}
SSEClient represents a connected SSE client
func (*SSEClient) IsSubscribed ¶
IsSubscribed checks if client is subscribed to an event type
func (*SSEClient) LastActivity ¶
func (*SSEClient) Unsubscribe ¶
Unsubscribe removes an event type from subscription list
func (*SSEClient) UpdateActivity ¶
func (c *SSEClient) UpdateActivity()
type SSEServer ¶
type SSEServer struct {
// contains filtered or unexported fields
}
SSEServer manages all SSE clients
func NewDefaultSSEServer ¶
func NewDefaultSSEServer() *SSEServer
NewDefaultSSEServer creates a server with default configuration
Example ¶
package main
import (
"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)
func main() {
server := websocket.NewDefaultSSEServer()
_ = server
}
func NewSSEServer ¶
NewSSEServer creates a new SSE server with given configuration
Example ¶
package main
import (
"time"
"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)
func main() {
config := websocket.Config{
PingInterval: 30 * time.Second,
RateLimit: 10,
ClientBufferSize: 256,
EnableRateLimiting: true,
EnablePing: true,
AllowedOrigins: []string{"http://localhost:3000"},
}
server := websocket.NewSSEServer(config)
_ = server
}
func (*SSEServer) Broadcast ¶
Broadcast sends an event to all connected clients
Example ¶
package main
import (
"fmt"
"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)
func main() {
server := websocket.NewDefaultSSEServer()
server.Broadcast(websocket.Event{
Event: "metrics",
Data: map[string]interface{}{
"cpu": 45.2,
"memory": 78.5,
"disk": 23.1,
},
})
fmt.Println("Event broadcast to all subscribed clients")
}
Output: Event broadcast to all subscribed clients
func (*SSEServer) BroadcastAlert ¶
BroadcastAlert sends an alert to all subscribed clients
Example ¶
package main
import (
"fmt"
"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)
func main() {
server := websocket.NewDefaultSSEServer()
server.BroadcastAlert(
"security",
"Suspicious activity detected on port 22",
"warning",
)
fmt.Println("Alert broadcast to all clients")
}
Output: Alert broadcast to all clients
func (*SSEServer) BroadcastEvent ¶
BroadcastEvent sends an event to all clients (with context for cancellation)
func (*SSEServer) BroadcastMetrics ¶
func (s *SSEServer) BroadcastMetrics(metricsData interface{})
BroadcastMetrics sends metrics data to all subscribed clients
Example ¶
package main
import (
"fmt"
"time"
"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)
func main() {
server := websocket.NewDefaultSSEServer()
metrics := map[string]interface{}{
"scan_count": 42,
"last_scan": time.Now().Unix(),
"vulnerabilities": 5,
}
server.BroadcastMetrics(metrics)
fmt.Println("Metrics broadcast to all clients")
}
Output: Metrics broadcast to all clients
func (*SSEServer) BroadcastTo ¶
BroadcastTo sends an event to a specific client
func (*SSEServer) GetClientCount ¶
GetClientCount returns the number of active clients
func (*SSEServer) GetClients ¶
GetClients returns list of active client IDs
func (*SSEServer) HandleCommand ¶
func (s *SSEServer) HandleCommand(w http.ResponseWriter, r *http.Request)
HandleCommand handles client subscription commands
func (*SSEServer) HandleConfig ¶
func (s *SSEServer) HandleConfig(w http.ResponseWriter, r *http.Request)
HandleConfig returns server configuration (HTTP endpoint)
func (*SSEServer) HandleSSE ¶
func (s *SSEServer) HandleSSE(w http.ResponseWriter, r *http.Request)
HandleSSE is the HTTP handler for SSE connections
func (*SSEServer) HealthCheck ¶
HealthCheck returns server health status
Example ¶
package main
import (
"fmt"
"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)
func main() {
server := websocket.NewDefaultSSEServer()
health := server.HealthCheck()
_ = health
fmt.Println(`{"status": "healthy", "active_clients": 0}`)
}
Output: {"status": "healthy", "active_clients": 0}
func (*SSEServer) RegisterClient ¶
RegisterClient registers a new SSE client from an HTTP request
func (*SSEServer) RemoveClient ¶
RemoveClient removes a client from the server
func (*SSEServer) Shutdown ¶
Shutdown gracefully shuts down the server
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/aegisgatesecurity/aegisgate/pkg/websocket"
)
func main() {
server := websocket.NewDefaultSSEServer()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := server.Shutdown(ctx)
if err != nil {
fmt.Println("Shutdown error:", err)
} else {
fmt.Println("All clients disconnected gracefully")
}
}
Output: All clients disconnected gracefully
func (*SSEServer) SubscribeAlerts ¶
SubscribeAlerts sets a callback for alert events
func (*SSEServer) SubscribeMetrics ¶
SubscribeMetrics sets a callback for metrics events