Documentation
¶
Index ¶
- Constants
- Variables
- func NewRuntimeOrgConnectionLimiter(store runtimeOrgConnectionStore, orgID, cpInstanceID string, ...) connectionLimiter
- func NewWorkerCapacityExhaustedError(retryAfter time.Duration) error
- func NewWorkerCapacityExhaustedErrorForReason(reason configstore.WorkerClaimMissReason, retryAfter time.Duration) error
- func RegisterK8sPoolFactory(f K8sPoolFactory)
- func RunControlPlane(cfg ControlPlaneConfig)
- func SetupMultiTenant(cfg ControlPlaneConfig, srv *server.Server, memBudget uint64, maxWorkers int, ...) (ConfigStoreInterface, OrgRouterInterface, *http.Server, ...)
- type CPUserSecretManager
- func (m *CPUserSecretManager) DeleteSecret(_ context.Context, orgID, username, secretName string) (bool, error)
- func (m *CPUserSecretManager) LoadStatements(_ context.Context, orgID, username string) ([]string, error)
- func (m *CPUserSecretManager) PutSecret(_ context.Context, orgID, username, secretName, statement string, ...) error
- func (m *CPUserSecretManager) Ready() error
- func (m *CPUserSecretManager) SessionSecretLoader(orgID string) func(ctx context.Context, username string) ([]string, error)
- type ConfigStoreInterface
- type ControlPlane
- type ControlPlaneConfig
- type ControlPlaneJanitor
- type ControlPlaneRuntimeTracker
- type FlightIngress
- type FlightIngressConfig
- type FlightWorkerPool
- func (p *FlightWorkerPool) AcquireWorker(ctx context.Context, _ *WorkerProfile) (*ManagedWorker, error)
- func (p *FlightWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Duration, onCrash WorkerCrashHandler, ...)
- func (p *FlightWorkerPool) PreBindSockets(count int) error
- func (p *FlightWorkerPool) ReleaseWorker(id int)
- func (p *FlightWorkerPool) RetireWorker(id int)
- func (p *FlightWorkerPool) RetireWorkerIfNoSessions(id int) bool
- func (p *FlightWorkerPool) SetMaxWorkers(n int)
- func (p *FlightWorkerPool) ShutdownAll()
- func (p *FlightWorkerPool) SpawnAll(count int) error
- func (p *FlightWorkerPool) SpawnMinWorkers(count int) error
- func (p *FlightWorkerPool) SpawnWorker(id int) error
- func (p *FlightWorkerPool) Worker(id int) (*ManagedWorker, bool)
- type HealthCheckResult
- type JanitorLeaderManager
- type K8sConfig
- type K8sPoolFactory
- type K8sWorkerPoolConfig
- type LifecycleOperation
- type LifecycleOrigin
- type ManagedSession
- type ManagedWorker
- func (w *ManagedWorker) ActivateTenant(ctx context.Context, payload server.WorkerActivationPayload) (err error)
- func (w *ManagedWorker) CreateSession(ctx context.Context, username, memoryLimit string, threads int, ...) (token string, secretWarnings []string, err error)
- func (w *ManagedWorker) DestroySession(ctx context.Context, sessionToken string) (err error)
- func (w *ManagedWorker) IncrementOwnerEpoch() int64
- func (w *ManagedWorker) OwnerCPInstanceID() string
- func (w *ManagedWorker) OwnerEpoch() int64
- func (w *ManagedWorker) PodName() string
- func (w *ManagedWorker) RefreshOwnerEpochAtomic(fn func(current int64) (int64, error)) error
- func (w *ManagedWorker) SetOwnerCPInstanceID(cpInstanceID string)
- func (w *ManagedWorker) SetOwnerEpoch(epoch int64)
- func (w *ManagedWorker) SetSharedState(state SharedWorkerState) error
- func (w *ManagedWorker) SharedState() SharedWorkerState
- type MemoryRebalancer
- func (r *MemoryRebalancer) DefaultMaxWorkers() int
- func (r *MemoryRebalancer) MemoryLimit() string
- func (r *MemoryRebalancer) PerSessionThreads() int
- func (r *MemoryRebalancer) RequestRebalance()
- func (r *MemoryRebalancer) SetInitialLimits(ctx context.Context, session *ManagedSession)
- func (r *MemoryRebalancer) SetSessionLister(sl SessionLister)
- func (r *MemoryRebalancer) Stop()
- type OrgRouterInterface
- type ProcessConfig
- type ProgressHandler
- type RuntimeWorkerStore
- type SessionLister
- type SessionManager
- func (sm *SessionManager) AllSessions() []*ManagedSession
- func (sm *SessionManager) CreateSession(ctx context.Context, username string, pid int32, memoryLimit string, ...) (int32, *flightclient.FlightExecutor, error)
- func (sm *SessionManager) CreateSessionWithProtocol(ctx context.Context, username string, pid int32, memoryLimit string, ...) (int32, *flightclient.FlightExecutor, error)
- func (sm *SessionManager) DestroyAllSessions()
- func (sm *SessionManager) DestroySession(pid int32)
- func (sm *SessionManager) GetProgress(pid int32) *SessionProgress
- func (sm *SessionManager) OnWorkerCrash(workerID int, errorFn func(pid int32))
- func (sm *SessionManager) ReconnectFlightSession(ctx context.Context, username string, workerID int, ownerEpoch int64) (int32, *flightclient.FlightExecutor, error)
- func (sm *SessionManager) ReservePID() int32
- func (sm *SessionManager) SessionCount() int
- func (sm *SessionManager) SessionCountForWorker(workerID int) int
- func (sm *SessionManager) SetConnCloser(pid int32, closer io.Closer)
- func (sm *SessionManager) SetConnectionLimiter(limiter connectionLimiter)
- func (sm *SessionManager) SetMaxConnections(n int)
- func (sm *SessionManager) SetProtocol(pid int32, protocol string)
- func (sm *SessionManager) SetUserSecretLoader(loader func(ctx context.Context, username string) ([]string, error))
- func (sm *SessionManager) UpdateProgress(workerID int, progress map[string]*SessionProgress)
- func (sm *SessionManager) WorkerIDForPID(pid int32) int
- func (sm *SessionManager) WorkerPodNameForPID(pid int32) string
- type SessionProgress
- type SharedWorkerState
- type SpawnFailureReason
- type StrandedOutcome
- type WorkerAssignment
- type WorkerCapacityExhaustedError
- type WorkerCrashHandler
- type WorkerLifecycle
- func (l *WorkerLifecycle) Drain(lease configstore.WorkerLease, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
- func (l *WorkerLifecycle) MarkLostFromLease(lease configstore.WorkerLease, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
- func (l *WorkerLifecycle) RefreshLease(lease configstore.WorkerLease, origin LifecycleOrigin) (configstore.WorkerLease, error)
- func (l *WorkerLifecycle) RetireDrained(lease configstore.WorkerLease, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
- func (l *WorkerLifecycle) RetireFromSnapshot(snap configstore.WorkerSnapshot, target configstore.WorkerState, reason string, ...) (configstore.TransitionOutcome, error)
- func (l *WorkerLifecycle) RetireIdleVariantFromSnapshot(snap configstore.WorkerSnapshot, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
- func (l *WorkerLifecycle) RetireOrphanFromSnapshot(snap configstore.WorkerSnapshot, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
- type WorkerLifecycleState
- type WorkerPhysicalCleanup
- type WorkerPool
- type WorkerProfile
Constants ¶
const ( SNIRoutingOff = "off" // ignore SNI entirely; identity can no longer be resolved SNIRoutingPassthrough = "passthrough" // require managed SNI but warn on legacy hostnames SNIRoutingEnforce = "enforce" // default: require a managed SNI hostname that resolves to an org )
SNI routing modes (values for ControlPlaneConfig.SNIRoutingMode).
const ( RetireReasonNormal = "normal" RetireReasonActivationFailure = "activation_failure" RetireReasonOrphaned = "orphaned" RetireReasonCrash = "crash" RetireReasonShutdown = "shutdown" RetireReasonIdleTimeout = "idle_timeout" RetireReasonStuckActivating = "stuck_activating" RetireReasonMismatchedVersion = "mismatched_version" )
Retirement reason constants. Passed as the `reason` argument to WorkerLifecycle.* methods and surfaced on duckgres_worker_lifecycle_transitions_total as part of the operation context (also fed into lifecycleOriginForRetireReason for retire_local observations). Defined here (no build tag) rather than in warm_pool_metrics.go (kubernetes-tagged) so the no-tag lifecycle observation helpers can reference them.
const DefaultK8sWorkerServiceAccount = "duckgres-worker"
const DefaultWorkerSpawnRetryAfter = 45 * time.Second
Variables ¶
var ErrSessionManagerDraining = errors.New("session manager is draining")
var ErrTooManyConnections = errors.New("too many connections")
Functions ¶
func NewWorkerCapacityExhaustedErrorForReason ¶
func NewWorkerCapacityExhaustedErrorForReason(reason configstore.WorkerClaimMissReason, retryAfter time.Duration) error
func RegisterK8sPoolFactory ¶
func RegisterK8sPoolFactory(f K8sPoolFactory)
RegisterK8sPoolFactory is called from an init() in k8s_factory.go (build tag: kubernetes).
func RunControlPlane ¶
func RunControlPlane(cfg ControlPlaneConfig)
RunControlPlane is the entry point for the control plane process.
func SetupMultiTenant ¶
func SetupMultiTenant( cfg ControlPlaneConfig, srv *server.Server, memBudget uint64, maxWorkers int, isHealthy func() bool, ) (ConfigStoreInterface, OrgRouterInterface, *http.Server, *ControlPlaneRuntimeTracker, *JanitorLeaderManager, error)
SetupMultiTenant is not available without the kubernetes build tag.
Types ¶
type CPUserSecretManager ¶
type CPUserSecretManager struct {
// contains filtered or unexported fields
}
CPUserSecretManager implements server.UserSecretManager on top of the config store: it seals user CREATE PERSISTENT SECRET statements with AES-GCM and persists them per (org, user, name) for replay at session creation. With no encryption key configured, writes are disabled (Ready errors, so the client gets a clear message) but deletes still work, so stale rows remain removable.
func NewCPUserSecretManager ¶
func NewCPUserSecretManager(store *configstore.ConfigStore, encodedKey string) (*CPUserSecretManager, error)
NewCPUserSecretManager builds the manager. encodedKey is the value of DUCKGRES_USER_SECRET_KEY; empty disables persistence (not an error), a malformed key is a startup error (fail fast on operator mistakes rather than surfacing them to customers one statement at a time).
func (*CPUserSecretManager) DeleteSecret ¶
func (m *CPUserSecretManager) DeleteSecret(_ context.Context, orgID, username, secretName string) (bool, error)
DeleteSecret implements server.UserSecretManager. Works without a cipher so rows written under a lost key can still be removed.
func (*CPUserSecretManager) LoadStatements ¶
func (m *CPUserSecretManager) LoadStatements(_ context.Context, orgID, username string) ([]string, error)
LoadStatements returns the user's decrypted secret statements for replay at session creation. Rows that fail to decrypt (e.g. written under a rotated key) are skipped with a loud log instead of failing the whole session.
func (*CPUserSecretManager) PutSecret ¶
func (m *CPUserSecretManager) PutSecret(_ context.Context, orgID, username, secretName, statement string, ifNotExists bool) error
PutSecret implements server.UserSecretManager. With ifNotExists set, an already-stored name is left untouched (DuckDB no-ops the live session for IF NOT EXISTS, so replacing the stored statement would diverge the two).
func (*CPUserSecretManager) Ready ¶
func (m *CPUserSecretManager) Ready() error
Ready implements server.UserSecretManager.
func (*CPUserSecretManager) SessionSecretLoader ¶
func (m *CPUserSecretManager) SessionSecretLoader(orgID string) func(ctx context.Context, username string) ([]string, error)
SessionSecretLoader binds an org onto LoadStatements in the shape SessionManager.SetUserSecretLoader expects.
type ConfigStoreInterface ¶
type ConfigStoreInterface interface {
ResolveDatabase(database string) (orgID string)
DatabaseNameForSNIPrefix(prefix string) string // translates SNI hostname prefix → canonical database_name (alias-aware)
// ResolveSNIPrefix maps a managed hostname prefix to its org and database.
// It accepts hostname_alias, database_name, and DNS-safe org names.
ResolveSNIPrefix(prefix string) (orgID, databaseName string)
ResolvePostgresConnection(startupDatabase, sniPrefix string, useManagedSNI bool, username, password string) configstore.PostgresConnectionResolution
ValidateOrgUser(orgID, username, password string) bool
// ValidateOrgUserAndGetPassthrough does both lookups against the same
// snapshot — the auth path needs both, and a single read closes the
// window where the snapshot could swap between two separate calls.
// passthrough is always false when valid is false.
ValidateOrgUserAndGetPassthrough(orgID, username, password string) (valid, passthrough bool)
// OrgWarehouseStatus reports an org's current warehouse provisioning state so
// connection-time errors can distinguish "no such org" from "warehouse not
// ready yet". Returns (state, orgExists). state is "" when the org has no
// warehouse row (legacy single-tenant orgs); otherwise it is the lifecycle
// string (pending/provisioning/ready/failed/deleting/deleted).
OrgWarehouseStatus(orgID string) (state string, orgExists bool)
// OrgDefaultWorkerProfile returns the org's operator-set default worker
// profile (config-store columns default_worker_cpu/memory/ttl): cpu and
// memory as k8s resource-quantity strings, ttl as a Go duration string.
// Empty strings mean "not set" (including unknown orgs). Raw stored
// values — validation happens in resolveWorkerProfile.
OrgDefaultWorkerProfile(orgID string) (cpu, memory, ttl string)
UpsertFlightSessionRecord(record *configstore.FlightSessionRecord) error
GetFlightSessionRecord(sessionToken string) (*configstore.FlightSessionRecord, error)
TouchFlightSessionRecord(sessionToken string, lastSeenAt time.Time) error
CloseFlightSessionRecord(sessionToken string, closedAt time.Time) error
}
ConfigStoreInterface abstracts the config store for the control plane. Defined here to avoid circular imports with the configstore package.
type ControlPlane ¶
type ControlPlane struct {
// contains filtered or unexported fields
}
ControlPlane manages the TCP listener and routes connections to Flight SQL workers. The control plane owns client connections end-to-end: TLS, authentication, PostgreSQL wire protocol, and SQL transpilation all happen here. Workers are thin DuckDB execution engines reachable via Arrow Flight SQL over Unix sockets.
type ControlPlaneConfig ¶
type ControlPlaneConfig struct {
server.Config
Process ProcessConfig
SocketDir string
ConfigPath string // Path to config file, passed to workers
HealthCheckInterval time.Duration
WorkerQueueTimeout time.Duration // How long to wait for an available worker/org connection slot (default: 60s)
WorkerIdleTimeout time.Duration // How long to keep an idle worker alive (default: 5m)
RetireOnSessionEnd bool // When true, process workers are retired immediately after their last session ends.
HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade. 0 = unbounded (wait until k8s SIGKILL via terminationGracePeriodSeconds). Default: 0 in remote mode (so a CP rolling out doesn't kill in-flight customer queries at a self-imposed wall — see drainAndShutdown), 24h in process mode.
MetricsServer *http.Server // Optional metrics server to shut down during upgrade
// WorkerBackend selects the worker management backend.
// "process" (default): workers are local child processes communicating over Unix sockets.
// "remote": Kubernetes-backed multitenant workers communicating over TCP.
// Requires ConfigStoreConn and a binary built with -tags kubernetes.
WorkerBackend string
// K8s contains Kubernetes-specific configuration. Only used for remote
// multitenant mode.
K8s K8sConfig
// ConfigStoreConn is the PostgreSQL connection string for the config store.
// Required when WorkerBackend == "remote".
ConfigStoreConn string
// ConfigPollInterval is how often to poll the config store for changes.
// Default: 30s.
ConfigPollInterval time.Duration
// InternalSecret is the shared secret for API authentication.
// When empty, a random secret is generated and logged at startup.
InternalSecret string
// InternalSecretFallbacks are previous internal secrets still accepted
// for API authentication during a rotation (newest first). Clients always
// send the primary InternalSecret; the server accepts any of
// {primary ∪ fallbacks}. Mirrors posthog's SECRET_KEY_FALLBACKS.
InternalSecretFallbacks []string
// UserSecretKey is the base64-encoded 32-byte AES key for encrypting
// user persistent secrets in the config store (env-only:
// DUCKGRES_USER_SECRET_KEY). Empty disables the persistent secret
// manager: CREATE PERSISTENT SECRET is rejected with a clear error.
// Only used in multitenant remote mode.
UserSecretKey string
// SNIRoutingMode controls hostname-based org routing. Values:
// "" or "off" - SNI is ignored; legacy database-param routing only
// (default).
// "passthrough" - Managed Postgres SNI must resolve to the same org as
// the requested startup database; when startup database is
// empty, use the SNI-derived database fallback. Legacy
// hostnames still fall back with a warn log.
// "enforce" - Reject connections whose SNI doesn't match a managed
// suffix. Explicit startup database still takes
// priority when SNI is present, but must resolve to the
// same org as the managed hostname.
// Unknown values behave like "off" in the connection path.
// Only consulted in multi-tenant control-plane builds (kubernetes tag);
// other builds always behave as "off".
SNIRoutingMode string
// ManagedHostnameSuffixes lists DNS suffixes (each starting with a dot) for
// managed tenant hostnames. When SNI matches one of these suffixes, the
// single-label prefix is resolved as hostname_alias, database_name, or org
// name. Postgres still honors an explicit startup database, but only when it
// resolves to the same org as the managed hostname.
ManagedHostnameSuffixes []string
// DucklingBucketSuffix is the env suffix the control plane uses to name a
// type=s3bucket Duckling's per-org S3 bucket
// (posthog-duckling-<compact-org>-<suffix>). It MUST equal the
// crossplane-config chart's envSuffix for this environment so the name the
// CP writes onto the Duckling CR's spec.dataStore.bucketName is exactly what
// the composition provisions. Empty ⇒ the CP does not name buckets and the
// composition derives the name (legacy behavior). See
// configstore.DucklingBucketName.
DucklingBucketSuffix string
// DuckLakeDefaultSpecVersion is the global default DuckLake spec version
// used for migration checks when an org doesn't specify an override.
DuckLakeDefaultSpecVersion string
}
ControlPlaneConfig extends server.Config with control-plane-specific settings.
type ControlPlaneJanitor ¶
type ControlPlaneJanitor struct {
// contains filtered or unexported fields
}
func NewControlPlaneJanitor ¶
func NewControlPlaneJanitor(store controlPlaneExpiryStore, interval, expiryTimeout time.Duration) *ControlPlaneJanitor
func (*ControlPlaneJanitor) Run ¶
func (j *ControlPlaneJanitor) Run(ctx context.Context)
type ControlPlaneRuntimeTracker ¶
type ControlPlaneRuntimeTracker struct {
// contains filtered or unexported fields
}
func NewControlPlaneRuntimeTracker ¶
func NewControlPlaneRuntimeTracker(store runtimeInstanceStore, id, podName, podUID, bootID string, heartbeatInterval time.Duration) *ControlPlaneRuntimeTracker
func (*ControlPlaneRuntimeTracker) Draining ¶
func (t *ControlPlaneRuntimeTracker) Draining() bool
func (*ControlPlaneRuntimeTracker) MarkDraining ¶
func (t *ControlPlaneRuntimeTracker) MarkDraining() error
type FlightIngress ¶
type FlightIngress = flightsqlingress.FlightIngress
func NewFlightIngress ¶
func NewFlightIngress(host string, port int, tlsConfig *tls.Config, validator flightsqlingress.CredentialValidator, provider flightsqlingress.SessionProvider, rateLimiter *server.RateLimiter, cfg FlightIngressConfig) (*FlightIngress, error)
NewFlightIngress creates a control-plane Flight SQL ingress listener.
func NewFlightIngressFromListener ¶
func NewFlightIngressFromListener(listener net.Listener, tlsConfig *tls.Config, validator flightsqlingress.CredentialValidator, provider flightsqlingress.SessionProvider, rateLimiter *server.RateLimiter, cfg FlightIngressConfig) (*FlightIngress, error)
type FlightIngressConfig ¶
type FlightIngressConfig = flightsqlingress.Config
type FlightWorkerPool ¶
type FlightWorkerPool struct {
// contains filtered or unexported fields
}
func NewFlightWorkerPool ¶
func NewFlightWorkerPool(socketDir, configPath string, minWorkers, maxWorkers int) *FlightWorkerPool
NewFlightWorkerPool creates a new worker pool.
func (*FlightWorkerPool) AcquireWorker ¶
func (p *FlightWorkerPool) AcquireWorker(ctx context.Context, _ *WorkerProfile) (*ManagedWorker, error)
AcquireWorker returns a worker for a new session.
Strategy:
- Reuse an idle worker (0 active sessions) if available.
- If the pool has fewer live workers than maxWorkers (or maxWorkers is 0), spawn a new worker process.
- If the pool is at capacity, assign to the least-loaded live worker.
This ensures the number of worker processes never exceeds maxWorkers while allowing unlimited concurrent sessions across the fixed pool.
func (*FlightWorkerPool) HealthCheckLoop ¶
func (p *FlightWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Duration, onCrash WorkerCrashHandler, onProgress ProgressHandler)
HealthCheckLoop periodically checks worker health and handles crashed workers. In the elastic 1:1 model, crashed workers with active sessions trigger crash notification (so sessions see errors), and the dead worker is cleaned up. Workers without sessions are simply retired. Workers that fail maxConsecutiveHealthFailures health checks in a row are force-killed and their sessions notified.
func (*FlightWorkerPool) PreBindSockets ¶
func (p *FlightWorkerPool) PreBindSockets(count int) error
PreBindSockets eagerly binds count Unix sockets at startup while the socket directory is verified writable. Under systemd's ProtectSystem=strict, the RuntimeDirectory bind mount can go read-only after the service finishes starting (e.g., after an upgrade or namespace event). Pre-binding ensures sockets are available regardless of later filesystem state.
func (*FlightWorkerPool) ReleaseWorker ¶
func (p *FlightWorkerPool) ReleaseWorker(id int)
ReleaseWorker decrements the active session count for a worker and updates its lastUsed time.
func (*FlightWorkerPool) RetireWorker ¶
func (p *FlightWorkerPool) RetireWorker(id int)
RetireWorker stops a worker process and cleans up its resources. Sends SIGINT, waits up to 3s, then SIGKILL. Runs asynchronously to avoid blocking the calling goroutine (e.g., connection handler).
func (*FlightWorkerPool) RetireWorkerIfNoSessions ¶
func (p *FlightWorkerPool) RetireWorkerIfNoSessions(id int) bool
RetireWorkerIfNoSessions retires a worker only if it has no active sessions after releasing our claim. Used to clean up on session creation failure without retiring shared workers that have other active sessions.
func (*FlightWorkerPool) SetMaxWorkers ¶
func (p *FlightWorkerPool) SetMaxWorkers(n int)
SetMaxWorkers updates the maximum number of workers. 0 means unlimited.
func (*FlightWorkerPool) ShutdownAll ¶
func (p *FlightWorkerPool) ShutdownAll()
ShutdownAll stops all workers gracefully.
func (*FlightWorkerPool) SpawnAll ¶
func (p *FlightWorkerPool) SpawnAll(count int) error
SpawnAll spawns the specified number of workers in parallel.
func (*FlightWorkerPool) SpawnMinWorkers ¶
func (p *FlightWorkerPool) SpawnMinWorkers(count int) error
SpawnMinWorkers pre-warms the pool with the given number of workers. This is used at startup for the elastic 1:1 model.
func (*FlightWorkerPool) SpawnWorker ¶
func (p *FlightWorkerPool) SpawnWorker(id int) error
SpawnWorker starts a new duckdb-service worker process. It uses a pre-bound socket from the pool if available, falling back to binding a new socket (which may fail with EROFS under systemd's ProtectSystem=strict after startup).
func (*FlightWorkerPool) Worker ¶
func (p *FlightWorkerPool) Worker(id int) (*ManagedWorker, bool)
Worker returns a worker by ID.
type HealthCheckResult ¶
type HealthCheckResult string
HealthCheckResult records the outcome of a single worker health check probe. "pass" and "fail" are the only first-class outcomes; recoverWorkerPanic-caught panics surface as "fail" because the caller sees them as an error return.
const ( HealthCheckResultPass HealthCheckResult = "pass" HealthCheckResultFail HealthCheckResult = "fail" )
type JanitorLeaderManager ¶
type JanitorLeaderManager struct{}
func NewJanitorLeaderManager ¶
func NewJanitorLeaderManager(namespace, identity string, janitor *ControlPlaneJanitor) (*JanitorLeaderManager, error)
func (*JanitorLeaderManager) Start ¶
func (m *JanitorLeaderManager) Start(ctx context.Context) error
func (*JanitorLeaderManager) Stop ¶
func (m *JanitorLeaderManager) Stop()
type K8sConfig ¶
type K8sConfig struct {
WorkerImage string // Container image for worker pods (required)
WorkerNamespace string // K8s namespace (default: auto-detect from service account)
ControlPlaneID string // Unique CP identifier for labeling worker pods (default: os.Hostname())
WorkerPort int // gRPC port on worker pods (default: 8816)
WorkerSecret string // Base name for per-worker K8s Secrets containing RPC bearer token and TLS material
WorkerConfigMap string // ConfigMap name for duckgres.yaml
ImagePullPolicy string // Image pull policy for worker pods (e.g., "Never", "IfNotPresent", "Always")
ServiceAccount string // ServiceAccount name for worker pods (default: "duckgres-worker")
MaxWorkers int // Global cap for the shared K8s worker pool (0 = unbounded; cluster autoscaler is the natural ceiling)
WorkerCPURequest string // CPU request for worker pods (e.g., "500m")
WorkerMemoryRequest string // Memory request for worker pods (e.g., "1Gi")
WorkerNodeSelector string // JSON map for worker pod nodeSelector (e.g., '{"posthog.com/nodepool":"workers"}')
WorkerTolerationKey string // Taint key for worker pod NoSchedule toleration
WorkerTolerationValue string // Taint value for worker pod NoSchedule toleration
WorkerPriorityClassName string // PriorityClass for worker pods, so they preempt overprovision headroom pause pods (empty = none)
AWSRegion string // AWS region for STS client
// Node-headroom controller: keep HeadroomPercent% of the worker nodepool's
// allocatable CPU+memory free via low-priority placeholder pods, so a worker
// spawn schedules immediately (preempting placeholders) rather than waiting
// on a fresh Karpenter node. 0 = disabled.
HeadroomPercent int // % of worker-nodepool allocatable to hold free (0 = disabled)
PlaceholderImage string // Image for placeholder pods (a pause image)
PlaceholderPriorityClassName string // PriorityClass for placeholder pods — MUST rank below WorkerPriorityClassName
// Connection-string worker sizing (duckgres.worker_cpu / worker_memory /
// worker_ttl). All default to the off/empty state, so absent config = the
// default worker shape. See docs/design/connection-string-worker-profile.md.
AllowClientWorkerProfile bool // Master gate: honor duckgres.* startup options at all
WorkerProfileMinCPU string // Clamp floor for a client-supplied cpu (e.g. "1")
WorkerProfileMaxCPU string // Clamp ceiling for a client-supplied cpu (e.g. "16")
WorkerProfileMinMemory string // Clamp floor for a client-supplied memory (e.g. "4Gi")
WorkerProfileMaxMemory string // Clamp ceiling for a client-supplied memory (e.g. "64Gi")
WorkerMaxTTL time.Duration // Clamp ceiling for a client-supplied duckgres.worker_ttl (0 = unbounded)
// WorkerDefaultTTL is the hot-idle TTL applied when a request does not
// specify duckgres.worker_ttl (and no org default does either) — the
// "default TTL", pairing with WorkerMaxTTL (the clamp). It governs both
// no-ttl paths the same way: default-shape workers (reaped by the janitor)
// and sized-but-no-ttl workers (stamped at profile resolution). Per-request
// precedence: client GUC > org default > this > built-in (20m,
// defaultWorkerTTL). Raise it above a tenant's job cadence (e.g. 70m for
// hourly jobs) so scheduled workloads reuse hot-idle workers instead of
// cold-spawning every run — at the cost of idle worker nodes.
WorkerDefaultTTL time.Duration
}
K8sConfig holds Kubernetes worker backend configuration.
type K8sPoolFactory ¶
type K8sPoolFactory func(cfg K8sWorkerPoolConfig) (WorkerPool, error)
K8sPoolFactory creates a K8sWorkerPool. Registered at init time by the kubernetes build tag. Nil when the binary is built without -tags kubernetes.
type K8sWorkerPoolConfig ¶
type K8sWorkerPoolConfig struct {
Namespace string
CPID string // Control plane pod name, used in labels
CPInstanceID string // Durable control-plane instance ID (<pod_uid>:<boot_id>)
WorkerImage string
WorkerPort int
SecretName string // Base name for per-worker K8s Secrets containing RPC bearer token and TLS material
ConfigMap string // ConfigMap name for duckgres.yaml
MaxWorkers int
IdleTimeout time.Duration
ConfigPath string // Path inside worker pod where config is mounted
ImagePullPolicy string // Image pull policy for worker pods (e.g., "Never", "IfNotPresent", "Always")
ServiceAccount string // ServiceAccount name for worker pods (default: "duckgres-worker")
WorkerCPURequest string // CPU request for worker pods (e.g., "500m"). Empty = BestEffort.
WorkerMemoryRequest string // Memory request for worker pods (e.g., "1Gi"). Empty = BestEffort.
WorkerNodeSelector map[string]string // Node selector for worker pods. Nil = no selector.
WorkerTolerationKey string // Taint key for worker pod NoSchedule toleration. Empty = no toleration.
WorkerTolerationValue string // Taint value for worker pod NoSchedule toleration.
WorkerPriorityClassName string // PriorityClass for worker pods (so they preempt overprovision pause pods). Empty = none.
HeadroomPercent int // Keep this % of worker-nodepool allocatable CPU+mem free via low-priority placeholder pods (0 = disabled).
PlaceholderImage string // Image for headroom placeholder pods (a pause image).
PlaceholderPriorityClassName string // PriorityClass for placeholder pods — MUST be below WorkerPriorityClassName so workers preempt them.
OrgID string // Org ID for pod labels (multi-tenant mode)
WorkerIDGenerator func() int // Shared ID generator across orgs (nil = internal counter)
ResolveOrgConfig func(string) (*configstore.OrgConfig, error) // Optional: resolve org config for version-aware reaping
RuntimeStore RuntimeWorkerStore
}
K8sWorkerPoolConfig holds the configuration for creating a K8sWorkerPool.
type LifecycleOperation ¶
type LifecycleOperation string
LifecycleOperation is the stable, low-cardinality label that identifies a public WorkerLifecycle method on lifecycle-transition metrics. New values must be added here (not invented at the call site) so the metric's label set stays bounded.
const ( LifecycleOpRetireFromSnapshot LifecycleOperation = "retire_from_snapshot" LifecycleOpRetireOrphanFromSnapshot LifecycleOperation = "retire_orphan_from_snapshot" LifecycleOpRetireIdleVariantFromSnapshot LifecycleOperation = "retire_idle_variant_from_snapshot" LifecycleOpMarkLostFromLease LifecycleOperation = "mark_lost_from_lease" LifecycleOpDrain LifecycleOperation = "drain" LifecycleOpRetireDrained LifecycleOperation = "retire_drained" LifecycleOpRefreshLease LifecycleOperation = "refresh_lease" // LifecycleOpRetireLocal covers the in-memory retirement chain // (markWorkerRetiredLocked → persistWorkerRecord via UpsertWorkerRecord) // which doesn't go through the lifecycle CAS service. Used by // public RetireWorker, idle-timeout reaping, stuck-activating // reaping, and activation-failure / liveness-recheck fallbacks. // outcome is normally transitioned, but UpsertWorkerRecord has a // fence-miss path (ErrWorkerRecordUpsertFenceMiss) — markWorkerRetiredLocked // gates the emission on the upsert result, so retire_local // samples reflect transitions that actually landed durably. LifecycleOpRetireLocal LifecycleOperation = "retire_local" )
type LifecycleOrigin ¶
type LifecycleOrigin string
LifecycleOrigin identifies the caller that asked WorkerLifecycle for a transition. The same operation (e.g. retire_from_snapshot) is invoked from many sites with very different triggering conditions, and a CAS miss from the janitor's hot-idle reaper tells a different story than one from cred-refresh — so we record both axes. Like LifecycleOperation, new values must be added here.
const ( LifecycleOriginJanitorOrphan LifecycleOrigin = "janitor_orphan" LifecycleOriginJanitorHotIdleTTL LifecycleOrigin = "janitor_hot_idle_ttl" LifecycleOriginJanitorStuckActivating LifecycleOrigin = "janitor_stuck_activating" LifecycleOriginMismatchedVersionReaper LifecycleOrigin = "mismatched_version_reaper" LifecycleOriginShutdownAll LifecycleOrigin = "shutdown_all" LifecycleOriginHealthCheckCrash LifecycleOrigin = "health_check_crash" LifecycleOriginWorkerDrain LifecycleOrigin = "worker_drain" LifecycleOriginSpawnFailure LifecycleOrigin = "spawn_failure" LifecycleOriginReserveImageMismatch LifecycleOrigin = "reserve_image_mismatch" LifecycleOriginCredRefresh LifecycleOrigin = "cred_refresh" // LifecycleOriginReserveFailure marks retire paths that fire when // ReserveSharedWorker observes a claim that cannot be activated // (stale-claim retries excepted) and falls back to retire-and-retry. LifecycleOriginReserveFailure LifecycleOrigin = "reserve_failure" // LifecycleOriginIdleTimeout marks retire paths from the idle-worker // reaper (reapIdleWorkers). LifecycleOriginIdleTimeout LifecycleOrigin = "idle_timeout" // LifecycleOriginPublicAPI marks retire paths invoked through the // public RetireWorker / RetireWorkerIfNoSessions / RetireIfDrainingAndEmpty // surface (admin tooling, orchestration callbacks). LifecycleOriginPublicAPI LifecycleOrigin = "public_api" // LifecycleOriginActivationFailure marks retire paths fired after a // worker activation returned an error (org pool's // activateWorkerForOrg / ReconnectFlightWorker). LifecycleOriginActivationFailure LifecycleOrigin = "activation_failure" // LifecycleOriginCrashGeneric marks retire paths driven by a generic // "worker died" signal that isn't covered by a more specific origin // (no current callers after the explicit-origin refactor; retained as // a safe fallback bucket for future generic-crash sites). LifecycleOriginCrashGeneric LifecycleOrigin = "crash_generic" // LifecycleOriginInformerCrash marks the cleanDeadWorkersLocked path // driven by the K8s pod-informer firing w.done. Distinct from // LifecycleOriginHealthCheckCrash (which is the periodic // HealthCheckLoop probe) so dashboards can separate cluster-driven // pod terminations (eviction, OOM, manual delete, node drain) from // our own health-check decisions. LifecycleOriginInformerCrash LifecycleOrigin = "informer_crash" // LifecycleOriginPoolStuckActivating marks the pool-local // reapStuckActivatingWorkers loop, which runs every minute on every // CP. Distinct from LifecycleOriginJanitorStuckActivating (which is // the leader-only janitor reaper) so operators can tell whether // pool-side or janitor-side reaping is doing the work. LifecycleOriginPoolStuckActivating LifecycleOrigin = "pool_stuck_activating" // LifecycleOriginOrgShutdown marks per-org ShutdownAll on // OrgReservedPool. Distinct from LifecycleOriginShutdownAll (which is // the pool-wide K8sWorkerPool.ShutdownAll) so operators can tell // org-offboarding events from CP rollouts on dashboards. LifecycleOriginOrgShutdown LifecycleOrigin = "org_shutdown" // LifecycleOriginUnknown is the fallback label applied when an empty // origin reaches the observer. We always emit a sample rather than // silently drop it; an "unknown" bucket showing up on dashboards is // the signal that a new call site forgot to thread the origin. LifecycleOriginUnknown LifecycleOrigin = "unknown" )
type ManagedSession ¶
type ManagedSession struct {
PID int32
WorkerID int
Protocol string // "postgres" or "flight"
SessionToken string
Executor *flightclient.FlightExecutor
// contains filtered or unexported fields
}
ManagedSession tracks a client session bound to a worker.
type ManagedWorker ¶
type ManagedWorker struct {
ID int
// contains filtered or unexported fields
}
ManagedWorker represents a duckdb-service worker process.
func (*ManagedWorker) ActivateTenant ¶
func (w *ManagedWorker) ActivateTenant(ctx context.Context, payload server.WorkerActivationPayload) (err error)
ActivateTenant delivers tenant runtime to a shared warm worker before it may serve sessions.
func (*ManagedWorker) CreateSession ¶
func (w *ManagedWorker) CreateSession(ctx context.Context, username, memoryLimit string, threads int, secretStatements []string) (token string, secretWarnings []string, err error)
CreateSession creates a new session on the given worker. secretStatements are the user's persistent secrets to replay on the worker before the session serves queries; secretWarnings reports any that failed to apply.
func (*ManagedWorker) DestroySession ¶
func (w *ManagedWorker) DestroySession(ctx context.Context, sessionToken string) (err error)
DestroySession destroys a session on the worker.
func (*ManagedWorker) IncrementOwnerEpoch ¶
func (w *ManagedWorker) IncrementOwnerEpoch() int64
func (*ManagedWorker) OwnerCPInstanceID ¶
func (w *ManagedWorker) OwnerCPInstanceID() string
func (*ManagedWorker) OwnerEpoch ¶
func (w *ManagedWorker) OwnerEpoch() int64
func (*ManagedWorker) PodName ¶
func (w *ManagedWorker) PodName() string
func (*ManagedWorker) RefreshOwnerEpochAtomic ¶
func (w *ManagedWorker) RefreshOwnerEpochAtomic(fn func(current int64) (int64, error)) error
RefreshOwnerEpochAtomic holds the per-worker epoch lock across the callback so the durable bump and the in-memory update appear atomic to concurrent readers. Used by the credential-refresh path: the callback performs the lifecycle.RefreshLease CAS and returns the new epoch; readers blocked on OwnerEpoch() during the callback see the new value the moment it lands.
This closes the race where ShutdownAll could read the in-memory epoch between the durable bump and the in-memory set, build a stale lease, and CAS-miss against the now-advanced row.
Latency note: the callback runs under the lock, and the credential-refresh callback does a single DB round-trip (BumpWorkerEpoch). Worst-case stall for any concurrent OwnerEpoch() reader on the same worker is one round-trip (~10ms in practice). Because the lock is per-worker, refreshing worker A does not block reads on worker B — but readers that hold the pool-wide K8sWorkerPool.mu (e.g. iterations in cleanDeadWorkersLocked) will serialize behind any in-flight refresh on the worker they're inspecting. If this latency budget ever becomes a problem, the path forward is to bound cred-refresh concurrency or move to a "refresh-in-progress" sentinel rather than a held mutex. Keep callback work tightly scoped to the durable round-trip. If the callback returns an error the in-memory epoch is left unchanged.
func (*ManagedWorker) SetOwnerCPInstanceID ¶
func (w *ManagedWorker) SetOwnerCPInstanceID(cpInstanceID string)
func (*ManagedWorker) SetOwnerEpoch ¶
func (w *ManagedWorker) SetOwnerEpoch(epoch int64)
func (*ManagedWorker) SetSharedState ¶
func (w *ManagedWorker) SetSharedState(state SharedWorkerState) error
SetSharedState updates the additive shared worker lifecycle metadata without changing existing session scheduling behavior.
func (*ManagedWorker) SharedState ¶
func (w *ManagedWorker) SharedState() SharedWorkerState
SharedState returns the additive shared worker lifecycle metadata for this worker. The zero value normalizes to an idle, unassigned worker.
type MemoryRebalancer ¶
type MemoryRebalancer struct {
// contains filtered or unexported fields
}
MemoryRebalancer sets memory_limit and threads on DuckDB sessions. Every session gets the full memory budget — DuckDB will spill to disk/swap if aggregate usage exceeds physical RAM. Threads are not subdivided.
When enabled is true, SET commands are re-sent on every session create/destroy (useful if the budget or threads change at runtime). When enabled is false (default), each session gets its limits at creation time and is never adjusted afterward.
Lock ordering invariant: r.mu → SessionManager.mu(RLock). Never acquire r.mu while holding SessionManager.mu to avoid deadlock.
func NewMemoryRebalancer ¶
func NewMemoryRebalancer(memoryBudget uint64, threadBudget int, sessions SessionLister, enabled bool) *MemoryRebalancer
NewMemoryRebalancer creates a rebalancer with the given budgets. If memoryBudget is 0, it defaults to 75% of system RAM. If threadBudget is 0, it defaults to runtime.NumCPU(). If enabled is false, RequestRebalance is a no-op and sessions only get their limits set once at creation time.
func (*MemoryRebalancer) DefaultMaxWorkers ¶
func (r *MemoryRebalancer) DefaultMaxWorkers() int
DefaultMaxWorkers returns a reasonable default for max_workers. Derived from the memory budget (budget / 256MB).
func (*MemoryRebalancer) MemoryLimit ¶
func (r *MemoryRebalancer) MemoryLimit() string
MemoryLimit returns the memory limit string applied to every session.
func (*MemoryRebalancer) PerSessionThreads ¶
func (r *MemoryRebalancer) PerSessionThreads() int
PerSessionThreads returns the thread count for each session. Threads are not subdivided — every session gets the full budget.
func (*MemoryRebalancer) RequestRebalance ¶
func (r *MemoryRebalancer) RequestRebalance()
RequestRebalance signals that a rebalance is needed. Multiple rapid calls are coalesced — the actual rebalance runs at most once per debounce interval. When rebalancing is disabled, this is a no-op.
func (*MemoryRebalancer) SetInitialLimits ¶
func (r *MemoryRebalancer) SetInitialLimits(ctx context.Context, session *ManagedSession)
SetInitialLimits sets memory_limit and threads on a single session synchronously. Called during CreateSession so the new session never runs with unlimited resources.
func (*MemoryRebalancer) SetSessionLister ¶
func (r *MemoryRebalancer) SetSessionLister(sl SessionLister)
SetSessionLister sets the session lister after construction. Must be called before any Rebalance calls (i.e., before accepting connections).
func (*MemoryRebalancer) Stop ¶
func (r *MemoryRebalancer) Stop()
Stop stops the background debounce goroutine. Must be called on shutdown.
type OrgRouterInterface ¶
type OrgRouterInterface interface {
StackForOrg(orgID string) (pool WorkerPool, sessions *SessionManager, rebalancer *MemoryRebalancer, ok bool)
IcebergConfigForOrg(orgID string) (server.IcebergConfig, bool)
IsMigratingForOrg(orgID string) bool
ShutdownAll()
}
OrgRouterInterface abstracts the org router for the control plane.
type ProcessConfig ¶
type ProgressHandler ¶
type ProgressHandler func(workerID int, progress map[string]*SessionProgress)
ProgressHandler is called after a successful health check with per-session progress data parsed from the worker's health check response.
type RuntimeWorkerStore ¶
type RuntimeWorkerStore interface {
UpsertWorkerRecord(record *configstore.WorkerRecord) error
ClaimHotIdleWorker(ownerCPInstanceID, orgID, image string, profileCPU, profileMemory string, maxOrgWorkers int) (*configstore.WorkerRecord, configstore.WorkerClaimMissReason, error)
CreateSpawningWorkerSlot(ownerCPInstanceID, orgID, image string, profileCPU, profileMemory string, ownerEpoch int64, podNamePrefix string, maxOrgWorkers, maxGlobalWorkers int) (*configstore.WorkerRecord, error)
CountHotIdleWorkers(orgID, image, profileCPU, profileMemory string) (int, error)
GetWorkerRecord(workerID int) (*configstore.WorkerRecord, error)
ObserveWorker(workerID int) (*configstore.WorkerSnapshot, error)
TakeOverWorker(workerID int, ownerCPInstanceID, orgID string, expectedOwnerEpoch int64) (*configstore.WorkerRecord, error)
}
RuntimeWorkerStore is the durable-store surface exposed to the K8s worker pool. Every lifecycle CAS method is now absent from this interface — they are reachable only through WorkerLifecycle so callers physically cannot bypass the typed snapshot/lease seam.
The lifecycle service uses workerLifecycleStore (defined in worker_lifecycle.go), which is the larger interface that includes the CAS methods. Production wires it via a type assertion in newK8sWorkerPool / ensureLifecycle, because *configstore.ConfigStore satisfies both interfaces.
type SessionLister ¶
type SessionLister interface {
AllSessions() []*ManagedSession
}
SessionLister provides a snapshot of all active sessions for rebalancing.
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager tracks all active sessions and their worker assignments.
func NewOrgSessionManager ¶
func NewOrgSessionManager(pool WorkerPool, rebalancer *MemoryRebalancer, orgID string) *SessionManager
NewOrgSessionManager builds a SessionManager whose log lines all carry the owning org (multi-tenant remote backend: one manager per org stack).
func NewSessionManager ¶
func NewSessionManager(pool WorkerPool, rebalancer *MemoryRebalancer) *SessionManager
NewSessionManager creates a new session manager.
func (*SessionManager) AllSessions ¶
func (sm *SessionManager) AllSessions() []*ManagedSession
AllSessions returns a snapshot of all active sessions. The returned slice is safe to iterate without holding the lock.
func (*SessionManager) CreateSession ¶
func (sm *SessionManager) CreateSession(ctx context.Context, username string, pid int32, memoryLimit string, threads int, profile *WorkerProfile) (int32, *flightclient.FlightExecutor, error)
CreateSession acquires a worker from the configured pool, creates a session on it, and rebalances memory/thread limits across all active sessions. If pid is 0, a new one is generated.
func (*SessionManager) CreateSessionWithProtocol ¶
func (sm *SessionManager) CreateSessionWithProtocol(ctx context.Context, username string, pid int32, memoryLimit string, threads int, protocol string, profile *WorkerProfile) (int32, *flightclient.FlightExecutor, error)
func (*SessionManager) DestroyAllSessions ¶
func (sm *SessionManager) DestroyAllSessions()
DestroyAllSessions destroys every active session without holding the manager lock while running per-session cleanup.
func (*SessionManager) DestroySession ¶
func (sm *SessionManager) DestroySession(pid int32)
DestroySession destroys a session, retires its dedicated worker, and rebalances memory/thread limits across remaining sessions.
func (*SessionManager) GetProgress ¶
func (sm *SessionManager) GetProgress(pid int32) *SessionProgress
GetProgress returns the cached query progress for a session, or nil.
func (*SessionManager) OnWorkerCrash ¶
func (sm *SessionManager) OnWorkerCrash(workerID int, errorFn func(pid int32))
OnWorkerCrash handles a worker crash by marking all affected executors as dead and notifying sessions. Executors are marked dead BEFORE the shared gRPC client is closed to prevent nil-pointer panics from concurrent RPCs. errorFn is called for each affected session to send an error to the client.
func (*SessionManager) ReconnectFlightSession ¶
func (sm *SessionManager) ReconnectFlightSession(ctx context.Context, username string, workerID int, ownerEpoch int64) (int32, *flightclient.FlightExecutor, error)
func (*SessionManager) ReservePID ¶
func (sm *SessionManager) ReservePID() int32
ReservePID generates a new unique PID for a session.
func (*SessionManager) SessionCount ¶
func (sm *SessionManager) SessionCount() int
SessionCount returns the number of active sessions.
func (*SessionManager) SessionCountForWorker ¶
func (sm *SessionManager) SessionCountForWorker(workerID int) int
SessionCountForWorker returns the number of sessions on a specific worker.
func (*SessionManager) SetConnCloser ¶
func (sm *SessionManager) SetConnCloser(pid int32, closer io.Closer)
SetConnCloser registers the client's TCP connection so it can be closed when the backing worker crashes. This unblocks the message loop's read, causing it to exit cleanly instead of looping on ErrWorkerDead.
func (*SessionManager) SetConnectionLimiter ¶
func (sm *SessionManager) SetConnectionLimiter(limiter connectionLimiter)
SetConnectionLimiter replaces the local process limiter with a cluster-wide admission limiter. Local session maps still track only this control-plane's live sessions.
func (*SessionManager) SetMaxConnections ¶
func (sm *SessionManager) SetMaxConnections(n int)
SetMaxConnections sets the maximum connections for this SessionManager.
func (*SessionManager) SetProtocol ¶
func (sm *SessionManager) SetProtocol(pid int32, protocol string)
SetProtocol updates the protocol label for an active session.
func (*SessionManager) SetUserSecretLoader ¶
func (sm *SessionManager) SetUserSecretLoader(loader func(ctx context.Context, username string) ([]string, error))
SetUserSecretLoader installs the per-user persistent-secret loader used at session creation (multitenant/remote backend only).
func (*SessionManager) UpdateProgress ¶
func (sm *SessionManager) UpdateProgress(workerID int, progress map[string]*SessionProgress)
UpdateProgress caches query progress data for sessions on the given worker. Called from the health check loop after parsing the worker's health check response. Progress keys are truncated session tokens (first 16 chars) to avoid leaking full bearer tokens in health check JSON.
func (*SessionManager) WorkerIDForPID ¶
func (sm *SessionManager) WorkerIDForPID(pid int32) int
WorkerIDForPID returns the worker ID for a session, or -1 if not found.
func (*SessionManager) WorkerPodNameForPID ¶
func (sm *SessionManager) WorkerPodNameForPID(pid int32) string
WorkerPodNameForPID returns the K8s pod name of the worker hosting the session, or "" if not found or not running on K8s.
type SessionProgress ¶
SessionProgress holds cached query progress from a worker health check.
type SharedWorkerState ¶
type SharedWorkerState struct {
}
SharedWorkerState holds the additive lifecycle/assignment model for shared workers.
func (SharedWorkerState) NormalizedLifecycle ¶
func (s SharedWorkerState) NormalizedLifecycle() WorkerLifecycleState
NormalizedLifecycle treats the zero value as an idle, unassigned worker so existing worker structs can adopt this model without extra initialization.
func (SharedWorkerState) Transition ¶
func (s SharedWorkerState) Transition(next WorkerLifecycleState, assignment *WorkerAssignment) (SharedWorkerState, error)
Transition validates a lifecycle change and returns the next worker state. When assignment metadata is omitted, the current assignment is carried forward for states that remain tenant-bound.
func (SharedWorkerState) Validate ¶
func (s SharedWorkerState) Validate() error
Validate checks that the lifecycle and assignment metadata are internally consistent.
type SpawnFailureReason ¶
type SpawnFailureReason string
SpawnFailureReason categorizes why a worker spawn returned an error. Buckets follow the spawnWorker control-flow stages so dashboards can localize regressions: an uptick in "pod_ready" points at the scheduler / image-pull, an uptick in "secret_create" points at Kubernetes API auth, and so on.
const ( SpawnFailureReasonRuntimeStore SpawnFailureReason = "runtime_store" SpawnFailureReasonConfigMap SpawnFailureReason = "config_map" SpawnFailureReasonSecretCreate SpawnFailureReason = "secret_create" SpawnFailureReasonPodCreate SpawnFailureReason = "pod_create" SpawnFailureReasonPodReady SpawnFailureReason = "pod_ready" SpawnFailureReasonSecretRead SpawnFailureReason = "secret_read" SpawnFailureReasonGRPCConnect SpawnFailureReason = "grpc_connect" SpawnFailureReasonContextCanceled SpawnFailureReason = "context_canceled" SpawnFailureReasonOther SpawnFailureReason = "other" )
type StrandedOutcome ¶
type StrandedOutcome string
StrandedOutcome categorizes what the janitor recovery sweep did with each stranded pod it observed. "kept" means the artifact was claimed by a current runtime row (i.e. it wasn't actually stranded); "deleted" means the API delete succeeded; "delete_failed" means the delete returned an error and the artifact is still around.
const ( StrandedOutcomeDeleted StrandedOutcome = "deleted" StrandedOutcomeDeletedMissingRow StrandedOutcome = "deleted_missing_row" StrandedOutcomeDeletedTerminalRow StrandedOutcome = "deleted_terminal_row" StrandedOutcomeKept StrandedOutcome = "kept" StrandedOutcomeDeleteFailed StrandedOutcome = "delete_failed" )
type WorkerAssignment ¶
type WorkerAssignment struct {
OrgID string
MaxWorkers int
Image string
// Profile is the requested worker shape. nil => the default exclusive
// profile (today's behavior). It is immutable for a reserved worker's life;
// enforcement is wired in alongside the scheduling changes (see design doc).
Profile *WorkerProfile
}
WorkerAssignment carries tenant-specific metadata once a shared worker has been reserved for an org.
type WorkerCapacityExhaustedError ¶
type WorkerCapacityExhaustedError struct {
// RetryAfter is the client-facing retry hint for protocol-specific error responses.
RetryAfter time.Duration
// Reason classifies why the worker could not be acquired.
Reason configstore.WorkerClaimMissReason
}
WorkerCapacityExhaustedError is returned when a worker can't be acquired because the org or the cluster is at its worker cap (or the pool is shutting down). The caller fails fast with a retryable capacity response.
func (*WorkerCapacityExhaustedError) Error ¶
func (e *WorkerCapacityExhaustedError) Error() string
type WorkerCrashHandler ¶
type WorkerCrashHandler func(workerID int)
WorkerCrashHandler is called when a worker crash is detected, before respawning.
type WorkerLifecycle ¶
type WorkerLifecycle struct {
// contains filtered or unexported fields
}
WorkerLifecycle is the central typed lifecycle-transition API. It is the seam where post-#615 hand-rolled "observe → fence → CAS → cleanup" patterns are collapsed into one place.
All transitions take either a WorkerSnapshot (for observed-row paths like the janitor reapers) or a WorkerLease (for owned-row paths like ShutdownAll and the health checker). The two types cannot be interchanged at the call site, which is the load-bearing safety property — callers without a real lease physically cannot invoke the lease-only methods, and a snapshot's frozen fields are exactly what the underlying CAS fences against.
On a successful terminal CAS the service kicks the configured WorkerPhysicalCleanup. On a CAS miss the cleanup is skipped — by construction we have no proof of ownership over the pod, so deleting it is unsafe.
Every public transition method observes duckgres_worker_lifecycle_transitions_total (keyed by operation, outcome, image, and origin) plus a per-operation latency histogram. origin is a caller-supplied LifecycleOrigin so dashboards can see, for example, that "fence_miss_owner on retire_from_snapshot from janitor_orphan" is a different signal from the same outcome coming from cred_refresh.
func NewWorkerLifecycle ¶
func NewWorkerLifecycle(store workerLifecycleStore, cleanup WorkerPhysicalCleanup) *WorkerLifecycle
NewWorkerLifecycle wires the service. cleanup may be nil if the caller only needs durable CAS transitions and no physical cleanup (e.g. tests that assert on store calls in isolation).
func (*WorkerLifecycle) Drain ¶
func (l *WorkerLifecycle) Drain(lease configstore.WorkerLease, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
Drain transitions a lease-owned worker into the draining state. This is the first step of ShutdownAll's 3-step chain; it does NOT trigger physical cleanup (the caller orchestrates the pod delete between Drain and RetireDrained). Returns Transitioned=false on a CAS miss so the caller can skip the remaining steps.
func (*WorkerLifecycle) MarkLostFromLease ¶
func (l *WorkerLifecycle) MarkLostFromLease(lease configstore.WorkerLease, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
MarkLostFromLease performs the lease-fenced CAS that transitions a worker row to lost. Used by the health-checker after it has confirmed the worker is unresponsive. Does NOT schedule pod cleanup — consistent with the other lease-based transitions (Drain, RetireDrained, RefreshLease), the caller orchestrates physical cleanup so it can interleave replenishment decisions, in-memory pool removal, and pod delete in the right order. (The snapshot-based variants RetireFromSnapshot/RetireOrphanFromSnapshot/ RetireIdleVariantFromSnapshot do bundle cleanup because their callers don't have post-CAS choreography.)
func (*WorkerLifecycle) RefreshLease ¶
func (l *WorkerLifecycle) RefreshLease(lease configstore.WorkerLease, origin LifecycleOrigin) (configstore.WorkerLease, error)
RefreshLease bumps the durable owner_epoch under the current lease and returns a fresh lease at the new epoch. Equivalent to calling BumpWorkerEpoch directly; the value is the typed lease passing, not race avoidance — the caller still has to mirror the new epoch onto the in-memory worker (and the window between BumpWorkerEpoch returning and that mirror update is unchanged). PR 5 is where the in-memory-epoch race actually gets closed. Returns ErrWorkerOwnerEpochMismatch (via the error return) when the lease no longer matches the durable row.
func (*WorkerLifecycle) RetireDrained ¶
func (l *WorkerLifecycle) RetireDrained(lease configstore.WorkerLease, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
RetireDrained is the third step of ShutdownAll's chain: it transitions a draining row to retired, fenced by the lease. The caller is responsible for the pod delete between Drain and RetireDrained; if that delete failed the row should be left in draining (don't call this method) so the orphan sweep can reconcile.
func (*WorkerLifecycle) RetireFromSnapshot ¶
func (l *WorkerLifecycle) RetireFromSnapshot(snap configstore.WorkerSnapshot, target configstore.WorkerState, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
RetireFromSnapshot moves the observed worker row to a terminal state (retired or lost) fenced by the snapshot. Used by paths that already own the worker but observed a separate snapshot (e.g. the janitor's hot-idle TTL reaper) — the broad MarkWorkerTerminalIfCurrent fence ensures we don't trample a row that has been taken over since the snapshot was captured.
func (*WorkerLifecycle) RetireIdleVariantFromSnapshot ¶
func (l *WorkerLifecycle) RetireIdleVariantFromSnapshot(snap configstore.WorkerSnapshot, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
RetireIdleVariantFromSnapshot retires a worker observed as idle or hot_idle, fenced by the snapshot. The state restriction maps onto the legacy RetireIdleOrHotIdleWorker store CAS. Today's only caller is the mismatched-version reaper, which deliberately reaps only idle/hot_idle pods so a busy worker isn't yanked mid-session. The hot-idle TTL janitor previously used this helper but was promoted to the broader RetireFromSnapshot once the snapshot already narrowed the candidate set to state=hot_idle by virtue of ListExpiredHotIdleSnapshots.
func (*WorkerLifecycle) RetireOrphanFromSnapshot ¶
func (l *WorkerLifecycle) RetireOrphanFromSnapshot(snap configstore.WorkerSnapshot, reason string, origin LifecycleOrigin) (configstore.TransitionOutcome, error)
RetireOrphanFromSnapshot is the orphan-specific retire: it permits any active observed state (spawning through draining) and additionally enforces that the worker's owner control-plane is still expired (or absent). Used by the janitor's orphan-cleanup loop where the listing step already filtered by owner-expired-or-missing, so the snapshot's CP must remain so for the retire to be safe.
type WorkerLifecycleState ¶
type WorkerLifecycleState string
WorkerLifecycleState models the shared worker lifecycle used for late tenant binding (spawn → reserve → activate → hot → hot-idle → drain/retire).
const ( WorkerLifecycleIdle WorkerLifecycleState = "idle" WorkerLifecycleReserved WorkerLifecycleState = "reserved" WorkerLifecycleActivating WorkerLifecycleState = "activating" WorkerLifecycleHot WorkerLifecycleState = "hot" WorkerLifecycleHotIdle WorkerLifecycleState = "hot_idle" WorkerLifecycleDraining WorkerLifecycleState = "draining" WorkerLifecycleRetired WorkerLifecycleState = "retired" )
type WorkerPhysicalCleanup ¶
type WorkerPhysicalCleanup interface {
// DeleteWorkerArtifacts schedules deletion of the pod and RPC secret
// for the named worker. Called only after the durable runtime row
// has transitioned to terminal. Implementations may also clean up
// in-memory pool state keyed by workerID.
DeleteWorkerArtifacts(workerID int, podName string, reason string)
}
WorkerPhysicalCleanup performs the post-CAS K8s/in-memory side of a worker retirement: pod delete, RPC secret delete, removal from any in-process pool maps. WorkerLifecycle calls it after a durable CAS to terminal has landed.
Cleanup is fire-and-forget (the method returns synchronously after scheduling the goroutine that performs the delete). The lifecycle service does not depend on or report cleanup completion — the orphan reconciler + cleanupOrphanedWorkerPods janitor are the safety net for failed deletes.
type WorkerPool ¶
type WorkerPool interface {
// AcquireWorker returns a worker for a new session. The multi-tenant
// OrgReservedPool (k8s/remote) reuses a hot-idle worker of the requested shape
// for the org or spawns one on demand; the process FlightWorkerPool reuses an
// idle worker / assigns least-loaded. profile is the requested pod shape (nil
// => the default shape) and is honored only by OrgReservedPool.
AcquireWorker(ctx context.Context, profile *WorkerProfile) (*ManagedWorker, error)
// ReleaseWorker decrements the active session count for a worker.
ReleaseWorker(id int)
// RetireWorker removes a worker from the pool and shuts it down.
RetireWorker(id int)
// RetireWorkerIfNoSessions retires a worker only if it has zero active
// sessions after releasing the caller's claim. Returns true if retired.
RetireWorkerIfNoSessions(id int) bool
// Worker returns a worker by ID, or false if not found.
Worker(id int) (*ManagedWorker, bool)
// SpawnMinWorkers pre-warms the pool with count workers at startup. Only the
// process FlightWorkerPool implements this (--process-min-workers); the K8s
// pool spawns on demand, so its implementation is a no-op.
SpawnMinWorkers(count int) error
// HealthCheckLoop runs periodic health checks on all workers.
// onCrash is called when a worker crash is detected.
// onProgress is called with per-session progress data after each successful check.
// Either callback may be nil.
HealthCheckLoop(ctx context.Context, interval time.Duration, onCrash WorkerCrashHandler, onProgress ProgressHandler)
// SetMaxWorkers updates the maximum number of workers. 0 means unlimited.
SetMaxWorkers(n int)
// ShutdownAll stops all workers gracefully.
ShutdownAll()
}
WorkerPool abstracts the lifecycle and scheduling of Flight SQL workers. Two implementations exist:
- FlightWorkerPool: spawns workers as local child processes (default)
- K8sWorkerPool: creates workers as Kubernetes pods (build tag: kubernetes)
func CreateK8sPool ¶
func CreateK8sPool(cfg K8sWorkerPoolConfig) (WorkerPool, error)
CreateK8sPool creates a Kubernetes worker pool if the kubernetes build tag is enabled.
type WorkerProfile ¶
type WorkerProfile struct {
CPU string // normalized k8s quantity (e.g. "8"); the worker's CPU size
Memory string // normalized k8s quantity (e.g. "16Gi"); the worker's memory size
TTL time.Duration // how long the worker stays hot-idle after its last query (reset per query); not part of MatchKey
}
WorkerProfile describes the pod shape a session asked for via connection-string startup options (duckgres.worker_cpu / worker_memory / worker_ttl). It is a match dimension on WorkerAssignment, ORTHOGONAL to Image: a reserved or hot-idle worker may only be handed to a request whose profile Equal()s it.
The nil/zero profile is the DEFAULT profile: empty CPU/Memory (the pool-global request applies). Normalizing the default to empty strings (rather than the literal pool values) is what keeps legacy worker records claimable without a data migration.
Only CPU/Memory are persisted (WorkerRecord) and matched; TTL rides along on the record but is not part of the match identity.
func (*WorkerProfile) Equal ¶
func (wp *WorkerProfile) Equal(other *WorkerProfile) bool
Equal reports whether two profiles match (nil == zero == default).
func (*WorkerProfile) MatchKey ¶
func (wp *WorkerProfile) MatchKey() string
MatchKey is the identity used to decide whether an existing worker can serve a request. A nil profile shares the key of the zero/default profile, so legacy and default requests match the same workers.
func (*WorkerProfile) Parts ¶
func (wp *WorkerProfile) Parts() (cpu, memory string)
Parts returns the persisted/match primitives for the profile, decomposing nil to the default profile. Used to cross the controlplane→configstore package boundary (which cannot reference the WorkerProfile type) without losing the nil==default convention.
Source Files
¶
- capacity_policy.go
- connection_limiter.go
- control.go
- flight_ingress.go
- flight_ingress_metrics.go
- flight_ingress_metrics_stub.go
- handover.go
- janitor.go
- janitor_leader_stub.go
- leader_loop.go
- memory_rebalancer.go
- multitenant_stub.go
- runtime_tracker.go
- sdnotify.go
- session_lifecycle.go
- session_mgr.go
- session_search_path.go
- sni_other.go
- user_secrets.go
- validation.go
- worker_lifecycle.go
- worker_lifecycle_metrics.go
- worker_mgr.go
- worker_pool.go
- worker_profile.go
- worker_state.go