api

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2025 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package api implements the Warren gRPC API server and Protocol Buffer conversions.

The api package provides the primary interface for external clients (CLI, workers, and other managers) to interact with the Warren cluster. It implements 30+ gRPC methods for cluster operations, service management, and health monitoring, with built-in mTLS support for secure communication.

Architecture

The API server is the gateway to Warren's control plane:

┌──────────────────── CLIENT (CLI/Worker) ───────────────────┐
│                                                              │
│  ┌──────────────────────────────────────────────┐          │
│  │         gRPC Client (mTLS)                    │          │
│  │  - Certificate-based authentication           │          │
│  │  - TLS 1.3 encryption                         │          │
│  └──────────────────┬───────────────────────────┘          │
└─────────────────────┼────────────────────────────────────┘
                      │ gRPC (port 8080)
                      │
┌─────────────────────▼──── MANAGER NODE ────────────────────┐
│                                                              │
│  ┌──────────────────────────────────────────────┐          │
│  │          gRPC API Server (pkg/api)            │          │
│  │  - 30+ RPC methods                            │          │
│  │  - mTLS authentication                        │          │
│  │  - Request validation                         │          │
│  │  - Metrics instrumentation                    │          │
│  └──────────────────┬───────────────────────────┘          │
│                     │                                        │
│  ┌──────────────────▼───────────────────────────┐          │
│  │              Manager                          │          │
│  │  - Processes API requests                     │          │
│  │  - Proposes Raft commands                     │          │
│  │  - Returns results                            │          │
│  └────────────────────────────────────────────────┘         │
└──────────────────────────────────────────────────────────┘

gRPC Methods

The API exposes 30+ methods organized by functional area:

Cluster Operations:

  • InitCluster: Initialize new cluster
  • GetClusterInfo: Get cluster metadata
  • JoinCluster: Join existing cluster
  • GenerateWorkerToken: Create worker join token
  • GenerateManagerToken: Create manager join token

Service Operations:

  • CreateService: Deploy new service
  • ListServices: Get all services
  • GetService: Get service details
  • UpdateService: Modify service (triggers rolling update)
  • DeleteService: Remove service and tasks
  • ScaleService: Change replica count

Task Operations:

  • ListTasks: Get all tasks
  • GetTask: Get task details
  • UpdateTask: Update task state (worker only)
  • DeleteTask: Remove task

Node Operations:

  • RegisterNode: Register new worker
  • ListNodes: Get all nodes
  • GetNode: Get node details
  • UpdateNode: Update node status (heartbeat)
  • RemoveNode: Decommission node

Secret Operations:

  • CreateSecret: Store encrypted secret
  • ListSecrets: Get all secrets
  • GetSecret: Get secret metadata
  • GetSecretData: Get encrypted secret data (worker only)
  • DeleteSecret: Remove secret

Volume Operations:

  • CreateVolume: Create persistent volume
  • ListVolumes: Get all volumes
  • GetVolume: Get volume details
  • DeleteVolume: Remove volume

Certificate Operations:

  • RequestCertificate: Request mTLS certificate (worker/CLI)
  • CreateCertificate: Upload TLS certificate (ingress)
  • ListCertificates: Get all certificates
  • GetCertificate: Get certificate details
  • UpdateCertificate: Renew certificate (Let's Encrypt)
  • DeleteCertificate: Remove certificate

Ingress Operations:

  • CreateIngress: Create HTTP/HTTPS routing rule
  • ListIngresses: Get all ingress rules
  • GetIngress: Get ingress details
  • UpdateIngress: Modify routing rules
  • DeleteIngress: Remove ingress

Health Operations:

  • ReportTaskHealth: Report health check result (worker only)
  • GetHealthStatus: Get cluster health

Protocol Buffers

The API uses Protocol Buffers for efficient serialization:

Message Types:

  • Request messages: *Request (e.g., CreateServiceRequest)
  • Response messages: *Response (e.g., CreateServiceResponse)
  • Resource messages: Service, Task, Node, Secret, Volume, etc.

Conversion Functions:

  • To protobuf: serviceToProto, taskToProto, etc.
  • From protobuf: protoToService, protoToTask, etc.

Proto Definition:

  • Located in api/proto/warren.proto
  • Generated code: api/proto/warren.pb.go
  • Generate: protoc --go_out=. --go-grpc_out=. warren.proto

Usage

Creating and Starting the Server:

import (
	"github.com/cuemby/warren/pkg/api"
	"github.com/cuemby/warren/pkg/manager"
)

// Create manager
mgr, err := manager.NewManager(cfg)
if err != nil {
	log.Fatal(err)
}

// Create API server with mTLS
srv, err := api.NewServer(mgr)
if err != nil {
	log.Fatal(err)
}

// Start server (blocks until stopped)
err = srv.Start("0.0.0.0:8080")
if err != nil {
	log.Fatal(err)
}

Handling RPC Requests:

func (s *Server) CreateService(ctx context.Context, req *proto.CreateServiceRequest) (*proto.CreateServiceResponse, error) {
	// 1. Ensure this node is the leader
	if err := s.ensureLeader(); err != nil {
		return nil, err
	}

	// 2. Validate request
	if req.Name == "" {
		return nil, fmt.Errorf("service name required")
	}

	// 3. Convert proto to types
	service := protoToService(req)

	// 4. Process via manager (Raft)
	err := s.manager.CreateService(service)
	if err != nil {
		return nil, err
	}

	// 5. Convert types to proto and return
	return &proto.CreateServiceResponse{
		Service: serviceToProto(service),
	}, nil
}

mTLS Authentication

The API server requires mTLS for secure communication:

Certificate Types:

  • Manager certificates: Issued by cluster CA
  • Worker certificates: Issued by cluster CA
  • CLI certificates: Issued by cluster CA (optional)
  • Ingress certificates: User-provided or Let's Encrypt

Certificate Request Flow:

  1. Client connects without certificate
  2. Calls RequestCertificate with join token
  3. Manager validates token
  4. CA issues certificate (signed by cluster CA)
  5. Client saves certificate to disk
  6. Client reconnects with mTLS

Authentication Modes:

  • RequireAndVerifyClientCert: For established workers (not used)
  • RequestClientCert: For initial connections (current)
  • Per-RPC verification: Some RPCs check certificates explicitly

Leader Forwarding

Write operations require the Raft leader:

Leader Check:

// All write operations check leadership
func (s *Server) CreateService(...) error {
	if err := s.ensureLeader(); err != nil {
		return err  // "not the leader, current leader is at: X"
	}
	// Process request...
}

Client Behavior:

  • Client receives "not the leader" error
  • Error includes leader address
  • Client can retry on leader
  • CLI handles automatically

Read Operations:

  • Can be served by any manager
  • Leader serves linearizable reads
  • Followers serve eventually consistent reads

Request Validation

All requests are validated before processing:

Common Validations:

  • Required fields non-empty
  • IDs in valid UUID format
  • Names follow naming rules (alphanumeric, dashes)
  • Replicas > 0 for replicated services
  • Resource limits reasonable
  • References exist (service, volume, secret)

Error Responses:

  • Invalid input: gRPC status InvalidArgument
  • Not found: gRPC status NotFound
  • Already exists: gRPC status AlreadyExists
  • Not leader: gRPC status FailedPrecondition
  • Internal error: gRPC status Internal

Metrics Instrumentation

All RPC methods are instrumented:

Request Metrics:

  • api_requests_total{method, status}: Request count
  • api_request_duration_seconds{method}: Request latency
  • api_requests_in_flight{method}: Concurrent requests

Example Metrics:

api_requests_total{method="CreateService",status="success"} 100
api_requests_total{method="CreateService",status="error"} 5
api_request_duration_seconds{method="CreateService",quantile="0.5"} 0.010
api_request_duration_seconds{method="CreateService",quantile="0.99"} 0.050

Performance Characteristics

Request Latency:

  • Read operations: 1-5ms (local state)
  • Write operations: 10-50ms (Raft consensus)
  • List operations: 5-20ms (depends on count)

Throughput:

  • Max requests/sec: ~1000 (single manager)
  • Max writes/sec: ~100 (Raft limited)
  • Max reads/sec: ~10,000 (parallelizable)

Resource Usage:

  • Memory per connection: ~100KB
  • CPU per request: <1ms
  • Max concurrent connections: 10,000+

Integration Points

This package integrates with:

  • pkg/manager: Processes all API requests
  • pkg/security: Provides mTLS certificates and CA
  • pkg/metrics: Instruments all RPC methods
  • pkg/types: Core data model
  • api/proto: Protocol Buffer definitions
  • pkg/client: Go client implementation

Design Patterns

Gateway Pattern:

  • API server is gateway to cluster
  • All external communication via gRPC
  • Internal communication can bypass API

Conversion Pattern:

  • Protobuf ↔ Internal types separation
  • Conversion functions in both directions
  • Keeps internal types independent

Validation Pattern:

  • Validate at API boundary
  • Fail fast with clear errors
  • Prevent invalid state in cluster

Security

mTLS Configuration:

  • TLS 1.3 only (rejects older versions)
  • Client certificates verified against CA
  • Server certificate from cluster CA
  • Certificates rotated periodically

Authorization:

  • Certificate-based identity (CN = node ID)
  • Per-RPC authorization checks (future)
  • Admin operations require manager certificate
  • Worker operations require worker certificate

Join Token Security:

  • Tokens validated on RequestCertificate
  • Tokens expire after use or timeout
  • Tokens grant appropriate certificate type
  • Token generation requires leader

Error Handling

Error Strategy:

  • Use gRPC status codes
  • Include helpful error messages
  • Log errors server-side
  • Return structured errors

Common Errors:

  • "not the leader, current leader is at: X"
  • "service already exists: nginx"
  • "node not found: worker-1"
  • "insufficient resources on node: worker-2"

Backward Compatibility

Proto Evolution:

  • Add new fields with defaults
  • Never remove fields (deprecate instead)
  • Use optional fields for new features
  • Version check in InitCluster

API Versioning:

  • Currently single version (v1 implicit)
  • Future: Multiple service versions
  • Deprecation: Grace period + warnings

Troubleshooting

Common Issues:

Connection Refused:

  • Manager not running or wrong address
  • Firewall blocking port 8080
  • TLS handshake failure (certificate issue)

Certificate Errors:

  • Certificate not found: Run RequestCertificate
  • Certificate expired: Re-request certificate
  • CA mismatch: Rejoin cluster

"Not the leader" Errors:

  • Normal during leader election
  • Retry automatically or manually
  • Check leader with GetClusterInfo

Timeout Errors:

  • Network latency too high
  • Manager overloaded (check metrics)
  • Raft quorum lost (check cluster health)

Monitoring

Key metrics to monitor:

API Health:

  • api_requests_total: Request rate
  • api_request_duration: Latency
  • api_errors_total: Error rate

Connection Health:

  • grpc_server_started_total: Connection rate
  • grpc_server_handled_total: Completion rate
  • grpc_server_handling_seconds: RPC duration

TLS Health:

  • tls_handshake_failures: Certificate issues
  • certificate_expiry_seconds: Time until renewal

See Also

  • pkg/manager for request processing
  • pkg/client for Go client implementation
  • api/proto/warren.proto for API definition
  • pkg/security for mTLS setup
  • docs/api-reference.md for complete API docs
  • .agent/System/api-documentation.md for detailed reference

Index

Constants

View Source
const (
	// DefaultUnixSocket is the default path for the Unix socket
	DefaultUnixSocket = "/var/run/warren.sock"
	// UnixSocketPermissions are the file permissions for the Unix socket (rw-rw----)
	UnixSocketPermissions = 0660
)

Variables

This section is empty.

Functions

func ReadOnlyInterceptor added in v1.4.0

func ReadOnlyInterceptor() grpc.UnaryServerInterceptor

ReadOnlyInterceptor creates a gRPC unary interceptor that only allows read-only operations. This is used for the Unix socket listener to prevent write operations from local CLI.

Types

type HealthResponse added in v1.4.0

type HealthResponse struct {
	Status    string    `json:"status"`
	Timestamp time.Time `json:"timestamp"`
	Version   string    `json:"version,omitempty"`
}

HealthResponse represents the health check response

type HealthServer added in v1.4.0

type HealthServer struct {
	// contains filtered or unexported fields
}

HealthServer provides HTTP health check endpoints

func NewHealthServer added in v1.4.0

func NewHealthServer(mgr *manager.Manager) *HealthServer

NewHealthServer creates a new health check HTTP server

func (*HealthServer) GetHandler added in v1.4.0

func (hs *HealthServer) GetHandler() http.Handler

GetHandler returns the HTTP handler for embedding in other servers

func (*HealthServer) Start added in v1.4.0

func (hs *HealthServer) Start(addr string) error

Start starts the health check HTTP server

type ReadyResponse added in v1.4.0

type ReadyResponse struct {
	Status    string            `json:"status"`
	Timestamp time.Time         `json:"timestamp"`
	Checks    map[string]string `json:"checks"`
	Message   string            `json:"message,omitempty"`
}

ReadyResponse represents the readiness check response

type Server

type Server struct {
	proto.UnimplementedWarrenAPIServer
	// contains filtered or unexported fields
}

Server implements the WarrenAPI gRPC service

func NewServer

func NewServer(mgr *manager.Manager) (*Server, error)

NewServer creates a new API server with mTLS

func (*Server) CreateIngress

CreateIngress creates a new ingress

func (*Server) CreateSecret

CreateSecret creates a new secret

func (*Server) CreateService

CreateService creates a new service

func (*Server) CreateTLSCertificate

CreateTLSCertificate creates a new TLS certificate

func (*Server) CreateVolume

CreateVolume creates a new volume

func (*Server) DeleteIngress

DeleteIngress deletes an ingress

func (*Server) DeleteSecret

DeleteSecret deletes a secret

func (*Server) DeleteService

DeleteService deletes a service

func (*Server) DeleteTLSCertificate

DeleteTLSCertificate deletes a TLS certificate

func (*Server) DeleteVolume

DeleteVolume deletes a volume

func (*Server) GenerateJoinToken

GenerateJoinToken generates a join token for adding nodes

func (*Server) GetClusterInfo

GetClusterInfo returns information about the Raft cluster

func (*Server) GetContainer added in v1.1.1

GetContainer returns a specific container

func (*Server) GetIngress

GetIngress retrieves an ingress

func (*Server) GetNode

GetNode returns a specific node by ID

func (*Server) GetSecretByName

GetSecretByName retrieves a secret by name (includes encrypted data for workers)

func (*Server) GetService

GetService returns a specific service

func (*Server) GetTLSCertificate

GetTLSCertificate retrieves a TLS certificate

func (*Server) GetVolumeByName

GetVolumeByName retrieves a volume by name

func (*Server) Heartbeat

Heartbeat processes heartbeat from a worker node

func (*Server) JoinCluster

JoinCluster handles a manager join request

func (*Server) ListContainers added in v1.1.1

ListContainers returns containers, optionally filtered by service or node

func (*Server) ListIngresses

ListIngresses lists all ingresses

func (*Server) ListNodes

ListNodes returns all nodes in the cluster

func (*Server) ListSecrets

ListSecrets returns all secrets (without data)

func (*Server) ListServices

ListServices returns all services

func (*Server) ListTLSCertificates

ListTLSCertificates lists all TLS certificates

func (*Server) ListVolumes

ListVolumes returns all volumes

func (*Server) RegisterNode

RegisterNode registers a new node (worker or manager) with the cluster

func (*Server) RemoveNode

RemoveNode removes a node from the cluster

func (*Server) ReportContainerHealth added in v1.1.1

ReportContainerHealth reports the health status of a container

func (*Server) RequestCertificate

RequestCertificate issues a certificate for a node joining the cluster

func (*Server) RollbackService added in v1.3.0

RollbackService rolls back a service to the previous version

func (*Server) Start

func (s *Server) Start(addr string) error

Start starts the TCP gRPC server with mTLS

func (*Server) StartUnix added in v1.4.0

func (s *Server) StartUnix() error

StartUnix starts the Unix socket listener (read-only, no mTLS)

func (*Server) Stop

func (s *Server) Stop()

Stop gracefully stops both gRPC servers

func (*Server) StreamEvents

StreamEvents streams cluster events to the client TODO: Complete implementation

func (*Server) UpdateContainerStatus added in v1.1.1

UpdateTaskStatus updates the status of a task

func (*Server) UpdateIngress

UpdateIngress updates an existing ingress

func (*Server) UpdateService

UpdateService updates an existing service

func (*Server) UpdateServiceImage added in v1.3.0

UpdateServiceImage updates a service with a new image using specified deployment strategy

func (*Server) WatchContainers added in v1.1.1

WatchContainers streams container events to a worker node

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL