Documentation
¶
Overview ¶
Package services provides business logic services for StreamSpace API.
This file implements the AgentSelector service which handles intelligent routing of session creation requests to appropriate agents in multi-agent deployments.
Package services provides business logic services for the StreamSpace API. This file implements the CommandDispatcher for queuing and dispatching commands to agents.
COMMAND DISPATCHER: The CommandDispatcher is responsible for:
- Queuing commands for dispatch to agents
- Managing a worker pool to process commands concurrently
- Sending commands to agents via the AgentHub
- Updating command status in the database
- Handling command lifecycle (pending → sent → ack → completed/failed)
COMMAND LIFECYCLE:
- Command created in database with status="pending"
- DispatchCommand() queues the command
- Worker picks up command from queue
- Worker checks if agent is connected
- Worker sends command to agent via hub
- Worker updates status="sent" and sent_at timestamp
- Agent acknowledges (WebSocket handler updates status="ack")
- Agent completes/fails (WebSocket handler updates status="completed"/"failed")
WORKER POOL PATTERN: The dispatcher uses a worker pool to process commands concurrently. Each worker is a goroutine that continuously reads from the queue channel.
Example:
dispatcher := NewCommandDispatcher(database, hub)
go dispatcher.Start()
command := &models.AgentCommand{
CommandID: "cmd-123",
AgentID: "k8s-prod-us-east-1",
Action: "start_session",
Payload: &models.CommandPayload{"sessionId": "sess-456"},
Status: "pending",
}
err := dispatcher.DispatchCommand(command)
Package services provides background services for the StreamSpace API.
This file implements the Session Reconciliation Loop, which handles stuck sessions that are out of sync with their actual platform state.
Index ¶
- type AgentInfo
- type AgentSelector
- type CommandDispatcher
- func (d *CommandDispatcher) DispatchCommand(command *models.AgentCommand) error
- func (d *CommandDispatcher) DispatchPendingCommands() error
- func (d *CommandDispatcher) GetQueueCapacity() int
- func (d *CommandDispatcher) GetQueueLength() int
- func (d *CommandDispatcher) SetWorkers(count int)
- func (d *CommandDispatcher) Start()
- func (d *CommandDispatcher) Stop()
- type SelectionCriteria
- type SessionReconciler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentInfo ¶
type AgentInfo struct {
AgentID string `json:"agent_id"`
ClusterID string `json:"cluster_id"`
ClusterName string `json:"cluster_name"`
Platform string `json:"platform"`
Region string `json:"region"`
Status string `json:"status"`
SessionCount int `json:"session_count"` // Current session load
Capacity map[string]interface{} `json:"capacity"` // Resource capacity
IsConnected bool `json:"is_connected"` // WebSocket connected
}
AgentInfo represents agent metadata for selection decisions.
type AgentSelector ¶
type AgentSelector struct {
// contains filtered or unexported fields
}
AgentSelector handles selection of appropriate agents for session creation.
The selector implements multiple strategies:
- Load balancing: Distribute sessions evenly across healthy agents
- Cluster affinity: Route to specific clusters when requested
- Region preference: Prefer agents in specific regions
- Capacity-based: Consider agent resource capacity
- Health filtering: Only select online agents with recent heartbeats
Thread Safety: Safe for concurrent use from multiple goroutines.
func NewAgentSelector ¶
func NewAgentSelector(db *sql.DB, agentHub *websocket.AgentHub) *AgentSelector
NewAgentSelector creates a new AgentSelector instance.
Parameters:
- db: Database connection for querying agent metadata
- agentHub: AgentHub for checking WebSocket connection status
Example:
selector := services.NewAgentSelector(database.DB(), agentHub)
agent, err := selector.SelectAgent(ctx, &services.SelectionCriteria{})
func (*AgentSelector) GetAgentInfo ¶
GetAgentInfo retrieves information about a specific agent.
This is useful for displaying agent details or validating agent availability.
Example:
info, err := selector.GetAgentInfo(ctx, "k8s-prod-us-east-1")
func (*AgentSelector) SelectAgent ¶
func (s *AgentSelector) SelectAgent(ctx context.Context, criteria *SelectionCriteria) (*AgentInfo, error)
SelectAgent selects the best available agent based on criteria.
Selection Algorithm:
- Filter agents by status (only 'online')
- Filter by WebSocket connection (if RequireConnected)
- Apply criteria filters (cluster, region, platform)
- Calculate session load for each candidate
- Select agent with lowest load (if PreferLowLoad)
- Return selected agent or error if none available
Returns:
- AgentInfo: Selected agent metadata
- error: If no suitable agent found or database error
Example:
criteria := &SelectionCriteria{
Region: "us-east-1",
PreferLowLoad: true,
RequireConnected: true,
}
agent, err := selector.SelectAgent(ctx, criteria)
type CommandDispatcher ¶
type CommandDispatcher struct {
// contains filtered or unexported fields
}
CommandDispatcher manages the queuing and dispatch of commands to agents.
The dispatcher maintains a worker pool that continuously processes commands from the queue channel. Each worker checks if the target agent is connected and sends the command via the AgentHub.
func NewCommandDispatcher ¶
func NewCommandDispatcher(database *db.Database, hub *websocket.AgentHub) *CommandDispatcher
NewCommandDispatcher creates a new CommandDispatcher.
The dispatcher is initialized with a buffered queue channel and configured number of workers (default: 10).
Example:
dispatcher := NewCommandDispatcher(database, hub) go dispatcher.Start()
func (*CommandDispatcher) DispatchCommand ¶
func (d *CommandDispatcher) DispatchCommand(command *models.AgentCommand) error
DispatchCommand queues a command for dispatch to an agent.
The command should already be created in the database with status="pending". This function adds the command to the queue for processing by a worker.
Returns an error if the queue is full.
Example:
command := &models.AgentCommand{
CommandID: "cmd-123",
AgentID: "k8s-prod-us-east-1",
Action: "start_session",
Payload: &models.CommandPayload{"sessionId": "sess-456"},
Status: "pending",
}
err := dispatcher.DispatchCommand(command)
func (*CommandDispatcher) DispatchPendingCommands ¶
func (d *CommandDispatcher) DispatchPendingCommands() error
DispatchPendingCommands retrieves all pending commands from the database and queues them for dispatch.
This is useful for recovering from a Control Plane restart - any commands that were pending when the server stopped will be re-queued.
Example:
// On server startup: dispatcher := NewCommandDispatcher(database, hub) go dispatcher.Start() dispatcher.DispatchPendingCommands()
func (*CommandDispatcher) GetQueueCapacity ¶
func (d *CommandDispatcher) GetQueueCapacity() int
GetQueueCapacity returns the maximum capacity of the command queue.
Useful for monitoring and debugging.
Example:
capacity := dispatcher.GetQueueCapacity()
fmt.Printf("Queue capacity: %d\n", capacity)
func (*CommandDispatcher) GetQueueLength ¶
func (d *CommandDispatcher) GetQueueLength() int
GetQueueLength returns the current number of commands in the queue.
Useful for monitoring and debugging.
Example:
length := dispatcher.GetQueueLength()
fmt.Printf("Commands in queue: %d\n", length)
func (*CommandDispatcher) SetWorkers ¶
func (d *CommandDispatcher) SetWorkers(count int)
SetWorkers configures the number of worker goroutines.
This should be called before Start().
Example:
dispatcher.SetWorkers(20) go dispatcher.Start()
func (*CommandDispatcher) Start ¶
func (d *CommandDispatcher) Start()
Start starts the worker pool.
This function starts the configured number of worker goroutines. Each worker continuously processes commands from the queue channel.
This function blocks until Stop() is called.
Example:
dispatcher := NewCommandDispatcher(database, hub) go dispatcher.Start()
func (*CommandDispatcher) Stop ¶
func (d *CommandDispatcher) Stop()
Stop signals the dispatcher to stop.
This closes the stopChan, causing Start() to exit. Workers will finish processing their current commands before exiting.
type SelectionCriteria ¶
type SelectionCriteria struct {
// ClusterID restricts selection to a specific cluster (optional)
ClusterID string
// Region restricts selection to a specific region (optional)
Region string
// Platform restricts selection to a specific platform (kubernetes, docker, etc.)
Platform string
// PreferLowLoad prefers agents with fewer active sessions (default: true)
PreferLowLoad bool
// RequireConnected only selects agents with active WebSocket connections (default: true)
RequireConnected bool
}
SelectionCriteria defines criteria for selecting an agent.
type SessionReconciler ¶
type SessionReconciler struct {
// contains filtered or unexported fields
}
SessionReconciler handles stuck sessions in "terminating" or "pending" states.
It runs a background loop that:
- Detects sessions stuck in "terminating" for >5 minutes
- Detects sessions stuck in "pending" for >5 minutes
- Retries commands if agent is available
- Force-updates database if agent is gone for >10 minutes
This solves Issues #235 and #236 (partial fix until agent pools implemented).
func NewSessionReconciler ¶
func NewSessionReconciler( database *db.Database, agentHub *websocket.AgentHub, dispatcher *CommandDispatcher, ) *SessionReconciler
NewSessionReconciler creates a new session reconciler.
Example:
reconciler := NewSessionReconciler(database, agentHub, dispatcher) go reconciler.Start()
func (*SessionReconciler) GetStats ¶
func (r *SessionReconciler) GetStats() (map[string]int, error)
GetStats returns reconciliation statistics.
Returns the number of sessions in each stuck state.
func (*SessionReconciler) Start ¶
func (r *SessionReconciler) Start()
Start begins the reconciliation loop.
This should be called in a goroutine:
go reconciler.Start()
func (*SessionReconciler) Stop ¶
func (r *SessionReconciler) Stop()
Stop gracefully stops the reconciliation loop.