Documentation
¶
Overview ¶
Package websocket provides WebSocket connection management for agents.
This file implements the AgentHub, which is the central hub managing all agent WebSocket connections in the v2.0 multi-platform architecture.
The AgentHub:
- Maintains a registry of all connected agents
- Routes messages between Control Plane and agents
- Monitors agent health via heartbeats
- Detects and cleans up stale connections
- Updates agent status in the database
Connection Lifecycle:
- Agent connects via WebSocket (/api/v1/agents/connect)
- Hub registers the connection (updates DB status to "online")
- Agent sends heartbeats every 30 seconds (default, configurable)
- Hub monitors LastPing timestamp
- If no heartbeat for >45 seconds, connection is considered stale
- On disconnect, hub unregisters connection (updates DB status to "offline")
Thread Safety:
- All hub operations use channels for synchronization
- Connection map is protected by RWMutex
- Safe for concurrent use from multiple goroutines
Package websocket provides real-time WebSocket communication for StreamSpace.
This file implements WebSocket managers and broadcasting for real-time updates.
Purpose: - Manage multiple WebSocket hubs (sessions, metrics, logs) - Periodically broadcast session and metric updates to connected clients - Stream pod logs in real-time via WebSocket - Integrate database and Kubernetes for live data
Features: - Multi-hub architecture (sessions, metrics separate channels) - Periodic broadcast intervals (sessions: 3s, metrics: 5s) - Database-enriched session data (active connections, activity status) - Real-time pod log streaming - Event-driven notifications via Notifier integration - Graceful shutdown with connection cleanup
Architecture:
- Manager: Coordinates all hubs and data sources
- Hub: Manages WebSocket connections and message delivery
- Notifier: Routes targeted notifications to subscribed clients
- Broadcast goroutines: Fetch and push updates periodically
Broadcast Strategy:
- Sessions: Every 3 seconds, fetch all sessions from Kubernetes
- Metrics: Every 5 seconds, aggregate counts from database
- Skip broadcasts when no clients connected (performance)
- Enriched data includes database fields (active_connections)
Implementation Details: - Uses gorilla/websocket for WebSocket protocol - Kubernetes client for session data (k8s.Client) - Database for enrichment (active connections, metrics) - JSON-encoded messages for all broadcasts
Thread Safety: - Manager is thread-safe - Hub operations use mutex protection - Broadcast goroutines run concurrently
Dependencies: - github.com/gorilla/websocket for WebSocket protocol - internal/db for database access - internal/k8s for Kubernetes API
Example Usage:
// Create manager with database and K8s client
manager := websocket.NewManager(database, k8sClient)
manager.Start()
// Handle WebSocket connections
router.GET("/ws/sessions", func(c *gin.Context) {
userID := c.Query("user_id")
conn, _ := upgrader.Upgrade(c.Writer, c.Request, nil)
manager.HandleSessionsWebSocket(conn, userID, "")
})
// Shutdown cleanly
defer manager.CloseAll()
Package websocket provides real-time WebSocket communication for StreamSpace.
The WebSocket system enables:
- Real-time session status updates to UI
- Session event notifications (created, updated, deleted, state changes)
- Connection tracking (connect, disconnect, heartbeat)
- Resource usage updates
- Sharing and collaboration notifications
Architecture:
- Hub: Manages all WebSocket connections and broadcasts
- Client: Represents individual WebSocket connection
- Notifier: Handles event subscriptions and targeted notifications
- Manager: Coordinates hubs and notifiers
Message flow:
- Browser establishes WebSocket connection
- Client registers with Hub
- Client subscribes to user/session events via Notifier
- Backend emits events (session created, state changed, etc.)
- Notifier routes events to subscribed clients
- Hub broadcasts messages to clients
- Client writePump sends messages to browser
Concurrency:
- Hub.Run() runs in goroutine, handles all channel operations
- Each Client has readPump and writePump goroutines
- Thread-safe with sync.RWMutex for shared state
Example usage:
hub := NewHub()
go hub.Run()
// On WebSocket connection
hub.ServeClient(conn, clientID)
// Broadcast message to all clients
hub.Broadcast([]byte(`{"type":"session.created","sessionId":"abc"}`))
Index ¶
- type AgentConnection
- type AgentHub
- func (h *AgentHub) BroadcastToAllAgents(message []byte, excludeAgentID string)
- func (h *AgentHub) GetConnectedAgents() []string
- func (h *AgentHub) GetConnection(agentID string) *AgentConnection
- func (h *AgentHub) IsAgentConnected(agentID string) bool
- func (h *AgentHub) RegisterAgent(conn *AgentConnection) error
- func (h *AgentHub) Run()
- func (h *AgentHub) SendCommandToAgent(agentID string, command *models.AgentCommand) error
- func (h *AgentHub) Stop()
- func (h *AgentHub) UnregisterAgent(agentID string)
- func (h *AgentHub) UpdateAgentHeartbeat(agentID string) error
- type BroadcastMessage
- type Client
- type EventType
- type Hub
- func (h *Hub) Broadcast(message []byte)
- func (h *Hub) BroadcastToOrg(orgID string, message []byte)
- func (h *Hub) ClientCount() int
- func (h *Hub) GetClientsByOrg(orgID string) []*Client
- func (h *Hub) GetK8sNamespaceForOrg(orgID string) string
- func (h *Hub) GetUniqueOrgs() []string
- func (h *Hub) Run()
- func (h *Hub) ServeClient(conn *websocket.Conn, clientID string)
- func (h *Hub) ServeClientWithOrg(conn *websocket.Conn, clientID, orgID, k8sNamespace, userID string)
- type Manager
- func (m *Manager) CloseAll()
- func (m *Manager) GetNotifier() *Notifier
- func (m *Manager) HandleLogsWebSocket(conn *websocket.Conn, namespace, podName string)
- func (m *Manager) HandleLogsWebSocketWithOrg(conn *websocket.Conn, podName string, orgCtx *OrgContext)
- func (m *Manager) HandleMetricsWebSocket(conn *websocket.Conn)
- func (m *Manager) HandleMetricsWebSocketWithOrg(conn *websocket.Conn, orgCtx *OrgContext)
- func (m *Manager) HandleSessionsWebSocket(conn *websocket.Conn, userID, sessionID string)
- func (m *Manager) HandleSessionsWebSocketWithOrg(conn *websocket.Conn, userID, sessionID string, orgCtx *OrgContext)
- func (m *Manager) Start()
- type Notifier
- func (n *Notifier) CloseAll()
- func (n *Notifier) NotifySessionActive(sessionID, userID string)
- func (n *Notifier) NotifySessionConnected(sessionID, userID string, connectionID string)
- func (n *Notifier) NotifySessionCreated(sessionID, userID string, data map[string]interface{})
- func (n *Notifier) NotifySessionDeleted(sessionID, userID string)
- func (n *Notifier) NotifySessionDisconnected(sessionID, userID string, connectionID string)
- func (n *Notifier) NotifySessionError(sessionID, userID string, errorMsg string)
- func (n *Notifier) NotifySessionEvent(event SessionEvent)
- func (n *Notifier) NotifySessionIdle(sessionID, userID string, idleDuration int64)
- func (n *Notifier) NotifySessionResourcesUpdated(sessionID, userID string, resources map[string]interface{})
- func (n *Notifier) NotifySessionShared(sessionID, ownerUserID, sharedWithUserID string, permissions []string)
- func (n *Notifier) NotifySessionStateChange(sessionID, userID, oldState, newState string)
- func (n *Notifier) NotifySessionTagsUpdated(sessionID, userID string, tags []string)
- func (n *Notifier) NotifySessionUpdated(sessionID, userID string, data map[string]interface{})
- func (n *Notifier) SubscribeSession(clientID, sessionID string)
- func (n *Notifier) SubscribeUser(clientID, userID string)
- func (n *Notifier) UnsubscribeClient(clientID string)
- type OrgContext
- type SessionEvent
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentConnection ¶
type AgentConnection struct {
// AgentID is the unique identifier for this agent
AgentID string
// Conn is the underlying WebSocket connection
Conn *websocket.Conn
// Platform identifies the agent type (kubernetes, docker, vm, cloud)
Platform string
// LastPing is the timestamp of the last heartbeat received
LastPing time.Time
// Send is a buffered channel for outbound messages to the agent
Send chan []byte
// Receive is a buffered channel for inbound messages from the agent
Receive chan []byte
// Mutex protects concurrent access to connection fields
Mutex sync.RWMutex
}
AgentConnection represents a single agent's WebSocket connection.
Each connected agent has one AgentConnection containing:
- Conn: The underlying WebSocket connection
- Send: Channel for outbound messages to agent
- Receive: Channel for inbound messages from agent
- LastPing: Timestamp of last heartbeat (for stale detection)
Thread Safety: Mutex protects concurrent access to connection fields
type AgentHub ¶
type AgentHub struct {
// contains filtered or unexported fields
}
AgentHub is the central manager for all agent WebSocket connections.
The hub runs a main event loop that processes:
- register: New agent connections
- unregister: Agent disconnections
- broadcast: Messages to all agents
- staleCheck: Periodic cleanup of stale connections
Multi-Pod Support (P1-MULTI-POD-001):
- Uses Redis to share agent connection state across API replicas
- Redis pub/sub for cross-pod command routing
- Local connections map for direct WebSocket access
Thread Safety: All operations use channels for synchronization.
func NewAgentHub ¶
NewAgentHub creates a new AgentHub instance without Redis support.
The hub is initialized with empty connection map and buffered channels. Call Run() to start the hub's event loop.
For multi-pod deployments, use NewAgentHubWithRedis instead.
Example:
hub := websocket.NewAgentHub(database) go hub.Run()
func NewAgentHubWithRedis ¶
NewAgentHubWithRedis creates a new AgentHub instance with Redis support.
This enables multi-pod deployments by sharing agent connection state across API replicas via Redis.
Parameters:
- database: Database connection for persisting agent status
- redisClient: Redis client for shared state (pass nil to disable multi-pod support)
Example:
redisClient := redis.NewClient(&redis.Options{Addr: "streamspace-redis:6379"})
hub := websocket.NewAgentHubWithRedis(database, redisClient)
go hub.Run()
func (*AgentHub) BroadcastToAllAgents ¶
BroadcastToAllAgents sends a message to all connected agents.
Optionally excludes a specific agent from the broadcast.
Example:
message := []byte(`{"type":"shutdown","payload":{}}`)
hub.BroadcastToAllAgents(message, "")
func (*AgentHub) GetConnectedAgents ¶
GetConnectedAgents returns a list of all currently connected agent IDs.
Example:
agents := hub.GetConnectedAgents()
fmt.Printf("Connected agents: %v\n", agents)
func (*AgentHub) GetConnection ¶
func (h *AgentHub) GetConnection(agentID string) *AgentConnection
GetConnection returns the AgentConnection for a specific agent.
Returns nil if the agent is not connected. Use IsAgentConnected to check before calling this.
Thread Safety: The returned connection should not be modified directly.
Example:
if conn := hub.GetConnection("k8s-prod-us-east-1"); conn != nil {
fmt.Printf("Agent platform: %s\n", conn.Platform)
}
func (*AgentHub) IsAgentConnected ¶
IsAgentConnected checks if a specific agent is currently connected.
For multi-pod deployments with Redis, this checks both local connections and Redis state to find agents connected to other API pods.
Example:
if hub.IsAgentConnected("k8s-prod-us-east-1") {
fmt.Println("Agent is online")
}
func (*AgentHub) RegisterAgent ¶
func (h *AgentHub) RegisterAgent(conn *AgentConnection) error
RegisterAgent adds a new agent connection to the hub.
This should be called when a new WebSocket connection is established. The connection will be processed by the hub's event loop.
Example:
conn := &AgentConnection{
AgentID: "k8s-prod-us-east-1",
Conn: wsConn,
Platform: "kubernetes",
LastPing: time.Now(),
Send: make(chan []byte, 256),
Receive: make(chan []byte, 256),
}
err := hub.RegisterAgent(conn)
func (*AgentHub) Run ¶
func (h *AgentHub) Run()
Run starts the hub's main event loop.
This function blocks and should be run in a goroutine. It processes registration, unregistration, broadcasts, and stale connection checks.
The loop runs until Stop() is called.
Example:
hub := websocket.NewAgentHub(database) go hub.Run()
func (*AgentHub) SendCommandToAgent ¶
func (h *AgentHub) SendCommandToAgent(agentID string, command *models.AgentCommand) error
SendCommandToAgent sends a command to a specific agent over WebSocket.
The command is wrapped in an AgentMessage with type="command" and sent to the agent's Send channel.
Returns an error if the agent is not connected.
Example:
command := &models.AgentCommand{
CommandID: "cmd-123",
Action: "start_session",
Payload: map[string]interface{}{"sessionId": "sess-456"},
}
err := hub.SendCommandToAgent("k8s-prod-us-east-1", command)
func (*AgentHub) Stop ¶
func (h *AgentHub) Stop()
Stop signals the hub to stop running.
This closes the stopChan, causing Run() to exit.
func (*AgentHub) UnregisterAgent ¶
UnregisterAgent removes an agent connection from the hub.
This should be called when a WebSocket connection is closed. The disconnection will be processed by the hub's event loop.
Example:
hub.UnregisterAgent("k8s-prod-us-east-1")
func (*AgentHub) UpdateAgentHeartbeat ¶
UpdateAgentHeartbeat updates the LastPing timestamp for an agent.
This should be called when a heartbeat message is received from the agent.
Example:
hub.UpdateAgentHeartbeat("k8s-prod-us-east-1")
type BroadcastMessage ¶
type BroadcastMessage struct {
// Message is the raw JSON bytes to send
Message []byte
// ExcludeAgentID optionally excludes a specific agent from the broadcast
ExcludeAgentID string
}
BroadcastMessage represents a message to be sent to multiple agents.
Used by the hub's broadcast channel to send messages to all connected agents (e.g., shutdown notifications, global announcements).
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents an individual WebSocket connection.
Each client has:
- Unique ID for identification
- Organization ID for multi-tenancy scoping
- K8s namespace for resource filtering
- WebSocket connection for bidirectional communication
- Buffered send channel for outbound messages
- Reference to hub for registration/unregistration
Client lifecycle:
- Created when browser establishes WebSocket
- Registered with Hub
- readPump goroutine reads messages from browser
- writePump goroutine writes messages to browser
- Unregistered when connection closes
- Send channel closed to signal writePump to stop
Message buffering:
- send channel has buffer of 256 messages
- If buffer fills, client is slow and gets disconnected
- Prevents slow clients from blocking the Hub
MULTI-TENANCY: The OrgID and K8sNamespace fields are CRITICAL for tenant isolation. Broadcasts MUST filter data by orgID to prevent cross-tenant data leakage.
Example:
client := &Client{
hub: hub,
conn: websocketConn,
send: make(chan []byte, 256),
id: "user1-session123",
orgID: "org-acme",
k8sNamespace: "streamspace-acme",
}
type EventType ¶
type EventType string
EventType represents the type of session event for real-time notifications.
Event types are organized by category:
- Lifecycle: created, updated, deleted, state changes
- Activity: connected, disconnected, idle, active
- Resources: CPU/memory updates, tag changes
- Sharing: shared, unshared
- Errors: error notifications
Events are sent to subscribed WebSocket clients in real-time, enabling the UI to update without polling.
const ( // EventSessionCreated is emitted when a new session is created. // Data: session details (template, resources, state) EventSessionCreated EventType = "session.created" // EventSessionUpdated is emitted when session metadata is modified. // Data: changed fields (tags, description, etc.) EventSessionUpdated EventType = "session.updated" // EventSessionDeleted is emitted when a session is deleted. // Data: none (session no longer exists) EventSessionDeleted EventType = "session.deleted" // EventSessionStateChange is emitted when session state transitions. // Data: oldState, newState (running→hibernated, etc.) EventSessionStateChange EventType = "session.state.changed" // EventSessionConnected is emitted when a user connects to a session. // Data: connectionId, clientIP, userAgent EventSessionConnected EventType = "session.connected" // EventSessionDisconnected is emitted when a user disconnects. // Data: connectionId, duration EventSessionDisconnected EventType = "session.disconnected" // EventSessionHeartbeat is emitted on periodic heartbeat (optional). // Data: timestamp, active connections count EventSessionHeartbeat EventType = "session.heartbeat" // EventSessionIdle is emitted when session becomes idle. // Data: idleDuration (seconds) EventSessionIdle EventType = "session.idle" // EventSessionActive is emitted when idle session becomes active again. // Data: none EventSessionActive EventType = "session.active" // EventSessionResourcesUpdated is emitted when CPU/memory limits change. // Data: resources (cpu, memory, storage) EventSessionResourcesUpdated EventType = "session.resources.updated" // EventSessionTagsUpdated is emitted when session tags are modified. // Data: tags array EventSessionTagsUpdated EventType = "session.tags.updated" // Data: sharedWith (userID), permissions EventSessionShared EventType = "session.shared" // Data: unsharedFrom (userID) EventSessionUnshared EventType = "session.unshared" // EventSessionError is emitted when an error occurs. // Data: error (error message), code (error code) EventSessionError EventType = "session.error" )
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub maintains active WebSocket connections and implements message broadcasting.
The Hub pattern:
- Centralizes connection management
- Provides thread-safe registration/unregistration
- Broadcasts messages to all clients efficiently
- Handles slow/disconnected clients gracefully
Channel-based design:
- register: New clients connect
- unregister: Clients disconnect
- broadcast: Messages to send to all clients
- All operations go through channels to avoid race conditions
Hub lifecycle:
- Create with NewHub()
- Start with go hub.Run()
- Clients connect via ServeClient()
- Send messages via Broadcast()
- Clients disconnect automatically on connection close
Thread safety:
- All client map access protected by sync.RWMutex
- Channel operations are inherently thread-safe
- Safe to call Broadcast() from multiple goroutines
func (*Hub) BroadcastToOrg ¶
BroadcastToOrg sends a message only to clients in a specific organization. SECURITY: This is the preferred broadcast method for org-scoped data.
func (*Hub) ClientCount ¶
ClientCount returns the number of connected clients
func (*Hub) GetClientsByOrg ¶
GetClientsByOrg returns all clients belonging to a specific organization. SECURITY: Used for org-scoped broadcasts to prevent cross-tenant data leakage.
func (*Hub) GetK8sNamespaceForOrg ¶
GetK8sNamespaceForOrg returns the K8s namespace for an org. Returns first client's namespace found for the org.
func (*Hub) GetUniqueOrgs ¶
GetUniqueOrgs returns a list of unique org IDs with connected clients. Used by broadcast goroutines to iterate over active orgs.
func (*Hub) ServeClient ¶
ServeClient handles a new WebSocket connection (deprecated - use ServeClientWithOrg) DEPRECATED: This function does not support org scoping. Use ServeClientWithOrg instead.
func (*Hub) ServeClientWithOrg ¶
func (h *Hub) ServeClientWithOrg(conn *websocket.Conn, clientID, orgID, k8sNamespace, userID string)
ServeClientWithOrg handles a new WebSocket connection with org context. SECURITY: This function requires org context for multi-tenant isolation. All broadcasts will be filtered by orgID to prevent cross-tenant data leakage.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages all WebSocket hubs
func NewManager ¶
NewManager creates a new WebSocket manager
func (*Manager) CloseAll ¶
func (m *Manager) CloseAll()
CloseAll closes all WebSocket connections and subscriptions
func (*Manager) GetNotifier ¶
GetNotifier returns the notifier for event-driven notifications
func (*Manager) HandleLogsWebSocket ¶
HandleLogsWebSocket handles WebSocket connections for pod logs streaming (deprecated) DEPRECATED: Use HandleLogsWebSocketWithOrg for multi-tenant deployments.
func (*Manager) HandleLogsWebSocketWithOrg ¶
func (m *Manager) HandleLogsWebSocketWithOrg(conn *websocket.Conn, podName string, orgCtx *OrgContext)
HandleLogsWebSocketWithOrg handles WebSocket connections for pod logs streaming with org context. SECURITY: This function requires org context for multi-tenant isolation. Pod logs will only be accessible within the org's K8s namespace.
func (*Manager) HandleMetricsWebSocket ¶
HandleMetricsWebSocket handles WebSocket connections for metrics updates (deprecated) DEPRECATED: Use HandleMetricsWebSocketWithOrg for multi-tenant deployments.
func (*Manager) HandleMetricsWebSocketWithOrg ¶
func (m *Manager) HandleMetricsWebSocketWithOrg(conn *websocket.Conn, orgCtx *OrgContext)
HandleMetricsWebSocketWithOrg handles WebSocket connections for metrics updates with org context. SECURITY: This function requires org context for multi-tenant isolation. All metrics will be scoped to the specified organization.
func (*Manager) HandleSessionsWebSocket ¶
HandleSessionsWebSocket handles WebSocket connections for session updates (deprecated) DEPRECATED: Use HandleSessionsWebSocketWithOrg for multi-tenant deployments. Supports subscribing to user-specific or session-specific events via query params: - ?user_id=<userID> - Subscribe to all events for a specific user - ?session_id=<sessionID> - Subscribe to events for a specific session
func (*Manager) HandleSessionsWebSocketWithOrg ¶
func (m *Manager) HandleSessionsWebSocketWithOrg(conn *websocket.Conn, userID, sessionID string, orgCtx *OrgContext)
HandleSessionsWebSocketWithOrg handles WebSocket connections for session updates with org context. SECURITY: This function requires org context for multi-tenant isolation. All session updates will be scoped to the specified organization.
Parameters:
- conn: WebSocket connection
- userID: User ID to subscribe to user-specific events
- sessionID: Session ID to subscribe to session-specific events
- orgCtx: Organization context (REQUIRED for multi-tenancy)
type Notifier ¶
type Notifier struct {
// contains filtered or unexported fields
}
Notifier handles event subscriptions and targeted real-time notifications.
The Notifier implements a pub/sub pattern:
- Clients subscribe to user events (all sessions for a user)
- Clients subscribe to session events (specific session)
- Backend emits events via NotifySessionEvent()
- Notifier routes events to subscribed clients
- Hub delivers messages over WebSocket
Subscription model:
- User subscriptions: Get all events for a user's sessions
- Session subscriptions: Get events for a specific session
- Clients can have both types of subscriptions simultaneously
Thread safety:
- All map access protected by sync.RWMutex
- Safe for concurrent subscriptions and notifications
Example usage:
notifier := NewNotifier(manager) // Client subscribes to user events notifier.SubscribeUser(clientID, userID) // Backend emits event notifier.NotifySessionCreated(sessionID, userID, data) // Event is routed to subscribed clients via WebSocket
func NewNotifier ¶
NewNotifier creates a new event notifier
func (*Notifier) CloseAll ¶
func (n *Notifier) CloseAll()
CloseAll closes all subscriptions (used during shutdown)
func (*Notifier) NotifySessionActive ¶
NotifySessionActive notifies clients when a session becomes active again
func (*Notifier) NotifySessionConnected ¶
NotifySessionConnected notifies clients when someone connects to a session
func (*Notifier) NotifySessionCreated ¶
NotifySessionCreated notifies clients when a session is created
func (*Notifier) NotifySessionDeleted ¶
NotifySessionDeleted notifies clients when a session is deleted
func (*Notifier) NotifySessionDisconnected ¶
NotifySessionDisconnected notifies clients when someone disconnects from a session
func (*Notifier) NotifySessionError ¶
NotifySessionError notifies clients about session errors
func (*Notifier) NotifySessionEvent ¶
func (n *Notifier) NotifySessionEvent(event SessionEvent)
NotifySessionEvent sends a session event to subscribed clients
func (*Notifier) NotifySessionIdle ¶
NotifySessionIdle notifies clients when a session becomes idle
func (*Notifier) NotifySessionResourcesUpdated ¶
func (n *Notifier) NotifySessionResourcesUpdated(sessionID, userID string, resources map[string]interface{})
NotifySessionResourcesUpdated notifies clients when session resources are updated
func (*Notifier) NotifySessionShared ¶
func (n *Notifier) NotifySessionShared(sessionID, ownerUserID, sharedWithUserID string, permissions []string)
NotifySessionShared notifies clients when a session is shared with someone
func (*Notifier) NotifySessionStateChange ¶
NotifySessionStateChange notifies clients when a session changes state
func (*Notifier) NotifySessionTagsUpdated ¶
NotifySessionTagsUpdated notifies clients when session tags are updated
func (*Notifier) NotifySessionUpdated ¶
NotifySessionUpdated notifies clients when a session is updated
func (*Notifier) SubscribeSession ¶
SubscribeSession subscribes a client to receive events for a specific session
func (*Notifier) SubscribeUser ¶
SubscribeUser subscribes a client to receive events for a specific user
func (*Notifier) UnsubscribeClient ¶
UnsubscribeClient removes all subscriptions for a client
type OrgContext ¶
type OrgContext struct {
// OrgID is the organization this connection belongs to.
OrgID string
// K8sNamespace is the Kubernetes namespace for this org.
K8sNamespace string
// UserID is the authenticated user's ID.
UserID string
}
OrgContext contains the organization context for WebSocket connections. SECURITY: This is REQUIRED for all WebSocket connections to ensure org isolation.
type SessionEvent ¶
type SessionEvent struct {
// Type identifies the event type (e.g., "session.created").
Type EventType `json:"type"`
// SessionID is the ID of the session this event relates to.
SessionID string `json:"sessionId"`
// UserID is the owner of the session.
// Events are routed to all clients subscribed to this user.
UserID string `json:"userId"`
// Timestamp is when the event occurred (server time).
Timestamp time.Time `json:"timestamp"`
// Data contains event-specific payload (optional).
// Structure depends on event type.
Data map[string]interface{} `json:"data,omitempty"`
}
SessionEvent represents a session-related event sent to WebSocket clients.
Events are JSON-encoded and sent over WebSocket connections to subscribed clients. The UI can listen for specific event types and update in real-time.
Event routing:
- Events are routed to clients subscribed to the userID
- Events are also routed to clients subscribed to the sessionID
- A client can be subscribed to multiple users and sessions
Example event:
{
"type": "session.created",
"sessionId": "user1-firefox",
"userId": "user1",
"timestamp": "2025-01-15T10:30:00Z",
"data": {
"template": "firefox-browser",
"state": "running",
"resources": {"cpu": "2000m", "memory": "4096Mi"}
}
}