Documentation
¶
Overview ¶
Package worker implements the Warren worker node that executes containerized tasks.
The worker package is the data plane of Warren, responsible for running containers, reporting health status, and maintaining connectivity with the manager cluster. Workers are stateless agents that receive task assignments from managers and execute them using containerd.
Architecture ¶
A Warren worker is a single-purpose agent that bridges managers and containers:
┌─────────────────────── WORKER NODE ────────────────────────┐ │ │ │ ┌──────────────────────────────────────────────┐ │ │ │ Worker Agent │ │ │ │ - gRPC client to manager │ │ │ │ - Heartbeat loop (5s) │ │ │ │ - Task sync loop (3s) │ │ │ │ - Status reporting │ │ │ └──────┬──────────────────────────┬─────────────┘ │ │ │ │ │ │ ┌──────▼───────┐ ┌──────▼───────────┐ │ │ │ Handlers │ │ Local Cache │ │ │ │ - Secrets │ │ - Task map │ │ │ │ - Volumes │ │ - Container IDs │ │ │ │ - DNS │ │ - Status │ │ │ │ - Health │ └──────────────────┘ │ │ │ - Ports │ │ │ └──────┬───────┘ │ │ │ │ │ ┌──────▼──────────────────────────────────────┐ │ │ │ Containerd Runtime │ │ │ │ - Pull images │ │ │ │ - Create containers │ │ │ │ - Start/stop containers │ │ │ │ - Monitor container status │ │ │ │ - Apply resource limits │ │ │ └──────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────┘
Core Components ¶
Worker:
- Main worker agent
- Maintains gRPC connection to manager
- Executes heartbeat and sync loops
- Coordinates all handlers
SecretsHandler:
- Fetches encrypted secrets from manager
- Decrypts using cluster encryption key
- Mounts secrets as tmpfs in containers
- Cleans up on task removal
VolumesHandler:
- Manages volume lifecycle
- Mounts volumes into containers
- Ensures volume affinity (local volumes)
- Tracks volume usage
HealthMonitor:
- Executes health checks (HTTP/TCP/Exec)
- Reports health status to manager
- Triggers task replacement on failure
- Integrates with reconciler
DNSHandler:
- Configures container DNS
- Points containers to manager DNS server
- Enables service discovery
HostPortPublisher:
- Publishes container ports on host
- Manages iptables rules (Linux)
- Handles port conflicts
- Cleans up on task removal
Worker Lifecycle ¶
Registration:
- Worker starts with join token
- Connects to manager via gRPC
- Registers with node resources (CPU, memory)
- Receives unique node ID
- Begins heartbeat loop
Heartbeat Loop (5 seconds):
- Send heartbeat to manager
- Report node resources and status
- Receive acknowledgment
- Update last heartbeat timestamp
Task Sync Loop (3 seconds):
- Fetch assigned tasks from manager
- Compare with local task cache
- Start new tasks
- Stop removed tasks
- Report task status updates
Task Execution:
- Receive task assignment
- Prepare: Mount secrets and volumes
- Pull container image (if not cached)
- Create container with runtime
- Configure DNS, network, resources
- Start container
- Monitor health checks
- Report running status
Task Removal:
- Receive stop command
- Stop container (SIGTERM, grace period)
- Force kill if timeout exceeded
- Unmount secrets and volumes
- Remove iptables rules
- Clean up container
- Report complete status
Usage ¶
Creating a Worker:
cfg := &worker.Config{
NodeID: "worker-1",
ManagerAddr: "192.168.1.10:8080",
DataDir: "/var/lib/warren/worker-1",
JoinToken: "worker-join-token-xyz789",
EncryptionKey: clusterKey,
ContainerdSocket: "", // Auto-detect
Resources: &types.NodeResources{
CPUCores: 4,
MemoryBytes: 8 * 1024 * 1024 * 1024, // 8GB
DiskBytes: 100 * 1024 * 1024 * 1024, // 100GB
},
}
w, err := worker.NewWorker(cfg)
if err != nil {
log.Fatal(err)
}
Starting the Worker:
// Connects to manager and begins loops
err := w.Start()
if err != nil {
log.Fatal(err)
}
Stopping the Worker:
// Graceful shutdown with task cleanup
err := w.Stop()
if err != nil {
log.Fatal(err)
}
Task Execution ¶
The worker executes tasks through multiple phases:
Preparing Phase:
- Fetch and decrypt secrets from manager
- Mount secrets as tmpfs at /run/secrets/<name>
- Ensure volumes exist (create if local driver)
- Prepare volume mount points
Starting Phase:
- Pull container image if not present
- Create container with:
- Environment variables
- Secret mounts (tmpfs)
- Volume mounts (bind or named)
- DNS configuration (manager IP)
- Resource limits (CPU, memory)
- Health check configuration
- Configure host port publishing (iptables)
- Start container process
Running Phase:
- Monitor container status
- Execute health checks periodically
- Report status updates to manager
- Handle container restarts (restart policy)
Stopping Phase:
- Send SIGTERM to container
- Wait for grace period (default 10s)
- Send SIGKILL if timeout exceeded
- Unmount secrets (tmpfs)
- Remove iptables rules
- Clean up container
Secrets Handling ¶
Workers handle secrets securely:
Fetch and Decrypt:
- Fetch encrypted secret data from manager
- Decrypt using cluster encryption key
- Store decrypted data in memory only
Mount as tmpfs:
- Create tmpfs mount at /run/secrets/<name>
- Write secret data to tmpfs
- Set permissions (0400, container user)
- tmpfs is memory-only (never touches disk)
Container Access:
- Container mounts /run/secrets/<name>
- Application reads secret as regular file
- Secret data never written to disk
- tmpfs cleared on unmount
Cleanup:
- Unmount tmpfs when task stops
- Memory automatically cleared
- No disk cleanup required
Volume Handling ¶
Workers manage volume lifecycle:
Local Volumes:
- Created at /var/lib/warren/volumes/<volume-name>
- Mounted as bind mount into container
- Persists across task restarts
- Affinity ensures same node (local storage)
Volume Mounts:
- Source: Volume name (e.g., "db-data")
- Target: Container path (e.g., "/var/lib/postgresql")
- ReadOnly: Optional read-only mount
- UID/GID mapping handled by runtime
Volume Cleanup:
- Volumes persist after task stops
- Manual deletion via "warren volume delete"
- Prevents accidental data loss
Health Monitoring ¶
Workers execute health checks and report results:
HTTP Health Checks:
- Send HTTP GET to specified endpoint
- Expected status code: 200-399
- Timeout and retry configuration
- Reports healthy/unhealthy to manager
TCP Health Checks:
- Attempt TCP connection to port
- Connection success = healthy
- Connection failure = unhealthy
- Useful for databases, caches
Exec Health Checks:
- Run command inside container
- Exit code 0 = healthy
- Non-zero exit = unhealthy
- Useful for custom health logic
Health Failure:
- After N failed checks, mark unhealthy
- Report to manager
- Reconciler replaces unhealthy task
- Old task stops, new task starts
Port Publishing ¶
Workers publish container ports to host:
Host Mode (PublishModeHost):
- Maps container port to host port
- Creates iptables rules:
- PREROUTING: DNAT to container IP
- POSTROUTING: MASQUERADE for responses
- Port available only on hosting node
- Used for health checks, ingress backends
Ingress Mode (PublishModeIngress):
- Future: Routing mesh (not yet implemented)
- Will route to any task replica
- Load balancing across tasks
Port Conflicts:
- Worker detects port conflicts
- Reports error to manager
- Scheduler avoids conflicting placements
Failure Scenarios ¶
Manager Disconnection:
- Worker continues running tasks
- Heartbeat loop retries connection
- Exponential backoff (up to 30s)
- Tasks keep running (autonomy)
Container Failure:
- Worker detects exit via containerd
- Restarts based on RestartPolicy
- Reports failure to manager
- Reconciler may reschedule
Containerd Failure:
- Worker cannot execute new tasks
- Reports error to manager
- Existing containers may continue (containerd recovery)
- Worker marked unhealthy
Worker Crash:
- Containers keep running (containerd daemon)
- Worker restart re-syncs state
- Orphaned containers detected and cleaned
Performance Characteristics ¶
Resource Usage:
- Base worker: 20MB memory
- Per task: ~5MB memory
- Typical worker (10 tasks): ~70MB total
Loop Frequencies:
- Heartbeat: Every 5 seconds
- Task sync: Every 3 seconds
- Health checks: Per service config (30s typical)
Task Operations:
- Task start time: 2-5s (image cached)
- Task start time: 10-60s (image pull)
- Task stop time: <10s (grace period)
- Task cleanup: <1s
Integration Points ¶
This package integrates with:
- pkg/runtime: Executes containers via containerd
- pkg/security: Decrypts secrets and handles certificates
- pkg/volume: Manages volume mounts
- pkg/health: Executes health check probes
- pkg/network: Publishes ports via iptables
- pkg/dns: Configures container DNS
- api/proto: Communicates with manager via gRPC
Design Patterns ¶
Agent Pattern:
- Stateless agent design
- All state stored in manager
- Worker restarts are transparent
- Task cache for performance only
Handler Pattern:
- Separate handlers for concerns
- Secrets, volumes, DNS, health, ports
- Each handler has specific lifecycle
- Coordinated by main Worker
Reconciliation Pattern:
- Desired state from manager
- Current state from containerd
- Reconcile: Start new, stop removed
- Eventually consistent
Security ¶
Join Token Authentication:
- Worker authenticates with join token
- Token validated by manager
- Token single-use (optional)
- Connection uses gRPC (TLS ready)
Secrets Encryption:
- Secrets encrypted at rest in manager
- Decrypted in worker memory only
- Mounted as tmpfs (no disk write)
- Cleared on unmount
Container Isolation:
- Containers run as non-root (when specified)
- Linux namespaces (PID, network, mount)
- Cgroups for resource limits
- Seccomp profiles (future)
Troubleshooting ¶
Common Issues:
Worker Won't Connect:
- Check manager address reachable
- Verify join token is valid
- Check firewall allows port 8080
- Review worker logs
Tasks Not Starting:
- Check containerd is running
- Verify image can be pulled
- Check disk space for volumes
- Review task logs in containerd
Health Checks Failing:
- Verify container is running
- Test endpoint manually (HTTP)
- Check network connectivity
- Adjust timeout/retries
Ports Not Accessible:
- Verify iptables rules created
- Check container listening on port
- Test from host machine first
- Review firewall rules
Monitoring ¶
Key metrics to monitor:
Worker Health:
- worker_heartbeat_failures: Connection issues
- worker_tasks_running: Active task count
- worker_task_start_duration: Performance
- worker_task_failures: Task reliability
Resource Usage:
- node_cpu_used: CPU utilization
- node_memory_used: Memory utilization
- node_disk_used: Disk utilization
Container Health:
- container_restarts: Restart frequency
- health_check_failures: Health check issues
- container_oom_kills: Memory limit hits
See Also ¶
- pkg/runtime for containerd integration
- pkg/security for secrets handling
- pkg/health for health check execution
- docs/concepts/services.md for service concepts
- docs/troubleshooting.md for common issues
Index ¶
Constants ¶
const ( // DefaultDNSDir is the directory where DNS config files are stored DefaultDNSDir = "/var/lib/warren/dns" // DefaultResolvConf is the default resolv.conf template filename DefaultResolvConf = "resolv.conf" )
const (
// SecretsBasePath is the base directory for secret tmpfs mounts
SecretsBasePath = "/run/secrets"
)
Variables ¶
This section is empty.
Functions ¶
func EnsureSecretsBaseDir ¶
func EnsureSecretsBaseDir() error
EnsureSecretsBaseDir ensures the base secrets directory exists This should be called during worker initialization
func ExtractManagerIP ¶
ExtractManagerIP extracts the IP address from manager address Examples:
"192.168.1.100:8080" -> "192.168.1.100" "localhost:8080" -> "127.0.0.1" "manager-1:8080" -> "manager-1" (hostname, DNS will resolve)
Types ¶
type Config ¶
type Config struct {
NodeID string
ManagerAddr string
DataDir string
Resources *types.NodeResources
EncryptionKey []byte // Cluster-wide encryption key for secrets
ContainerdSocket string // Containerd socket path (empty = auto-detect)
JoinToken string // Join token for initial authentication
}
Config holds worker configuration
type DNSHandler ¶
type DNSHandler struct {
// contains filtered or unexported fields
}
DNSHandler manages DNS configuration for containers
func NewDNSHandler ¶
func NewDNSHandler(w *Worker, managerAddr string) (*DNSHandler, error)
NewDNSHandler creates a new DNS handler
func (*DNSHandler) Cleanup ¶
func (h *DNSHandler) Cleanup() error
Cleanup removes the DNS configuration directory
func (*DNSHandler) GenerateResolvConf ¶
func (h *DNSHandler) GenerateResolvConf() (string, error)
GenerateResolvConf generates a resolv.conf file for containers This configures containers to use Warren DNS server on the manager
Format:
nameserver <manager-ip> # Warren DNS server nameserver 8.8.8.8 # Google DNS fallback nameserver 1.1.1.1 # Cloudflare DNS fallback search warren # Allow "nginx" instead of "nginx.warren" options ndots:0 # Try search domains immediately
func (*DNSHandler) GetResolvConfPath ¶
func (h *DNSHandler) GetResolvConfPath() (string, error)
GetResolvConfPath returns the path to the generated resolv.conf file If the file doesn't exist, it generates it first
type HealthMonitor ¶
type HealthMonitor struct {
// contains filtered or unexported fields
}
HealthMonitor manages health checks for containers
func NewHealthMonitor ¶
func NewHealthMonitor(w *Worker) *HealthMonitor
NewHealthMonitor creates a new health monitor
type SecretsHandler ¶
type SecretsHandler struct {
// contains filtered or unexported fields
}
SecretsHandler manages secret mounting for tasks
func NewSecretsHandler ¶
func NewSecretsHandler(worker *Worker, encryptionKey []byte) (*SecretsHandler, error)
NewSecretsHandler creates a new secrets handler
func (*SecretsHandler) CleanupSecretsForTask ¶
func (sh *SecretsHandler) CleanupSecretsForTask(taskID string) error
CleanupSecretsForTask removes all secrets for a task from tmpfs
func (*SecretsHandler) GetSecretPath ¶
func (sh *SecretsHandler) GetSecretPath(taskID, secretName string) string
GetSecretPath returns the path to a specific secret for a task
func (*SecretsHandler) MountSecretsForTask ¶
func (sh *SecretsHandler) MountSecretsForTask(task *types.Container) (string, error)
MountSecretsForTask fetches secrets from manager and mounts them to tmpfs Returns the tmpfs mount path for the container
type VolumesHandler ¶
type VolumesHandler struct {
// contains filtered or unexported fields
}
VolumesHandler manages volume mounting for tasks
func NewVolumesHandler ¶
func NewVolumesHandler(worker *Worker) (*VolumesHandler, error)
NewVolumesHandler creates a new volumes handler
func (*VolumesHandler) CleanupVolumesForTask ¶
func (vh *VolumesHandler) CleanupVolumesForTask(task *types.Container) error
CleanupVolumesForTask unmounts volumes for a task (no-op for local driver)
func (*VolumesHandler) PrepareVolumesForTask ¶
PrepareVolumesForTask prepares all volumes for a task and returns mount specs
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a Warren worker node
func NewEmbeddedWorker ¶ added in v1.6.0
NewEmbeddedWorker creates a worker optimized for in-process embedding with a manager (hybrid mode) This is identical to NewWorker but documents the intended use case for embedded workers