Documentation
¶
Overview ¶
Package fleet implements FleetSandboxControl, a SandboxControl adapter that manages a fleet of sandbox-host instances with auto-scaling, health monitoring, warm pool maintenance, and capacity-aware routing.
Index ¶
- Constants
- Variables
- func ParseSandboxID(sandboxID string) (instanceID, sessionID string, err error)
- type CapacityRouter
- type Clock
- type DrainReason
- type FleetConfig
- type FleetNodeClient
- type FleetOption
- type FleetSandboxControl
- func (f *FleetSandboxControl) Capabilities() control.SandboxCapabilities
- func (f *FleetSandboxControl) Close() error
- func (f *FleetSandboxControl) CreateSandbox(ctx context.Context, req control.CreateSandboxRequest) (*control.CreateSandboxResponse, error)
- func (f *FleetSandboxControl) DestroySandbox(ctx context.Context, sandboxID string) error
- func (f *FleetSandboxControl) FleetStatus() FleetStatus
- func (f *FleetSandboxControl) GetProcessStatus(ctx context.Context, req control.GetProcessStatusRequest) (*control.GetProcessStatusResponse, error)
- func (f *FleetSandboxControl) KillProcess(ctx context.Context, req control.KillProcessRequest) error
- func (f *FleetSandboxControl) LaunchProcess(ctx context.Context, req control.LaunchProcessRequest) (*control.LaunchProcessResponse, error)
- func (f *FleetSandboxControl) PauseSandbox(ctx context.Context, sandboxID string) error
- func (f *FleetSandboxControl) RecoverInstances(ctx context.Context) error
- func (f *FleetSandboxControl) ResumeSandbox(ctx context.Context, sandboxID string) error
- func (f *FleetSandboxControl) StartControlLoop()
- type FleetStatus
- type HealthCheckResult
- type InstanceState
- type ManagedInstance
- type SandboxClientFactory
Constants ¶
const SandboxIDPrefix = sandboxIDPrefix
SandboxIDPrefix is the prefix for fleet-managed sandbox IDs. Exported so the orchestrator can detect fleet-prefixed IDs.
Variables ¶
var ( // ErrNoCapacity indicates no instance has available session slots. The // fleet may be in the process of scaling up. Callers should wait briefly // (with backoff) and retry. The singleflight provision path handles // deduplication of concurrent scale-up requests automatically. ErrNoCapacity = errors.New("fleet: no capacity available") // ErrFleetClosed indicates the fleet manager is shutting down and cannot // accept new CreateSandbox calls. This error is permanent for the // lifetime of this FleetSandboxControl instance. ErrFleetClosed = errors.New("fleet: adapter is closed") // ErrInstanceNotFound indicates the instance ID parsed from a SandboxID // does not correspond to a known managed instance in the fleet. ErrInstanceNotFound = errors.New("fleet: instance not found") // ErrInvalidStateTransition indicates an attempted instance state // transition that is not permitted by the state machine. ErrInvalidStateTransition = errors.New("fleet: invalid state transition") // ErrInstanceDraining indicates an operation was attempted on an instance // that is in the draining state and not accepting new sessions. ErrInstanceDraining = errors.New("fleet: instance is draining") )
Sentinel errors returned by FleetSandboxControl. Error classification:
| Error | Retryable? | Description | |--------------------|------------|-------------------------------------------------------| | ErrNoCapacity | Yes | No instance has available session slots; fleet may be | | | | scaling up. Caller should wait briefly and retry. | | ErrFleetClosed | No | Fleet manager is shutting down. No new sandboxes. | | parseSandboxID err | No | SandboxID is malformed. Programming error. | | Node RPC errors | Depends | Passed through from NodeSandboxControl. | | Provisioner errors | Depends | Cloud API errors may be transient or permanent. |
Functions ¶
func ParseSandboxID ¶
ParseSandboxID extracts the instance ID and session ID from a fleet SandboxID. Returns an error if the ID is malformed (wrong prefix, missing separator, or empty components).
This is the exported entry point; internal callers use parseSandboxID.
Types ¶
type CapacityRouter ¶
type CapacityRouter struct {
// contains filtered or unexported fields
}
CapacityRouter selects the best instance for a new sandbox using a spread strategy that prefers the most available capacity. This maximizes per-instance headroom so burst traffic is absorbed without immediate scale-up.
func NewCapacityRouter ¶
func NewCapacityRouter(config FleetConfig) *CapacityRouter
NewCapacityRouter creates a router configured from the given FleetConfig.
func (*CapacityRouter) SelectInstance ¶
func (r *CapacityRouter) SelectInstance(instances []*ManagedInstance) (*ManagedInstance, error)
SelectInstance picks the best instance for a new sandbox. Returns ErrNoCapacity if no instance can accept the sandbox.
Strategy: spread (prefer most available capacity). When multiple instances have equal headroom, use IdleSince as a tiebreaker (prefer the instance that has been idle longest, promoting even distribution). If IdleSince is also equal (e.g., all instances at the same session count), use an atomic counter for round-robin to prevent map iteration order bias.
Caller must ensure instances are not concurrently mutated (hold fleet read lock or pass a snapshot).
type DrainReason ¶
type DrainReason string
DrainReason records why an instance entered the Draining state. This determines whether Draining -> Ready recovery is permitted.
const ( // DrainHealth indicates the instance was drained because it failed // consecutive health checks. Recovery to Ready is permitted if the // instance subsequently passes HealthyThreshold consecutive checks. DrainHealth DrainReason = "health" // DrainScaleDown indicates the instance was drained as part of scale-down // (idle too long, fleet can shrink). Recovery to Ready is NOT permitted. DrainScaleDown DrainReason = "scale-down" // DrainShutdown indicates the instance was drained as part of fleet // shutdown (Close was called). Recovery to Ready is NOT permitted. DrainShutdown DrainReason = "shutdown" )
type FleetConfig ¶
type FleetConfig struct {
// Instance provisioning
InstanceConfig instance.InstanceConfig // Template for launching new instances (from shared package)
// Fleet sizing
MinInstances int // Floor -- never scale below this (default: 1)
MaxInstances int // Ceiling -- never scale above this (default: 50)
WarmPoolTarget int // Number of idle instances to keep ready (default: 2)
// Capacity thresholds
MaxSessionsPerInstance int // Max sandboxes per instance (default: 10)
CapacityHeadroom float64 // Don't route above this fraction (default: 0.8)
// Health checking
HealthCheckInterval time.Duration // How often to poll (default: 15s)
UnhealthyThreshold int // Consecutive failures before marking unhealthy (default: 3)
HealthyThreshold int // Consecutive successes before marking healthy again during drain recovery (default: 3)
// Scaling
IdleCooldown time.Duration // How long an instance must be idle before scale-down (default: 10m)
DrainTimeout time.Duration // Max time to wait for active sandboxes during drain (default: 30m)
ProvisionTimeout time.Duration // Max time to wait for instance to become healthy (default: 5m)
MaxConcurrentProvisions int // Max simultaneous LaunchInstance calls (default: 3)
// Termination
MaxTerminateRetries int // Max consecutive TerminateInstance failures before alerting (default: 5)
// Shutdown behavior
LeaveInstancesOnClose bool // If true, skip instance termination on Close for re-adoption on restart
// Sandbox-host port
SandboxHostPort int // Port sandbox-host listens on (default: 9100)
}
FleetConfig controls fleet behavior. All durations with zero values use the defaults from DefaultFleetConfig().
func DefaultFleetConfig ¶
func DefaultFleetConfig() FleetConfig
DefaultFleetConfig returns a FleetConfig with sensible production defaults.
func (FleetConfig) Validate ¶
func (c FleetConfig) Validate() error
Validate checks config invariants and returns an error describing all violations found. Returns nil if the config is valid.
type FleetNodeClient ¶
type FleetNodeClient interface {
control.SandboxControl
HealthCheck(ctx context.Context) (*HealthCheckResult, error)
}
FleetNodeClient combines SandboxControl with health checking. The fleet control loop needs both sandbox delegation and health polling from each instance. SandboxControl alone has no HealthCheck method, so this composite interface bridges the gap.
type FleetOption ¶
type FleetOption func(*fleetOptions)
FleetOption configures optional behavior on FleetSandboxControl.
func WithLeaveInstancesOnClose ¶
func WithLeaveInstancesOnClose(leave bool) FleetOption
WithLeaveInstancesOnClose configures the fleet to skip instance termination on Close. This is used when the fleet manager is restarting and instances should be re-adopted by the new manager via crash recovery.
func WithLogger ¶
func WithLogger(logger *slog.Logger) FleetOption
WithLogger sets the structured logger for fleet operations.
func WithMetricsRegisterer ¶
func WithMetricsRegisterer(registerer prometheus.Registerer) FleetOption
WithMetricsRegisterer sets the Prometheus registerer used for fleet metrics. If nil, the default registerer is used.
type FleetSandboxControl ¶
type FleetSandboxControl struct {
// contains filtered or unexported fields
}
FleetSandboxControl implements SandboxControl by managing a fleet of sandbox-host instances. It provisions instances via InstanceProvisioner, monitors health, maintains a warm pool, and routes sandbox operations to per-instance NodeSandboxControl clients.
Lock ordering discipline (MUST be followed throughout):
- FleetSandboxControl.mu (fleet-level lock)
- ManagedInstance.mu (per-instance lock)
The fleet lock is ALWAYS acquired before any instance lock. No code path may acquire FleetSandboxControl.mu while holding any ManagedInstance.mu. This prevents deadlocks between the control loop and concurrent callers.
func NewFleetSandboxControl ¶
func NewFleetSandboxControl( provisioner instance.InstanceProvisioner, clientFactory SandboxClientFactory, cfg FleetConfig, opts ...FleetOption, ) (*FleetSandboxControl, error)
NewFleetSandboxControl creates a new fleet manager. The control loop (health checks, scaling) is started in bead aiag-1xd.4 via StartControlLoop.
func (*FleetSandboxControl) Capabilities ¶
func (f *FleetSandboxControl) Capabilities() control.SandboxCapabilities
Capabilities returns the fleet's capability set. ConcurrentSandboxes is the theoretical ceiling (MaxInstances * MaxSessionsPerInstance); actual available capacity depends on fleet state.
func (*FleetSandboxControl) Close ¶
func (f *FleetSandboxControl) Close() error
Close initiates graceful shutdown: cancels the control loop, drains and terminates instances (unless LeaveInstancesOnClose is set).
func (*FleetSandboxControl) CreateSandbox ¶
func (f *FleetSandboxControl) CreateSandbox(ctx context.Context, req control.CreateSandboxRequest) (*control.CreateSandboxResponse, error)
CreateSandbox picks the best instance via capacity-aware routing, atomically claims a session slot, creates a sandbox on it, and returns a fleet-prefixed SandboxID. If no capacity exists, attempts to provision a new instance.
func (*FleetSandboxControl) DestroySandbox ¶
func (f *FleetSandboxControl) DestroySandbox(ctx context.Context, sandboxID string) error
DestroySandbox parses the fleet SandboxID, decrements the session count eagerly, then delegates to the per-instance client. On RPC failure the count is rolled back; the reconciliation loop corrects any remaining drift.
func (*FleetSandboxControl) FleetStatus ¶
func (f *FleetSandboxControl) FleetStatus() FleetStatus
FleetStatus returns a snapshot of the fleet's current state.
func (*FleetSandboxControl) GetProcessStatus ¶
func (f *FleetSandboxControl) GetProcessStatus(ctx context.Context, req control.GetProcessStatusRequest) (*control.GetProcessStatusResponse, error)
GetProcessStatus delegates to the per-instance client after rewriting SandboxID.
func (*FleetSandboxControl) KillProcess ¶
func (f *FleetSandboxControl) KillProcess(ctx context.Context, req control.KillProcessRequest) error
KillProcess delegates to the per-instance client after rewriting SandboxID.
func (*FleetSandboxControl) LaunchProcess ¶
func (f *FleetSandboxControl) LaunchProcess(ctx context.Context, req control.LaunchProcessRequest) (*control.LaunchProcessResponse, error)
LaunchProcess delegates to the per-instance client after rewriting SandboxID to the session-local ID.
func (*FleetSandboxControl) PauseSandbox ¶
func (f *FleetSandboxControl) PauseSandbox(ctx context.Context, sandboxID string) error
PauseSandbox delegates to the per-instance client after parsing the SandboxID.
func (*FleetSandboxControl) RecoverInstances ¶
func (f *FleetSandboxControl) RecoverInstances(ctx context.Context) error
RecoverInstances rediscovers managed instances via cloud API tags and re-adds them to the fleet. Call before StartControlLoop after a restart.
func (*FleetSandboxControl) ResumeSandbox ¶
func (f *FleetSandboxControl) ResumeSandbox(ctx context.Context, sandboxID string) error
ResumeSandbox delegates to the per-instance client after parsing the SandboxID.
func (*FleetSandboxControl) StartControlLoop ¶
func (f *FleetSandboxControl) StartControlLoop()
StartControlLoop begins the background control loop goroutine. Must be called after NewFleetSandboxControl. The loop runs every HealthCheckInterval.
type FleetStatus ¶
type FleetStatus struct {
TotalInstances int
InstancesByState map[InstanceState]int
TotalSessions int64
WarmPoolSize int
PendingProvisions int32
Healthy bool
Closed bool
}
FleetStatus contains a point-in-time snapshot of fleet state.
type HealthCheckResult ¶
type HealthCheckResult struct {
Status string
SessionCount int
ActiveTools int
Uptime time.Duration
Errors []string
}
HealthCheckResult contains the health status reported by a sandbox-host instance. This is the fleet-local representation; the RPC layer converts to/from the wire format in internal/rpc/api.HealthCheckResponse.
type InstanceState ¶
type InstanceState string
InstanceState represents the lifecycle state of a managed instance in the fleet.
const ( // InstanceProvisioning indicates the cloud instance has been launched but // the sandbox-host has not yet passed health checks. InstanceProvisioning InstanceState = "provisioning" // InstanceReady indicates the sandbox-host is healthy with 0 sessions, // available for routing new sandboxes. InstanceReady InstanceState = "ready" // InstanceActive indicates the sandbox-host is healthy with >= 1 active // session. InstanceActive InstanceState = "active" // InstanceDraining indicates no new sessions should be assigned. The // instance is waiting for existing sessions to complete before // termination (or recovering health if DrainReason is DrainHealth). InstanceDraining InstanceState = "draining" // InstanceTerminating indicates TerminateInstance has been called and // we are waiting for cloud confirmation. InstanceTerminating InstanceState = "terminating" )
type ManagedInstance ¶
type ManagedInstance struct {
// InstanceID is the cloud provider's unique identifier.
InstanceID string
// State is the current lifecycle state.
State InstanceState
// Client is the combined SandboxControl + HealthCheck client for this
// instance. Set after the instance is provisioned and the client factory
// creates a connection.
Client FleetNodeClient
// SessionCount tracks the number of active sandbox sessions on this
// instance. Updated via the atomic claim-slot protocol (section 4.1.1)
// and reconciled against HealthCheck-reported counts.
SessionCount int64
// MaxSessions is the maximum number of sessions this instance can host,
// typically set from FleetConfig.MaxSessionsPerInstance.
MaxSessions int
// IP is the instance's private IP address used for RPC connections.
IP string
// ConsecutiveHealthSuccesses counts consecutive successful health checks.
// Used for Draining -> Ready recovery when DrainReason is DrainHealth.
// Reset to 0 on health check failure.
ConsecutiveHealthSuccesses int
// ConsecutiveHealthFailures counts consecutive failed health checks.
// Used to trigger Draining transition when >= UnhealthyThreshold.
// Reset to 0 on health check success.
ConsecutiveHealthFailures int
// DrainReason records why this instance entered the Draining state.
// Only meaningful when State == InstanceDraining.
DrainReason DrainReason
// IdleSince records when the session count last reached 0 (in Ready state).
// Used by the scale-down logic to determine if the instance has been idle
// longer than IdleCooldown.
IdleSince time.Time
// ProvisionStarted records when provisioning began. Used by the control
// loop to detect provision timeouts.
ProvisionStarted time.Time
// DrainStarted records when draining began. Used by the control loop
// to detect drain timeouts and force-terminate.
DrainStarted time.Time
// TerminateRetries counts consecutive TerminateInstance failures for
// this instance. Used to trigger an alert after MaxTerminateRetries.
TerminateRetries int
// contains filtered or unexported fields
}
ManagedInstance tracks the state of a single instance in the fleet.
Lock ordering discipline: FleetSandboxControl.mu (fleet-level lock) must ALWAYS be acquired BEFORE ManagedInstance.mu (per-instance lock). No code path may acquire FleetSandboxControl.mu while holding any ManagedInstance.mu. This prevents deadlocks between the control loop and concurrent callers. See plan 20, section 4.1.2.
func (*ManagedInstance) ClaimSession ¶
func (mi *ManagedInstance) ClaimSession() error
ClaimSession atomically increments the session count and transitions Ready -> Active if this is the first session. Returns an error if the instance is not in a routable state (Ready or Active) or is at capacity.
Caller must hold mi.mu for writing.
func (*ManagedInstance) ReleaseSession ¶
func (mi *ManagedInstance) ReleaseSession() error
ReleaseSession decrements the session count and transitions Active -> Ready if this was the last session. Returns an error if the session count would go negative.
Caller must hold mi.mu for writing.
func (*ManagedInstance) TransitionTo ¶
func (mi *ManagedInstance) TransitionTo(newState InstanceState) error
TransitionTo attempts to change the instance state. Returns an error if the transition is not valid according to the state machine.
Caller must hold mi.mu for writing.
Special rules:
- Draining -> Ready is only permitted when DrainReason is DrainHealth. Scale-down and shutdown drains are irrecoverable.
type SandboxClientFactory ¶
type SandboxClientFactory func(addr string) (FleetNodeClient, error)
SandboxClientFactory creates a FleetNodeClient for a given sandbox-host address. This is injected for testability — unit tests provide a mock factory, production code provides the real RPC client constructor wrapping the connection in a NodeSandboxControl + health client.