grpc

package
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package grpc provides the gRPC server implementation for the control plane.

Package grpc provides the gRPC server implementation for the control plane.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NodeIDFromContext

func NodeIDFromContext(ctx context.Context) (string, bool)

NodeIDFromContext extracts the node ID from the context.

Types

type AuthService

type AuthService interface {
	ValidateToken(tokenString string) (*auth.Claims, error)
}

AuthService defines the interface for authentication operations.

type Config

type Config struct {
	Port                 int
	TLSCertFile          string
	TLSKeyFile           string
	MaxConcurrentStreams uint32
	KeepaliveTime        time.Duration
	KeepaliveTimeout     time.Duration
	MaxRecvMsgSize       int
}

Config holds the gRPC server configuration.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with sensible defaults.

type HealthChecker

type HealthChecker interface {
	CheckPostgres(ctx context.Context) error
}

HealthChecker defines the interface for checking service health.

type NodeConnection

type NodeConnection struct {
	NodeID        string
	Info          *pb.NodeInfo
	CommandStream pb.ControlPlaneService_WatchCommandsServer
	LastHeartbeat time.Time
	Status        NodeStatus
	// contains filtered or unexported fields
}

NodeConnection represents an active connection to a node agent.

func (*NodeConnection) AddDeployment

func (nc *NodeConnection) AddDeployment(deploymentID string)

AddDeployment marks a deployment as active on this node.

func (*NodeConnection) GetLastHeartbeat

func (nc *NodeConnection) GetLastHeartbeat() time.Time

GetLastHeartbeat returns the last heartbeat time.

func (*NodeConnection) GetStatus

func (nc *NodeConnection) GetStatus() NodeStatus

GetStatus returns the current node status.

func (*NodeConnection) HasDeployment

func (nc *NodeConnection) HasDeployment(deploymentID string) bool

HasDeployment checks if a deployment is active on this node.

func (*NodeConnection) IsDraining

func (nc *NodeConnection) IsDraining() bool

IsDraining returns whether the node is draining.

func (*NodeConnection) RemoveDeployment

func (nc *NodeConnection) RemoveDeployment(deploymentID string)

RemoveDeployment removes a deployment from the active set.

func (*NodeConnection) SetDraining

func (nc *NodeConnection) SetDraining(draining bool)

SetDraining sets the draining flag for the node.

func (*NodeConnection) SetStatus

func (nc *NodeConnection) SetStatus(status NodeStatus)

SetStatus sets the node status.

func (*NodeConnection) UpdateHeartbeat

func (nc *NodeConnection) UpdateHeartbeat(info *pb.NodeInfo)

UpdateHeartbeat updates the last heartbeat time and node info.

type NodeManager

type NodeManager struct {
	// contains filtered or unexported fields
}

NodeManager manages active node connections and their command streams.

func NewNodeManager

func NewNodeManager(st store.Store, logger *slog.Logger) *NodeManager

NewNodeManager creates a new NodeManager instance.

func NewNodeManagerWithConfig

func NewNodeManagerWithConfig(st store.Store, logger *slog.Logger, cfg *NodeManagerConfig) *NodeManager

NewNodeManagerWithConfig creates a new NodeManager with custom configuration.

func (*NodeManager) BroadcastCommand

func (m *NodeManager) BroadcastCommand(ctx context.Context, cmd *pb.DeploymentCommand) error

BroadcastCommand sends a command to all connected nodes.

func (*NodeManager) CalculateNodeStatus

func (m *NodeManager) CalculateNodeStatus(timeSinceHeartbeat time.Duration) NodeStatus

CalculateNodeStatus determines the node status based on time since last heartbeat. This is exported for testing purposes.

func (*NodeManager) ConnectionCount

func (m *NodeManager) ConnectionCount() int

ConnectionCount returns the number of active connections.

func (*NodeManager) GetAvailableNodes

func (m *NodeManager) GetAvailableNodes() []*NodeConnection

GetAvailableNodes returns all nodes that are healthy and not draining. These are the nodes that can accept new deployments.

func (*NodeManager) GetConnectedNodeIDs

func (m *NodeManager) GetConnectedNodeIDs() []string

GetConnectedNodeIDs returns a list of all connected node IDs.

func (*NodeManager) GetConnection

func (m *NodeManager) GetConnection(nodeID string) (*NodeConnection, bool)

GetConnection returns a node's connection if it exists.

func (*NodeManager) GetHealthyNodes

func (m *NodeManager) GetHealthyNodes() []*NodeConnection

GetHealthyNodes returns all nodes that are in healthy status.

func (*NodeManager) GetNodeStatus

func (m *NodeManager) GetNodeStatus(nodeID string) NodeStatus

GetNodeStatus returns the status of a specific node.

func (*NodeManager) IsNodeDraining

func (m *NodeManager) IsNodeDraining(nodeID string) bool

IsNodeDraining returns whether a node is draining.

func (*NodeManager) MarkDeploymentComplete

func (m *NodeManager) MarkDeploymentComplete(nodeID, deploymentID string)

MarkDeploymentComplete marks a deployment as no longer active on a node.

func (*NodeManager) RegisterConnection

func (m *NodeManager) RegisterConnection(nodeID string, stream pb.ControlPlaneService_WatchCommandsServer) error

RegisterConnection registers a node's command stream.

func (*NodeManager) SendCommand

func (m *NodeManager) SendCommand(ctx context.Context, nodeID string, cmd *pb.DeploymentCommand) error

SendCommand sends a deployment command to a specific node. Returns ALREADY_EXISTS if trying to deploy a deployment that's already running. Returns UNAVAILABLE if the node is draining or down.

func (*NodeManager) SetNodeDraining

func (m *NodeManager) SetNodeDraining(nodeID string, draining bool)

SetNodeDraining sets the draining flag for a node. When a node is draining, it will not receive new deployment commands. Requirement: 15.5

func (*NodeManager) StartHealthChecker

func (m *NodeManager) StartHealthChecker(ctx context.Context)

StartHealthChecker starts the background health checker goroutine.

func (*NodeManager) Stop

func (m *NodeManager) Stop()

Stop stops the NodeManager and waits for goroutines to finish.

func (*NodeManager) UnregisterConnection

func (m *NodeManager) UnregisterConnection(nodeID string)

UnregisterConnection removes a node's connection.

func (*NodeManager) UpdateHeartbeat

func (m *NodeManager) UpdateHeartbeat(nodeID string, info *pb.NodeInfo)

UpdateHeartbeat updates a node's heartbeat timestamp and info.

type NodeManagerConfig

type NodeManagerConfig struct {
	HealthCheckInterval time.Duration
	DegradedThreshold   time.Duration
	DownThreshold       time.Duration
}

NodeManagerConfig holds configuration for the NodeManager.

func DefaultNodeManagerConfig

func DefaultNodeManagerConfig() *NodeManagerConfig

DefaultNodeManagerConfig returns default configuration values.

type NodeStatus

type NodeStatus int

NodeStatus represents the health status of a node.

const (
	// NodeStatusHealthy indicates the node is responding normally.
	NodeStatusHealthy NodeStatus = iota
	// NodeStatusDegraded indicates the node has missed some heartbeats.
	NodeStatusDegraded
	// NodeStatusDown indicates the node is unresponsive.
	NodeStatusDown
)

func (NodeStatus) String

func (s NodeStatus) String() string

String returns the string representation of NodeStatus.

type Server

type Server struct {
	pb.UnimplementedControlPlaneServiceServer
	pb.UnimplementedHealthServer
	// contains filtered or unexported fields
}

Server implements the gRPC server for the control plane.

func NewServer

func NewServer(cfg *Config, st store.Store, authSvc AuthService, logger *slog.Logger) (*Server, error)

NewServer creates a new gRPC server instance.

func (*Server) Check

Check implements the gRPC Health Check protocol. Requirements: 13.1, 13.2, 13.3, 13.4, 13.5

func (*Server) Heartbeat

func (s *Server) Heartbeat(ctx context.Context, req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)

Heartbeat handles heartbeat requests from nodes. Requirements: 2.1, 2.2, 2.3, 15.5

func (*Server) IsServing

func (s *Server) IsServing() bool

IsServing returns whether the server is currently serving requests.

func (*Server) NodeManager

func (s *Server) NodeManager() *NodeManager

NodeManager returns the server's NodeManager instance.

func (*Server) PushLogs

func (s *Server) PushLogs(stream pb.ControlPlaneService_PushLogsServer) error

PushLogs handles log streaming from nodes. Requirements: 4.2, 4.4

func (*Server) Register

func (s *Server) Register(ctx context.Context, req *pb.RegisterRequest) (*pb.RegisterResponse, error)

Register handles node registration requests. Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6

func (*Server) ReportStatus

func (s *Server) ReportStatus(ctx context.Context, req *pb.StatusReport) (*pb.StatusResponse, error)

ReportStatus handles deployment status reports from nodes. Requirements: 5.1, 5.2, 5.3, 5.4

func (*Server) SetHealthChecker

func (s *Server) SetHealthChecker(hc HealthChecker)

SetHealthChecker sets the health checker for the server.

func (*Server) SetNodeManager

func (s *Server) SetNodeManager(nm *NodeManager)

SetNodeManager sets a custom NodeManager (useful for testing).

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start starts the gRPC server.

func (*Server) Stop

func (s *Server) Stop(ctx context.Context) error

Stop gracefully stops the gRPC server.

func (*Server) Watch

func (s *Server) Watch(req *pb.HealthCheckRequest, stream pb.Health_WatchServer) error

Watch implements the streaming health check protocol. Requirements: 13.1

func (*Server) WatchCommands

WatchCommands handles the command streaming to nodes. Requirements: 3.1, 3.2, 3.3, 3.4

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL