Documentation
¶
Overview ¶
Package manager implements the Warren cluster manager node with Raft consensus.
The manager package is the control plane of Warren, responsible for cluster coordination, state management, and orchestration decisions. Managers form a highly-available quorum using the Raft consensus protocol, ensuring consistent cluster state even during network partitions or node failures.
Architecture ¶
A Warren cluster consists of 1-7 manager nodes that form a Raft quorum:
┌─────────────────────── MANAGER NODE ───────────────────────┐ │ │ │ ┌──────────────────────────────────────────────┐ │ │ │ gRPC API Server (port 8080) │ │ │ │ - 30+ methods for cluster operations │ │ │ └──────────────────┬───────────────────────────┘ │ │ │ │ │ ┌──────────────────▼───────────────────────────┐ │ │ │ Manager │ │ │ │ - Handles API requests │ │ │ │ - Proposes Raft commands │ │ │ │ - Coordinates scheduler & reconciler │ │ │ │ - Manages tokens, secrets, DNS, ingress │ │ │ └──────────────────┬───────────────────────────┘ │ │ │ │ │ ┌──────────────────▼───────────────────────────┐ │ │ │ Raft Consensus Layer │ │ │ │ - Leader election (2-3s failover) │ │ │ │ - Log replication across managers │ │ │ │ - FSM applies committed commands │ │ │ └──────────────────┬───────────────────────────┘ │ │ │ │ │ ┌──────────────────▼───────────────────────────┐ │ │ │ WarrenFSM (Finite State Machine) │ │ │ │ - Apply(): Process committed commands │ │ │ │ - Snapshot(): Create state snapshots │ │ │ │ - Restore(): Recover from snapshots │ │ │ └──────────────────┬───────────────────────────┘ │ │ │ │ │ ┌──────────────────▼───────────────────────────┐ │ │ │ BoltDB Store │ │ │ │ - Nodes, Services, Tasks │ │ │ │ - Secrets, Volumes, Certificates │ │ │ │ - Raft log and snapshots │ │ │ └────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────┘
Core Components ¶
Manager:
- Main orchestration coordinator
- Handles gRPC API requests
- Proposes Raft commands for state changes
- Manages scheduler and reconciler lifecycle
- Coordinates DNS server and ingress proxy
WarrenFSM:
- Raft finite state machine implementation
- Applies committed log entries to cluster state
- Implements snapshot/restore for fast recovery
TokenManager:
- Generates and validates join tokens
- Separate tokens for workers and managers
- Time-limited tokens with rotation support
Command:
- Encapsulates state change operations
- Types: CreateService, UpdateTask, AddNode, etc.
- Serialized as JSON in Raft log
Raft Consensus ¶
Warren uses HashiCorp's Raft library for distributed consensus.
Cluster Sizes:
- 1 manager: Development only (no HA)
- 3 managers: Production (tolerates 1 failure)
- 5 managers: High availability (tolerates 2 failures)
- 7 managers: Maximum recommended (tolerates 3 failures)
Quorum Requirements:
- Write operations require majority quorum
- Read operations served by leader (linearizable)
- Leader election typically completes in 2-3 seconds
- Network partition: Minority partition becomes read-only
Data Replication:
- All state changes replicated via Raft log
- Log entries applied to FSM in order
- Snapshots created periodically for compaction
- New managers sync via snapshot + log replay
Usage ¶
Creating a Manager:
cfg := &manager.Config{
NodeID: "manager-1",
BindAddr: "192.168.1.10:8080",
DataDir: "/var/lib/warren/manager-1",
}
mgr, err := manager.NewManager(cfg)
if err != nil {
log.Fatal(err)
}
Initializing a Cluster:
// First manager initializes the cluster
err := mgr.InitCluster()
if err != nil {
log.Fatal(err)
}
Joining Additional Managers:
// Additional managers join existing cluster
token := "manager-join-token-abc123"
err := mgr.JoinCluster("192.168.1.10:8080", token)
if err != nil {
log.Fatal(err)
}
Generating Join Tokens:
// Leader generates tokens for new nodes
workerToken, err := mgr.GenerateWorkerToken()
if err != nil {
log.Fatal(err)
}
managerToken, err := mgr.GenerateManagerToken()
if err != nil {
log.Fatal(err)
}
Proposing State Changes:
// All state changes go through Raft
cmd := &manager.Command{
Type: "create_service",
Data: serviceJSON,
}
err := mgr.ProposeCommand(cmd)
if err != nil {
log.Fatal(err)
}
Leadership ¶
Only the Raft leader can:
- Accept write operations (state changes)
- Schedule new tasks
- Generate join tokens
- Coordinate cluster operations
Followers:
- Forward writes to leader automatically
- Serve read operations (eventually consistent)
- Participate in leader election
- Replicate log entries from leader
When leader fails:
- New leader elected in 2-3 seconds
- Scheduler and reconciler start on new leader
- Workers reconnect to new leader automatically
- No service disruption (workers cache state)
State Machine Commands ¶
The FSM processes these command types:
Node Operations:
- AddNode: Register new worker node
- UpdateNodeStatus: Update node health and resources
- RemoveNode: Decommission node
Service Operations:
- CreateService: Deploy new service
- UpdateService: Modify service (triggers rolling update)
- DeleteService: Remove service and all containers
Container Operations:
- CreateContainer: Create new container instance
- UpdateContainer: Update container state and metadata
- DeleteContainer: Remove completed/failed container
Secret Operations:
- CreateSecret: Store encrypted secret
- DeleteSecret: Remove secret (if not in use)
Volume Operations:
- CreateVolume: Create persistent volume
- DeleteVolume: Remove volume (if not in use)
Certificate Operations:
- CreateCertificate: Upload TLS certificate
- UpdateCertificate: Renew certificate (Let's Encrypt)
- DeleteCertificate: Remove certificate
Ingress Operations:
- CreateIngress: Create HTTP/HTTPS ingress rule
- UpdateIngress: Modify routing rules
- DeleteIngress: Remove ingress
Failure Scenarios ¶
Manager Failure:
- If follower fails: No impact (quorum maintained)
- If leader fails: New election (2-3s downtime)
- Raft handles seamlessly
Network Partition:
- Majority partition: Continues operating (elects leader)
- Minority partition: Read-only mode (no writes accepted)
- Partition heals: Minority syncs from majority
Data Corruption:
- BoltDB checksums detect corruption
- Restore from latest snapshot
- Sync missing log entries from peers
Performance Characteristics ¶
Raft Operations:
- Write latency: ~10ms (local quorum)
- Snapshot interval: Every 10,000 log entries
- Max log size: Configurable (default 1GB)
- Heartbeat interval: 1 second
API Throughput:
- Service creation: 10/sec (linearizable writes)
- Container updates: 100/sec (batched FSM applies)
- Read operations: 1000/sec (leader serving)
Memory Usage:
- Base manager: 50MB
- Per service: ~10KB
- Per container: ~5KB
- Per node: ~2KB
- Typical 3-manager cluster: ~256MB total
Integration Points ¶
This package integrates with:
- pkg/api: Provides gRPC server implementation
- pkg/storage: Persists cluster state to BoltDB
- pkg/scheduler: Coordinates container scheduling
- pkg/reconciler: Coordinates failure detection
- pkg/security: Manages secrets encryption and CA
- pkg/dns: Provides DNS server for service discovery
- pkg/ingress: Provides HTTP/HTTPS ingress controller
- pkg/events: Publishes cluster events
Design Patterns ¶
Command Pattern:
- All state changes encapsulated as commands
- Commands serialized and replicated via Raft
- FSM applies commands to achieve state transitions
Leader Pattern:
- Single leader coordinates operations
- Followers forward writes to leader
- Automatic failover on leader failure
Token Pattern:
- Time-limited join tokens for authentication
- Separate tokens for workers and managers
- Tokens rotated periodically for security
Security ¶
Join Token Security:
- Tokens generated with cryptographic randomness
- Time-limited validity (default 1 hour)
- Separate tokens for workers and managers
- Tokens never logged or exposed in API
mTLS Support:
- Manager-to-manager: Raft over mTLS (future)
- Manager-to-worker: gRPC with TLS (future)
- Certificate rotation: Automated (future)
Secrets Encryption:
- AES-256-GCM for secret data
- Encryption key derived from cluster ID
- Keys never stored on disk unencrypted
High Availability ¶
3-Manager Cluster (Production):
- Tolerates 1 manager failure
- Requires 2/3 quorum for writes
- Recommended for production workloads
5-Manager Cluster (High Availability):
- Tolerates 2 manager failures
- Requires 3/5 quorum for writes
- Recommended for critical workloads
Best Practices:
- Deploy managers across availability zones
- Use fast, reliable network between managers
- Monitor Raft metrics (leader elections, log size)
- Regular snapshot backups
Troubleshooting ¶
Common Issues:
Leader Election Storms:
- Symptom: Frequent leader changes
- Cause: Network latency or clock skew
- Solution: Check network and NTP sync
Split Brain:
- Symptom: Multiple leaders claimed
- Cause: Network partition without quorum
- Solution: Ensure quorum is maintained (3+ managers)
Slow Writes:
- Symptom: High write latency
- Cause: Slow follower or network
- Solution: Check follower health and network latency
Large Raft Log:
- Symptom: High memory usage
- Cause: Snapshot interval too high
- Solution: Reduce snapshot threshold
Monitoring ¶
Key metrics to monitor:
Raft Health:
- raft_leader_changes: Should be low (< 1/hour)
- raft_log_entries: Should be bounded (snapshots working)
- raft_commit_time: Should be low (< 10ms)
- raft_quorum: Should always be true
Manager Health:
- manager_proposals_total: Write throughput
- manager_proposal_duration: Write latency
- manager_fsm_apply_duration: State machine performance
Resource Usage:
- process_resident_memory: Memory usage
- go_goroutines: Goroutine count
- process_cpu_seconds: CPU usage
See Also ¶
- pkg/api for gRPC server implementation
- pkg/storage for state persistence
- pkg/scheduler for container scheduling logic
- pkg/reconciler for failure detection
- docs/concepts/high-availability.md for HA setup
- docs/raft-tuning.md for Raft configuration
Index ¶
- type Command
- type Config
- type JoinToken
- type Manager
- func (m *Manager) AddVoter(nodeID, address string) error
- func (m *Manager) Apply(cmd Command) error
- func (m *Manager) Bootstrap() error
- func (m *Manager) CertToPEM(cert *tls.Certificate) (certPEM, keyPEM []byte, err error)
- func (m *Manager) CreateContainer(container *types.Container) error
- func (m *Manager) CreateIngress(ingress *types.Ingress) error
- func (m *Manager) CreateNode(node *types.Node) error
- func (m *Manager) CreateSecret(secret *types.Secret) error
- func (m *Manager) CreateService(service *types.Service) error
- func (m *Manager) CreateTLSCertificate(cert *types.TLSCertificate) error
- func (m *Manager) CreateVolume(volume *types.Volume) error
- func (m *Manager) DeleteContainer(id string) error
- func (m *Manager) DeleteIngress(ingressID string) error
- func (m *Manager) DeleteNode(id string) error
- func (m *Manager) DeleteSecret(id string) error
- func (m *Manager) DeleteService(id string) error
- func (m *Manager) DeleteTLSCertificate(certID string) error
- func (m *Manager) DeleteVolume(id string) error
- func (m *Manager) EnableACME(email string) error
- func (m *Manager) EncryptSecret(plaintext []byte) ([]byte, error)
- func (m *Manager) GenerateJoinToken(role string) (*JoinToken, error)
- func (m *Manager) GetCACertPEM() []byte
- func (m *Manager) GetClusterServers() ([]raft.Server, error)
- func (m *Manager) GetContainer(id string) (*types.Container, error)
- func (m *Manager) GetDeployer() *deploy.Deployer
- func (m *Manager) GetEventBroker() *events.Broker
- func (m *Manager) GetIngress(id string) (*types.Ingress, error)
- func (m *Manager) GetIngressByName(name string) (*types.Ingress, error)
- func (m *Manager) GetNetwork(id string) (*types.Network, error)
- func (m *Manager) GetNode(id string) (*types.Node, error)
- func (m *Manager) GetRaftStats() map[string]interface{}
- func (m *Manager) GetSecret(id string) (*types.Secret, error)
- func (m *Manager) GetSecretByName(name string) (*types.Secret, error)
- func (m *Manager) GetService(id string) (*types.Service, error)
- func (m *Manager) GetServiceByName(name string) (*types.Service, error)
- func (m *Manager) GetTLSCertificate(id string) (*types.TLSCertificate, error)
- func (m *Manager) GetTLSCertificateByName(name string) (*types.TLSCertificate, error)
- func (m *Manager) GetVolume(id string) (*types.Volume, error)
- func (m *Manager) GetVolumeByName(name string) (*types.Volume, error)
- func (m *Manager) IsLeader() bool
- func (m *Manager) IssueACMECertificate(domains []string) error
- func (m *Manager) IssueCertificate(nodeID, role string) (*tls.Certificate, error)
- func (m *Manager) Join(leaderAddr string, token string) error
- func (m *Manager) LeaderAddr() string
- func (m *Manager) ListContainers() ([]*types.Container, error)
- func (m *Manager) ListContainersByNode(nodeID string) ([]*types.Container, error)
- func (m *Manager) ListContainersByService(serviceID string) ([]*types.Container, error)
- func (m *Manager) ListIngresses() ([]*types.Ingress, error)
- func (m *Manager) ListNetworks() ([]*types.Network, error)
- func (m *Manager) ListNodes() ([]*types.Node, error)
- func (m *Manager) ListSecrets() ([]*types.Secret, error)
- func (m *Manager) ListServices() ([]*types.Service, error)
- func (m *Manager) ListTLSCertificates() ([]*types.TLSCertificate, error)
- func (m *Manager) ListVolumes() ([]*types.Volume, error)
- func (m *Manager) NodeID() string
- func (m *Manager) PublishEvent(event *events.Event)
- func (m *Manager) ReloadIngress() error
- func (m *Manager) RemoveServer(nodeID string) error
- func (m *Manager) Shutdown() error
- func (m *Manager) StartEmbeddedWorker() error
- func (m *Manager) StartIngress() error
- func (m *Manager) StopEmbeddedWorker() error
- func (m *Manager) StopIngress()
- func (m *Manager) UpdateContainer(container *types.Container) error
- func (m *Manager) UpdateIngress(ingress *types.Ingress) error
- func (m *Manager) UpdateNode(node *types.Node) error
- func (m *Manager) UpdateNodeRole(nodeID string, role types.NodeRole) error
- func (m *Manager) UpdateService(service *types.Service) error
- func (m *Manager) ValidateJoinToken(token string) (string, error)
- func (m *Manager) ValidateToken(token string) (string, error)
- type MetricsCollector
- type TokenManager
- func (tm *TokenManager) CleanupExpiredTokens()
- func (tm *TokenManager) GenerateToken(role string, duration time.Duration) (*JoinToken, error)
- func (tm *TokenManager) ListTokens() []*JoinToken
- func (tm *TokenManager) RevokeToken(token string)
- func (tm *TokenManager) ValidateToken(token string) (string, error)
- type WarrenFSM
- type WarrenSnapshot
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Command ¶
type Command struct {
Op string `json:"op"`
Data json.RawMessage `json:"data"`
}
Command represents a state change operation in the Raft log
type JoinToken ¶
type JoinToken struct {
Token string
Role string // "manager" or "worker"
CreatedAt time.Time
ExpiresAt time.Time
}
JoinToken represents a token for joining the cluster
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager represents a Warren cluster manager node
func NewManager ¶
NewManager creates a new Manager instance
func (*Manager) CertToPEM ¶
func (m *Manager) CertToPEM(cert *tls.Certificate) (certPEM, keyPEM []byte, err error)
CertToPEM converts a TLS certificate to PEM format
func (*Manager) CreateContainer ¶ added in v1.1.1
CreateContainer creates a new container
func (*Manager) CreateIngress ¶
CreateIngress creates a new ingress via Raft
func (*Manager) CreateNode ¶
CreateNode adds a node to the cluster
func (*Manager) CreateSecret ¶
CreateSecret creates a new secret (data should already be encrypted)
func (*Manager) CreateService ¶
CreateService creates a new service
func (*Manager) CreateTLSCertificate ¶
func (m *Manager) CreateTLSCertificate(cert *types.TLSCertificate) error
CreateTLSCertificate creates a new TLS certificate via Raft
func (*Manager) CreateVolume ¶
CreateVolume creates a new volume
func (*Manager) DeleteContainer ¶ added in v1.1.1
DeleteContainer removes a container
func (*Manager) DeleteIngress ¶
DeleteIngress deletes an ingress via Raft
func (*Manager) DeleteNode ¶
DeleteNode removes a node from the cluster
func (*Manager) DeleteSecret ¶
DeleteSecret removes a secret
func (*Manager) DeleteService ¶
DeleteService removes a service
func (*Manager) DeleteTLSCertificate ¶
DeleteTLSCertificate deletes a TLS certificate via Raft
func (*Manager) DeleteVolume ¶
DeleteVolume removes a volume
func (*Manager) EnableACME ¶
EnableACME initializes the ACME client for Let's Encrypt
func (*Manager) EncryptSecret ¶
EncryptSecret encrypts plaintext secret data
func (*Manager) GenerateJoinToken ¶
GenerateJoinToken generates a new join token for adding nodes
func (*Manager) GetCACertPEM ¶
GetCACertPEM returns the CA certificate in PEM format
func (*Manager) GetClusterServers ¶
GetClusterServers returns information about all servers in the Raft cluster
func (*Manager) GetContainer ¶ added in v1.1.1
GetContainer retrieves a container by ID (read from local store)
func (*Manager) GetDeployer ¶ added in v1.3.0
GetDeployer returns the deployment manager
func (*Manager) GetEventBroker ¶
GetEventBroker returns the event broker
func (*Manager) GetIngress ¶
GetIngress retrieves an ingress by ID
func (*Manager) GetIngressByName ¶
GetIngressByName retrieves an ingress by name
func (*Manager) GetNetwork ¶
GetNetwork retrieves a network by ID (read from local store)
func (*Manager) GetRaftStats ¶
GetRaftStats returns Raft statistics
func (*Manager) GetSecretByName ¶
GetSecretByName retrieves a secret by name (read from local store)
func (*Manager) GetService ¶
GetService retrieves a service by ID (read from local store)
func (*Manager) GetServiceByName ¶
GetServiceByName retrieves a service by name (read from local store)
func (*Manager) GetTLSCertificate ¶
func (m *Manager) GetTLSCertificate(id string) (*types.TLSCertificate, error)
GetTLSCertificate retrieves a TLS certificate by ID
func (*Manager) GetTLSCertificateByName ¶
func (m *Manager) GetTLSCertificateByName(name string) (*types.TLSCertificate, error)
GetTLSCertificateByName retrieves a TLS certificate by name
func (*Manager) GetVolumeByName ¶
GetVolumeByName retrieves a volume by name (read from local store)
func (*Manager) IssueACMECertificate ¶
IssueACMECertificate requests a new certificate from Let's Encrypt
func (*Manager) IssueCertificate ¶
func (m *Manager) IssueCertificate(nodeID, role string) (*tls.Certificate, error)
IssueCertificate issues a certificate for a node
func (*Manager) LeaderAddr ¶
LeaderAddr returns the address of the current Raft leader
func (*Manager) ListContainers ¶ added in v1.1.1
ListContainers returns all containers (read from local store)
func (*Manager) ListContainersByNode ¶ added in v1.1.1
ListContainersByNode returns all containers on a node (read from local store)
func (*Manager) ListContainersByService ¶ added in v1.1.1
ListContainersByService returns all containers for a service (read from local store)
func (*Manager) ListIngresses ¶
ListIngresses lists all ingresses
func (*Manager) ListNetworks ¶
ListNetworks returns all networks (read from local store)
func (*Manager) ListSecrets ¶
ListSecrets returns all secrets (read from local store)
func (*Manager) ListServices ¶
ListServices returns all services (read from local store)
func (*Manager) ListTLSCertificates ¶
func (m *Manager) ListTLSCertificates() ([]*types.TLSCertificate, error)
ListTLSCertificates lists all TLS certificates
func (*Manager) ListVolumes ¶
ListVolumes returns all volumes (read from local store)
func (*Manager) PublishEvent ¶
PublishEvent publishes an event to all subscribers
func (*Manager) ReloadIngress ¶
ReloadIngress reloads ingress rules from storage
func (*Manager) RemoveServer ¶
RemoveServer removes a server from the Raft cluster
func (*Manager) StartEmbeddedWorker ¶ added in v1.6.0
StartEmbeddedWorker starts an embedded worker in the same process (hybrid mode) This allows the manager node to also run workloads, achieving Docker Swarm-like simplicity
func (*Manager) StartIngress ¶
StartIngress starts the ingress HTTP proxy on port 80
func (*Manager) StopEmbeddedWorker ¶ added in v1.6.0
StopEmbeddedWorker stops the embedded worker (if running)
func (*Manager) UpdateContainer ¶ added in v1.1.1
UpdateContainer updates a container
func (*Manager) UpdateIngress ¶
UpdateIngress updates an ingress via Raft
func (*Manager) UpdateNode ¶
UpdateNode updates a node in the cluster
func (*Manager) UpdateNodeRole ¶ added in v1.6.0
UpdateNodeRole updates a node's role in the cluster state
func (*Manager) UpdateService ¶
UpdateService updates an existing service
func (*Manager) ValidateJoinToken ¶
ValidateJoinToken validates a join token
type MetricsCollector ¶ added in v1.3.0
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector collects metrics from the manager
func NewMetricsCollector ¶ added in v1.3.0
func NewMetricsCollector(mgr *Manager) *MetricsCollector
NewMetricsCollector creates a new metrics collector
func (*MetricsCollector) Start ¶ added in v1.3.0
func (c *MetricsCollector) Start()
Start begins collecting metrics
func (*MetricsCollector) Stop ¶ added in v1.3.0
func (c *MetricsCollector) Stop()
Stop stops the collector
type TokenManager ¶
type TokenManager struct {
// contains filtered or unexported fields
}
TokenManager manages join tokens for the cluster
func NewTokenManager ¶
func NewTokenManager() *TokenManager
NewTokenManager creates a new token manager
func (*TokenManager) CleanupExpiredTokens ¶
func (tm *TokenManager) CleanupExpiredTokens()
CleanupExpiredTokens removes expired tokens
func (*TokenManager) GenerateToken ¶
GenerateToken generates a new join token
func (*TokenManager) ListTokens ¶
func (tm *TokenManager) ListTokens() []*JoinToken
ListTokens returns all active tokens
func (*TokenManager) RevokeToken ¶
func (tm *TokenManager) RevokeToken(token string)
RevokeToken revokes a join token
func (*TokenManager) ValidateToken ¶
func (tm *TokenManager) ValidateToken(token string) (string, error)
ValidateToken validates a join token and returns its role
type WarrenFSM ¶
type WarrenFSM struct {
// contains filtered or unexported fields
}
WarrenFSM implements the Raft Finite State Machine for Warren's cluster state It applies log entries to the cluster state and handles snapshots
func NewWarrenFSM ¶
NewWarrenFSM creates a new FSM instance
func (*WarrenFSM) Apply ¶
Apply applies a Raft log entry to the FSM This is called by Raft when a log entry is committed
type WarrenSnapshot ¶
type WarrenSnapshot struct {
Nodes []*types.Node
Services []*types.Service
Containers []*types.Container
Secrets []*types.Secret
Volumes []*types.Volume
Networks []*types.Network
Ingresses []*types.Ingress
TLSCertificates []*types.TLSCertificate
}
WarrenSnapshot represents a point-in-time snapshot of cluster state
func (*WarrenSnapshot) Persist ¶
func (s *WarrenSnapshot) Persist(sink raft.SnapshotSink) error
Persist writes the snapshot to the given SnapshotSink
func (*WarrenSnapshot) Release ¶
func (s *WarrenSnapshot) Release()
Release releases the snapshot resources