Documentation
¶
Index ¶
- func AppImage(app *db.AppRow, serverDefault string) string
- func AppRuntime(app *db.AppRow, cfg config.DockerConfig) string
- func BaseWorkerSpec(srv *Server, app *db.AppRow, workerID, bundleID string) backend.WorkerSpec
- func CleanupTokenDir(bundleServerPath, workerID string)
- func EnsureRProfile(dir string) (string, error)
- func LoadOrCreateWorkerKey(ctx context.Context, vaultClient *integration.Client, cfg *config.Config) (*auth.SigningKey, error)
- func SpawnTokenRefresher(ctx context.Context, bundleServerPath string, signingKey *auth.SigningKey, ...) (tokDir string, cancel func(), err error)
- func WorkerEnv(srv *Server) map[string]string
- type ActiveWorker
- type MemoryWorkerMap
- func (m *MemoryWorkerMap) All() []string
- func (m *MemoryWorkerMap) AppIDs() []string
- func (m *MemoryWorkerMap) ClearDraining(workerID string)
- func (m *MemoryWorkerMap) ClearIdleSince(workerID string) bool
- func (m *MemoryWorkerMap) Count() int
- func (m *MemoryWorkerMap) CountForApp(appID string) int
- func (m *MemoryWorkerMap) Delete(id string)
- func (m *MemoryWorkerMap) ForApp(appID string) []string
- func (m *MemoryWorkerMap) ForAppAvailable(appID string) []string
- func (m *MemoryWorkerMap) Get(id string) (ActiveWorker, bool)
- func (m *MemoryWorkerMap) IdleWorkers(timeout time.Duration) []string
- func (m *MemoryWorkerMap) IsDraining(appID string) bool
- func (m *MemoryWorkerMap) MarkDraining(appID string) []string
- func (m *MemoryWorkerMap) Set(id string, w ActiveWorker)
- func (m *MemoryWorkerMap) SetDraining(workerID string)
- func (m *MemoryWorkerMap) SetIdleSince(workerID string, t time.Time)
- func (m *MemoryWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
- func (m *MemoryWorkerMap) WorkersForServer(_ string) []string
- type PackageRequest
- type PackageResponse
- type RedisWorkerMap
- func (m *RedisWorkerMap) All() []string
- func (m *RedisWorkerMap) AppIDs() []string
- func (m *RedisWorkerMap) ClearDraining(workerID string)
- func (m *RedisWorkerMap) ClearIdleSince(workerID string) bool
- func (m *RedisWorkerMap) Count() int
- func (m *RedisWorkerMap) CountForApp(appID string) int
- func (m *RedisWorkerMap) Delete(id string)
- func (m *RedisWorkerMap) ForApp(appID string) []string
- func (m *RedisWorkerMap) ForAppAvailable(appID string) []string
- func (m *RedisWorkerMap) Get(id string) (ActiveWorker, bool)
- func (m *RedisWorkerMap) IdleWorkers(timeout time.Duration) []string
- func (m *RedisWorkerMap) IsDraining(appID string) bool
- func (m *RedisWorkerMap) MarkDraining(appID string) []string
- func (m *RedisWorkerMap) Set(id string, w ActiveWorker)
- func (m *RedisWorkerMap) SetDraining(workerID string)
- func (m *RedisWorkerMap) SetIdleSince(workerID string, t time.Time)
- func (m *RedisWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
- func (m *RedisWorkerMap) WorkersForServer(serverID string) []string
- type Server
- func (srv *Server) AuthDeps() *auth.Deps
- func (srv *Server) BundlePaths(appID, bundleID string) bundle.Paths
- func (srv *Server) CancelTokenRefresher(workerID string)
- func (srv *Server) CleanupInstallMu(workerID string)
- func (srv *Server) ClearTransferring(workerID string)
- func (srv *Server) GetVersion() string
- func (srv *Server) InstallPackage(ctx context.Context, appID, workerID string, req PackageRequest) (PackageResponse, error)
- func (srv *Server) InternalAPIURL() string
- func (srv *Server) IsTransferring(workerID string) bool
- func (srv *Server) RunRefresh(ctx context.Context, app *db.AppRow, m *manifest.Manifest, sender task.Sender) bool
- func (srv *Server) RunRefreshScheduler(ctx context.Context)
- func (srv *Server) RunRollback(ctx context.Context, app *db.AppRow, target string, sender task.Sender)
- func (srv *Server) SetCancelToken(workerID string, cancel func())
- func (srv *Server) SetTransferring(workerID string)
- func (srv *Server) SetUpdateStatus(r *update.Result)
- func (srv *Server) TransferDir(workerID string) string
- func (srv *Server) UpdateAvailableVersion() string
- type WorkerMap
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AppImage ¶ added in v0.0.3
AppImage returns the per-app image override, or the server-wide default.
func AppRuntime ¶ added in v0.0.3
func AppRuntime(app *db.AppRow, cfg config.DockerConfig) string
AppRuntime returns the effective OCI runtime for an app. Fallback chain: app.Runtime → config.RuntimeDefaults[accessType] → config.Runtime.
func BaseWorkerSpec ¶ added in v0.0.3
BaseWorkerSpec returns a WorkerSpec with all fields that are common across spawn sites (coldstart, transfer, API scale-up). Callers fill in site-specific fields like LibDir, TransferDir, TokenDir, MemoryLimit, and CPULimit.
func CleanupTokenDir ¶ added in v0.0.3
func CleanupTokenDir(bundleServerPath, workerID string)
CleanupTokenDir removes the token directory for a worker.
func EnsureRProfile ¶ added in v0.0.3
EnsureRProfile writes the blockyard R profile to dir and returns the path. The file bridges SHINY_HOST and SHINY_PORT env vars to the corresponding R options so bundles don't have to. Safe to call from multiple goroutines; the file is written once.
func LoadOrCreateWorkerKey ¶ added in v0.0.3
func LoadOrCreateWorkerKey( ctx context.Context, vaultClient *integration.Client, cfg *config.Config, ) (*auth.SigningKey, error)
LoadOrCreateWorkerKey resolves the worker signing key. It tries three sources in order:
- OpenBao (if configured) -- read or generate + store
- File ({bundle_server_path}/.worker-key) -- read existing
- Generate new + write to file
This ensures both the old and new server use the same key during a rolling update. When OpenBao is not available, the file path provides persistence across restarts.
func SpawnTokenRefresher ¶ added in v0.0.3
func SpawnTokenRefresher( ctx context.Context, bundleServerPath string, signingKey *auth.SigningKey, appID, workerID string, ) (tokDir string, cancel func(), err error)
SpawnTokenRefresher starts a goroutine that refreshes the worker's token file every TTL/2. Returns the token directory and a cancel function. The goroutine writes an initial token synchronously before returning, so the token file is ready before the container starts.
Types ¶
type ActiveWorker ¶
type ActiveWorker struct {
AppID string
BundleID string // bundle active at spawn time; runtime installs resolve against this
Draining bool // set by graceful drain; no new sessions routed
IdleSince time.Time // zero value = not idle; set when session count hits 0
StartedAt time.Time // when the worker was spawned
}
ActiveWorker represents a running worker tracked by the server. The worker ID is the map key in WorkerMap, not stored here.
type MemoryWorkerMap ¶ added in v0.0.3
type MemoryWorkerMap struct {
// contains filtered or unexported fields
}
MemoryWorkerMap is a concurrent in-memory map of worker ID → ActiveWorker.
func NewMemoryWorkerMap ¶ added in v0.0.3
func NewMemoryWorkerMap() *MemoryWorkerMap
func (*MemoryWorkerMap) All ¶ added in v0.0.3
func (m *MemoryWorkerMap) All() []string
All returns a snapshot of all worker IDs.
func (*MemoryWorkerMap) AppIDs ¶ added in v0.0.3
func (m *MemoryWorkerMap) AppIDs() []string
AppIDs returns a deduplicated list of app IDs that have active workers.
func (*MemoryWorkerMap) ClearDraining ¶ added in v0.0.3
func (m *MemoryWorkerMap) ClearDraining(workerID string)
ClearDraining clears the draining flag on a single worker by ID.
func (*MemoryWorkerMap) ClearIdleSince ¶ added in v0.0.3
func (m *MemoryWorkerMap) ClearIdleSince(workerID string) bool
ClearIdleSince resets the idle timer (a new session was assigned). Returns true if the worker was idle before clearing.
func (*MemoryWorkerMap) Count ¶ added in v0.0.3
func (m *MemoryWorkerMap) Count() int
func (*MemoryWorkerMap) CountForApp ¶ added in v0.0.3
func (m *MemoryWorkerMap) CountForApp(appID string) int
func (*MemoryWorkerMap) Delete ¶ added in v0.0.3
func (m *MemoryWorkerMap) Delete(id string)
func (*MemoryWorkerMap) ForApp ¶ added in v0.0.3
func (m *MemoryWorkerMap) ForApp(appID string) []string
ForApp returns all worker IDs for a given app (including draining).
func (*MemoryWorkerMap) ForAppAvailable ¶ added in v0.0.3
func (m *MemoryWorkerMap) ForAppAvailable(appID string) []string
ForAppAvailable returns worker IDs for an app that are not draining.
func (*MemoryWorkerMap) Get ¶ added in v0.0.3
func (m *MemoryWorkerMap) Get(id string) (ActiveWorker, bool)
func (*MemoryWorkerMap) IdleWorkers ¶ added in v0.0.3
func (m *MemoryWorkerMap) IdleWorkers(timeout time.Duration) []string
IdleWorkers returns workers that have been idle longer than the given timeout, excluding draining workers (they have their own lifecycle).
func (*MemoryWorkerMap) IsDraining ¶ added in v0.0.3
func (m *MemoryWorkerMap) IsDraining(appID string) bool
IsDraining returns true if any worker for the given app is draining.
func (*MemoryWorkerMap) MarkDraining ¶ added in v0.0.3
func (m *MemoryWorkerMap) MarkDraining(appID string) []string
MarkDraining sets the draining flag on all workers for an app. Returns the list of affected worker IDs.
func (*MemoryWorkerMap) Set ¶ added in v0.0.3
func (m *MemoryWorkerMap) Set(id string, w ActiveWorker)
func (*MemoryWorkerMap) SetDraining ¶ added in v0.0.3
func (m *MemoryWorkerMap) SetDraining(workerID string)
SetDraining sets the draining flag on a single worker by ID. Used by refresh drain-and-replace to drain specific old workers while keeping newly spawned workers available.
func (*MemoryWorkerMap) SetIdleSince ¶ added in v0.0.3
func (m *MemoryWorkerMap) SetIdleSince(workerID string, t time.Time)
SetIdleSince marks when a worker became idle (zero sessions). Called when the last session for a worker is removed.
func (*MemoryWorkerMap) SetIdleSinceIfZero ¶ added in v0.0.3
func (m *MemoryWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
SetIdleSinceIfZero marks when a worker became idle, but only if it isn't already marked. This avoids resetting the timer on repeated ticks while the worker remains idle.
func (*MemoryWorkerMap) WorkersForServer ¶ added in v0.0.3
func (m *MemoryWorkerMap) WorkersForServer(_ string) []string
WorkersForServer returns all worker IDs. In the memory implementation every worker belongs to "this" server — single-node deployments have one process — so the serverID filter is a no-op. The Redis variant does the real filtering.
type PackageRequest ¶ added in v0.0.3
type PackageRequest struct {
Name string `json:"name"` // package name or pkgdepends ref
LoadedNamespaces []string `json:"loaded_namespaces"` // from loadedNamespaces() in R
}
PackageRequest is the body of POST /api/v1/packages.
type PackageResponse ¶ added in v0.0.3
type PackageResponse struct {
Status string `json:"status"` // "ok", "transfer", "error"
Message string `json:"message,omitempty"`
TransferPath string `json:"transfer_path,omitempty"` // set when status == "transfer"
}
PackageResponse is returned by POST /api/v1/packages.
type RedisWorkerMap ¶ added in v0.0.3
type RedisWorkerMap struct {
// contains filtered or unexported fields
}
RedisWorkerMap implements WorkerMap using Redis hashes.
Key schema:
{prefix}worker:{workerID} -> hash {app_id, bundle_id, draining, idle_since, started_at, server_id}
No TTL — workers are explicitly deleted on eviction.
func NewRedisWorkerMap ¶ added in v0.0.3
func NewRedisWorkerMap(client *redisstate.Client, serverID string) *RedisWorkerMap
func (*RedisWorkerMap) All ¶ added in v0.0.3
func (m *RedisWorkerMap) All() []string
func (*RedisWorkerMap) AppIDs ¶ added in v0.0.3
func (m *RedisWorkerMap) AppIDs() []string
func (*RedisWorkerMap) ClearDraining ¶ added in v0.0.3
func (m *RedisWorkerMap) ClearDraining(workerID string)
func (*RedisWorkerMap) ClearIdleSince ¶ added in v0.0.3
func (m *RedisWorkerMap) ClearIdleSince(workerID string) bool
func (*RedisWorkerMap) Count ¶ added in v0.0.3
func (m *RedisWorkerMap) Count() int
func (*RedisWorkerMap) CountForApp ¶ added in v0.0.3
func (m *RedisWorkerMap) CountForApp(appID string) int
func (*RedisWorkerMap) Delete ¶ added in v0.0.3
func (m *RedisWorkerMap) Delete(id string)
func (*RedisWorkerMap) ForApp ¶ added in v0.0.3
func (m *RedisWorkerMap) ForApp(appID string) []string
func (*RedisWorkerMap) ForAppAvailable ¶ added in v0.0.3
func (m *RedisWorkerMap) ForAppAvailable(appID string) []string
func (*RedisWorkerMap) Get ¶ added in v0.0.3
func (m *RedisWorkerMap) Get(id string) (ActiveWorker, bool)
func (*RedisWorkerMap) IdleWorkers ¶ added in v0.0.3
func (m *RedisWorkerMap) IdleWorkers(timeout time.Duration) []string
func (*RedisWorkerMap) IsDraining ¶ added in v0.0.3
func (m *RedisWorkerMap) IsDraining(appID string) bool
func (*RedisWorkerMap) MarkDraining ¶ added in v0.0.3
func (m *RedisWorkerMap) MarkDraining(appID string) []string
func (*RedisWorkerMap) Set ¶ added in v0.0.3
func (m *RedisWorkerMap) Set(id string, w ActiveWorker)
func (*RedisWorkerMap) SetDraining ¶ added in v0.0.3
func (m *RedisWorkerMap) SetDraining(workerID string)
func (*RedisWorkerMap) SetIdleSince ¶ added in v0.0.3
func (m *RedisWorkerMap) SetIdleSince(workerID string, t time.Time)
func (*RedisWorkerMap) SetIdleSinceIfZero ¶ added in v0.0.3
func (m *RedisWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)
func (*RedisWorkerMap) WorkersForServer ¶ added in v0.0.3
func (m *RedisWorkerMap) WorkersForServer(serverID string) []string
WorkersForServer returns worker IDs whose server_id hash field matches the given serverID. Pattern lifted from ForApp — scan the worker key namespace, pipeline-HGET the server_id field, filter. Same shape as the other server-scoped filters, just on a different hash field.
type Server ¶
type Server struct {
Config *config.Config
Backend backend.Backend
DB *db.DB
Workers WorkerMap
Sessions session.Store
Registry registry.WorkerRegistry
Tasks *task.Store
LogStore *logstore.Store
ErrorLog *errorlog.Store
Metrics *telemetry.Metrics
// Auth fields — nil when [oidc] is not configured (v0 compat).
OIDCClient *auth.OIDCClient
SigningKey *auth.SigningKey
UserSessions *auth.UserSessionStore
// Session token signing key — for credential exchange tokens.
// Derived from session_secret with a different domain string.
SessionTokenKey *auth.SigningKey
// OpenBao — nil when [openbao] is not configured.
VaultClient *integration.Client
VaultTokenCache *integration.VaultTokenCache
// VaultTokenHealthy reports whether the vault token is valid.
// Non-nil only when AppRole auth is used (token renewal active).
VaultTokenHealthy func() bool
// Redis client — nil when [redis] is not configured.
RedisClient *redisstate.Client
// Audit log — nil when [audit] is not configured.
AuditLog *audit.Log
// System checks — populated during startup, used by the system page
// and API endpoints. Nil until Init is called.
Checker *preflight.Checker
// Package store — nil when not available (no builds yet).
PkgStore *pkgstore.Store
// HMAC key for worker tokens. Persisted via OpenBao or file-based
// fallback so both servers verify the same tokens during a rolling
// update. Independent of SessionSecret and OIDC.
WorkerTokenKey *auth.SigningKey
// Draining is set when the server enters drain mode (SIGUSR1) or
// shutdown (SIGTERM). Health endpoints return 503 when set.
Draining atomic.Bool
// Passive is true when BLOCKYARD_PASSIVE=1 is set. Background
// goroutines are deferred until POST /api/v1/admin/activate.
Passive atomic.Bool
// Bootstrap token state — one-time token that can be exchanged for
// a real PAT via POST /api/v1/bootstrap. Hash is set at startup;
// Redeemed is flipped to true on first successful exchange.
BootstrapTokenHash []byte
BootstrapRedeemed atomic.Bool
// Version is the server version string, set at build time.
Version string
// UpdateStatus holds the most recent CheckLatest result. Nil
// until the first check runs. The richer State enum lets readers
// distinguish "update available" from "ahead of latest" / "dev
// build" / "couldn't reach GitHub" / etc.
UpdateStatus atomic.Pointer[update.Result]
// UpdateLastChecked records the wall-clock time of the most recent
// update check (scheduled or manual). Nil until the first check runs.
UpdateLastChecked atomic.Pointer[time.Time]
// RestoreWG is used in tests to wait for background restore goroutines
// to complete before cleanup. Nil in production.
RestoreWG *sync.WaitGroup
// Hooks for operations that would cause import cycles if called
// directly from server. Set during initialization in main().
EvictWorkerFn func(ctx context.Context, srv *Server, workerID, reason string)
SpawnLogCaptureFn func(ctx context.Context, srv *Server, workerID, appID string)
// contains filtered or unexported fields
}
Server holds all shared state for the running server. Passed by pointer to API handlers, proxy, and background goroutines.
func NewServer ¶
NewServer creates a Server with all in-memory stores initialized. The returned Server owns a fresh per-instance telemetry.Metrics registered against a private prometheus registry, so tests that construct a Server never contend on a shared counter state. Production callers should replace [Server.Metrics] with one registered against prometheus.DefaultRegisterer so the /metrics HTTP handler can scrape it — see NewServerWithDefaultMetrics.
func NewServerWithDefaultMetrics ¶ added in v0.0.3
NewServerWithDefaultMetrics is the production constructor. It creates a Server whose metrics are registered with prometheus.DefaultRegisterer so the promhttp /metrics endpoint can scrape them.
func (*Server) AuthDeps ¶ added in v0.0.2
AuthDeps returns an auth.Deps populated from this server's fields. Used by the router to wire auth handlers and middleware without a circular import.
func (*Server) BundlePaths ¶ added in v0.0.3
BundlePaths returns the filesystem paths for a bundle.
func (*Server) CancelTokenRefresher ¶ added in v0.0.3
CancelTokenRefresher calls and removes the cancel function for a worker. No-op if the worker has no registered cancel function.
func (*Server) CleanupInstallMu ¶ added in v0.0.3
CleanupInstallMu removes the per-worker mutex (called on eviction).
func (*Server) ClearTransferring ¶ added in v0.0.3
ClearTransferring removes the transfer-in-progress flag for a worker.
func (*Server) GetVersion ¶ added in v0.0.3
GetVersion returns the running server version.
func (*Server) InstallPackage ¶ added in v0.0.3
func (srv *Server) InstallPackage( ctx context.Context, appID, workerID string, req PackageRequest, ) (PackageResponse, error)
InstallPackage is the core orchestration for runtime package installation. Uses the same four-phase store-aware flow as the build pipeline, differing in resolution context: the worker's existing /lib is a reference library so pak sees what's installed.
func (*Server) InternalAPIURL ¶ added in v0.0.3
InternalAPIURL returns the URL workers should use to reach this server.
func (*Server) IsTransferring ¶ added in v0.0.3
IsTransferring returns true if a container transfer is in progress for the given worker.
func (*Server) RunRefresh ¶ added in v0.0.3
func (srv *Server) RunRefresh( ctx context.Context, app *db.AppRow, m *manifest.Manifest, sender task.Sender, ) bool
RunRefresh re-resolves dependencies for an unpinned deployment. Returns true if a new worker was spawned (dependencies changed).
func (*Server) RunRefreshScheduler ¶ added in v0.0.3
RunRefreshScheduler checks active apps with a refresh_schedule and triggers refresh at the configured times. Blocks until ctx is cancelled.
func (*Server) RunRollback ¶ added in v0.0.3
func (srv *Server) RunRollback( ctx context.Context, app *db.AppRow, target string, sender task.Sender, )
RunRollback performs a rollback to either the previous refresh or the original build, then drains and replaces workers.
func (*Server) SetCancelToken ¶ added in v0.0.3
SetCancelToken registers a cancel function for a worker's token refresher.
func (*Server) SetTransferring ¶ added in v0.0.3
SetTransferring marks a worker as having a transfer in progress.
func (*Server) SetUpdateStatus ¶ added in v0.0.3
SetUpdateStatus records the latest update check result and stamps UpdateLastChecked with the current time. Satisfies the update.UpdateStore interface so the periodic checker and the UI's manual-refresh handler can both feed results in.
func (*Server) TransferDir ¶ added in v0.0.3
TransferDir returns the host-side transfer directory for a worker.
func (*Server) UpdateAvailableVersion ¶ added in v0.0.3
UpdateAvailableVersion returns the upstream version string when an update is recommended, or "" otherwise. Convenience for consumers (preflight, readyz) that previously only cared about the boolean.
type WorkerMap ¶
type WorkerMap interface {
Get(id string) (ActiveWorker, bool)
Set(id string, w ActiveWorker)
Delete(id string)
Count() int
CountForApp(appID string) int
All() []string
ForApp(appID string) []string
ForAppAvailable(appID string) []string
MarkDraining(appID string) []string
SetDraining(workerID string)
SetIdleSince(workerID string, t time.Time)
SetIdleSinceIfZero(workerID string, t time.Time)
ClearIdleSince(workerID string) bool
IdleWorkers(timeout time.Duration) []string
AppIDs() []string
IsDraining(appID string) bool
ClearDraining(workerID string)
// WorkersForServer returns the worker IDs owned by the given
// server_id. Used by Drainer.waitForIdle during a same-host
// rolling update so the old server waits for its own sessions
// to end, not the new server's fresh sessions. In the memory
// implementation this is equivalent to All() — single-node
// deployments have one server, so every worker belongs to it.
// The Redis implementation filters by the server_id hash field
// phase 3-3 already writes.
WorkersForServer(serverID string) []string
}
WorkerMap defines the contract for the active worker map. MemoryWorkerMap is the in-process implementation; Redis implements the same interface for shared state during rolling updates.