controlplane

package
v0.0.0-...-80ce94d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: MIT Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SNIRoutingOff         = "off"         // ignore SNI entirely; identity can no longer be resolved
	SNIRoutingPassthrough = "passthrough" // require managed SNI but warn on legacy hostnames
	SNIRoutingEnforce     = "enforce"     // default: require a managed SNI hostname that resolves to an org
)

SNI routing modes (values for ControlPlaneConfig.SNIRoutingMode).

View Source
const (
	RetireReasonNormal            = "normal"
	RetireReasonActivationFailure = "activation_failure"
	RetireReasonOrphaned          = "orphaned"
	RetireReasonCrash             = "crash"
	RetireReasonShutdown          = "shutdown"
	RetireReasonIdleTimeout       = "idle_timeout"
	RetireReasonStuckActivating   = "stuck_activating"
	RetireReasonMismatchedVersion = "mismatched_version"
)

Retirement reason constants. Passed as the `reason` argument to WorkerLifecycle.* methods and surfaced on duckgres_worker_lifecycle_transitions_total as part of the operation context (also fed into lifecycleOriginForRetireReason for retire_local observations). Defined here (no build tag) rather than in warm_pool_metrics.go (kubernetes-tagged) so the no-tag lifecycle observation helpers can reference them.

View Source
const DefaultK8sWorkerServiceAccount = "duckgres-worker"
View Source
const DefaultWorkerSpawnRetryAfter = 45 * time.Second

Variables

View Source
var ErrSessionManagerDraining = errors.New("session manager is draining")
View Source
var ErrTooManyConnections = errors.New("too many connections")

Functions

func NewRuntimeOrgConnectionLimiter

func NewRuntimeOrgConnectionLimiter(store runtimeOrgConnectionStore, orgID, cpInstanceID string, queueTTL time.Duration) connectionLimiter

func NewWorkerCapacityExhaustedError

func NewWorkerCapacityExhaustedError(retryAfter time.Duration) error

func NewWorkerCapacityExhaustedErrorForReason

func NewWorkerCapacityExhaustedErrorForReason(reason configstore.WorkerClaimMissReason, retryAfter time.Duration) error

func RegisterK8sPoolFactory

func RegisterK8sPoolFactory(f K8sPoolFactory)

RegisterK8sPoolFactory is called from an init() in k8s_factory.go (build tag: kubernetes).

func RunControlPlane

func RunControlPlane(cfg ControlPlaneConfig)

RunControlPlane is the entry point for the control plane process.

func SetupMultiTenant

func SetupMultiTenant(
	cfg ControlPlaneConfig,
	srv *server.Server,
	memBudget uint64,
	maxWorkers int,
	isHealthy func() bool,
) (ConfigStoreInterface, OrgRouterInterface, *http.Server, *ControlPlaneRuntimeTracker, *JanitorLeaderManager, error)

SetupMultiTenant is not available without the kubernetes build tag.

Types

type CPUserSecretManager

type CPUserSecretManager struct {
	// contains filtered or unexported fields
}

CPUserSecretManager implements server.UserSecretManager on top of the config store: it seals user CREATE PERSISTENT SECRET statements with AES-GCM and persists them per (org, user, name) for replay at session creation. With no encryption key configured, writes are disabled (Ready errors, so the client gets a clear message) but deletes still work, so stale rows remain removable.

func NewCPUserSecretManager

func NewCPUserSecretManager(store *configstore.ConfigStore, encodedKey string) (*CPUserSecretManager, error)

NewCPUserSecretManager builds the manager. encodedKey is the value of DUCKGRES_USER_SECRET_KEY; empty disables persistence (not an error), a malformed key is a startup error (fail fast on operator mistakes rather than surfacing them to customers one statement at a time).

func (*CPUserSecretManager) DeleteSecret

func (m *CPUserSecretManager) DeleteSecret(_ context.Context, orgID, username, secretName string) (bool, error)

DeleteSecret implements server.UserSecretManager. Works without a cipher so rows written under a lost key can still be removed.

func (*CPUserSecretManager) LoadStatements

func (m *CPUserSecretManager) LoadStatements(_ context.Context, orgID, username string) ([]string, error)

LoadStatements returns the user's decrypted secret statements for replay at session creation. Rows that fail to decrypt (e.g. written under a rotated key) are skipped with a loud log instead of failing the whole session.

func (*CPUserSecretManager) PutSecret

func (m *CPUserSecretManager) PutSecret(_ context.Context, orgID, username, secretName, statement string, ifNotExists bool) error

PutSecret implements server.UserSecretManager. With ifNotExists set, an already-stored name is left untouched (DuckDB no-ops the live session for IF NOT EXISTS, so replacing the stored statement would diverge the two).

func (*CPUserSecretManager) Ready

func (m *CPUserSecretManager) Ready() error

Ready implements server.UserSecretManager.

func (*CPUserSecretManager) SessionSecretLoader

func (m *CPUserSecretManager) SessionSecretLoader(orgID string) func(ctx context.Context, username string) ([]string, error)

SessionSecretLoader binds an org onto LoadStatements in the shape SessionManager.SetUserSecretLoader expects.

type ConfigStoreInterface

type ConfigStoreInterface interface {
	ResolveDatabase(database string) (orgID string)
	DatabaseNameForSNIPrefix(prefix string) string // translates SNI hostname prefix → canonical database_name (alias-aware)
	// ResolveSNIPrefix maps a managed hostname prefix to its org and database.
	// It accepts hostname_alias, database_name, and DNS-safe org names.
	ResolveSNIPrefix(prefix string) (orgID, databaseName string)
	ResolvePostgresConnection(startupDatabase, sniPrefix string, useManagedSNI bool, username, password string) configstore.PostgresConnectionResolution
	ValidateOrgUser(orgID, username, password string) bool
	// ValidateOrgUserAndGetPassthrough does both lookups against the same
	// snapshot — the auth path needs both, and a single read closes the
	// window where the snapshot could swap between two separate calls.
	// passthrough is always false when valid is false.
	ValidateOrgUserAndGetPassthrough(orgID, username, password string) (valid, passthrough bool)
	// OrgWarehouseStatus reports an org's current warehouse provisioning state so
	// connection-time errors can distinguish "no such org" from "warehouse not
	// ready yet". Returns (state, orgExists). state is "" when the org has no
	// warehouse row (legacy single-tenant orgs); otherwise it is the lifecycle
	// string (pending/provisioning/ready/failed/deleting/deleted).
	OrgWarehouseStatus(orgID string) (state string, orgExists bool)
	// OrgDefaultWorkerProfile returns the org's operator-set default worker
	// profile (config-store columns default_worker_cpu/memory/ttl): cpu and
	// memory as k8s resource-quantity strings, ttl as a Go duration string.
	// Empty strings mean "not set" (including unknown orgs). Raw stored
	// values — validation happens in resolveWorkerProfile.
	OrgDefaultWorkerProfile(orgID string) (cpu, memory, ttl string)
	UpsertFlightSessionRecord(record *configstore.FlightSessionRecord) error
	GetFlightSessionRecord(sessionToken string) (*configstore.FlightSessionRecord, error)
	TouchFlightSessionRecord(sessionToken string, lastSeenAt time.Time) error
	CloseFlightSessionRecord(sessionToken string, closedAt time.Time) error
}

ConfigStoreInterface abstracts the config store for the control plane. Defined here to avoid circular imports with the configstore package.

type ControlPlane

type ControlPlane struct {
	// contains filtered or unexported fields
}

ControlPlane manages the TCP listener and routes connections to Flight SQL workers. The control plane owns client connections end-to-end: TLS, authentication, PostgreSQL wire protocol, and SQL transpilation all happen here. Workers are thin DuckDB execution engines reachable via Arrow Flight SQL over Unix sockets.

type ControlPlaneConfig

type ControlPlaneConfig struct {
	server.Config

	Process ProcessConfig

	SocketDir            string
	ConfigPath           string // Path to config file, passed to workers
	HealthCheckInterval  time.Duration
	WorkerQueueTimeout   time.Duration // How long to wait for an available worker/org connection slot (default: 60s)
	WorkerIdleTimeout    time.Duration // How long to keep an idle worker alive (default: 5m)
	RetireOnSessionEnd   bool          // When true, process workers are retired immediately after their last session ends.
	HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade. 0 = unbounded (wait until k8s SIGKILL via terminationGracePeriodSeconds). Default: 0 in remote mode (so a CP rolling out doesn't kill in-flight customer queries at a self-imposed wall — see drainAndShutdown), 24h in process mode.
	MetricsServer        *http.Server  // Optional metrics server to shut down during upgrade

	// WorkerBackend selects the worker management backend.
	// "process" (default): workers are local child processes communicating over Unix sockets.
	// "remote": Kubernetes-backed multitenant workers communicating over TCP.
	//           Requires ConfigStoreConn and a binary built with -tags kubernetes.
	WorkerBackend string

	// K8s contains Kubernetes-specific configuration. Only used for remote
	// multitenant mode.
	K8s K8sConfig

	// ConfigStoreConn is the PostgreSQL connection string for the config store.
	// Required when WorkerBackend == "remote".
	ConfigStoreConn string

	// ConfigPollInterval is how often to poll the config store for changes.
	// Default: 30s.
	ConfigPollInterval time.Duration

	// InternalSecret is the shared secret for API authentication.
	// When empty, a random secret is generated and logged at startup.
	InternalSecret string

	// InternalSecretFallbacks are previous internal secrets still accepted
	// for API authentication during a rotation (newest first). Clients always
	// send the primary InternalSecret; the server accepts any of
	// {primary ∪ fallbacks}. Mirrors posthog's SECRET_KEY_FALLBACKS.
	InternalSecretFallbacks []string

	// UserSecretKey is the base64-encoded 32-byte AES key for encrypting
	// user persistent secrets in the config store (env-only:
	// DUCKGRES_USER_SECRET_KEY). Empty disables the persistent secret
	// manager: CREATE PERSISTENT SECRET is rejected with a clear error.
	// Only used in multitenant remote mode.
	UserSecretKey string

	// SNIRoutingMode controls hostname-based org routing. Values:
	//   "" or "off"   - SNI is ignored; legacy database-param routing only
	//                   (default).
	//   "passthrough" - Managed Postgres SNI must resolve to the same org as
	//                   the requested startup database; when startup database is
	//                   empty, use the SNI-derived database fallback. Legacy
	//                   hostnames still fall back with a warn log.
	//   "enforce"     - Reject connections whose SNI doesn't match a managed
	//                   suffix. Explicit startup database still takes
	//                   priority when SNI is present, but must resolve to the
	//                   same org as the managed hostname.
	// Unknown values behave like "off" in the connection path.
	// Only consulted in multi-tenant control-plane builds (kubernetes tag);
	// other builds always behave as "off".
	SNIRoutingMode string

	// ManagedHostnameSuffixes lists DNS suffixes (each starting with a dot) for
	// managed tenant hostnames. When SNI matches one of these suffixes, the
	// single-label prefix is resolved as hostname_alias, database_name, or org
	// name. Postgres still honors an explicit startup database, but only when it
	// resolves to the same org as the managed hostname.
	ManagedHostnameSuffixes []string

	// DucklingBucketSuffix is the env suffix the control plane uses to name a
	// type=s3bucket Duckling's per-org S3 bucket
	// (posthog-duckling-<compact-org>-<suffix>). It MUST equal the
	// crossplane-config chart's envSuffix for this environment so the name the
	// CP writes onto the Duckling CR's spec.dataStore.bucketName is exactly what
	// the composition provisions. Empty ⇒ the CP does not name buckets and the
	// composition derives the name (legacy behavior). See
	// configstore.DucklingBucketName.
	DucklingBucketSuffix string

	// DuckLakeDefaultSpecVersion is the global default DuckLake spec version
	// used for migration checks when an org doesn't specify an override.
	DuckLakeDefaultSpecVersion string
}

ControlPlaneConfig extends server.Config with control-plane-specific settings.

type ControlPlaneJanitor

type ControlPlaneJanitor struct {
	// contains filtered or unexported fields
}

func NewControlPlaneJanitor

func NewControlPlaneJanitor(store controlPlaneExpiryStore, interval, expiryTimeout time.Duration) *ControlPlaneJanitor

func (*ControlPlaneJanitor) Run

func (j *ControlPlaneJanitor) Run(ctx context.Context)

type ControlPlaneRuntimeTracker

type ControlPlaneRuntimeTracker struct {
	// contains filtered or unexported fields
}

func NewControlPlaneRuntimeTracker

func NewControlPlaneRuntimeTracker(store runtimeInstanceStore, id, podName, podUID, bootID string, heartbeatInterval time.Duration) *ControlPlaneRuntimeTracker

func (*ControlPlaneRuntimeTracker) Draining

func (t *ControlPlaneRuntimeTracker) Draining() bool

func (*ControlPlaneRuntimeTracker) MarkDraining

func (t *ControlPlaneRuntimeTracker) MarkDraining() error

func (*ControlPlaneRuntimeTracker) Start

type FlightIngress

type FlightIngress = flightsqlingress.FlightIngress

func NewFlightIngress

func NewFlightIngress(host string, port int, tlsConfig *tls.Config, validator flightsqlingress.CredentialValidator, provider flightsqlingress.SessionProvider, rateLimiter *server.RateLimiter, cfg FlightIngressConfig) (*FlightIngress, error)

NewFlightIngress creates a control-plane Flight SQL ingress listener.

func NewFlightIngressFromListener

func NewFlightIngressFromListener(listener net.Listener, tlsConfig *tls.Config, validator flightsqlingress.CredentialValidator, provider flightsqlingress.SessionProvider, rateLimiter *server.RateLimiter, cfg FlightIngressConfig) (*FlightIngress, error)

type FlightIngressConfig

type FlightIngressConfig = flightsqlingress.Config

type FlightWorkerPool

type FlightWorkerPool struct {
	// contains filtered or unexported fields
}

func NewFlightWorkerPool

func NewFlightWorkerPool(socketDir, configPath string, minWorkers, maxWorkers int) *FlightWorkerPool

NewFlightWorkerPool creates a new worker pool.

func (*FlightWorkerPool) AcquireWorker

func (p *FlightWorkerPool) AcquireWorker(ctx context.Context, _ *WorkerProfile) (*ManagedWorker, error)

AcquireWorker returns a worker for a new session.

Strategy:

  1. Reuse an idle worker (0 active sessions) if available.
  2. If the pool has fewer live workers than maxWorkers (or maxWorkers is 0), spawn a new worker process.
  3. If the pool is at capacity, assign to the least-loaded live worker.

This ensures the number of worker processes never exceeds maxWorkers while allowing unlimited concurrent sessions across the fixed pool.

func (*FlightWorkerPool) HealthCheckLoop

func (p *FlightWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Duration, onCrash WorkerCrashHandler, onProgress ProgressHandler)

HealthCheckLoop periodically checks worker health and handles crashed workers. In the elastic 1:1 model, crashed workers with active sessions trigger crash notification (so sessions see errors), and the dead worker is cleaned up. Workers without sessions are simply retired. Workers that fail maxConsecutiveHealthFailures health checks in a row are force-killed and their sessions notified.

func (*FlightWorkerPool) PreBindSockets

func (p *FlightWorkerPool) PreBindSockets(count int) error

PreBindSockets eagerly binds count Unix sockets at startup while the socket directory is verified writable. Under systemd's ProtectSystem=strict, the RuntimeDirectory bind mount can go read-only after the service finishes starting (e.g., after an upgrade or namespace event). Pre-binding ensures sockets are available regardless of later filesystem state.

func (*FlightWorkerPool) ReleaseWorker

func (p *FlightWorkerPool) ReleaseWorker(id int)

ReleaseWorker decrements the active session count for a worker and updates its lastUsed time.

func (*FlightWorkerPool) RetireWorker

func (p *FlightWorkerPool) RetireWorker(id int)

RetireWorker stops a worker process and cleans up its resources. Sends SIGINT, waits up to 3s, then SIGKILL. Runs asynchronously to avoid blocking the calling goroutine (e.g., connection handler).

func (*FlightWorkerPool) RetireWorkerIfNoSessions

func (p *FlightWorkerPool) RetireWorkerIfNoSessions(id int) bool

RetireWorkerIfNoSessions retires a worker only if it has no active sessions after releasing our claim. Used to clean up on session creation failure without retiring shared workers that have other active sessions.

func (*FlightWorkerPool) SetMaxWorkers

func (p *FlightWorkerPool) SetMaxWorkers(n int)

SetMaxWorkers updates the maximum number of workers. 0 means unlimited.

func (*FlightWorkerPool) ShutdownAll

func (p *FlightWorkerPool) ShutdownAll()

ShutdownAll stops all workers gracefully.

func (*FlightWorkerPool) SpawnAll

func (p *FlightWorkerPool) SpawnAll(count int) error

SpawnAll spawns the specified number of workers in parallel.

func (*FlightWorkerPool) SpawnMinWorkers

func (p *FlightWorkerPool) SpawnMinWorkers(count int) error

SpawnMinWorkers pre-warms the pool with the given number of workers. This is used at startup for the elastic 1:1 model.

func (*FlightWorkerPool) SpawnWorker

func (p *FlightWorkerPool) SpawnWorker(id int) error

SpawnWorker starts a new duckdb-service worker process. It uses a pre-bound socket from the pool if available, falling back to binding a new socket (which may fail with EROFS under systemd's ProtectSystem=strict after startup).

func (*FlightWorkerPool) Worker

func (p *FlightWorkerPool) Worker(id int) (*ManagedWorker, bool)

Worker returns a worker by ID.

type HealthCheckResult

type HealthCheckResult string

HealthCheckResult records the outcome of a single worker health check probe. "pass" and "fail" are the only first-class outcomes; recoverWorkerPanic-caught panics surface as "fail" because the caller sees them as an error return.

const (
	HealthCheckResultPass HealthCheckResult = "pass"
	HealthCheckResultFail HealthCheckResult = "fail"
)

type JanitorLeaderManager

type JanitorLeaderManager struct{}

func NewJanitorLeaderManager

func NewJanitorLeaderManager(namespace, identity string, janitor *ControlPlaneJanitor) (*JanitorLeaderManager, error)

func (*JanitorLeaderManager) Start

func (m *JanitorLeaderManager) Start(ctx context.Context) error

func (*JanitorLeaderManager) Stop

func (m *JanitorLeaderManager) Stop()

type K8sConfig

type K8sConfig struct {
	WorkerImage             string // Container image for worker pods (required)
	WorkerNamespace         string // K8s namespace (default: auto-detect from service account)
	ControlPlaneID          string // Unique CP identifier for labeling worker pods (default: os.Hostname())
	WorkerPort              int    // gRPC port on worker pods (default: 8816)
	WorkerSecret            string // Base name for per-worker K8s Secrets containing RPC bearer token and TLS material
	WorkerConfigMap         string // ConfigMap name for duckgres.yaml
	ImagePullPolicy         string // Image pull policy for worker pods (e.g., "Never", "IfNotPresent", "Always")
	ServiceAccount          string // ServiceAccount name for worker pods (default: "duckgres-worker")
	MaxWorkers              int    // Global cap for the shared K8s worker pool (0 = unbounded; cluster autoscaler is the natural ceiling)
	WorkerCPURequest        string // CPU request for worker pods (e.g., "500m")
	WorkerMemoryRequest     string // Memory request for worker pods (e.g., "1Gi")
	WorkerNodeSelector      string // JSON map for worker pod nodeSelector (e.g., '{"posthog.com/nodepool":"workers"}')
	WorkerTolerationKey     string // Taint key for worker pod NoSchedule toleration
	WorkerTolerationValue   string // Taint value for worker pod NoSchedule toleration
	WorkerPriorityClassName string // PriorityClass for worker pods, so they preempt overprovision headroom pause pods (empty = none)
	AWSRegion               string // AWS region for STS client

	// Node-headroom controller: keep HeadroomPercent% of the worker nodepool's
	// allocatable CPU+memory free via low-priority placeholder pods, so a worker
	// spawn schedules immediately (preempting placeholders) rather than waiting
	// on a fresh Karpenter node. 0 = disabled.
	HeadroomPercent              int    // % of worker-nodepool allocatable to hold free (0 = disabled)
	PlaceholderImage             string // Image for placeholder pods (a pause image)
	PlaceholderPriorityClassName string // PriorityClass for placeholder pods — MUST rank below WorkerPriorityClassName

	// Connection-string worker sizing (duckgres.worker_cpu / worker_memory /
	// worker_ttl). All default to the off/empty state, so absent config = the
	// default worker shape. See docs/design/connection-string-worker-profile.md.
	AllowClientWorkerProfile bool          // Master gate: honor duckgres.* startup options at all
	WorkerProfileMinCPU      string        // Clamp floor for a client-supplied cpu (e.g. "1")
	WorkerProfileMaxCPU      string        // Clamp ceiling for a client-supplied cpu (e.g. "16")
	WorkerProfileMinMemory   string        // Clamp floor for a client-supplied memory (e.g. "4Gi")
	WorkerProfileMaxMemory   string        // Clamp ceiling for a client-supplied memory (e.g. "64Gi")
	WorkerMaxTTL             time.Duration // Clamp ceiling for a client-supplied duckgres.worker_ttl (0 = unbounded)

	// WorkerDefaultTTL is the hot-idle TTL applied when a request does not
	// specify duckgres.worker_ttl (and no org default does either) — the
	// "default TTL", pairing with WorkerMaxTTL (the clamp). It governs both
	// no-ttl paths the same way: default-shape workers (reaped by the janitor)
	// and sized-but-no-ttl workers (stamped at profile resolution). Per-request
	// precedence: client GUC > org default > this > built-in (20m,
	// defaultWorkerTTL). Raise it above a tenant's job cadence (e.g. 70m for
	// hourly jobs) so scheduled workloads reuse hot-idle workers instead of
	// cold-spawning every run — at the cost of idle worker nodes.
	WorkerDefaultTTL time.Duration
}

K8sConfig holds Kubernetes worker backend configuration.

type K8sPoolFactory

type K8sPoolFactory func(cfg K8sWorkerPoolConfig) (WorkerPool, error)

K8sPoolFactory creates a K8sWorkerPool. Registered at init time by the kubernetes build tag. Nil when the binary is built without -tags kubernetes.

type K8sWorkerPoolConfig

type K8sWorkerPoolConfig struct {
	Namespace                    string
	CPID                         string // Control plane pod name, used in labels
	CPInstanceID                 string // Durable control-plane instance ID (<pod_uid>:<boot_id>)
	WorkerImage                  string
	WorkerPort                   int
	SecretName                   string // Base name for per-worker K8s Secrets containing RPC bearer token and TLS material
	ConfigMap                    string // ConfigMap name for duckgres.yaml
	MaxWorkers                   int
	IdleTimeout                  time.Duration
	ConfigPath                   string                                       // Path inside worker pod where config is mounted
	ImagePullPolicy              string                                       // Image pull policy for worker pods (e.g., "Never", "IfNotPresent", "Always")
	ServiceAccount               string                                       // ServiceAccount name for worker pods (default: "duckgres-worker")
	WorkerCPURequest             string                                       // CPU request for worker pods (e.g., "500m"). Empty = BestEffort.
	WorkerMemoryRequest          string                                       // Memory request for worker pods (e.g., "1Gi"). Empty = BestEffort.
	WorkerNodeSelector           map[string]string                            // Node selector for worker pods. Nil = no selector.
	WorkerTolerationKey          string                                       // Taint key for worker pod NoSchedule toleration. Empty = no toleration.
	WorkerTolerationValue        string                                       // Taint value for worker pod NoSchedule toleration.
	WorkerPriorityClassName      string                                       // PriorityClass for worker pods (so they preempt overprovision pause pods). Empty = none.
	HeadroomPercent              int                                          // Keep this % of worker-nodepool allocatable CPU+mem free via low-priority placeholder pods (0 = disabled).
	PlaceholderImage             string                                       // Image for headroom placeholder pods (a pause image).
	PlaceholderPriorityClassName string                                       // PriorityClass for placeholder pods — MUST be below WorkerPriorityClassName so workers preempt them.
	OrgID                        string                                       // Org ID for pod labels (multi-tenant mode)
	WorkerIDGenerator            func() int                                   // Shared ID generator across orgs (nil = internal counter)
	ResolveOrgConfig             func(string) (*configstore.OrgConfig, error) // Optional: resolve org config for version-aware reaping
	RuntimeStore                 RuntimeWorkerStore
}

K8sWorkerPoolConfig holds the configuration for creating a K8sWorkerPool.

type LifecycleOperation

type LifecycleOperation string

LifecycleOperation is the stable, low-cardinality label that identifies a public WorkerLifecycle method on lifecycle-transition metrics. New values must be added here (not invented at the call site) so the metric's label set stays bounded.

const (
	LifecycleOpRetireFromSnapshot            LifecycleOperation = "retire_from_snapshot"
	LifecycleOpRetireOrphanFromSnapshot      LifecycleOperation = "retire_orphan_from_snapshot"
	LifecycleOpRetireIdleVariantFromSnapshot LifecycleOperation = "retire_idle_variant_from_snapshot"
	LifecycleOpMarkLostFromLease             LifecycleOperation = "mark_lost_from_lease"
	LifecycleOpDrain                         LifecycleOperation = "drain"
	LifecycleOpRetireDrained                 LifecycleOperation = "retire_drained"
	LifecycleOpRefreshLease                  LifecycleOperation = "refresh_lease"
	// LifecycleOpRetireLocal covers the in-memory retirement chain
	// (markWorkerRetiredLocked → persistWorkerRecord via UpsertWorkerRecord)
	// which doesn't go through the lifecycle CAS service. Used by
	// public RetireWorker, idle-timeout reaping, stuck-activating
	// reaping, and activation-failure / liveness-recheck fallbacks.
	// outcome is normally transitioned, but UpsertWorkerRecord has a
	// fence-miss path (ErrWorkerRecordUpsertFenceMiss) — markWorkerRetiredLocked
	// gates the emission on the upsert result, so retire_local
	// samples reflect transitions that actually landed durably.
	LifecycleOpRetireLocal LifecycleOperation = "retire_local"
)

type LifecycleOrigin

type LifecycleOrigin string

LifecycleOrigin identifies the caller that asked WorkerLifecycle for a transition. The same operation (e.g. retire_from_snapshot) is invoked from many sites with very different triggering conditions, and a CAS miss from the janitor's hot-idle reaper tells a different story than one from cred-refresh — so we record both axes. Like LifecycleOperation, new values must be added here.

const (
	LifecycleOriginJanitorOrphan           LifecycleOrigin = "janitor_orphan"
	LifecycleOriginJanitorHotIdleTTL       LifecycleOrigin = "janitor_hot_idle_ttl"
	LifecycleOriginJanitorStuckActivating  LifecycleOrigin = "janitor_stuck_activating"
	LifecycleOriginMismatchedVersionReaper LifecycleOrigin = "mismatched_version_reaper"
	LifecycleOriginShutdownAll             LifecycleOrigin = "shutdown_all"
	LifecycleOriginHealthCheckCrash        LifecycleOrigin = "health_check_crash"
	LifecycleOriginWorkerDrain             LifecycleOrigin = "worker_drain"
	LifecycleOriginSpawnFailure            LifecycleOrigin = "spawn_failure"
	LifecycleOriginReserveImageMismatch    LifecycleOrigin = "reserve_image_mismatch"
	LifecycleOriginCredRefresh             LifecycleOrigin = "cred_refresh"
	// LifecycleOriginReserveFailure marks retire paths that fire when
	// ReserveSharedWorker observes a claim that cannot be activated
	// (stale-claim retries excepted) and falls back to retire-and-retry.
	LifecycleOriginReserveFailure LifecycleOrigin = "reserve_failure"
	// LifecycleOriginIdleTimeout marks retire paths from the idle-worker
	// reaper (reapIdleWorkers).
	LifecycleOriginIdleTimeout LifecycleOrigin = "idle_timeout"
	// LifecycleOriginPublicAPI marks retire paths invoked through the
	// public RetireWorker / RetireWorkerIfNoSessions / RetireIfDrainingAndEmpty
	// surface (admin tooling, orchestration callbacks).
	LifecycleOriginPublicAPI LifecycleOrigin = "public_api"
	// LifecycleOriginActivationFailure marks retire paths fired after a
	// worker activation returned an error (org pool's
	// activateWorkerForOrg / ReconnectFlightWorker).
	LifecycleOriginActivationFailure LifecycleOrigin = "activation_failure"
	// LifecycleOriginCrashGeneric marks retire paths driven by a generic
	// "worker died" signal that isn't covered by a more specific origin
	// (no current callers after the explicit-origin refactor; retained as
	// a safe fallback bucket for future generic-crash sites).
	LifecycleOriginCrashGeneric LifecycleOrigin = "crash_generic"
	// LifecycleOriginInformerCrash marks the cleanDeadWorkersLocked path
	// driven by the K8s pod-informer firing w.done. Distinct from
	// LifecycleOriginHealthCheckCrash (which is the periodic
	// HealthCheckLoop probe) so dashboards can separate cluster-driven
	// pod terminations (eviction, OOM, manual delete, node drain) from
	// our own health-check decisions.
	LifecycleOriginInformerCrash LifecycleOrigin = "informer_crash"
	// LifecycleOriginPoolStuckActivating marks the pool-local
	// reapStuckActivatingWorkers loop, which runs every minute on every
	// CP. Distinct from LifecycleOriginJanitorStuckActivating (which is
	// the leader-only janitor reaper) so operators can tell whether
	// pool-side or janitor-side reaping is doing the work.
	LifecycleOriginPoolStuckActivating LifecycleOrigin = "pool_stuck_activating"
	// LifecycleOriginOrgShutdown marks per-org ShutdownAll on
	// OrgReservedPool. Distinct from LifecycleOriginShutdownAll (which is
	// the pool-wide K8sWorkerPool.ShutdownAll) so operators can tell
	// org-offboarding events from CP rollouts on dashboards.
	LifecycleOriginOrgShutdown LifecycleOrigin = "org_shutdown"
	// LifecycleOriginUnknown is the fallback label applied when an empty
	// origin reaches the observer. We always emit a sample rather than
	// silently drop it; an "unknown" bucket showing up on dashboards is
	// the signal that a new call site forgot to thread the origin.
	LifecycleOriginUnknown LifecycleOrigin = "unknown"
)

type ManagedSession

type ManagedSession struct {
	PID          int32
	WorkerID     int
	Protocol     string // "postgres" or "flight"
	SessionToken string
	Executor     *flightclient.FlightExecutor
	// contains filtered or unexported fields
}

ManagedSession tracks a client session bound to a worker.

type ManagedWorker

type ManagedWorker struct {
	ID int
	// contains filtered or unexported fields
}

ManagedWorker represents a duckdb-service worker process.

func (*ManagedWorker) ActivateTenant

func (w *ManagedWorker) ActivateTenant(ctx context.Context, payload server.WorkerActivationPayload) (err error)

ActivateTenant delivers tenant runtime to a shared warm worker before it may serve sessions.

func (*ManagedWorker) CreateSession

func (w *ManagedWorker) CreateSession(ctx context.Context, username, memoryLimit string, threads int, secretStatements []string) (token string, secretWarnings []string, err error)

CreateSession creates a new session on the given worker. secretStatements are the user's persistent secrets to replay on the worker before the session serves queries; secretWarnings reports any that failed to apply.

func (*ManagedWorker) DestroySession

func (w *ManagedWorker) DestroySession(ctx context.Context, sessionToken string) (err error)

DestroySession destroys a session on the worker.

func (*ManagedWorker) IncrementOwnerEpoch

func (w *ManagedWorker) IncrementOwnerEpoch() int64

func (*ManagedWorker) OwnerCPInstanceID

func (w *ManagedWorker) OwnerCPInstanceID() string

func (*ManagedWorker) OwnerEpoch

func (w *ManagedWorker) OwnerEpoch() int64

func (*ManagedWorker) PodName

func (w *ManagedWorker) PodName() string

func (*ManagedWorker) RefreshOwnerEpochAtomic

func (w *ManagedWorker) RefreshOwnerEpochAtomic(fn func(current int64) (int64, error)) error

RefreshOwnerEpochAtomic holds the per-worker epoch lock across the callback so the durable bump and the in-memory update appear atomic to concurrent readers. Used by the credential-refresh path: the callback performs the lifecycle.RefreshLease CAS and returns the new epoch; readers blocked on OwnerEpoch() during the callback see the new value the moment it lands.

This closes the race where ShutdownAll could read the in-memory epoch between the durable bump and the in-memory set, build a stale lease, and CAS-miss against the now-advanced row.

Latency note: the callback runs under the lock, and the credential-refresh callback does a single DB round-trip (BumpWorkerEpoch). Worst-case stall for any concurrent OwnerEpoch() reader on the same worker is one round-trip (~10ms in practice). Because the lock is per-worker, refreshing worker A does not block reads on worker B — but readers that hold the pool-wide K8sWorkerPool.mu (e.g. iterations in cleanDeadWorkersLocked) will serialize behind any in-flight refresh on the worker they're inspecting. If this latency budget ever becomes a problem, the path forward is to bound cred-refresh concurrency or move to a "refresh-in-progress" sentinel rather than a held mutex. Keep callback work tightly scoped to the durable round-trip. If the callback returns an error the in-memory epoch is left unchanged.

func (*ManagedWorker) SetOwnerCPInstanceID

func (w *ManagedWorker) SetOwnerCPInstanceID(cpInstanceID string)

func (*ManagedWorker) SetOwnerEpoch

func (w *ManagedWorker) SetOwnerEpoch(epoch int64)

func (*ManagedWorker) SetSharedState

func (w *ManagedWorker) SetSharedState(state SharedWorkerState) error

SetSharedState updates the additive shared worker lifecycle metadata without changing existing session scheduling behavior.

func (*ManagedWorker) SharedState

func (w *ManagedWorker) SharedState() SharedWorkerState

SharedState returns the additive shared worker lifecycle metadata for this worker. The zero value normalizes to an idle, unassigned worker.

type MemoryRebalancer

type MemoryRebalancer struct {
	// contains filtered or unexported fields
}

MemoryRebalancer sets memory_limit and threads on DuckDB sessions. Every session gets the full memory budget — DuckDB will spill to disk/swap if aggregate usage exceeds physical RAM. Threads are not subdivided.

When enabled is true, SET commands are re-sent on every session create/destroy (useful if the budget or threads change at runtime). When enabled is false (default), each session gets its limits at creation time and is never adjusted afterward.

Lock ordering invariant: r.mu → SessionManager.mu(RLock). Never acquire r.mu while holding SessionManager.mu to avoid deadlock.

func NewMemoryRebalancer

func NewMemoryRebalancer(memoryBudget uint64, threadBudget int, sessions SessionLister, enabled bool) *MemoryRebalancer

NewMemoryRebalancer creates a rebalancer with the given budgets. If memoryBudget is 0, it defaults to 75% of system RAM. If threadBudget is 0, it defaults to runtime.NumCPU(). If enabled is false, RequestRebalance is a no-op and sessions only get their limits set once at creation time.

func (*MemoryRebalancer) DefaultMaxWorkers

func (r *MemoryRebalancer) DefaultMaxWorkers() int

DefaultMaxWorkers returns a reasonable default for max_workers. Derived from the memory budget (budget / 256MB).

func (*MemoryRebalancer) MemoryLimit

func (r *MemoryRebalancer) MemoryLimit() string

MemoryLimit returns the memory limit string applied to every session.

func (*MemoryRebalancer) PerSessionThreads

func (r *MemoryRebalancer) PerSessionThreads() int

PerSessionThreads returns the thread count for each session. Threads are not subdivided — every session gets the full budget.

func (*MemoryRebalancer) RequestRebalance

func (r *MemoryRebalancer) RequestRebalance()

RequestRebalance signals that a rebalance is needed. Multiple rapid calls are coalesced — the actual rebalance runs at most once per debounce interval. When rebalancing is disabled, this is a no-op.

func (*MemoryRebalancer) SetInitialLimits

func (r *MemoryRebalancer) SetInitialLimits(ctx context.Context, session *ManagedSession)

SetInitialLimits sets memory_limit and threads on a single session synchronously. Called during CreateSession so the new session never runs with unlimited resources.

func (*MemoryRebalancer) SetSessionLister

func (r *MemoryRebalancer) SetSessionLister(sl SessionLister)

SetSessionLister sets the session lister after construction. Must be called before any Rebalance calls (i.e., before accepting connections).

func (*MemoryRebalancer) Stop

func (r *MemoryRebalancer) Stop()

Stop stops the background debounce goroutine. Must be called on shutdown.

type OrgRouterInterface

type OrgRouterInterface interface {
	StackForOrg(orgID string) (pool WorkerPool, sessions *SessionManager, rebalancer *MemoryRebalancer, ok bool)
	IcebergConfigForOrg(orgID string) (server.IcebergConfig, bool)
	IsMigratingForOrg(orgID string) bool
	ShutdownAll()
}

OrgRouterInterface abstracts the org router for the control plane.

type ProcessConfig

type ProcessConfig struct {
	MinWorkers int
	MaxWorkers int
}

type ProgressHandler

type ProgressHandler func(workerID int, progress map[string]*SessionProgress)

ProgressHandler is called after a successful health check with per-session progress data parsed from the worker's health check response.

type RuntimeWorkerStore

type RuntimeWorkerStore interface {
	UpsertWorkerRecord(record *configstore.WorkerRecord) error
	ClaimHotIdleWorker(ownerCPInstanceID, orgID, image string, profileCPU, profileMemory string, maxOrgWorkers int) (*configstore.WorkerRecord, configstore.WorkerClaimMissReason, error)
	CreateSpawningWorkerSlot(ownerCPInstanceID, orgID, image string, profileCPU, profileMemory string, ownerEpoch int64, podNamePrefix string, maxOrgWorkers, maxGlobalWorkers int) (*configstore.WorkerRecord, error)
	CountHotIdleWorkers(orgID, image, profileCPU, profileMemory string) (int, error)
	GetWorkerRecord(workerID int) (*configstore.WorkerRecord, error)
	ObserveWorker(workerID int) (*configstore.WorkerSnapshot, error)
	TakeOverWorker(workerID int, ownerCPInstanceID, orgID string, expectedOwnerEpoch int64) (*configstore.WorkerRecord, error)
}

RuntimeWorkerStore is the durable-store surface exposed to the K8s worker pool. Every lifecycle CAS method is now absent from this interface — they are reachable only through WorkerLifecycle so callers physically cannot bypass the typed snapshot/lease seam.

The lifecycle service uses workerLifecycleStore (defined in worker_lifecycle.go), which is the larger interface that includes the CAS methods. Production wires it via a type assertion in newK8sWorkerPool / ensureLifecycle, because *configstore.ConfigStore satisfies both interfaces.

type SessionLister

type SessionLister interface {
	AllSessions() []*ManagedSession
}

SessionLister provides a snapshot of all active sessions for rebalancing.

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager tracks all active sessions and their worker assignments.

func NewOrgSessionManager

func NewOrgSessionManager(pool WorkerPool, rebalancer *MemoryRebalancer, orgID string) *SessionManager

NewOrgSessionManager builds a SessionManager whose log lines all carry the owning org (multi-tenant remote backend: one manager per org stack).

func NewSessionManager

func NewSessionManager(pool WorkerPool, rebalancer *MemoryRebalancer) *SessionManager

NewSessionManager creates a new session manager.

func (*SessionManager) AllSessions

func (sm *SessionManager) AllSessions() []*ManagedSession

AllSessions returns a snapshot of all active sessions. The returned slice is safe to iterate without holding the lock.

func (*SessionManager) CreateSession

func (sm *SessionManager) CreateSession(ctx context.Context, username string, pid int32, memoryLimit string, threads int, profile *WorkerProfile) (int32, *flightclient.FlightExecutor, error)

CreateSession acquires a worker from the configured pool, creates a session on it, and rebalances memory/thread limits across all active sessions. If pid is 0, a new one is generated.

func (*SessionManager) CreateSessionWithProtocol

func (sm *SessionManager) CreateSessionWithProtocol(ctx context.Context, username string, pid int32, memoryLimit string, threads int, protocol string, profile *WorkerProfile) (int32, *flightclient.FlightExecutor, error)

func (*SessionManager) DestroyAllSessions

func (sm *SessionManager) DestroyAllSessions()

DestroyAllSessions destroys every active session without holding the manager lock while running per-session cleanup.

func (*SessionManager) DestroySession

func (sm *SessionManager) DestroySession(pid int32)

DestroySession destroys a session, retires its dedicated worker, and rebalances memory/thread limits across remaining sessions.

func (*SessionManager) GetProgress

func (sm *SessionManager) GetProgress(pid int32) *SessionProgress

GetProgress returns the cached query progress for a session, or nil.

func (*SessionManager) OnWorkerCrash

func (sm *SessionManager) OnWorkerCrash(workerID int, errorFn func(pid int32))

OnWorkerCrash handles a worker crash by marking all affected executors as dead and notifying sessions. Executors are marked dead BEFORE the shared gRPC client is closed to prevent nil-pointer panics from concurrent RPCs. errorFn is called for each affected session to send an error to the client.

func (*SessionManager) ReconnectFlightSession

func (sm *SessionManager) ReconnectFlightSession(ctx context.Context, username string, workerID int, ownerEpoch int64) (int32, *flightclient.FlightExecutor, error)

func (*SessionManager) ReservePID

func (sm *SessionManager) ReservePID() int32

ReservePID generates a new unique PID for a session.

func (*SessionManager) SessionCount

func (sm *SessionManager) SessionCount() int

SessionCount returns the number of active sessions.

func (*SessionManager) SessionCountForWorker

func (sm *SessionManager) SessionCountForWorker(workerID int) int

SessionCountForWorker returns the number of sessions on a specific worker.

func (*SessionManager) SetConnCloser

func (sm *SessionManager) SetConnCloser(pid int32, closer io.Closer)

SetConnCloser registers the client's TCP connection so it can be closed when the backing worker crashes. This unblocks the message loop's read, causing it to exit cleanly instead of looping on ErrWorkerDead.

func (*SessionManager) SetConnectionLimiter

func (sm *SessionManager) SetConnectionLimiter(limiter connectionLimiter)

SetConnectionLimiter replaces the local process limiter with a cluster-wide admission limiter. Local session maps still track only this control-plane's live sessions.

func (*SessionManager) SetMaxConnections

func (sm *SessionManager) SetMaxConnections(n int)

SetMaxConnections sets the maximum connections for this SessionManager.

func (*SessionManager) SetProtocol

func (sm *SessionManager) SetProtocol(pid int32, protocol string)

SetProtocol updates the protocol label for an active session.

func (*SessionManager) SetUserSecretLoader

func (sm *SessionManager) SetUserSecretLoader(loader func(ctx context.Context, username string) ([]string, error))

SetUserSecretLoader installs the per-user persistent-secret loader used at session creation (multitenant/remote backend only).

func (*SessionManager) UpdateProgress

func (sm *SessionManager) UpdateProgress(workerID int, progress map[string]*SessionProgress)

UpdateProgress caches query progress data for sessions on the given worker. Called from the health check loop after parsing the worker's health check response. Progress keys are truncated session tokens (first 16 chars) to avoid leaking full bearer tokens in health check JSON.

func (*SessionManager) WorkerIDForPID

func (sm *SessionManager) WorkerIDForPID(pid int32) int

WorkerIDForPID returns the worker ID for a session, or -1 if not found.

func (*SessionManager) WorkerPodNameForPID

func (sm *SessionManager) WorkerPodNameForPID(pid int32) string

WorkerPodNameForPID returns the K8s pod name of the worker hosting the session, or "" if not found or not running on K8s.

type SessionProgress

type SessionProgress struct {
	Percentage float64
	Rows       uint64
	TotalRows  uint64
	Stalled    bool
}

SessionProgress holds cached query progress from a worker health check.

type SharedWorkerState

type SharedWorkerState struct {
	Lifecycle  WorkerLifecycleState
	Assignment *WorkerAssignment
}

SharedWorkerState holds the additive lifecycle/assignment model for shared workers.

func (SharedWorkerState) NormalizedLifecycle

func (s SharedWorkerState) NormalizedLifecycle() WorkerLifecycleState

NormalizedLifecycle treats the zero value as an idle, unassigned worker so existing worker structs can adopt this model without extra initialization.

func (SharedWorkerState) Transition

Transition validates a lifecycle change and returns the next worker state. When assignment metadata is omitted, the current assignment is carried forward for states that remain tenant-bound.

func (SharedWorkerState) Validate

func (s SharedWorkerState) Validate() error

Validate checks that the lifecycle and assignment metadata are internally consistent.

type SpawnFailureReason

type SpawnFailureReason string

SpawnFailureReason categorizes why a worker spawn returned an error. Buckets follow the spawnWorker control-flow stages so dashboards can localize regressions: an uptick in "pod_ready" points at the scheduler / image-pull, an uptick in "secret_create" points at Kubernetes API auth, and so on.

const (
	SpawnFailureReasonRuntimeStore    SpawnFailureReason = "runtime_store"
	SpawnFailureReasonConfigMap       SpawnFailureReason = "config_map"
	SpawnFailureReasonSecretCreate    SpawnFailureReason = "secret_create"
	SpawnFailureReasonPodCreate       SpawnFailureReason = "pod_create"
	SpawnFailureReasonPodReady        SpawnFailureReason = "pod_ready"
	SpawnFailureReasonSecretRead      SpawnFailureReason = "secret_read"
	SpawnFailureReasonGRPCConnect     SpawnFailureReason = "grpc_connect"
	SpawnFailureReasonContextCanceled SpawnFailureReason = "context_canceled"
	SpawnFailureReasonOther           SpawnFailureReason = "other"
)

type StrandedOutcome

type StrandedOutcome string

StrandedOutcome categorizes what the janitor recovery sweep did with each stranded pod it observed. "kept" means the artifact was claimed by a current runtime row (i.e. it wasn't actually stranded); "deleted" means the API delete succeeded; "delete_failed" means the delete returned an error and the artifact is still around.

const (
	StrandedOutcomeDeleted            StrandedOutcome = "deleted"
	StrandedOutcomeDeletedMissingRow  StrandedOutcome = "deleted_missing_row"
	StrandedOutcomeDeletedTerminalRow StrandedOutcome = "deleted_terminal_row"
	StrandedOutcomeKept               StrandedOutcome = "kept"
	StrandedOutcomeDeleteFailed       StrandedOutcome = "delete_failed"
)

type WorkerAssignment

type WorkerAssignment struct {
	OrgID      string
	MaxWorkers int
	Image      string
	// Profile is the requested worker shape. nil => the default exclusive
	// profile (today's behavior). It is immutable for a reserved worker's life;
	// enforcement is wired in alongside the scheduling changes (see design doc).
	Profile *WorkerProfile
}

WorkerAssignment carries tenant-specific metadata once a shared worker has been reserved for an org.

type WorkerCapacityExhaustedError

type WorkerCapacityExhaustedError struct {
	// RetryAfter is the client-facing retry hint for protocol-specific error responses.
	RetryAfter time.Duration
	// Reason classifies why the worker could not be acquired.
	Reason configstore.WorkerClaimMissReason
}

WorkerCapacityExhaustedError is returned when a worker can't be acquired because the org or the cluster is at its worker cap (or the pool is shutting down). The caller fails fast with a retryable capacity response.

func (*WorkerCapacityExhaustedError) Error

type WorkerCrashHandler

type WorkerCrashHandler func(workerID int)

WorkerCrashHandler is called when a worker crash is detected, before respawning.

type WorkerLifecycle

type WorkerLifecycle struct {
	// contains filtered or unexported fields
}

WorkerLifecycle is the central typed lifecycle-transition API. It is the seam where post-#615 hand-rolled "observe → fence → CAS → cleanup" patterns are collapsed into one place.

All transitions take either a WorkerSnapshot (for observed-row paths like the janitor reapers) or a WorkerLease (for owned-row paths like ShutdownAll and the health checker). The two types cannot be interchanged at the call site, which is the load-bearing safety property — callers without a real lease physically cannot invoke the lease-only methods, and a snapshot's frozen fields are exactly what the underlying CAS fences against.

On a successful terminal CAS the service kicks the configured WorkerPhysicalCleanup. On a CAS miss the cleanup is skipped — by construction we have no proof of ownership over the pod, so deleting it is unsafe.

Every public transition method observes duckgres_worker_lifecycle_transitions_total (keyed by operation, outcome, image, and origin) plus a per-operation latency histogram. origin is a caller-supplied LifecycleOrigin so dashboards can see, for example, that "fence_miss_owner on retire_from_snapshot from janitor_orphan" is a different signal from the same outcome coming from cred_refresh.

func NewWorkerLifecycle

func NewWorkerLifecycle(store workerLifecycleStore, cleanup WorkerPhysicalCleanup) *WorkerLifecycle

NewWorkerLifecycle wires the service. cleanup may be nil if the caller only needs durable CAS transitions and no physical cleanup (e.g. tests that assert on store calls in isolation).

func (*WorkerLifecycle) Drain

Drain transitions a lease-owned worker into the draining state. This is the first step of ShutdownAll's 3-step chain; it does NOT trigger physical cleanup (the caller orchestrates the pod delete between Drain and RetireDrained). Returns Transitioned=false on a CAS miss so the caller can skip the remaining steps.

func (*WorkerLifecycle) MarkLostFromLease

func (l *WorkerLifecycle) MarkLostFromLease(lease configstore.WorkerLease, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)

MarkLostFromLease performs the lease-fenced CAS that transitions a worker row to lost. Used by the health-checker after it has confirmed the worker is unresponsive. Does NOT schedule pod cleanup — consistent with the other lease-based transitions (Drain, RetireDrained, RefreshLease), the caller orchestrates physical cleanup so it can interleave replenishment decisions, in-memory pool removal, and pod delete in the right order. (The snapshot-based variants RetireFromSnapshot/RetireOrphanFromSnapshot/ RetireIdleVariantFromSnapshot do bundle cleanup because their callers don't have post-CAS choreography.)

func (*WorkerLifecycle) RefreshLease

RefreshLease bumps the durable owner_epoch under the current lease and returns a fresh lease at the new epoch. Equivalent to calling BumpWorkerEpoch directly; the value is the typed lease passing, not race avoidance — the caller still has to mirror the new epoch onto the in-memory worker (and the window between BumpWorkerEpoch returning and that mirror update is unchanged). PR 5 is where the in-memory-epoch race actually gets closed. Returns ErrWorkerOwnerEpochMismatch (via the error return) when the lease no longer matches the durable row.

func (*WorkerLifecycle) RetireDrained

RetireDrained is the third step of ShutdownAll's chain: it transitions a draining row to retired, fenced by the lease. The caller is responsible for the pod delete between Drain and RetireDrained; if that delete failed the row should be left in draining (don't call this method) so the orphan sweep can reconcile.

func (*WorkerLifecycle) RetireFromSnapshot

RetireFromSnapshot moves the observed worker row to a terminal state (retired or lost) fenced by the snapshot. Used by paths that already own the worker but observed a separate snapshot (e.g. the janitor's hot-idle TTL reaper) — the broad MarkWorkerTerminalIfCurrent fence ensures we don't trample a row that has been taken over since the snapshot was captured.

func (*WorkerLifecycle) RetireIdleVariantFromSnapshot

func (l *WorkerLifecycle) RetireIdleVariantFromSnapshot(snap configstore.WorkerSnapshot, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)

RetireIdleVariantFromSnapshot retires a worker observed as idle or hot_idle, fenced by the snapshot. The state restriction maps onto the legacy RetireIdleOrHotIdleWorker store CAS. Today's only caller is the mismatched-version reaper, which deliberately reaps only idle/hot_idle pods so a busy worker isn't yanked mid-session. The hot-idle TTL janitor previously used this helper but was promoted to the broader RetireFromSnapshot once the snapshot already narrowed the candidate set to state=hot_idle by virtue of ListExpiredHotIdleSnapshots.

func (*WorkerLifecycle) RetireOrphanFromSnapshot

func (l *WorkerLifecycle) RetireOrphanFromSnapshot(snap configstore.WorkerSnapshot, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)

RetireOrphanFromSnapshot is the orphan-specific retire: it permits any active observed state (spawning through draining) and additionally enforces that the worker's owner control-plane is still expired (or absent). Used by the janitor's orphan-cleanup loop where the listing step already filtered by owner-expired-or-missing, so the snapshot's CP must remain so for the retire to be safe.

type WorkerLifecycleState

type WorkerLifecycleState string

WorkerLifecycleState models the shared worker lifecycle used for late tenant binding (spawn → reserve → activate → hot → hot-idle → drain/retire).

const (
	WorkerLifecycleIdle       WorkerLifecycleState = "idle"
	WorkerLifecycleReserved   WorkerLifecycleState = "reserved"
	WorkerLifecycleActivating WorkerLifecycleState = "activating"
	WorkerLifecycleHot        WorkerLifecycleState = "hot"
	WorkerLifecycleHotIdle    WorkerLifecycleState = "hot_idle"
	WorkerLifecycleDraining   WorkerLifecycleState = "draining"
	WorkerLifecycleRetired    WorkerLifecycleState = "retired"
)

type WorkerPhysicalCleanup

type WorkerPhysicalCleanup interface {
	// DeleteWorkerArtifacts schedules deletion of the pod and RPC secret
	// for the named worker. Called only after the durable runtime row
	// has transitioned to terminal. Implementations may also clean up
	// in-memory pool state keyed by workerID.
	DeleteWorkerArtifacts(workerID int, podName string, reason string)
}

WorkerPhysicalCleanup performs the post-CAS K8s/in-memory side of a worker retirement: pod delete, RPC secret delete, removal from any in-process pool maps. WorkerLifecycle calls it after a durable CAS to terminal has landed.

Cleanup is fire-and-forget (the method returns synchronously after scheduling the goroutine that performs the delete). The lifecycle service does not depend on or report cleanup completion — the orphan reconciler + cleanupOrphanedWorkerPods janitor are the safety net for failed deletes.

type WorkerPool

type WorkerPool interface {
	// AcquireWorker returns a worker for a new session. The multi-tenant
	// OrgReservedPool (k8s/remote) reuses a hot-idle worker of the requested shape
	// for the org or spawns one on demand; the process FlightWorkerPool reuses an
	// idle worker / assigns least-loaded. profile is the requested pod shape (nil
	// => the default shape) and is honored only by OrgReservedPool.
	AcquireWorker(ctx context.Context, profile *WorkerProfile) (*ManagedWorker, error)

	// ReleaseWorker decrements the active session count for a worker.
	ReleaseWorker(id int)

	// RetireWorker removes a worker from the pool and shuts it down.
	RetireWorker(id int)

	// RetireWorkerIfNoSessions retires a worker only if it has zero active
	// sessions after releasing the caller's claim. Returns true if retired.
	RetireWorkerIfNoSessions(id int) bool

	// Worker returns a worker by ID, or false if not found.
	Worker(id int) (*ManagedWorker, bool)

	// SpawnMinWorkers pre-warms the pool with count workers at startup. Only the
	// process FlightWorkerPool implements this (--process-min-workers); the K8s
	// pool spawns on demand, so its implementation is a no-op.
	SpawnMinWorkers(count int) error

	// HealthCheckLoop runs periodic health checks on all workers.
	// onCrash is called when a worker crash is detected.
	// onProgress is called with per-session progress data after each successful check.
	// Either callback may be nil.
	HealthCheckLoop(ctx context.Context, interval time.Duration, onCrash WorkerCrashHandler, onProgress ProgressHandler)

	// SetMaxWorkers updates the maximum number of workers. 0 means unlimited.
	SetMaxWorkers(n int)

	// ShutdownAll stops all workers gracefully.
	ShutdownAll()
}

WorkerPool abstracts the lifecycle and scheduling of Flight SQL workers. Two implementations exist:

  • FlightWorkerPool: spawns workers as local child processes (default)
  • K8sWorkerPool: creates workers as Kubernetes pods (build tag: kubernetes)

func CreateK8sPool

func CreateK8sPool(cfg K8sWorkerPoolConfig) (WorkerPool, error)

CreateK8sPool creates a Kubernetes worker pool if the kubernetes build tag is enabled.

type WorkerProfile

type WorkerProfile struct {
	CPU    string        // normalized k8s quantity (e.g. "8"); the worker's CPU size
	Memory string        // normalized k8s quantity (e.g. "16Gi"); the worker's memory size
	TTL    time.Duration // how long the worker stays hot-idle after its last query (reset per query); not part of MatchKey
}

WorkerProfile describes the pod shape a session asked for via connection-string startup options (duckgres.worker_cpu / worker_memory / worker_ttl). It is a match dimension on WorkerAssignment, ORTHOGONAL to Image: a reserved or hot-idle worker may only be handed to a request whose profile Equal()s it.

The nil/zero profile is the DEFAULT profile: empty CPU/Memory (the pool-global request applies). Normalizing the default to empty strings (rather than the literal pool values) is what keeps legacy worker records claimable without a data migration.

Only CPU/Memory are persisted (WorkerRecord) and matched; TTL rides along on the record but is not part of the match identity.

func (*WorkerProfile) Equal

func (wp *WorkerProfile) Equal(other *WorkerProfile) bool

Equal reports whether two profiles match (nil == zero == default).

func (*WorkerProfile) MatchKey

func (wp *WorkerProfile) MatchKey() string

MatchKey is the identity used to decide whether an existing worker can serve a request. A nil profile shares the key of the zero/default profile, so legacy and default requests match the same workers.

func (*WorkerProfile) Parts

func (wp *WorkerProfile) Parts() (cpu, memory string)

Parts returns the persisted/match primitives for the profile, decomposing nil to the default profile. Used to cross the controlplane→configstore package boundary (which cannot reference the WorkerProfile type) without losing the nil==default convention.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL