services

package
v0.0.0-...-e5d423c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package services provides business logic services for StreamSpace API.

This file implements the AgentSelector service which handles intelligent routing of session creation requests to appropriate agents in multi-agent deployments.

Package services provides business logic services for the StreamSpace API. This file implements the CommandDispatcher for queuing and dispatching commands to agents.

COMMAND DISPATCHER: The CommandDispatcher is responsible for:

  • Queuing commands for dispatch to agents
  • Managing a worker pool to process commands concurrently
  • Sending commands to agents via the AgentHub
  • Updating command status in the database
  • Handling command lifecycle (pending → sent → ack → completed/failed)

COMMAND LIFECYCLE:

  1. Command created in database with status="pending"
  2. DispatchCommand() queues the command
  3. Worker picks up command from queue
  4. Worker checks if agent is connected
  5. Worker sends command to agent via hub
  6. Worker updates status="sent" and sent_at timestamp
  7. Agent acknowledges (WebSocket handler updates status="ack")
  8. Agent completes/fails (WebSocket handler updates status="completed"/"failed")

WORKER POOL PATTERN: The dispatcher uses a worker pool to process commands concurrently. Each worker is a goroutine that continuously reads from the queue channel.

Example:

dispatcher := NewCommandDispatcher(database, hub)
go dispatcher.Start()

command := &models.AgentCommand{
    CommandID: "cmd-123",
    AgentID: "k8s-prod-us-east-1",
    Action: "start_session",
    Payload: &models.CommandPayload{"sessionId": "sess-456"},
    Status: "pending",
}
err := dispatcher.DispatchCommand(command)

Package services provides background services for the StreamSpace API.

This file implements the Session Reconciliation Loop, which handles stuck sessions that are out of sync with their actual platform state.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentInfo

type AgentInfo struct {
	AgentID      string                 `json:"agent_id"`
	ClusterID    string                 `json:"cluster_id"`
	ClusterName  string                 `json:"cluster_name"`
	Platform     string                 `json:"platform"`
	Region       string                 `json:"region"`
	Status       string                 `json:"status"`
	SessionCount int                    `json:"session_count"` // Current session load
	Capacity     map[string]interface{} `json:"capacity"`      // Resource capacity
	IsConnected  bool                   `json:"is_connected"`  // WebSocket connected
}

AgentInfo represents agent metadata for selection decisions.

type AgentSelector

type AgentSelector struct {
	// contains filtered or unexported fields
}

AgentSelector handles selection of appropriate agents for session creation.

The selector implements multiple strategies:

  • Load balancing: Distribute sessions evenly across healthy agents
  • Cluster affinity: Route to specific clusters when requested
  • Region preference: Prefer agents in specific regions
  • Capacity-based: Consider agent resource capacity
  • Health filtering: Only select online agents with recent heartbeats

Thread Safety: Safe for concurrent use from multiple goroutines.

func NewAgentSelector

func NewAgentSelector(db *sql.DB, agentHub *websocket.AgentHub) *AgentSelector

NewAgentSelector creates a new AgentSelector instance.

Parameters:

  • db: Database connection for querying agent metadata
  • agentHub: AgentHub for checking WebSocket connection status

Example:

selector := services.NewAgentSelector(database.DB(), agentHub)
agent, err := selector.SelectAgent(ctx, &services.SelectionCriteria{})

func (*AgentSelector) GetAgentInfo

func (s *AgentSelector) GetAgentInfo(ctx context.Context, agentID string) (*AgentInfo, error)

GetAgentInfo retrieves information about a specific agent.

This is useful for displaying agent details or validating agent availability.

Example:

info, err := selector.GetAgentInfo(ctx, "k8s-prod-us-east-1")

func (*AgentSelector) SelectAgent

func (s *AgentSelector) SelectAgent(ctx context.Context, criteria *SelectionCriteria) (*AgentInfo, error)

SelectAgent selects the best available agent based on criteria.

Selection Algorithm:

  1. Filter agents by status (only 'online')
  2. Filter by WebSocket connection (if RequireConnected)
  3. Apply criteria filters (cluster, region, platform)
  4. Calculate session load for each candidate
  5. Select agent with lowest load (if PreferLowLoad)
  6. Return selected agent or error if none available

Returns:

  • AgentInfo: Selected agent metadata
  • error: If no suitable agent found or database error

Example:

criteria := &SelectionCriteria{
    Region: "us-east-1",
    PreferLowLoad: true,
    RequireConnected: true,
}
agent, err := selector.SelectAgent(ctx, criteria)

type CommandDispatcher

type CommandDispatcher struct {
	// contains filtered or unexported fields
}

CommandDispatcher manages the queuing and dispatch of commands to agents.

The dispatcher maintains a worker pool that continuously processes commands from the queue channel. Each worker checks if the target agent is connected and sends the command via the AgentHub.

func NewCommandDispatcher

func NewCommandDispatcher(database *db.Database, hub *websocket.AgentHub) *CommandDispatcher

NewCommandDispatcher creates a new CommandDispatcher.

The dispatcher is initialized with a buffered queue channel and configured number of workers (default: 10).

Example:

dispatcher := NewCommandDispatcher(database, hub)
go dispatcher.Start()

func (*CommandDispatcher) DispatchCommand

func (d *CommandDispatcher) DispatchCommand(command *models.AgentCommand) error

DispatchCommand queues a command for dispatch to an agent.

The command should already be created in the database with status="pending". This function adds the command to the queue for processing by a worker.

Returns an error if the queue is full.

Example:

command := &models.AgentCommand{
    CommandID: "cmd-123",
    AgentID: "k8s-prod-us-east-1",
    Action: "start_session",
    Payload: &models.CommandPayload{"sessionId": "sess-456"},
    Status: "pending",
}
err := dispatcher.DispatchCommand(command)

func (*CommandDispatcher) DispatchPendingCommands

func (d *CommandDispatcher) DispatchPendingCommands() error

DispatchPendingCommands retrieves all pending commands from the database and queues them for dispatch.

This is useful for recovering from a Control Plane restart - any commands that were pending when the server stopped will be re-queued.

Example:

// On server startup:
dispatcher := NewCommandDispatcher(database, hub)
go dispatcher.Start()
dispatcher.DispatchPendingCommands()

func (*CommandDispatcher) GetQueueCapacity

func (d *CommandDispatcher) GetQueueCapacity() int

GetQueueCapacity returns the maximum capacity of the command queue.

Useful for monitoring and debugging.

Example:

capacity := dispatcher.GetQueueCapacity()
fmt.Printf("Queue capacity: %d\n", capacity)

func (*CommandDispatcher) GetQueueLength

func (d *CommandDispatcher) GetQueueLength() int

GetQueueLength returns the current number of commands in the queue.

Useful for monitoring and debugging.

Example:

length := dispatcher.GetQueueLength()
fmt.Printf("Commands in queue: %d\n", length)

func (*CommandDispatcher) SetWorkers

func (d *CommandDispatcher) SetWorkers(count int)

SetWorkers configures the number of worker goroutines.

This should be called before Start().

Example:

dispatcher.SetWorkers(20)
go dispatcher.Start()

func (*CommandDispatcher) Start

func (d *CommandDispatcher) Start()

Start starts the worker pool.

This function starts the configured number of worker goroutines. Each worker continuously processes commands from the queue channel.

This function blocks until Stop() is called.

Example:

dispatcher := NewCommandDispatcher(database, hub)
go dispatcher.Start()

func (*CommandDispatcher) Stop

func (d *CommandDispatcher) Stop()

Stop signals the dispatcher to stop.

This closes the stopChan, causing Start() to exit. Workers will finish processing their current commands before exiting.

type SelectionCriteria

type SelectionCriteria struct {
	// ClusterID restricts selection to a specific cluster (optional)
	ClusterID string

	// Region restricts selection to a specific region (optional)
	Region string

	// Platform restricts selection to a specific platform (kubernetes, docker, etc.)
	Platform string

	// PreferLowLoad prefers agents with fewer active sessions (default: true)
	PreferLowLoad bool

	// RequireConnected only selects agents with active WebSocket connections (default: true)
	RequireConnected bool
}

SelectionCriteria defines criteria for selecting an agent.

type SessionReconciler

type SessionReconciler struct {
	// contains filtered or unexported fields
}

SessionReconciler handles stuck sessions in "terminating" or "pending" states.

It runs a background loop that:

  1. Detects sessions stuck in "terminating" for >5 minutes
  2. Detects sessions stuck in "pending" for >5 minutes
  3. Retries commands if agent is available
  4. Force-updates database if agent is gone for >10 minutes

This solves Issues #235 and #236 (partial fix until agent pools implemented).

func NewSessionReconciler

func NewSessionReconciler(
	database *db.Database,
	agentHub *websocket.AgentHub,
	dispatcher *CommandDispatcher,
) *SessionReconciler

NewSessionReconciler creates a new session reconciler.

Example:

reconciler := NewSessionReconciler(database, agentHub, dispatcher)
go reconciler.Start()

func (*SessionReconciler) GetStats

func (r *SessionReconciler) GetStats() (map[string]int, error)

GetStats returns reconciliation statistics.

Returns the number of sessions in each stuck state.

func (*SessionReconciler) Start

func (r *SessionReconciler) Start()

Start begins the reconciliation loop.

This should be called in a goroutine:

go reconciler.Start()

func (*SessionReconciler) Stop

func (r *SessionReconciler) Stop()

Stop gracefully stops the reconciliation loop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL