fleet

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package fleet implements FleetSandboxControl, a SandboxControl adapter that manages a fleet of sandbox-host instances with auto-scaling, health monitoring, warm pool maintenance, and capacity-aware routing.

Index

Constants

View Source
const SandboxIDPrefix = sandboxIDPrefix

SandboxIDPrefix is the prefix for fleet-managed sandbox IDs. Exported so the orchestrator can detect fleet-prefixed IDs.

Variables

View Source
var (
	// ErrNoCapacity indicates no instance has available session slots. The
	// fleet may be in the process of scaling up. Callers should wait briefly
	// (with backoff) and retry. The singleflight provision path handles
	// deduplication of concurrent scale-up requests automatically.
	ErrNoCapacity = errors.New("fleet: no capacity available")

	// ErrFleetClosed indicates the fleet manager is shutting down and cannot
	// accept new CreateSandbox calls. This error is permanent for the
	// lifetime of this FleetSandboxControl instance.
	ErrFleetClosed = errors.New("fleet: adapter is closed")

	// ErrInstanceNotFound indicates the instance ID parsed from a SandboxID
	// does not correspond to a known managed instance in the fleet.
	ErrInstanceNotFound = errors.New("fleet: instance not found")

	// ErrInvalidStateTransition indicates an attempted instance state
	// transition that is not permitted by the state machine.
	ErrInvalidStateTransition = errors.New("fleet: invalid state transition")

	// ErrInstanceDraining indicates an operation was attempted on an instance
	// that is in the draining state and not accepting new sessions.
	ErrInstanceDraining = errors.New("fleet: instance is draining")
)

Sentinel errors returned by FleetSandboxControl. Error classification:

| Error | Retryable? | Description | |--------------------|------------|-------------------------------------------------------| | ErrNoCapacity | Yes | No instance has available session slots; fleet may be | | | | scaling up. Caller should wait briefly and retry. | | ErrFleetClosed | No | Fleet manager is shutting down. No new sandboxes. | | parseSandboxID err | No | SandboxID is malformed. Programming error. | | Node RPC errors | Depends | Passed through from NodeSandboxControl. | | Provisioner errors | Depends | Cloud API errors may be transient or permanent. |

Functions

func ParseSandboxID

func ParseSandboxID(sandboxID string) (instanceID, sessionID string, err error)

ParseSandboxID extracts the instance ID and session ID from a fleet SandboxID. Returns an error if the ID is malformed (wrong prefix, missing separator, or empty components).

This is the exported entry point; internal callers use parseSandboxID.

Types

type CapacityRouter

type CapacityRouter struct {
	// contains filtered or unexported fields
}

CapacityRouter selects the best instance for a new sandbox using a spread strategy that prefers the most available capacity. This maximizes per-instance headroom so burst traffic is absorbed without immediate scale-up.

func NewCapacityRouter

func NewCapacityRouter(config FleetConfig) *CapacityRouter

NewCapacityRouter creates a router configured from the given FleetConfig.

func (*CapacityRouter) SelectInstance

func (r *CapacityRouter) SelectInstance(instances []*ManagedInstance) (*ManagedInstance, error)

SelectInstance picks the best instance for a new sandbox. Returns ErrNoCapacity if no instance can accept the sandbox.

Strategy: spread (prefer most available capacity). When multiple instances have equal headroom, use IdleSince as a tiebreaker (prefer the instance that has been idle longest, promoting even distribution). If IdleSince is also equal (e.g., all instances at the same session count), use an atomic counter for round-robin to prevent map iteration order bias.

Caller must ensure instances are not concurrently mutated (hold fleet read lock or pass a snapshot).

type Clock

type Clock interface {
	Now() time.Time
}

Clock abstracts time for deterministic testing.

type DrainReason

type DrainReason string

DrainReason records why an instance entered the Draining state. This determines whether Draining -> Ready recovery is permitted.

const (
	// DrainHealth indicates the instance was drained because it failed
	// consecutive health checks. Recovery to Ready is permitted if the
	// instance subsequently passes HealthyThreshold consecutive checks.
	DrainHealth DrainReason = "health"

	// DrainScaleDown indicates the instance was drained as part of scale-down
	// (idle too long, fleet can shrink). Recovery to Ready is NOT permitted.
	DrainScaleDown DrainReason = "scale-down"

	// DrainShutdown indicates the instance was drained as part of fleet
	// shutdown (Close was called). Recovery to Ready is NOT permitted.
	DrainShutdown DrainReason = "shutdown"
)

type FleetConfig

type FleetConfig struct {
	// Instance provisioning
	InstanceConfig instance.InstanceConfig // Template for launching new instances (from shared package)

	// Fleet sizing
	MinInstances   int // Floor -- never scale below this (default: 1)
	MaxInstances   int // Ceiling -- never scale above this (default: 50)
	WarmPoolTarget int // Number of idle instances to keep ready (default: 2)

	// Capacity thresholds
	MaxSessionsPerInstance int     // Max sandboxes per instance (default: 10)
	CapacityHeadroom       float64 // Don't route above this fraction (default: 0.8)

	// Health checking
	HealthCheckInterval time.Duration // How often to poll (default: 15s)
	UnhealthyThreshold  int           // Consecutive failures before marking unhealthy (default: 3)
	HealthyThreshold    int           // Consecutive successes before marking healthy again during drain recovery (default: 3)

	// Scaling
	IdleCooldown            time.Duration // How long an instance must be idle before scale-down (default: 10m)
	DrainTimeout            time.Duration // Max time to wait for active sandboxes during drain (default: 30m)
	ProvisionTimeout        time.Duration // Max time to wait for instance to become healthy (default: 5m)
	MaxConcurrentProvisions int           // Max simultaneous LaunchInstance calls (default: 3)

	// Termination
	MaxTerminateRetries int // Max consecutive TerminateInstance failures before alerting (default: 5)

	// Shutdown behavior
	LeaveInstancesOnClose bool // If true, skip instance termination on Close for re-adoption on restart

	// Sandbox-host port
	SandboxHostPort int // Port sandbox-host listens on (default: 9100)
}

FleetConfig controls fleet behavior. All durations with zero values use the defaults from DefaultFleetConfig().

func DefaultFleetConfig

func DefaultFleetConfig() FleetConfig

DefaultFleetConfig returns a FleetConfig with sensible production defaults.

func (FleetConfig) Validate

func (c FleetConfig) Validate() error

Validate checks config invariants and returns an error describing all violations found. Returns nil if the config is valid.

type FleetNodeClient

type FleetNodeClient interface {
	control.SandboxControl
	HealthCheck(ctx context.Context) (*HealthCheckResult, error)
}

FleetNodeClient combines SandboxControl with health checking. The fleet control loop needs both sandbox delegation and health polling from each instance. SandboxControl alone has no HealthCheck method, so this composite interface bridges the gap.

type FleetOption

type FleetOption func(*fleetOptions)

FleetOption configures optional behavior on FleetSandboxControl.

func WithLeaveInstancesOnClose

func WithLeaveInstancesOnClose(leave bool) FleetOption

WithLeaveInstancesOnClose configures the fleet to skip instance termination on Close. This is used when the fleet manager is restarting and instances should be re-adopted by the new manager via crash recovery.

func WithLogger

func WithLogger(logger *slog.Logger) FleetOption

WithLogger sets the structured logger for fleet operations.

func WithMetricsRegisterer

func WithMetricsRegisterer(registerer prometheus.Registerer) FleetOption

WithMetricsRegisterer sets the Prometheus registerer used for fleet metrics. If nil, the default registerer is used.

type FleetSandboxControl

type FleetSandboxControl struct {
	// contains filtered or unexported fields
}

FleetSandboxControl implements SandboxControl by managing a fleet of sandbox-host instances. It provisions instances via InstanceProvisioner, monitors health, maintains a warm pool, and routes sandbox operations to per-instance NodeSandboxControl clients.

Lock ordering discipline (MUST be followed throughout):

  1. FleetSandboxControl.mu (fleet-level lock)
  2. ManagedInstance.mu (per-instance lock)

The fleet lock is ALWAYS acquired before any instance lock. No code path may acquire FleetSandboxControl.mu while holding any ManagedInstance.mu. This prevents deadlocks between the control loop and concurrent callers.

func NewFleetSandboxControl

func NewFleetSandboxControl(
	provisioner instance.InstanceProvisioner,
	clientFactory SandboxClientFactory,
	cfg FleetConfig,
	opts ...FleetOption,
) (*FleetSandboxControl, error)

NewFleetSandboxControl creates a new fleet manager. The control loop (health checks, scaling) is started in bead aiag-1xd.4 via StartControlLoop.

func (*FleetSandboxControl) Capabilities

Capabilities returns the fleet's capability set. ConcurrentSandboxes is the theoretical ceiling (MaxInstances * MaxSessionsPerInstance); actual available capacity depends on fleet state.

func (*FleetSandboxControl) Close

func (f *FleetSandboxControl) Close() error

Close initiates graceful shutdown: cancels the control loop, drains and terminates instances (unless LeaveInstancesOnClose is set).

func (*FleetSandboxControl) CreateSandbox

CreateSandbox picks the best instance via capacity-aware routing, atomically claims a session slot, creates a sandbox on it, and returns a fleet-prefixed SandboxID. If no capacity exists, attempts to provision a new instance.

func (*FleetSandboxControl) DestroySandbox

func (f *FleetSandboxControl) DestroySandbox(ctx context.Context, sandboxID string) error

DestroySandbox parses the fleet SandboxID, decrements the session count eagerly, then delegates to the per-instance client. On RPC failure the count is rolled back; the reconciliation loop corrects any remaining drift.

func (*FleetSandboxControl) FleetStatus

func (f *FleetSandboxControl) FleetStatus() FleetStatus

FleetStatus returns a snapshot of the fleet's current state.

func (*FleetSandboxControl) GetProcessStatus

GetProcessStatus delegates to the per-instance client after rewriting SandboxID.

func (*FleetSandboxControl) KillProcess

KillProcess delegates to the per-instance client after rewriting SandboxID.

func (*FleetSandboxControl) LaunchProcess

LaunchProcess delegates to the per-instance client after rewriting SandboxID to the session-local ID.

func (*FleetSandboxControl) PauseSandbox

func (f *FleetSandboxControl) PauseSandbox(ctx context.Context, sandboxID string) error

PauseSandbox delegates to the per-instance client after parsing the SandboxID.

func (*FleetSandboxControl) RecoverInstances

func (f *FleetSandboxControl) RecoverInstances(ctx context.Context) error

RecoverInstances rediscovers managed instances via cloud API tags and re-adds them to the fleet. Call before StartControlLoop after a restart.

func (*FleetSandboxControl) ResumeSandbox

func (f *FleetSandboxControl) ResumeSandbox(ctx context.Context, sandboxID string) error

ResumeSandbox delegates to the per-instance client after parsing the SandboxID.

func (*FleetSandboxControl) StartControlLoop

func (f *FleetSandboxControl) StartControlLoop()

StartControlLoop begins the background control loop goroutine. Must be called after NewFleetSandboxControl. The loop runs every HealthCheckInterval.

type FleetStatus

type FleetStatus struct {
	TotalInstances    int
	InstancesByState  map[InstanceState]int
	TotalSessions     int64
	WarmPoolSize      int
	PendingProvisions int32
	Healthy           bool
	Closed            bool
}

FleetStatus contains a point-in-time snapshot of fleet state.

type HealthCheckResult

type HealthCheckResult struct {
	Status       string
	SessionCount int
	ActiveTools  int
	Uptime       time.Duration
	Errors       []string
}

HealthCheckResult contains the health status reported by a sandbox-host instance. This is the fleet-local representation; the RPC layer converts to/from the wire format in internal/rpc/api.HealthCheckResponse.

type InstanceState

type InstanceState string

InstanceState represents the lifecycle state of a managed instance in the fleet.

const (
	// InstanceProvisioning indicates the cloud instance has been launched but
	// the sandbox-host has not yet passed health checks.
	InstanceProvisioning InstanceState = "provisioning"

	// InstanceReady indicates the sandbox-host is healthy with 0 sessions,
	// available for routing new sandboxes.
	InstanceReady InstanceState = "ready"

	// InstanceActive indicates the sandbox-host is healthy with >= 1 active
	// session.
	InstanceActive InstanceState = "active"

	// InstanceDraining indicates no new sessions should be assigned. The
	// instance is waiting for existing sessions to complete before
	// termination (or recovering health if DrainReason is DrainHealth).
	InstanceDraining InstanceState = "draining"

	// InstanceTerminating indicates TerminateInstance has been called and
	// we are waiting for cloud confirmation.
	InstanceTerminating InstanceState = "terminating"
)

type ManagedInstance

type ManagedInstance struct {

	// InstanceID is the cloud provider's unique identifier.
	InstanceID string

	// State is the current lifecycle state.
	State InstanceState

	// Client is the combined SandboxControl + HealthCheck client for this
	// instance. Set after the instance is provisioned and the client factory
	// creates a connection.
	Client FleetNodeClient

	// SessionCount tracks the number of active sandbox sessions on this
	// instance. Updated via the atomic claim-slot protocol (section 4.1.1)
	// and reconciled against HealthCheck-reported counts.
	SessionCount int64

	// MaxSessions is the maximum number of sessions this instance can host,
	// typically set from FleetConfig.MaxSessionsPerInstance.
	MaxSessions int

	// IP is the instance's private IP address used for RPC connections.
	IP string

	// ConsecutiveHealthSuccesses counts consecutive successful health checks.
	// Used for Draining -> Ready recovery when DrainReason is DrainHealth.
	// Reset to 0 on health check failure.
	ConsecutiveHealthSuccesses int

	// ConsecutiveHealthFailures counts consecutive failed health checks.
	// Used to trigger Draining transition when >= UnhealthyThreshold.
	// Reset to 0 on health check success.
	ConsecutiveHealthFailures int

	// DrainReason records why this instance entered the Draining state.
	// Only meaningful when State == InstanceDraining.
	DrainReason DrainReason

	// IdleSince records when the session count last reached 0 (in Ready state).
	// Used by the scale-down logic to determine if the instance has been idle
	// longer than IdleCooldown.
	IdleSince time.Time

	// ProvisionStarted records when provisioning began. Used by the control
	// loop to detect provision timeouts.
	ProvisionStarted time.Time

	// DrainStarted records when draining began. Used by the control loop
	// to detect drain timeouts and force-terminate.
	DrainStarted time.Time

	// TerminateRetries counts consecutive TerminateInstance failures for
	// this instance. Used to trigger an alert after MaxTerminateRetries.
	TerminateRetries int
	// contains filtered or unexported fields
}

ManagedInstance tracks the state of a single instance in the fleet.

Lock ordering discipline: FleetSandboxControl.mu (fleet-level lock) must ALWAYS be acquired BEFORE ManagedInstance.mu (per-instance lock). No code path may acquire FleetSandboxControl.mu while holding any ManagedInstance.mu. This prevents deadlocks between the control loop and concurrent callers. See plan 20, section 4.1.2.

func (*ManagedInstance) ClaimSession

func (mi *ManagedInstance) ClaimSession() error

ClaimSession atomically increments the session count and transitions Ready -> Active if this is the first session. Returns an error if the instance is not in a routable state (Ready or Active) or is at capacity.

Caller must hold mi.mu for writing.

func (*ManagedInstance) ReleaseSession

func (mi *ManagedInstance) ReleaseSession() error

ReleaseSession decrements the session count and transitions Active -> Ready if this was the last session. Returns an error if the session count would go negative.

Caller must hold mi.mu for writing.

func (*ManagedInstance) TransitionTo

func (mi *ManagedInstance) TransitionTo(newState InstanceState) error

TransitionTo attempts to change the instance state. Returns an error if the transition is not valid according to the state machine.

Caller must hold mi.mu for writing.

Special rules:

  • Draining -> Ready is only permitted when DrainReason is DrainHealth. Scale-down and shutdown drains are irrecoverable.

type SandboxClientFactory

type SandboxClientFactory func(addr string) (FleetNodeClient, error)

SandboxClientFactory creates a FleetNodeClient for a given sandbox-host address. This is injected for testability — unit tests provide a mock factory, production code provides the real RPC client constructor wrapping the connection in a NodeSandboxControl + health client.

Directories

Path Synopsis
Package ec2 implements the instance.InstanceProvisioner interface using the AWS EC2 API (SDK v2).
Package ec2 implements the instance.InstanceProvisioner interface using the AWS EC2 API (SDK v2).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL