clustergateway

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentConnection

type AgentConnection struct {
	ID              string // Unique connection identifier
	Conn            *websocket.Conn
	PlaneIdentifier string // Composite identifier: {planeType}/{planeName}
	ConnectedAt     time.Time
	LastSeen        time.Time
	// contains filtered or unexported fields
}

AgentConnection represents an active agent connection Multiple agent replicas for the same plane can share the same PlaneIdentifier for HA

func (*AgentConnection) Close

func (ac *AgentConnection) Close() error

Close closes the agent connection

func (*AgentConnection) SendClusterAgentRequest

func (ac *AgentConnection) SendClusterAgentRequest(req *messaging.ClusterAgentRequest) error

SendClusterAgentRequest sends a ClusterAgentRequest through this connection

func (*AgentConnection) SendMessage

func (ac *AgentConnection) SendMessage(msg *messaging.Message) error

SendMessage sends a message through this connection

type Config

type Config struct {
	Port              int
	ServerCertPath    string
	ServerKeyPath     string
	ReadTimeout       time.Duration
	WriteTimeout      time.Duration
	IdleTimeout       time.Duration
	ShutdownTimeout   time.Duration
	HeartbeatInterval time.Duration
	HeartbeatTimeout  time.Duration
}

Config holds configuration for the agent server

type ConnectionManager

type ConnectionManager struct {
	// contains filtered or unexported fields
}

ConnectionManager manages active agent connections Supports multiple concurrent connections per plane identifier for HA

func NewConnectionManager

func NewConnectionManager(logger *slog.Logger) *ConnectionManager

NewConnectionManager creates a new ConnectionManager

func (*ConnectionManager) BroadcastMessage

func (cm *ConnectionManager) BroadcastMessage(msg *messaging.Message) error

func (*ConnectionManager) Count

func (cm *ConnectionManager) Count() int

Count returns the total number of active connections across all planes

func (*ConnectionManager) Get

func (cm *ConnectionManager) Get(planeIdentifier string) (*AgentConnection, error)

Get retrieves an agent connection by plane identifier using round-robin selection If multiple connections exist for the plane, it rotates between them

func (*ConnectionManager) GetAll

func (cm *ConnectionManager) GetAll() []*AgentConnection

func (*ConnectionManager) Register

func (cm *ConnectionManager) Register(planeIdentifier string, conn *websocket.Conn) (string, error)

Register registers a new agent connection Multiple agent replicas (for HA) for the same plane share the same planeIdentifier Returns the connection ID which should be used for unregistration

func (*ConnectionManager) SendClusterAgentRequest

func (cm *ConnectionManager) SendClusterAgentRequest(planeIdentifier string, req *messaging.ClusterAgentRequest) error

func (*ConnectionManager) SendMessage

func (cm *ConnectionManager) SendMessage(planeIdentifier string, msg *messaging.Message) error

func (*ConnectionManager) Unregister

func (cm *ConnectionManager) Unregister(planeIdentifier, connID string)

func (*ConnectionManager) UpdateConnectionLastSeen

func (cm *ConnectionManager) UpdateConnectionLastSeen(planeIdentifier, connID string)

UpdateConnectionLastSeen updates the last seen time for a specific connection

type Dispatcher

type Dispatcher interface {
	SendClusterAgentRequest(
		planeName string,
		requestType messaging.RequestType,
		identifier string,
		payload map[string]interface{},
		timeout time.Duration,
	) (*messaging.ClusterAgentResponse, error)
}

Dispatcher is an interface that both Server and RemoteServerClient implement It allows sending cluster agent requests to agents

type RemoteServerClient

type RemoteServerClient struct {
	// contains filtered or unexported fields
}

RemoteServerClient is an HTTP client that connects to a remote agent server This allows controllers to communicate with an agent server running in-cluster while the controller itself runs locally (useful for development)

func NewRemoteServerClient

func NewRemoteServerClient(serverURL string, insecureSkipVerify bool) *RemoteServerClient

NewRemoteServerClient creates a new client that connects to a remote agent server For development use only - uses insecureSkipVerify

func NewRemoteServerClientWithConfig

func NewRemoteServerClientWithConfig(config *RemoteServerClientConfig) (*RemoteServerClient, error)

NewRemoteServerClientWithConfig creates a new client with full TLS configuration This is the recommended method for production deployments

func (*RemoteServerClient) SendClusterAgentRequest

func (c *RemoteServerClient) SendClusterAgentRequest(
	planeName string,
	requestType messaging.RequestType,
	identifier string,
	payload map[string]interface{},
	timeout time.Duration,
) (*messaging.ClusterAgentResponse, error)

SendClusterAgentRequest sends a request to the remote agent server and waits for response This method signature matches Server.SendClusterAgentRequest so it can be used interchangeably

type RemoteServerClientConfig

type RemoteServerClientConfig struct {
	// ServerURL is the URL of the agent server (e.g., https://cluster-agent-server:8443)
	ServerURL string

	// InsecureSkipVerify disables TLS certificate verification (development only)
	InsecureSkipVerify bool

	// ServerCAPath is the path to the CA certificate for verifying the server's certificate
	// If empty and InsecureSkipVerify is false, system CA pool will be used
	ServerCAPath string

	// ClientCertPath is the path to the client certificate for mTLS (optional)
	ClientCertPath string

	// ClientKeyPath is the path to the client private key for mTLS (optional)
	ClientKeyPath string

	// Timeout is the HTTP client timeout
	Timeout time.Duration
}

RemoteServerClientConfig holds configuration for RemoteServerClient

type Server

type Server struct {
	// contains filtered or unexported fields
}

func New

func New(config *Config, logger *slog.Logger) *Server

func (*Server) GetConnectionManager

func (s *Server) GetConnectionManager() *ConnectionManager

func (*Server) SendClusterAgentRequest

func (s *Server) SendClusterAgentRequest(planeName string, requestType messaging.RequestType, identifier string, payload map[string]interface{}, timeout time.Duration) (*messaging.ClusterAgentResponse, error)

func (*Server) Start

func (s *Server) Start() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL