Documentation
¶
Index ¶
- func AppImage(app *db.AppRow, serverDefault string) string
- func AppRuntime(app *db.AppRow, cfg config.DockerConfig) string
- func BaseWorkerSpec(srv *Server, app *db.AppRow, workerID, bundleID string) backend.WorkerSpec
- func CleanupTokenDir(bundleServerPath, workerID string)
- func EnsureRProfile(dir string) (string, error)
- func LoadOrCreateWorkerKey(ctx context.Context, vaultClient *integration.Client, cfg *config.Config) (*auth.SigningKey, error)
- func SpawnTokenRefresher(ctx context.Context, bundleServerPath string, signingKey *auth.SigningKey, ...) (tokDir string, cancel func(), err error)
- func WorkerEnv(srv *Server) map[string]string
- type ActiveWorker
- type LayeredWorkerMap
- func (m *LayeredWorkerMap) All() []string
- func (m *LayeredWorkerMap) AppIDs() []string
- func (m *LayeredWorkerMap) ClearDraining(workerID string)
- func (m *LayeredWorkerMap) ClearIdleSince(workerID string) bool
- func (m *LayeredWorkerMap) Count() int
- func (m *LayeredWorkerMap) CountForApp(appID string) int
- func (m *LayeredWorkerMap) Delete(id string)
- func (m *LayeredWorkerMap) ForApp(appID string) []string
- func (m *LayeredWorkerMap) ForAppAvailable(appID string) []string
- func (m *LayeredWorkerMap) Get(id string) (ActiveWorker, bool)
- func (m *LayeredWorkerMap) IdleWorkers(timeout time.Duration) []string
- func (m *LayeredWorkerMap) IsDraining(appID string) bool
- func (m *LayeredWorkerMap) MarkDraining(appID string) []string
- func (m *LayeredWorkerMap) Set(id string, w ActiveWorker)
- func (m *LayeredWorkerMap) SetDraining(workerID string)
- func (m *LayeredWorkerMap) SetIdleSince(workerID string, t time.Time)
- func (m *LayeredWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
- func (m *LayeredWorkerMap) WorkersForServer(serverID string) []string
- type MemoryWorkerMap
- func (m *MemoryWorkerMap) All() []string
- func (m *MemoryWorkerMap) AppIDs() []string
- func (m *MemoryWorkerMap) ClearDraining(workerID string)
- func (m *MemoryWorkerMap) ClearIdleSince(workerID string) bool
- func (m *MemoryWorkerMap) Count() int
- func (m *MemoryWorkerMap) CountForApp(appID string) int
- func (m *MemoryWorkerMap) Delete(id string)
- func (m *MemoryWorkerMap) ForApp(appID string) []string
- func (m *MemoryWorkerMap) ForAppAvailable(appID string) []string
- func (m *MemoryWorkerMap) Get(id string) (ActiveWorker, bool)
- func (m *MemoryWorkerMap) IdleWorkers(timeout time.Duration) []string
- func (m *MemoryWorkerMap) IsDraining(appID string) bool
- func (m *MemoryWorkerMap) MarkDraining(appID string) []string
- func (m *MemoryWorkerMap) Set(id string, w ActiveWorker)
- func (m *MemoryWorkerMap) SetDraining(workerID string)
- func (m *MemoryWorkerMap) SetIdleSince(workerID string, t time.Time)
- func (m *MemoryWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
- func (m *MemoryWorkerMap) WorkersForServer(_ string) []string
- type PackageRequest
- type PackageResponse
- type PostgresWorkerMap
- func (m *PostgresWorkerMap) All() []string
- func (m *PostgresWorkerMap) AppIDs() []string
- func (m *PostgresWorkerMap) ClearDraining(workerID string)
- func (m *PostgresWorkerMap) ClearIdleSince(workerID string) bool
- func (m *PostgresWorkerMap) Count() int
- func (m *PostgresWorkerMap) CountForApp(appID string) int
- func (m *PostgresWorkerMap) Delete(id string)
- func (m *PostgresWorkerMap) ForApp(appID string) []string
- func (m *PostgresWorkerMap) ForAppAvailable(appID string) []string
- func (m *PostgresWorkerMap) Get(id string) (ActiveWorker, bool)
- func (m *PostgresWorkerMap) IdleWorkers(timeout time.Duration) []string
- func (m *PostgresWorkerMap) IsDraining(appID string) bool
- func (m *PostgresWorkerMap) MarkDraining(appID string) []string
- func (m *PostgresWorkerMap) RunReaper(ctx context.Context, threshold, interval time.Duration)
- func (m *PostgresWorkerMap) Set(id string, w ActiveWorker)
- func (m *PostgresWorkerMap) SetDraining(workerID string)
- func (m *PostgresWorkerMap) SetIdleSince(workerID string, t time.Time)
- func (m *PostgresWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
- func (m *PostgresWorkerMap) WorkersForServer(serverID string) []string
- type RedisWorkerMap
- func (m *RedisWorkerMap) All() []string
- func (m *RedisWorkerMap) AppIDs() []string
- func (m *RedisWorkerMap) ClearDraining(workerID string)
- func (m *RedisWorkerMap) ClearIdleSince(workerID string) bool
- func (m *RedisWorkerMap) Count() int
- func (m *RedisWorkerMap) CountForApp(appID string) int
- func (m *RedisWorkerMap) Delete(id string)
- func (m *RedisWorkerMap) ForApp(appID string) []string
- func (m *RedisWorkerMap) ForAppAvailable(appID string) []string
- func (m *RedisWorkerMap) Get(id string) (ActiveWorker, bool)
- func (m *RedisWorkerMap) IdleWorkers(timeout time.Duration) []string
- func (m *RedisWorkerMap) IsDraining(appID string) bool
- func (m *RedisWorkerMap) MarkDraining(appID string) []string
- func (m *RedisWorkerMap) Set(id string, w ActiveWorker)
- func (m *RedisWorkerMap) SetDraining(workerID string)
- func (m *RedisWorkerMap) SetIdleSince(workerID string, t time.Time)
- func (m *RedisWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
- func (m *RedisWorkerMap) WorkersForServer(serverID string) []string
- type Server
- func (srv *Server) AuthDeps() *auth.Deps
- func (srv *Server) BundlePaths(appID, bundleID string) bundle.Paths
- func (srv *Server) CancelTokenRefresher(workerID string)
- func (srv *Server) CleanupInstallMu(workerID string)
- func (srv *Server) ClearTransferring(workerID string)
- func (srv *Server) GetVersion() string
- func (srv *Server) InstallPackage(ctx context.Context, appID, workerID string, req PackageRequest) (PackageResponse, error)
- func (srv *Server) InternalAPIURL() string
- func (srv *Server) IsTransferring(workerID string) bool
- func (srv *Server) RunRefresh(ctx context.Context, app *db.AppRow, m *manifest.Manifest, sender task.Sender) bool
- func (srv *Server) RunRefreshScheduler(ctx context.Context)
- func (srv *Server) RunRollback(ctx context.Context, app *db.AppRow, target string, sender task.Sender)
- func (srv *Server) SetCancelToken(workerID string, cancel func())
- func (srv *Server) SetTransferring(workerID string)
- func (srv *Server) SetUpdateStatus(r *update.Result)
- func (srv *Server) TransferDir(workerID string) string
- func (srv *Server) UpdateAvailableVersion() string
- type WorkerMap
- type WsConnCounter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AppImage ¶ added in v0.0.3
AppImage returns the per-app image override, or the server-wide default.
func AppRuntime ¶ added in v0.0.3
func AppRuntime(app *db.AppRow, cfg config.DockerConfig) string
AppRuntime returns the effective OCI runtime for an app. Fallback chain: app.Runtime → config.RuntimeDefaults[accessType] → config.Runtime.
func BaseWorkerSpec ¶ added in v0.0.3
BaseWorkerSpec returns a WorkerSpec with all fields that are common across spawn sites (coldstart, transfer, API scale-up). Callers fill in site-specific fields like LibDir, TransferDir, TokenDir, MemoryLimit, and CPULimit.
func CleanupTokenDir ¶ added in v0.0.3
func CleanupTokenDir(bundleServerPath, workerID string)
CleanupTokenDir removes the token directory for a worker.
func EnsureRProfile ¶ added in v0.0.3
EnsureRProfile writes the blockyard R profile to dir and returns the path. The file bridges SHINY_HOST and SHINY_PORT env vars to the corresponding R options so bundles don't have to. Safe to call from multiple goroutines; the file is written once.
func LoadOrCreateWorkerKey ¶ added in v0.0.3
func LoadOrCreateWorkerKey( ctx context.Context, vaultClient *integration.Client, cfg *config.Config, ) (*auth.SigningKey, error)
LoadOrCreateWorkerKey resolves the worker signing key. It tries three sources in order:
- The vault (if configured) -- read or generate + store
- File ({bundle_server_path}/.worker-key) -- read existing
- Generate new + write to file
This ensures both the old and new server use the same key during a rolling update. When the vault is not available, the file path provides persistence across restarts.
func SpawnTokenRefresher ¶ added in v0.0.3
func SpawnTokenRefresher( ctx context.Context, bundleServerPath string, signingKey *auth.SigningKey, appID, workerID string, ) (tokDir string, cancel func(), err error)
SpawnTokenRefresher starts a goroutine that refreshes the worker's token file every TTL/2. Returns the token directory and a cancel function. The goroutine writes an initial token synchronously before returning, so the token file is ready before the container starts.
func WorkerEnv ¶ added in v0.0.3
WorkerEnv builds the backend-agnostic environment variable map for worker containers. Always sets BLOCKYARD_API_URL (needed for runtime package installs). Includes vault integration vars when configured. Sets SHINY_HOST per backend so bundles don't have to. Values from server.worker_env are merged in last; blockyard-managed keys win on collision, everything else (e.g. OTEL_*) is passed through.
Types ¶
type ActiveWorker ¶
type ActiveWorker struct {
AppID string
BundleID string // bundle active at spawn time; runtime installs resolve against this
Draining bool // set by graceful drain; no new sessions routed
IdleSince time.Time // zero value = not idle; set when session count hits 0
StartedAt time.Time // when the worker was spawned
}
ActiveWorker represents a running worker tracked by the server. The worker ID is the map key in WorkerMap, not stored here.
type LayeredWorkerMap ¶ added in v0.0.4
type LayeredWorkerMap struct {
// contains filtered or unexported fields
}
LayeredWorkerMap layers a cache WorkerMap over a primary (see #287, parent #262). The primary is the source of truth (Postgres in production); the cache is an optional optimization (Redis).
Reads: cache first; on miss, fall back to primary and populate the cache on the way out.
Writes: primary first; cache mirrored best-effort. Cache errors are swallowed inside the concrete stores, so LayeredWorkerMap just calls both — the primary operation's outcome is the one surfaced.
Aggregate queries (Count, CountForApp, ForApp, MarkDraining, …) always go to the primary: the cache may hold a subset and can't answer accurately.
func NewLayeredWorkerMap ¶ added in v0.0.4
func NewLayeredWorkerMap(primary, cache WorkerMap) *LayeredWorkerMap
func (*LayeredWorkerMap) All ¶ added in v0.0.4
func (m *LayeredWorkerMap) All() []string
func (*LayeredWorkerMap) AppIDs ¶ added in v0.0.4
func (m *LayeredWorkerMap) AppIDs() []string
func (*LayeredWorkerMap) ClearDraining ¶ added in v0.0.4
func (m *LayeredWorkerMap) ClearDraining(workerID string)
func (*LayeredWorkerMap) ClearIdleSince ¶ added in v0.0.4
func (m *LayeredWorkerMap) ClearIdleSince(workerID string) bool
func (*LayeredWorkerMap) Count ¶ added in v0.0.4
func (m *LayeredWorkerMap) Count() int
func (*LayeredWorkerMap) CountForApp ¶ added in v0.0.4
func (m *LayeredWorkerMap) CountForApp(appID string) int
func (*LayeredWorkerMap) Delete ¶ added in v0.0.4
func (m *LayeredWorkerMap) Delete(id string)
func (*LayeredWorkerMap) ForApp ¶ added in v0.0.4
func (m *LayeredWorkerMap) ForApp(appID string) []string
func (*LayeredWorkerMap) ForAppAvailable ¶ added in v0.0.4
func (m *LayeredWorkerMap) ForAppAvailable(appID string) []string
func (*LayeredWorkerMap) Get ¶ added in v0.0.4
func (m *LayeredWorkerMap) Get(id string) (ActiveWorker, bool)
func (*LayeredWorkerMap) IdleWorkers ¶ added in v0.0.4
func (m *LayeredWorkerMap) IdleWorkers(timeout time.Duration) []string
func (*LayeredWorkerMap) IsDraining ¶ added in v0.0.4
func (m *LayeredWorkerMap) IsDraining(appID string) bool
func (*LayeredWorkerMap) MarkDraining ¶ added in v0.0.4
func (m *LayeredWorkerMap) MarkDraining(appID string) []string
MarkDraining writes the drain flag to the primary, then mirrors the state to the cache for each affected worker so a subsequent cache hit returns the new draining flag.
func (*LayeredWorkerMap) Set ¶ added in v0.0.4
func (m *LayeredWorkerMap) Set(id string, w ActiveWorker)
func (*LayeredWorkerMap) SetDraining ¶ added in v0.0.4
func (m *LayeredWorkerMap) SetDraining(workerID string)
func (*LayeredWorkerMap) SetIdleSince ¶ added in v0.0.4
func (m *LayeredWorkerMap) SetIdleSince(workerID string, t time.Time)
func (*LayeredWorkerMap) SetIdleSinceIfZero ¶ added in v0.0.4
func (m *LayeredWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
func (*LayeredWorkerMap) WorkersForServer ¶ added in v0.0.4
func (m *LayeredWorkerMap) WorkersForServer(serverID string) []string
type MemoryWorkerMap ¶ added in v0.0.3
type MemoryWorkerMap struct {
// contains filtered or unexported fields
}
MemoryWorkerMap is a concurrent in-memory map of worker ID → ActiveWorker.
func NewMemoryWorkerMap ¶ added in v0.0.3
func NewMemoryWorkerMap() *MemoryWorkerMap
func (*MemoryWorkerMap) All ¶ added in v0.0.3
func (m *MemoryWorkerMap) All() []string
All returns a snapshot of all worker IDs.
func (*MemoryWorkerMap) AppIDs ¶ added in v0.0.3
func (m *MemoryWorkerMap) AppIDs() []string
AppIDs returns a deduplicated list of app IDs that have active workers.
func (*MemoryWorkerMap) ClearDraining ¶ added in v0.0.3
func (m *MemoryWorkerMap) ClearDraining(workerID string)
ClearDraining clears the draining flag on a single worker by ID.
func (*MemoryWorkerMap) ClearIdleSince ¶ added in v0.0.3
func (m *MemoryWorkerMap) ClearIdleSince(workerID string) bool
ClearIdleSince resets the idle timer (a new session was assigned). Returns true if the worker was idle before clearing.
func (*MemoryWorkerMap) Count ¶ added in v0.0.3
func (m *MemoryWorkerMap) Count() int
func (*MemoryWorkerMap) CountForApp ¶ added in v0.0.3
func (m *MemoryWorkerMap) CountForApp(appID string) int
func (*MemoryWorkerMap) Delete ¶ added in v0.0.3
func (m *MemoryWorkerMap) Delete(id string)
func (*MemoryWorkerMap) ForApp ¶ added in v0.0.3
func (m *MemoryWorkerMap) ForApp(appID string) []string
ForApp returns all worker IDs for a given app (including draining).
func (*MemoryWorkerMap) ForAppAvailable ¶ added in v0.0.3
func (m *MemoryWorkerMap) ForAppAvailable(appID string) []string
ForAppAvailable returns worker IDs for an app that are not draining.
func (*MemoryWorkerMap) Get ¶ added in v0.0.3
func (m *MemoryWorkerMap) Get(id string) (ActiveWorker, bool)
func (*MemoryWorkerMap) IdleWorkers ¶ added in v0.0.3
func (m *MemoryWorkerMap) IdleWorkers(timeout time.Duration) []string
IdleWorkers returns workers that have been idle longer than the given timeout, excluding draining workers (they have their own lifecycle).
func (*MemoryWorkerMap) IsDraining ¶ added in v0.0.3
func (m *MemoryWorkerMap) IsDraining(appID string) bool
IsDraining returns true if any worker for the given app is draining.
func (*MemoryWorkerMap) MarkDraining ¶ added in v0.0.3
func (m *MemoryWorkerMap) MarkDraining(appID string) []string
MarkDraining sets the draining flag on all workers for an app. Returns the list of affected worker IDs.
func (*MemoryWorkerMap) Set ¶ added in v0.0.3
func (m *MemoryWorkerMap) Set(id string, w ActiveWorker)
func (*MemoryWorkerMap) SetDraining ¶ added in v0.0.3
func (m *MemoryWorkerMap) SetDraining(workerID string)
SetDraining sets the draining flag on a single worker by ID. Used by refresh drain-and-replace to drain specific old workers while keeping newly spawned workers available.
func (*MemoryWorkerMap) SetIdleSince ¶ added in v0.0.3
func (m *MemoryWorkerMap) SetIdleSince(workerID string, t time.Time)
SetIdleSince marks when a worker became idle (zero sessions). Called when the last session for a worker is removed.
func (*MemoryWorkerMap) SetIdleSinceIfZero ¶ added in v0.0.3
func (m *MemoryWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
SetIdleSinceIfZero marks when a worker became idle, but only if it isn't already marked. This avoids resetting the timer on repeated ticks while the worker remains idle.
func (*MemoryWorkerMap) WorkersForServer ¶ added in v0.0.3
func (m *MemoryWorkerMap) WorkersForServer(_ string) []string
WorkersForServer returns all worker IDs. In the memory implementation every worker belongs to "this" server — single-node deployments have one process — so the serverID filter is a no-op. The Redis variant does the real filtering.
type PackageRequest ¶ added in v0.0.3
type PackageRequest struct {
Name string `json:"name"` // package name or pkgdepends ref
LoadedNamespaces []string `json:"loaded_namespaces"` // from loadedNamespaces() in R
}
PackageRequest is the body of POST /api/v1/packages.
type PackageResponse ¶ added in v0.0.3
type PackageResponse struct {
Status string `json:"status"` // "ok", "transfer", "error"
Message string `json:"message,omitempty"`
TransferPath string `json:"transfer_path,omitempty"` // set when status == "transfer"
}
PackageResponse is returned by POST /api/v1/packages.
type PostgresWorkerMap ¶ added in v0.0.4
type PostgresWorkerMap struct {
// contains filtered or unexported fields
}
PostgresWorkerMap implements WorkerMap against the blockyard_workers table, making Postgres the source of truth for worker metadata (see #287, parent #262). The same table also backs registry.PostgresRegistry — each store upserts its own column subset so a production spawn that calls Workers.Set before Registry.Set converges to a full row.
idle_since is stored as NULL when the worker is not idle (mapped to Go's zero-value time.Time), mirroring the MemoryWorkerMap semantic used throughout the codebase.
func NewPostgresWorkerMap ¶ added in v0.0.4
func NewPostgresWorkerMap(db *sqlx.DB, serverID string) *PostgresWorkerMap
func (*PostgresWorkerMap) All ¶ added in v0.0.4
func (m *PostgresWorkerMap) All() []string
func (*PostgresWorkerMap) AppIDs ¶ added in v0.0.4
func (m *PostgresWorkerMap) AppIDs() []string
AppIDs returns the deduplicated set of app_ids with at least one tracked worker. Filters out the empty-string default so rows created by registry.PostgresRegistry alone don't leak a bogus "" entry.
func (*PostgresWorkerMap) ClearDraining ¶ added in v0.0.4
func (m *PostgresWorkerMap) ClearDraining(workerID string)
func (*PostgresWorkerMap) ClearIdleSince ¶ added in v0.0.4
func (m *PostgresWorkerMap) ClearIdleSince(workerID string) bool
ClearIdleSince resets idle_since to NULL and reports whether the worker was previously idle. Done in a single UPDATE so the read-modify-write is race-free.
func (*PostgresWorkerMap) Count ¶ added in v0.0.4
func (m *PostgresWorkerMap) Count() int
Count returns the total number of tracked workers. Rows created by registry.PostgresRegistry alone (no Workers.Set ever called) still count — a registered address is a tracked worker.
func (*PostgresWorkerMap) CountForApp ¶ added in v0.0.4
func (m *PostgresWorkerMap) CountForApp(appID string) int
func (*PostgresWorkerMap) Delete ¶ added in v0.0.4
func (m *PostgresWorkerMap) Delete(id string)
func (*PostgresWorkerMap) ForApp ¶ added in v0.0.4
func (m *PostgresWorkerMap) ForApp(appID string) []string
func (*PostgresWorkerMap) ForAppAvailable ¶ added in v0.0.4
func (m *PostgresWorkerMap) ForAppAvailable(appID string) []string
func (*PostgresWorkerMap) Get ¶ added in v0.0.4
func (m *PostgresWorkerMap) Get(id string) (ActiveWorker, bool)
func (*PostgresWorkerMap) IdleWorkers ¶ added in v0.0.4
func (m *PostgresWorkerMap) IdleWorkers(timeout time.Duration) []string
IdleWorkers returns workers idle longer than timeout, excluding draining workers (they're on their own lifecycle).
func (*PostgresWorkerMap) IsDraining ¶ added in v0.0.4
func (m *PostgresWorkerMap) IsDraining(appID string) bool
func (*PostgresWorkerMap) MarkDraining ¶ added in v0.0.4
func (m *PostgresWorkerMap) MarkDraining(appID string) []string
MarkDraining sets draining on every worker for the given app and returns the affected worker IDs. Implemented as a single UPDATE ... RETURNING so the read-modify-write stays atomic.
func (*PostgresWorkerMap) RunReaper ¶ added in v0.0.4
func (m *PostgresWorkerMap) RunReaper(ctx context.Context, threshold, interval time.Duration)
RunReaper deletes blockyard_workers rows whose last_heartbeat has been stale for longer than `threshold`, every `interval`. Blocks until ctx is cancelled. Caller runs it in a goroutine.
Motivation: Redis workermap entries had implicit TTLs that cleaned up after a pod died without graceful shutdown. Postgres doesn't expire rows, so without this sweep a crashed pod's workers linger in the shared blockyard_workers table — Registry.Get hides them via its own last_heartbeat filter, but WorkerMap's Count / ForApp / IdleWorkers / AppIDs would keep reporting ghosts and skew scheduler decisions.
Pick a threshold well above registryTTL so a transient network blip (heartbeat writes briefly blocked) doesn't reap a worker the health poller will resume updating seconds later. cmd/blockyard/main.go uses 5 × registryTTL, giving 60 s of recovery slack on top of the 15 s "Registry considers dead" signal with default health intervals.
func (*PostgresWorkerMap) Set ¶ added in v0.0.4
func (m *PostgresWorkerMap) Set(id string, w ActiveWorker)
func (*PostgresWorkerMap) SetDraining ¶ added in v0.0.4
func (m *PostgresWorkerMap) SetDraining(workerID string)
SetDraining flips draining on a single worker. The WHERE guard matches the "must not create ghost entry" conformance test — if the row doesn't exist, nothing is inserted.
func (*PostgresWorkerMap) SetIdleSince ¶ added in v0.0.4
func (m *PostgresWorkerMap) SetIdleSince(workerID string, t time.Time)
func (*PostgresWorkerMap) SetIdleSinceIfZero ¶ added in v0.0.4
func (m *PostgresWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
SetIdleSinceIfZero sets idle_since only when it's currently NULL (the zero-value representation). Matches the MemoryWorkerMap semantic used to avoid resetting the timer on repeated ticks.
func (*PostgresWorkerMap) WorkersForServer ¶ added in v0.0.4
func (m *PostgresWorkerMap) WorkersForServer(serverID string) []string
type RedisWorkerMap ¶ added in v0.0.3
type RedisWorkerMap struct {
// contains filtered or unexported fields
}
RedisWorkerMap implements WorkerMap using Redis hashes.
Key schema:
{prefix}worker:{workerID} -> hash {app_id, bundle_id, draining, idle_since, started_at, server_id}
No TTL — workers are explicitly deleted on eviction.
func NewRedisWorkerMap ¶ added in v0.0.3
func NewRedisWorkerMap(client *redisstate.Client, serverID string) *RedisWorkerMap
func (*RedisWorkerMap) All ¶ added in v0.0.3
func (m *RedisWorkerMap) All() []string
func (*RedisWorkerMap) AppIDs ¶ added in v0.0.3
func (m *RedisWorkerMap) AppIDs() []string
func (*RedisWorkerMap) ClearDraining ¶ added in v0.0.3
func (m *RedisWorkerMap) ClearDraining(workerID string)
func (*RedisWorkerMap) ClearIdleSince ¶ added in v0.0.3
func (m *RedisWorkerMap) ClearIdleSince(workerID string) bool
func (*RedisWorkerMap) Count ¶ added in v0.0.3
func (m *RedisWorkerMap) Count() int
func (*RedisWorkerMap) CountForApp ¶ added in v0.0.3
func (m *RedisWorkerMap) CountForApp(appID string) int
func (*RedisWorkerMap) Delete ¶ added in v0.0.3
func (m *RedisWorkerMap) Delete(id string)
func (*RedisWorkerMap) ForApp ¶ added in v0.0.3
func (m *RedisWorkerMap) ForApp(appID string) []string
func (*RedisWorkerMap) ForAppAvailable ¶ added in v0.0.3
func (m *RedisWorkerMap) ForAppAvailable(appID string) []string
func (*RedisWorkerMap) Get ¶ added in v0.0.3
func (m *RedisWorkerMap) Get(id string) (ActiveWorker, bool)
func (*RedisWorkerMap) IdleWorkers ¶ added in v0.0.3
func (m *RedisWorkerMap) IdleWorkers(timeout time.Duration) []string
func (*RedisWorkerMap) IsDraining ¶ added in v0.0.3
func (m *RedisWorkerMap) IsDraining(appID string) bool
func (*RedisWorkerMap) MarkDraining ¶ added in v0.0.3
func (m *RedisWorkerMap) MarkDraining(appID string) []string
func (*RedisWorkerMap) Set ¶ added in v0.0.3
func (m *RedisWorkerMap) Set(id string, w ActiveWorker)
func (*RedisWorkerMap) SetDraining ¶ added in v0.0.3
func (m *RedisWorkerMap) SetDraining(workerID string)
func (*RedisWorkerMap) SetIdleSince ¶ added in v0.0.3
func (m *RedisWorkerMap) SetIdleSince(workerID string, t time.Time)
func (*RedisWorkerMap) SetIdleSinceIfZero ¶ added in v0.0.3
func (m *RedisWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
func (*RedisWorkerMap) WorkersForServer ¶ added in v0.0.3
func (m *RedisWorkerMap) WorkersForServer(serverID string) []string
WorkersForServer returns worker IDs whose server_id hash field matches the given serverID. Pattern lifted from ForApp — scan the worker key namespace, pipeline-HGET the server_id field, filter. Same shape as the other server-scoped filters, just on a different hash field.
type Server ¶
type Server struct {
Config *config.Config
Backend backend.Backend
DB *db.DB
Workers WorkerMap
Sessions session.Store
WsConns *WsConnCounter
Registry registry.WorkerRegistry
Tasks *task.Store
LogStore *logstore.Store
ErrorLog *errorlog.Store
Metrics *telemetry.Metrics
// ServerID uniquely identifies this process among peers sharing the
// same Postgres / Redis (see #262). Health-poll, startup-reconcile,
// and graceful-shutdown use it via Workers.WorkersForServer so they
// only act on workers this process owns — otherwise a sibling pod's
// boot would wipe this pod's workers from the shared table.
ServerID string
// Auth fields — nil when [oidc] is not configured (v0 compat).
OIDCClient *auth.OIDCClient
SigningKey *auth.SigningKey
UserSessions *auth.UserSessionStore
// Session token signing key — for credential exchange tokens.
// Derived from session_secret with a different domain string.
SessionTokenKey *auth.SigningKey
// Vault — nil when [vault] is not configured.
VaultClient *integration.Client
VaultTokenCache *integration.VaultTokenCache
// Board-storage provisioner — non-nil only when
// database.board_storage is enabled. Drives first-login
// provisioning from the OIDC callback and deactivation/reactivation
// from the admin update-user endpoint.
BoardStorage *boardstorage.Provisioner
// VaultTokenHealthy reports whether the vault token is valid.
// Non-nil only when AppRole auth is used (token renewal active).
VaultTokenHealthy func() bool
// Redis client — nil when [redis] is not configured.
RedisClient *redisstate.Client
// Audit log — nil when [audit] is not configured.
AuditLog *audit.Log
// System checks — populated during startup, used by the system page
// and API endpoints. Nil until Init is called.
Checker *preflight.Checker
// Package store — nil when not available (no builds yet).
PkgStore *pkgstore.Store
// HMAC key for worker tokens. Persisted via vault or file-based
// fallback so both servers verify the same tokens during a rolling
// update. Independent of SessionSecret and OIDC.
WorkerTokenKey *auth.SigningKey
// Draining is set when the server enters drain mode (SIGUSR1) or
// shutdown (SIGTERM). Health endpoints return 503 when set.
Draining atomic.Bool
// Passive is true when BLOCKYARD_PASSIVE=1 is set. Background
// goroutines are deferred until POST /api/v1/admin/activate.
Passive atomic.Bool
// Bootstrap token state — one-time token that can be exchanged for
// a real PAT via POST /api/v1/bootstrap. Hash is set at startup;
// Redeemed is flipped to true on first successful exchange.
BootstrapTokenHash []byte
BootstrapRedeemed atomic.Bool
// Version is the server version string, set at build time.
Version string
// UpdateStatus holds the most recent CheckLatest result. Nil
// until the first check runs. The richer State enum lets readers
// distinguish "update available" from "ahead of latest" / "dev
// build" / "couldn't reach GitHub" / etc.
UpdateStatus atomic.Pointer[update.Result]
// UpdateLastChecked records the wall-clock time of the most recent
// update check (scheduled or manual). Nil until the first check runs.
UpdateLastChecked atomic.Pointer[time.Time]
// RestoreWG is used in tests to wait for background restore goroutines
// to complete before cleanup. Nil in production.
RestoreWG *sync.WaitGroup
// Hooks for operations that would cause import cycles if called
// directly from server. Set during initialization in main().
EvictWorkerFn func(ctx context.Context, srv *Server, workerID, reason string)
SpawnLogCaptureFn func(ctx context.Context, srv *Server, workerID, appID string)
// contains filtered or unexported fields
}
Server holds all shared state for the running server. Passed by pointer to API handlers, proxy, and background goroutines.
func NewServer ¶
NewServer creates a Server with all in-memory stores initialized. The returned Server owns a fresh per-instance telemetry.Metrics registered against a private prometheus registry, so tests that construct a Server never contend on a shared counter state. Production callers should replace Server.Metrics with one registered against prometheus.DefaultRegisterer so the /metrics HTTP handler can scrape it — see NewServerWithDefaultMetrics.
func NewServerWithDefaultMetrics ¶ added in v0.0.3
NewServerWithDefaultMetrics is the production constructor. It creates a Server whose metrics are registered with prometheus.DefaultRegisterer so the promhttp /metrics endpoint can scrape them.
func (*Server) AuthDeps ¶ added in v0.0.2
AuthDeps returns an auth.Deps populated from this server's fields. Used by the router to wire auth handlers and middleware without a circular import.
func (*Server) BundlePaths ¶ added in v0.0.3
BundlePaths returns the filesystem paths for a bundle.
func (*Server) CancelTokenRefresher ¶ added in v0.0.3
CancelTokenRefresher calls and removes the cancel function for a worker. No-op if the worker has no registered cancel function.
func (*Server) CleanupInstallMu ¶ added in v0.0.3
CleanupInstallMu removes the per-worker mutex (called on eviction).
func (*Server) ClearTransferring ¶ added in v0.0.3
ClearTransferring removes the transfer-in-progress flag for a worker.
func (*Server) GetVersion ¶ added in v0.0.3
GetVersion returns the running server version.
func (*Server) InstallPackage ¶ added in v0.0.3
func (srv *Server) InstallPackage( ctx context.Context, appID, workerID string, req PackageRequest, ) (PackageResponse, error)
InstallPackage is the core orchestration for runtime package installation. Uses the same four-phase store-aware flow as the build pipeline, differing in resolution context: the worker's existing /lib is a reference library so pak sees what's installed.
func (*Server) InternalAPIURL ¶ added in v0.0.3
InternalAPIURL returns the URL workers should use to reach this server.
func (*Server) IsTransferring ¶ added in v0.0.3
IsTransferring returns true if a container transfer is in progress for the given worker.
func (*Server) RunRefresh ¶ added in v0.0.3
func (srv *Server) RunRefresh( ctx context.Context, app *db.AppRow, m *manifest.Manifest, sender task.Sender, ) bool
RunRefresh re-resolves dependencies for an unpinned deployment. Returns true if a new worker was spawned (dependencies changed).
func (*Server) RunRefreshScheduler ¶ added in v0.0.3
RunRefreshScheduler checks active apps with a refresh_schedule and triggers refresh at the configured times. Blocks until ctx is cancelled.
func (*Server) RunRollback ¶ added in v0.0.3
func (srv *Server) RunRollback( ctx context.Context, app *db.AppRow, target string, sender task.Sender, )
RunRollback performs a rollback to either the previous refresh or the original build, then drains and replaces workers.
func (*Server) SetCancelToken ¶ added in v0.0.3
SetCancelToken registers a cancel function for a worker's token refresher.
func (*Server) SetTransferring ¶ added in v0.0.3
SetTransferring marks a worker as having a transfer in progress.
func (*Server) SetUpdateStatus ¶ added in v0.0.3
SetUpdateStatus records the latest update check result and stamps UpdateLastChecked with the current time. Satisfies the update.UpdateStore interface so the periodic checker and the UI's manual-refresh handler can both feed results in.
func (*Server) TransferDir ¶ added in v0.0.3
TransferDir returns the host-side transfer directory for a worker.
func (*Server) UpdateAvailableVersion ¶ added in v0.0.3
UpdateAvailableVersion returns the upstream version string when an update is recommended, or "" otherwise. Convenience for consumers (preflight, readyz) that previously only cared about the boolean.
type WorkerMap ¶
type WorkerMap interface {
Get(id string) (ActiveWorker, bool)
Set(id string, w ActiveWorker)
Delete(id string)
Count() int
CountForApp(appID string) int
All() []string
ForApp(appID string) []string
ForAppAvailable(appID string) []string
MarkDraining(appID string) []string
SetDraining(workerID string)
SetIdleSince(workerID string, t time.Time)
SetIdleSinceIfZero(workerID string, t time.Time)
ClearIdleSince(workerID string) bool
IdleWorkers(timeout time.Duration) []string
AppIDs() []string
IsDraining(appID string) bool
ClearDraining(workerID string)
// WorkersForServer returns the worker IDs owned by the given
// server_id. Used by Drainer.waitForIdle during a same-host
// rolling update so the old server waits for its own sessions
// to end, not the new server's fresh sessions. In the memory
// implementation this is equivalent to All() — single-node
// deployments have one server, so every worker belongs to it.
// The Redis implementation filters by the server_id hash field
// phase 3-3 already writes.
WorkersForServer(serverID string) []string
}
WorkerMap defines the contract for the active worker map. MemoryWorkerMap is the in-process implementation; Redis implements the same interface for shared state during rolling updates.
type WsConnCounter ¶ added in v0.0.4
type WsConnCounter struct {
// contains filtered or unexported fields
}
WsConnCounter tracks active WebSocket connections per worker.
Used to enforce max_sessions_per_worker at WebSocket handshake time and to inform load-balancer assignment and idle detection. A session in blockyard is one active Shiny WebSocket — one browser tab — not one browser cookie, so this counter is what "max sessions" gates against. The session.Store continues to track cookie-level worker pinning for HTTP routing stickiness; it does not gate capacity.
func NewWsConnCounter ¶ added in v0.0.4
func NewWsConnCounter() *WsConnCounter
func (*WsConnCounter) Count ¶ added in v0.0.4
func (c *WsConnCounter) Count(workerID string) int
Count returns the current active WS count for workerID.
func (*WsConnCounter) CountForWorkers ¶ added in v0.0.4
func (c *WsConnCounter) CountForWorkers(ids []string) int
CountForWorkers sums active WS counts across the given worker IDs.
func (*WsConnCounter) Dec ¶ added in v0.0.4
func (c *WsConnCounter) Dec(workerID string)
Dec decrements the count for workerID. Idempotent at zero.
func (*WsConnCounter) DeleteWorker ¶ added in v0.0.4
func (c *WsConnCounter) DeleteWorker(workerID string)
DeleteWorker drops the entry for workerID. Called on eviction so stale counts don't linger against a vanished worker.
func (*WsConnCounter) TryInc ¶ added in v0.0.4
func (c *WsConnCounter) TryInc(workerID string, max int) bool
TryInc atomically increments the count for workerID if it is strictly below max. Returns true on success, false if the worker is already at capacity. A single mutex-protected check-and-increment avoids the race where two concurrent handshakes both observe count < max.