Documentation
¶
Overview ¶
Package storage provides BoltDB-backed state persistence for Warren's cluster data.
The storage package implements the Store interface using BoltDB as the underlying database, providing ACID transactions for cluster state including nodes, services, tasks, secrets, volumes, certificates, and ingresses. All data is serialized as JSON and stored in separate buckets for efficient querying and isolation.
Architecture ¶
Warren uses BoltDB (bbolt) for embedded, transactional storage with zero external dependencies:
┌──────────────────── BOLTDB STORAGE ──────────────────────┐ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ BoltStore │ │ │ │ - File: <dataDir>/warren.db │ │ │ │ - Format: B+tree with MVCC │ │ │ │ - Transactions: ACID with fsync │ │ │ └──────────────────┬─────────────────────────┘ │ │ │ │ │ ┌──────────────────▼─────────────────────────┐ │ │ │ Bucket Structure │ │ │ │ ┌────────────────────────────┐ │ │ │ │ │ nodes (Node ID) │ │ │ │ │ │ services (Service ID)│ │ │ │ │ │ tasks (Task ID) │ │ │ │ │ │ secrets (Secret ID) │ │ │ │ │ │ volumes (Volume ID) │ │ │ │ │ │ networks (Network ID)│ │ │ │ │ │ ca (fixed key) │ │ │ │ │ │ ingresses (Ingress ID)│ │ │ │ │ │ tls_certificates (Cert ID) │ │ │ │ │ └────────────────────────────┘ │ │ │ └──────────────────┬─────────────────────────┘ │ │ │ │ │ ┌──────────────────▼─────────────────────────┐ │ │ │ Transaction Management │ │ │ │ - Read: db.View() - Concurrent reads │ │ │ │ - Write: db.Update() - Serialized writes │ │ │ │ - Rollback: Automatic on error │ │ │ │ - Commit: Automatic on success + fsync │ │ │ └──────────────────┬─────────────────────────┘ │ │ │ │ │ ┌──────────────────▼─────────────────────────┐ │ │ │ JSON Serialization │ │ │ │ - Marshal: Go struct → JSON bytes │ │ │ │ - Unmarshal: JSON bytes → Go struct │ │ │ │ - Validation: Type safety via Go types │ │ │ └────────────────────────────────────────────┘ │ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ BoltDB File │ │ │ │ - Copy-on-write B+tree │ │ │ │ - Page size: 4KB │ │ │ │ - mmap for reads │ │ │ │ - Atomic writes with fsync │ │ │ └────────────────────────────────────────────┘ │ └────────────────────────────────────────────────────────┘
Core Components ¶
BoltStore:
- Implements Store interface using BoltDB
- Single database file per manager node
- Automatic bucket creation on initialization
- Thread-safe via BoltDB's transaction model
Buckets:
- nodes: Worker and manager node registrations
- services: Service definitions and configurations
- tasks: Task instances and their current state
- secrets: Encrypted secret data
- volumes: Persistent volume metadata
- networks: Overlay network configurations
- ca: Certificate authority data (single entry)
- ingresses: HTTP/HTTPS ingress rules
- tls_certificates: TLS certificate data for ingress
Transaction Model:
- Read transactions: db.View() - Concurrent, consistent snapshots
- Write transactions: db.Update() - Serialized, atomic commits
- Isolation: Snapshot isolation (MVCC)
- Durability: fsync on commit ensures crash recovery
CRUD Operations ¶
Node Operations:
Create Node:
- Insert node metadata with ID as key
- JSON serialization of Node struct
- Atomic commit via transaction
Get Node:
- Key lookup by node ID
- Unmarshal JSON to Node struct
- Returns error if not found
List Nodes:
- Cursor iteration over nodes bucket
- Deserialize all entries to []*Node
- Empty slice if no nodes
Update Node:
- Upsert operation (same as Create)
- Overwrites existing key with new value
- Atomic replacement
Delete Node:
- Remove key from bucket
- No error if key doesn't exist (idempotent)
Service Operations:
Create Service:
- Store service with ID as key
- Includes replicas, image, deploy strategy
- Links to secrets, volumes, networks
Get Service:
- Direct key lookup by service ID
- Returns complete service definition
Get Service By Name:
- Cursor scan to find matching name
- Returns first match (names should be unique)
- Error if not found
List Services:
- Full bucket scan and deserialization
- Used by scheduler and reconciler
- Typically < 1000 services per cluster
Update Service:
- Upsert with updated fields
- UpdatedAt timestamp managed by caller
- Triggers reconciliation loop
Delete Service:
- Remove service entry
- Tasks must be deleted separately
- Consider cascade delete (future)
Task Operations:
Create Task:
- Store task with ID as key
- Includes service ID, node ID, state
- Resource requirements and mounts
List Tasks:
- Full scan for all tasks
- Used by reconciler for global state
List Tasks By Service:
- Filter tasks by service ID
- Returns tasks for specific service
- Used during rolling updates
List Tasks By Node:
- Filter tasks by node ID
- Returns tasks assigned to node
- Used by worker agent on startup
Update Task:
- Update state, timestamps, health
- Called frequently (state transitions)
- High write throughput operation
Delete Task:
- Remove completed/failed tasks
- Cleanup during scaling down
- Idempotent operation
Usage ¶
Creating a Store:
store, err := storage.NewBoltStore("/var/lib/warren/manager-1")
if err != nil {
log.Fatal(err)
}
defer store.Close()
Node Operations:
// Create node
node := &types.Node{
ID: "node-abc123",
Hostname: "worker-01",
Role: types.RoleWorker,
Status: types.NodeStatusReady,
Resources: &types.NodeResources{
CPUCores: 4,
MemoryBytes: 8 * 1024 * 1024 * 1024, // 8GB
},
}
err := store.CreateNode(node)
// Get node
node, err := store.GetNode("node-abc123")
// List all nodes
nodes, err := store.ListNodes()
// Update node
node.Status = types.NodeStatusDown
err = store.UpdateNode(node)
// Delete node
err = store.DeleteNode("node-abc123")
Service Operations:
// Create service
service := &types.Service{
ID: "service-xyz789",
Name: "web",
Image: "nginx:latest",
Replicas: 3,
DeployStrategy: types.DeployStrategyRolling,
}
err := store.CreateService(service)
// Get by ID
service, err := store.GetService("service-xyz789")
// Get by name
service, err := store.GetServiceByName("web")
// List all services
services, err := store.ListServices()
// Update service
service.Replicas = 5
err = store.UpdateService(service)
// Delete service
err = store.DeleteService("service-xyz789")
Task Operations:
// Create task
task := &types.Task{
ID: "task-def456",
ServiceID: "service-xyz789",
NodeID: "node-abc123",
DesiredState: types.TaskStateRunning,
ActualState: types.TaskStatePending,
}
err := store.CreateTask(task)
// List tasks by service
tasks, err := store.ListTasksByService("service-xyz789")
// List tasks by node
tasks, err := store.ListTasksByNode("node-abc123")
// Update task state
task.ActualState = types.TaskStateRunning
err = store.UpdateTask(task)
Secret Operations:
// Create secret (already encrypted)
secret := &types.Secret{
ID: "secret-ghi789",
Name: "db-password",
Data: encryptedData, // AES-256-GCM encrypted
}
err := store.CreateSecret(secret)
// Get secret
secret, err := store.GetSecret("secret-ghi789")
// Get by name
secret, err := store.GetSecretByName("db-password")
// List secrets
secrets, err := store.ListSecrets()
// Delete secret
err = store.DeleteSecret("secret-ghi789")
Certificate Authority:
// Save CA certificate and key
caData := []byte("PEM-encoded CA cert and key")
err := store.SaveCA(caData)
// Get CA data
caData, err := store.GetCA()
Integration Points ¶
This package integrates with:
- pkg/manager: Raft FSM reads/writes cluster state
- pkg/scheduler: Reads nodes and services for placement
- pkg/reconciler: Reads tasks and services for reconciliation
- pkg/security: Stores encrypted secrets and CA data
- pkg/types: All entity definitions
Design Patterns ¶
Upsert Pattern:
- Create and Update use same method (db.Put)
- No separate "exists" check needed
- Simplifies API and caller code
- Atomic replacement
Idempotent Deletes:
- Delete returns no error if key doesn't exist
- Safe to call multiple times
- Simplifies cleanup code
Cursor Iteration:
- ForEach pattern for full bucket scans
- Memory efficient (streaming)
- Consistent snapshot during iteration
Error Wrapping:
- All errors wrapped with context: fmt.Errorf("op failed: %w", err)
- Preserves original error for inspection
- Provides operation context in logs
Filter Pattern:
- List all, filter in memory (ListTasksByService)
- Simple implementation for small datasets
- Future: Secondary indexes for performance
Performance Characteristics ¶
Read Operations:
- Get by key: O(log n) via B+tree, typically < 1ms
- List all: O(n) full scan, ~1ms per 1000 entries
- Filter by field: O(n) scan with predicate, same as List
- Concurrent reads: Supported via MVCC snapshots
Write Operations:
- Insert/Update: O(log n) for key, ~1-5ms with fsync
- Delete: O(log n) for key, ~1-5ms with fsync
- Batch writes: Single transaction, amortized cost
- Serialized: Only one writer at a time (BoltDB limitation)
Database File Size:
- Empty: 32KB (header + initial pages)
- Small cluster (10 nodes, 20 services): ~1MB
- Medium cluster (100 nodes, 200 services): ~10MB
- Large cluster (500 nodes, 1000 services): ~50MB
- Growth: Linear with entity count + history
Memory Usage:
- mmap: Database file mapped to memory
- Read-only pages: Shared across processes
- Write buffer: ~4MB per transaction
- Page cache: OS manages (warm frequently accessed pages)
Transaction Latency:
- Read transaction: < 100µs (memory access)
- Write transaction: 1-5ms (fsync to disk)
- Under load: May queue (single writer)
Troubleshooting ¶
Common Issues:
Database Locked:
- Symptom: "database is locked" error
- Cause: Another process has exclusive lock
- Solution: Ensure only one manager accesses database
- Check: No dangling processes holding file
Database Corruption:
- Symptom: "invalid database" or checksum errors
- Cause: Unclean shutdown, disk failure, bug
- Solution: Restore from Raft snapshot backup
- Prevention: Use fsync (enabled by default)
Slow Writes:
- Symptom: High latency on Create/Update operations
- Cause: Slow disk, large database, fragmentation
- Check: fsync latency, disk I/O wait
- Solution: Use SSD, compact database (future)
Memory Growth:
- Symptom: Manager memory usage grows over time
- Cause: mmap keeps pages in cache
- Check: OS page cache usage
- Solution: Normal behavior, OS manages eviction
Large Database File:
- Symptom: Database file grows large over time
- Cause: No compaction, deleted keys leave space
- Check: Compare file size to expected data size
- Solution: Manual compact (future) or backup/restore
Monitoring ¶
Key metrics to monitor:
Database Operations:
- storage_read_duration: Time for read transactions
- storage_write_duration: Time for write transactions
- storage_operations_total: Count by operation type
- storage_errors_total: Failed operations
Database Health:
- storage_db_size_bytes: Database file size
- storage_db_open: Database connection status (1=open)
- storage_tx_duration: Transaction latency (p50, p95, p99)
Entity Counts:
- storage_nodes_total: Number of nodes stored
- storage_services_total: Number of services
- storage_tasks_total: Number of tasks
- storage_secrets_total: Number of secrets
Data Integrity ¶
Transaction Guarantees:
- Atomicity: All-or-nothing commits
- Consistency: JSON validation before commit
- Isolation: Snapshot reads, serialized writes
- Durability: fsync ensures crash recovery
Backup and Restore:
- Database is single file (easy to copy)
- Backup: Copy file while database is closed OR use db.View()
- Restore: Replace file and restart manager
- Raft handles replication across managers
Data Migration:
- Schema changes handled via JSON flexibility
- New fields: Add with omitempty tag (backward compatible)
- Remove fields: Ignored during unmarshal
- Major changes: Implement migration in NewBoltStore
Security ¶
Encryption at Rest:
- Database file not encrypted by default
- Recommendation: Use disk encryption (LUKS, dm-crypt)
- Secrets already encrypted before storage (AES-256-GCM)
- Future: Full database encryption option
File Permissions:
- Database file: 0600 (owner read/write only)
- Directory: 0700 (owner full access only)
- Prevents unprivileged access to cluster state
- Root or warren user only
Access Control:
- No authentication within database
- Rely on OS file permissions
- Manager API provides authorization layer
- Direct database access only for recovery
See Also ¶
- pkg/manager for Raft FSM integration
- pkg/types for all entity definitions
- pkg/scheduler for read-heavy workloads
- pkg/reconciler for state reconciliation
- BoltDB documentation: https://github.com/etcd-io/bbolt
- ACID properties: https://en.wikipedia.org/wiki/ACID
Index ¶
- type BoltStore
- func (s *BoltStore) Close() error
- func (s *BoltStore) CreateContainer(container *types.Container) error
- func (s *BoltStore) CreateIngress(ingress *types.Ingress) error
- func (s *BoltStore) CreateNetwork(network *types.Network) error
- func (s *BoltStore) CreateNode(node *types.Node) error
- func (s *BoltStore) CreateSecret(secret *types.Secret) error
- func (s *BoltStore) CreateService(service *types.Service) error
- func (s *BoltStore) CreateTLSCertificate(cert *types.TLSCertificate) error
- func (s *BoltStore) CreateVolume(volume *types.Volume) error
- func (s *BoltStore) DeleteContainer(id string) error
- func (s *BoltStore) DeleteIngress(id string) error
- func (s *BoltStore) DeleteNetwork(id string) error
- func (s *BoltStore) DeleteNode(id string) error
- func (s *BoltStore) DeleteSecret(id string) error
- func (s *BoltStore) DeleteService(id string) error
- func (s *BoltStore) DeleteTLSCertificate(id string) error
- func (s *BoltStore) DeleteVolume(id string) error
- func (s *BoltStore) GetCA() ([]byte, error)
- func (s *BoltStore) GetContainer(id string) (*types.Container, error)
- func (s *BoltStore) GetIngress(id string) (*types.Ingress, error)
- func (s *BoltStore) GetIngressByName(name string) (*types.Ingress, error)
- func (s *BoltStore) GetNetwork(id string) (*types.Network, error)
- func (s *BoltStore) GetNode(id string) (*types.Node, error)
- func (s *BoltStore) GetSecret(id string) (*types.Secret, error)
- func (s *BoltStore) GetSecretByName(name string) (*types.Secret, error)
- func (s *BoltStore) GetService(id string) (*types.Service, error)
- func (s *BoltStore) GetServiceByName(name string) (*types.Service, error)
- func (s *BoltStore) GetTLSCertificate(id string) (*types.TLSCertificate, error)
- func (s *BoltStore) GetTLSCertificateByName(name string) (*types.TLSCertificate, error)
- func (s *BoltStore) GetTLSCertificatesByHost(host string) ([]*types.TLSCertificate, error)
- func (s *BoltStore) GetVolume(id string) (*types.Volume, error)
- func (s *BoltStore) GetVolumeByName(name string) (*types.Volume, error)
- func (s *BoltStore) ListContainers() ([]*types.Container, error)
- func (s *BoltStore) ListContainersByNode(nodeID string) ([]*types.Container, error)
- func (s *BoltStore) ListContainersByService(serviceID string) ([]*types.Container, error)
- func (s *BoltStore) ListIngresses() ([]*types.Ingress, error)
- func (s *BoltStore) ListNetworks() ([]*types.Network, error)
- func (s *BoltStore) ListNodes() ([]*types.Node, error)
- func (s *BoltStore) ListSecrets() ([]*types.Secret, error)
- func (s *BoltStore) ListServices() ([]*types.Service, error)
- func (s *BoltStore) ListTLSCertificates() ([]*types.TLSCertificate, error)
- func (s *BoltStore) ListVolumes() ([]*types.Volume, error)
- func (s *BoltStore) SaveCA(data []byte) error
- func (s *BoltStore) UpdateContainer(container *types.Container) error
- func (s *BoltStore) UpdateIngress(ingress *types.Ingress) error
- func (s *BoltStore) UpdateNode(node *types.Node) error
- func (s *BoltStore) UpdateService(service *types.Service) error
- func (s *BoltStore) UpdateTLSCertificate(cert *types.TLSCertificate) error
- type Store
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BoltStore ¶
type BoltStore struct {
// contains filtered or unexported fields
}
BoltStore implements Store interface using BoltDB
func NewBoltStore ¶
NewBoltStore creates a new BoltDB-backed store
func (*BoltStore) CreateContainer ¶ added in v1.1.1
Container operations
func (*BoltStore) CreateIngress ¶
CreateIngress creates a new ingress
func (*BoltStore) CreateNetwork ¶
Network operations
func (*BoltStore) CreateNode ¶
Node operations
func (*BoltStore) CreateSecret ¶
Secret operations
func (*BoltStore) CreateService ¶
Service operations
func (*BoltStore) CreateTLSCertificate ¶
func (s *BoltStore) CreateTLSCertificate(cert *types.TLSCertificate) error
CreateTLSCertificate creates a new TLS certificate
func (*BoltStore) CreateVolume ¶
Volume operations
func (*BoltStore) DeleteContainer ¶ added in v1.1.1
func (*BoltStore) DeleteIngress ¶
DeleteIngress deletes an ingress
func (*BoltStore) DeleteNetwork ¶
func (*BoltStore) DeleteNode ¶
func (*BoltStore) DeleteSecret ¶
func (*BoltStore) DeleteService ¶
func (*BoltStore) DeleteTLSCertificate ¶
DeleteTLSCertificate deletes a TLS certificate
func (*BoltStore) DeleteVolume ¶
func (*BoltStore) GetContainer ¶ added in v1.1.1
func (*BoltStore) GetIngress ¶
GetIngress retrieves an ingress by ID
func (*BoltStore) GetIngressByName ¶
GetIngressByName retrieves an ingress by name
func (*BoltStore) GetSecretByName ¶
func (*BoltStore) GetServiceByName ¶
func (*BoltStore) GetTLSCertificate ¶
func (s *BoltStore) GetTLSCertificate(id string) (*types.TLSCertificate, error)
GetTLSCertificate retrieves a TLS certificate by ID
func (*BoltStore) GetTLSCertificateByName ¶
func (s *BoltStore) GetTLSCertificateByName(name string) (*types.TLSCertificate, error)
GetTLSCertificateByName retrieves a TLS certificate by name
func (*BoltStore) GetTLSCertificatesByHost ¶
func (s *BoltStore) GetTLSCertificatesByHost(host string) ([]*types.TLSCertificate, error)
GetTLSCertificatesByHost retrieves all TLS certificates that cover a specific host
func (*BoltStore) GetVolumeByName ¶
func (*BoltStore) ListContainers ¶ added in v1.1.1
func (*BoltStore) ListContainersByNode ¶ added in v1.1.1
func (*BoltStore) ListContainersByService ¶ added in v1.1.1
func (*BoltStore) ListIngresses ¶
ListIngresses returns all ingresses
func (*BoltStore) ListTLSCertificates ¶
func (s *BoltStore) ListTLSCertificates() ([]*types.TLSCertificate, error)
ListTLSCertificates lists all TLS certificates
func (*BoltStore) UpdateContainer ¶ added in v1.1.1
func (*BoltStore) UpdateIngress ¶
UpdateIngress updates an existing ingress
func (*BoltStore) UpdateTLSCertificate ¶
func (s *BoltStore) UpdateTLSCertificate(cert *types.TLSCertificate) error
UpdateTLSCertificate updates an existing TLS certificate
type Store ¶
type Store interface {
// Nodes
CreateNode(node *types.Node) error
GetNode(id string) (*types.Node, error)
ListNodes() ([]*types.Node, error)
UpdateNode(node *types.Node) error
DeleteNode(id string) error
// Services
CreateService(service *types.Service) error
GetService(id string) (*types.Service, error)
GetServiceByName(name string) (*types.Service, error)
ListServices() ([]*types.Service, error)
UpdateService(service *types.Service) error
DeleteService(id string) error
// Containers
CreateContainer(container *types.Container) error
GetContainer(id string) (*types.Container, error)
ListContainers() ([]*types.Container, error)
ListContainersByService(serviceID string) ([]*types.Container, error)
ListContainersByNode(nodeID string) ([]*types.Container, error)
UpdateContainer(container *types.Container) error
DeleteContainer(id string) error
// Secrets
CreateSecret(secret *types.Secret) error
GetSecret(id string) (*types.Secret, error)
GetSecretByName(name string) (*types.Secret, error)
ListSecrets() ([]*types.Secret, error)
DeleteSecret(id string) error
// Volumes
CreateVolume(volume *types.Volume) error
GetVolume(id string) (*types.Volume, error)
GetVolumeByName(name string) (*types.Volume, error)
ListVolumes() ([]*types.Volume, error)
DeleteVolume(id string) error
// Networks
CreateNetwork(network *types.Network) error
GetNetwork(id string) (*types.Network, error)
ListNetworks() ([]*types.Network, error)
DeleteNetwork(id string) error
// Certificate Authority
SaveCA(data []byte) error
GetCA() ([]byte, error)
// Ingresses
CreateIngress(ingress *types.Ingress) error
GetIngress(id string) (*types.Ingress, error)
GetIngressByName(name string) (*types.Ingress, error)
ListIngresses() ([]*types.Ingress, error)
UpdateIngress(ingress *types.Ingress) error
DeleteIngress(id string) error
// TLS Certificates
CreateTLSCertificate(cert *types.TLSCertificate) error
GetTLSCertificate(id string) (*types.TLSCertificate, error)
GetTLSCertificateByName(name string) (*types.TLSCertificate, error)
GetTLSCertificatesByHost(host string) ([]*types.TLSCertificate, error)
ListTLSCertificates() ([]*types.TLSCertificate, error)
UpdateTLSCertificate(cert *types.TLSCertificate) error
DeleteTLSCertificate(id string) error
// Utility
Close() error
}
Store defines the interface for cluster state storage This will be implemented by BoltDB-backed storage