Documentation
¶
Overview ¶
Package grpc provides the gRPC server implementation for the control plane.
Package grpc provides the gRPC server implementation for the control plane.
Index ¶
- func NodeIDFromContext(ctx context.Context) (string, bool)
- type AuthService
- type Config
- type HealthChecker
- type NodeConnection
- func (nc *NodeConnection) AddDeployment(deploymentID string)
- func (nc *NodeConnection) GetLastHeartbeat() time.Time
- func (nc *NodeConnection) GetStatus() NodeStatus
- func (nc *NodeConnection) HasDeployment(deploymentID string) bool
- func (nc *NodeConnection) IsDraining() bool
- func (nc *NodeConnection) RemoveDeployment(deploymentID string)
- func (nc *NodeConnection) SetDraining(draining bool)
- func (nc *NodeConnection) SetStatus(status NodeStatus)
- func (nc *NodeConnection) UpdateHeartbeat(info *pb.NodeInfo)
- type NodeManager
- func (m *NodeManager) BroadcastCommand(ctx context.Context, cmd *pb.DeploymentCommand) error
- func (m *NodeManager) CalculateNodeStatus(timeSinceHeartbeat time.Duration) NodeStatus
- func (m *NodeManager) ConnectionCount() int
- func (m *NodeManager) GetAvailableNodes() []*NodeConnection
- func (m *NodeManager) GetConnectedNodeIDs() []string
- func (m *NodeManager) GetConnection(nodeID string) (*NodeConnection, bool)
- func (m *NodeManager) GetHealthyNodes() []*NodeConnection
- func (m *NodeManager) GetNodeStatus(nodeID string) NodeStatus
- func (m *NodeManager) IsNodeDraining(nodeID string) bool
- func (m *NodeManager) MarkDeploymentComplete(nodeID, deploymentID string)
- func (m *NodeManager) RegisterConnection(nodeID string, stream pb.ControlPlaneService_WatchCommandsServer) error
- func (m *NodeManager) SendCommand(ctx context.Context, nodeID string, cmd *pb.DeploymentCommand) error
- func (m *NodeManager) SetNodeDraining(nodeID string, draining bool)
- func (m *NodeManager) StartHealthChecker(ctx context.Context)
- func (m *NodeManager) Stop()
- func (m *NodeManager) UnregisterConnection(nodeID string)
- func (m *NodeManager) UpdateHeartbeat(nodeID string, info *pb.NodeInfo)
- type NodeManagerConfig
- type NodeStatus
- type Server
- func (s *Server) Check(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error)
- func (s *Server) Heartbeat(ctx context.Context, req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
- func (s *Server) IsServing() bool
- func (s *Server) NodeManager() *NodeManager
- func (s *Server) PushLogs(stream pb.ControlPlaneService_PushLogsServer) error
- func (s *Server) Register(ctx context.Context, req *pb.RegisterRequest) (*pb.RegisterResponse, error)
- func (s *Server) ReportStatus(ctx context.Context, req *pb.StatusReport) (*pb.StatusResponse, error)
- func (s *Server) SetHealthChecker(hc HealthChecker)
- func (s *Server) SetNodeManager(nm *NodeManager)
- func (s *Server) Start(ctx context.Context) error
- func (s *Server) Stop(ctx context.Context) error
- func (s *Server) Watch(req *pb.HealthCheckRequest, stream pb.Health_WatchServer) error
- func (s *Server) WatchCommands(req *pb.WatchCommandsRequest, ...) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AuthService ¶
AuthService defines the interface for authentication operations.
type Config ¶
type Config struct {
Port int
TLSCertFile string
TLSKeyFile string
MaxConcurrentStreams uint32
KeepaliveTime time.Duration
KeepaliveTimeout time.Duration
MaxRecvMsgSize int
}
Config holds the gRPC server configuration.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with sensible defaults.
type HealthChecker ¶
HealthChecker defines the interface for checking service health.
type NodeConnection ¶
type NodeConnection struct {
NodeID string
Info *pb.NodeInfo
CommandStream pb.ControlPlaneService_WatchCommandsServer
LastHeartbeat time.Time
Status NodeStatus
// contains filtered or unexported fields
}
NodeConnection represents an active connection to a node agent.
func (*NodeConnection) AddDeployment ¶
func (nc *NodeConnection) AddDeployment(deploymentID string)
AddDeployment marks a deployment as active on this node.
func (*NodeConnection) GetLastHeartbeat ¶
func (nc *NodeConnection) GetLastHeartbeat() time.Time
GetLastHeartbeat returns the last heartbeat time.
func (*NodeConnection) GetStatus ¶
func (nc *NodeConnection) GetStatus() NodeStatus
GetStatus returns the current node status.
func (*NodeConnection) HasDeployment ¶
func (nc *NodeConnection) HasDeployment(deploymentID string) bool
HasDeployment checks if a deployment is active on this node.
func (*NodeConnection) IsDraining ¶
func (nc *NodeConnection) IsDraining() bool
IsDraining returns whether the node is draining.
func (*NodeConnection) RemoveDeployment ¶
func (nc *NodeConnection) RemoveDeployment(deploymentID string)
RemoveDeployment removes a deployment from the active set.
func (*NodeConnection) SetDraining ¶
func (nc *NodeConnection) SetDraining(draining bool)
SetDraining sets the draining flag for the node.
func (*NodeConnection) SetStatus ¶
func (nc *NodeConnection) SetStatus(status NodeStatus)
SetStatus sets the node status.
func (*NodeConnection) UpdateHeartbeat ¶
func (nc *NodeConnection) UpdateHeartbeat(info *pb.NodeInfo)
UpdateHeartbeat updates the last heartbeat time and node info.
type NodeManager ¶
type NodeManager struct {
// contains filtered or unexported fields
}
NodeManager manages active node connections and their command streams.
func NewNodeManager ¶
func NewNodeManager(st store.Store, logger *slog.Logger) *NodeManager
NewNodeManager creates a new NodeManager instance.
func NewNodeManagerWithConfig ¶
func NewNodeManagerWithConfig(st store.Store, logger *slog.Logger, cfg *NodeManagerConfig) *NodeManager
NewNodeManagerWithConfig creates a new NodeManager with custom configuration.
func (*NodeManager) BroadcastCommand ¶
func (m *NodeManager) BroadcastCommand(ctx context.Context, cmd *pb.DeploymentCommand) error
BroadcastCommand sends a command to all connected nodes.
func (*NodeManager) CalculateNodeStatus ¶
func (m *NodeManager) CalculateNodeStatus(timeSinceHeartbeat time.Duration) NodeStatus
CalculateNodeStatus determines the node status based on time since last heartbeat. This is exported for testing purposes.
func (*NodeManager) ConnectionCount ¶
func (m *NodeManager) ConnectionCount() int
ConnectionCount returns the number of active connections.
func (*NodeManager) GetAvailableNodes ¶
func (m *NodeManager) GetAvailableNodes() []*NodeConnection
GetAvailableNodes returns all nodes that are healthy and not draining. These are the nodes that can accept new deployments.
func (*NodeManager) GetConnectedNodeIDs ¶
func (m *NodeManager) GetConnectedNodeIDs() []string
GetConnectedNodeIDs returns a list of all connected node IDs.
func (*NodeManager) GetConnection ¶
func (m *NodeManager) GetConnection(nodeID string) (*NodeConnection, bool)
GetConnection returns a node's connection if it exists.
func (*NodeManager) GetHealthyNodes ¶
func (m *NodeManager) GetHealthyNodes() []*NodeConnection
GetHealthyNodes returns all nodes that are in healthy status.
func (*NodeManager) GetNodeStatus ¶
func (m *NodeManager) GetNodeStatus(nodeID string) NodeStatus
GetNodeStatus returns the status of a specific node.
func (*NodeManager) IsNodeDraining ¶
func (m *NodeManager) IsNodeDraining(nodeID string) bool
IsNodeDraining returns whether a node is draining.
func (*NodeManager) MarkDeploymentComplete ¶
func (m *NodeManager) MarkDeploymentComplete(nodeID, deploymentID string)
MarkDeploymentComplete marks a deployment as no longer active on a node.
func (*NodeManager) RegisterConnection ¶
func (m *NodeManager) RegisterConnection(nodeID string, stream pb.ControlPlaneService_WatchCommandsServer) error
RegisterConnection registers a node's command stream.
func (*NodeManager) SendCommand ¶
func (m *NodeManager) SendCommand(ctx context.Context, nodeID string, cmd *pb.DeploymentCommand) error
SendCommand sends a deployment command to a specific node. Returns ALREADY_EXISTS if trying to deploy a deployment that's already running. Returns UNAVAILABLE if the node is draining or down.
func (*NodeManager) SetNodeDraining ¶
func (m *NodeManager) SetNodeDraining(nodeID string, draining bool)
SetNodeDraining sets the draining flag for a node. When a node is draining, it will not receive new deployment commands. Requirement: 15.5
func (*NodeManager) StartHealthChecker ¶
func (m *NodeManager) StartHealthChecker(ctx context.Context)
StartHealthChecker starts the background health checker goroutine.
func (*NodeManager) Stop ¶
func (m *NodeManager) Stop()
Stop stops the NodeManager and waits for goroutines to finish.
func (*NodeManager) UnregisterConnection ¶
func (m *NodeManager) UnregisterConnection(nodeID string)
UnregisterConnection removes a node's connection.
func (*NodeManager) UpdateHeartbeat ¶
func (m *NodeManager) UpdateHeartbeat(nodeID string, info *pb.NodeInfo)
UpdateHeartbeat updates a node's heartbeat timestamp and info.
type NodeManagerConfig ¶
type NodeManagerConfig struct {
HealthCheckInterval time.Duration
DegradedThreshold time.Duration
DownThreshold time.Duration
}
NodeManagerConfig holds configuration for the NodeManager.
func DefaultNodeManagerConfig ¶
func DefaultNodeManagerConfig() *NodeManagerConfig
DefaultNodeManagerConfig returns default configuration values.
type NodeStatus ¶
type NodeStatus int
NodeStatus represents the health status of a node.
const ( // NodeStatusHealthy indicates the node is responding normally. NodeStatusHealthy NodeStatus = iota // NodeStatusDegraded indicates the node has missed some heartbeats. NodeStatusDegraded // NodeStatusDown indicates the node is unresponsive. NodeStatusDown )
func (NodeStatus) String ¶
func (s NodeStatus) String() string
String returns the string representation of NodeStatus.
type Server ¶
type Server struct {
pb.UnimplementedControlPlaneServiceServer
pb.UnimplementedHealthServer
// contains filtered or unexported fields
}
Server implements the gRPC server for the control plane.
func NewServer ¶
func NewServer(cfg *Config, st store.Store, authSvc AuthService, logger *slog.Logger) (*Server, error)
NewServer creates a new gRPC server instance.
func (*Server) Check ¶
func (s *Server) Check(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error)
Check implements the gRPC Health Check protocol. Requirements: 13.1, 13.2, 13.3, 13.4, 13.5
func (*Server) Heartbeat ¶
func (s *Server) Heartbeat(ctx context.Context, req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
Heartbeat handles heartbeat requests from nodes. Requirements: 2.1, 2.2, 2.3, 15.5
func (*Server) NodeManager ¶
func (s *Server) NodeManager() *NodeManager
NodeManager returns the server's NodeManager instance.
func (*Server) PushLogs ¶
func (s *Server) PushLogs(stream pb.ControlPlaneService_PushLogsServer) error
PushLogs handles log streaming from nodes. Requirements: 4.2, 4.4
func (*Server) Register ¶
func (s *Server) Register(ctx context.Context, req *pb.RegisterRequest) (*pb.RegisterResponse, error)
Register handles node registration requests. Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6
func (*Server) ReportStatus ¶
func (s *Server) ReportStatus(ctx context.Context, req *pb.StatusReport) (*pb.StatusResponse, error)
ReportStatus handles deployment status reports from nodes. Requirements: 5.1, 5.2, 5.3, 5.4
func (*Server) SetHealthChecker ¶
func (s *Server) SetHealthChecker(hc HealthChecker)
SetHealthChecker sets the health checker for the server.
func (*Server) SetNodeManager ¶
func (s *Server) SetNodeManager(nm *NodeManager)
SetNodeManager sets a custom NodeManager (useful for testing).
func (*Server) Watch ¶
func (s *Server) Watch(req *pb.HealthCheckRequest, stream pb.Health_WatchServer) error
Watch implements the streaming health check protocol. Requirements: 13.1
func (*Server) WatchCommands ¶
func (s *Server) WatchCommands(req *pb.WatchCommandsRequest, stream pb.ControlPlaneService_WatchCommandsServer) error
WatchCommands handles the command streaming to nodes. Requirements: 3.1, 3.2, 3.3, 3.4