server

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: GPL-3.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AppImage added in v0.0.3

func AppImage(app *db.AppRow, serverDefault string) string

AppImage returns the per-app image override, or the server-wide default.

func AppRuntime added in v0.0.3

func AppRuntime(app *db.AppRow, cfg config.DockerConfig) string

AppRuntime returns the effective OCI runtime for an app. Fallback chain: app.Runtime → config.RuntimeDefaults[accessType] → config.Runtime.

func BaseWorkerSpec added in v0.0.3

func BaseWorkerSpec(srv *Server, app *db.AppRow, workerID, bundleID string) backend.WorkerSpec

BaseWorkerSpec returns a WorkerSpec with all fields that are common across spawn sites (coldstart, transfer, API scale-up). Callers fill in site-specific fields like LibDir, TransferDir, TokenDir, MemoryLimit, and CPULimit.

func CleanupTokenDir added in v0.0.3

func CleanupTokenDir(bundleServerPath, workerID string)

CleanupTokenDir removes the token directory for a worker.

func EnsureRProfile added in v0.0.3

func EnsureRProfile(dir string) (string, error)

EnsureRProfile writes the blockyard R profile to dir and returns the path. The file bridges SHINY_HOST and SHINY_PORT env vars to the corresponding R options so bundles don't have to. Safe to call from multiple goroutines; the file is written once.

func LoadOrCreateWorkerKey added in v0.0.3

func LoadOrCreateWorkerKey(
	ctx context.Context,
	vaultClient *integration.Client,
	cfg *config.Config,
) (*auth.SigningKey, error)

LoadOrCreateWorkerKey resolves the worker signing key. It tries three sources in order:

  1. OpenBao (if configured) -- read or generate + store
  2. File ({bundle_server_path}/.worker-key) -- read existing
  3. Generate new + write to file

This ensures both the old and new server use the same key during a rolling update. When OpenBao is not available, the file path provides persistence across restarts.

func SpawnTokenRefresher added in v0.0.3

func SpawnTokenRefresher(
	ctx context.Context,
	bundleServerPath string,
	signingKey *auth.SigningKey,
	appID, workerID string,
) (tokDir string, cancel func(), err error)

SpawnTokenRefresher starts a goroutine that refreshes the worker's token file every TTL/2. Returns the token directory and a cancel function. The goroutine writes an initial token synchronously before returning, so the token file is ready before the container starts.

func WorkerEnv added in v0.0.3

func WorkerEnv(srv *Server) map[string]string

WorkerEnv builds the environment variable map for worker containers. Always sets BLOCKYARD_API_URL (needed for runtime package installs). Includes Vault/OpenBao integration vars when configured. Sets SHINY_HOST per backend so bundles don't have to.

Types

type ActiveWorker

type ActiveWorker struct {
	AppID     string
	BundleID  string    // bundle active at spawn time; runtime installs resolve against this
	Draining  bool      // set by graceful drain; no new sessions routed
	IdleSince time.Time // zero value = not idle; set when session count hits 0
	StartedAt time.Time // when the worker was spawned
}

ActiveWorker represents a running worker tracked by the server. The worker ID is the map key in WorkerMap, not stored here.

type MemoryWorkerMap added in v0.0.3

type MemoryWorkerMap struct {
	// contains filtered or unexported fields
}

MemoryWorkerMap is a concurrent in-memory map of worker ID → ActiveWorker.

func NewMemoryWorkerMap added in v0.0.3

func NewMemoryWorkerMap() *MemoryWorkerMap

func (*MemoryWorkerMap) All added in v0.0.3

func (m *MemoryWorkerMap) All() []string

All returns a snapshot of all worker IDs.

func (*MemoryWorkerMap) AppIDs added in v0.0.3

func (m *MemoryWorkerMap) AppIDs() []string

AppIDs returns a deduplicated list of app IDs that have active workers.

func (*MemoryWorkerMap) ClearDraining added in v0.0.3

func (m *MemoryWorkerMap) ClearDraining(workerID string)

ClearDraining clears the draining flag on a single worker by ID.

func (*MemoryWorkerMap) ClearIdleSince added in v0.0.3

func (m *MemoryWorkerMap) ClearIdleSince(workerID string) bool

ClearIdleSince resets the idle timer (a new session was assigned). Returns true if the worker was idle before clearing.

func (*MemoryWorkerMap) Count added in v0.0.3

func (m *MemoryWorkerMap) Count() int

func (*MemoryWorkerMap) CountForApp added in v0.0.3

func (m *MemoryWorkerMap) CountForApp(appID string) int

func (*MemoryWorkerMap) Delete added in v0.0.3

func (m *MemoryWorkerMap) Delete(id string)

func (*MemoryWorkerMap) ForApp added in v0.0.3

func (m *MemoryWorkerMap) ForApp(appID string) []string

ForApp returns all worker IDs for a given app (including draining).

func (*MemoryWorkerMap) ForAppAvailable added in v0.0.3

func (m *MemoryWorkerMap) ForAppAvailable(appID string) []string

ForAppAvailable returns worker IDs for an app that are not draining.

func (*MemoryWorkerMap) Get added in v0.0.3

func (m *MemoryWorkerMap) Get(id string) (ActiveWorker, bool)

func (*MemoryWorkerMap) IdleWorkers added in v0.0.3

func (m *MemoryWorkerMap) IdleWorkers(timeout time.Duration) []string

IdleWorkers returns workers that have been idle longer than the given timeout, excluding draining workers (they have their own lifecycle).

func (*MemoryWorkerMap) IsDraining added in v0.0.3

func (m *MemoryWorkerMap) IsDraining(appID string) bool

IsDraining returns true if any worker for the given app is draining.

func (*MemoryWorkerMap) MarkDraining added in v0.0.3

func (m *MemoryWorkerMap) MarkDraining(appID string) []string

MarkDraining sets the draining flag on all workers for an app. Returns the list of affected worker IDs.

func (*MemoryWorkerMap) Set added in v0.0.3

func (m *MemoryWorkerMap) Set(id string, w ActiveWorker)

func (*MemoryWorkerMap) SetDraining added in v0.0.3

func (m *MemoryWorkerMap) SetDraining(workerID string)

SetDraining sets the draining flag on a single worker by ID. Used by refresh drain-and-replace to drain specific old workers while keeping newly spawned workers available.

func (*MemoryWorkerMap) SetIdleSince added in v0.0.3

func (m *MemoryWorkerMap) SetIdleSince(workerID string, t time.Time)

SetIdleSince marks when a worker became idle (zero sessions). Called when the last session for a worker is removed.

func (*MemoryWorkerMap) SetIdleSinceIfZero added in v0.0.3

func (m *MemoryWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)

SetIdleSinceIfZero marks when a worker became idle, but only if it isn't already marked. This avoids resetting the timer on repeated ticks while the worker remains idle.

func (*MemoryWorkerMap) WorkersForServer added in v0.0.3

func (m *MemoryWorkerMap) WorkersForServer(_ string) []string

WorkersForServer returns all worker IDs. In the memory implementation every worker belongs to "this" server — single-node deployments have one process — so the serverID filter is a no-op. The Redis variant does the real filtering.

type PackageRequest added in v0.0.3

type PackageRequest struct {
	Name             string   `json:"name"`              // package name or pkgdepends ref
	LoadedNamespaces []string `json:"loaded_namespaces"` // from loadedNamespaces() in R
}

PackageRequest is the body of POST /api/v1/packages.

type PackageResponse added in v0.0.3

type PackageResponse struct {
	Status       string `json:"status"` // "ok", "transfer", "error"
	Message      string `json:"message,omitempty"`
	TransferPath string `json:"transfer_path,omitempty"` // set when status == "transfer"
}

PackageResponse is returned by POST /api/v1/packages.

type RedisWorkerMap added in v0.0.3

type RedisWorkerMap struct {
	// contains filtered or unexported fields
}

RedisWorkerMap implements WorkerMap using Redis hashes.

Key schema:

{prefix}worker:{workerID}  ->  hash {app_id, bundle_id, draining, idle_since, started_at, server_id}

No TTL — workers are explicitly deleted on eviction.

func NewRedisWorkerMap added in v0.0.3

func NewRedisWorkerMap(client *redisstate.Client, serverID string) *RedisWorkerMap

func (*RedisWorkerMap) All added in v0.0.3

func (m *RedisWorkerMap) All() []string

func (*RedisWorkerMap) AppIDs added in v0.0.3

func (m *RedisWorkerMap) AppIDs() []string

func (*RedisWorkerMap) ClearDraining added in v0.0.3

func (m *RedisWorkerMap) ClearDraining(workerID string)

func (*RedisWorkerMap) ClearIdleSince added in v0.0.3

func (m *RedisWorkerMap) ClearIdleSince(workerID string) bool

func (*RedisWorkerMap) Count added in v0.0.3

func (m *RedisWorkerMap) Count() int

func (*RedisWorkerMap) CountForApp added in v0.0.3

func (m *RedisWorkerMap) CountForApp(appID string) int

func (*RedisWorkerMap) Delete added in v0.0.3

func (m *RedisWorkerMap) Delete(id string)

func (*RedisWorkerMap) ForApp added in v0.0.3

func (m *RedisWorkerMap) ForApp(appID string) []string

func (*RedisWorkerMap) ForAppAvailable added in v0.0.3

func (m *RedisWorkerMap) ForAppAvailable(appID string) []string

func (*RedisWorkerMap) Get added in v0.0.3

func (m *RedisWorkerMap) Get(id string) (ActiveWorker, bool)

func (*RedisWorkerMap) IdleWorkers added in v0.0.3

func (m *RedisWorkerMap) IdleWorkers(timeout time.Duration) []string

func (*RedisWorkerMap) IsDraining added in v0.0.3

func (m *RedisWorkerMap) IsDraining(appID string) bool

func (*RedisWorkerMap) MarkDraining added in v0.0.3

func (m *RedisWorkerMap) MarkDraining(appID string) []string

func (*RedisWorkerMap) Set added in v0.0.3

func (m *RedisWorkerMap) Set(id string, w ActiveWorker)

func (*RedisWorkerMap) SetDraining added in v0.0.3

func (m *RedisWorkerMap) SetDraining(workerID string)

func (*RedisWorkerMap) SetIdleSince added in v0.0.3

func (m *RedisWorkerMap) SetIdleSince(workerID string, t time.Time)

func (*RedisWorkerMap) SetIdleSinceIfZero added in v0.0.3

func (m *RedisWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)

func (*RedisWorkerMap) WorkersForServer added in v0.0.3

func (m *RedisWorkerMap) WorkersForServer(serverID string) []string

WorkersForServer returns worker IDs whose server_id hash field matches the given serverID. Pattern lifted from ForApp — scan the worker key namespace, pipeline-HGET the server_id field, filter. Same shape as the other server-scoped filters, just on a different hash field.

type Server

type Server struct {
	Config   *config.Config
	Backend  backend.Backend
	DB       *db.DB
	Workers  WorkerMap
	Sessions session.Store
	Registry registry.WorkerRegistry
	Tasks    *task.Store
	LogStore *logstore.Store
	ErrorLog *errorlog.Store
	Metrics  *telemetry.Metrics

	// Auth fields — nil when [oidc] is not configured (v0 compat).
	OIDCClient   *auth.OIDCClient
	SigningKey   *auth.SigningKey
	UserSessions *auth.UserSessionStore
	// Session token signing key — for credential exchange tokens.
	// Derived from session_secret with a different domain string.
	SessionTokenKey *auth.SigningKey

	// OpenBao — nil when [openbao] is not configured.
	VaultClient     *integration.Client
	VaultTokenCache *integration.VaultTokenCache

	// VaultTokenHealthy reports whether the vault token is valid.
	// Non-nil only when AppRole auth is used (token renewal active).
	VaultTokenHealthy func() bool

	// Redis client — nil when [redis] is not configured.
	RedisClient *redisstate.Client

	// Audit log — nil when [audit] is not configured.
	AuditLog *audit.Log

	// System checks — populated during startup, used by the system page
	// and API endpoints. Nil until Init is called.
	Checker *preflight.Checker

	// Package store — nil when not available (no builds yet).
	PkgStore *pkgstore.Store

	// HMAC key for worker tokens. Persisted via OpenBao or file-based
	// fallback so both servers verify the same tokens during a rolling
	// update. Independent of SessionSecret and OIDC.
	WorkerTokenKey *auth.SigningKey

	// Draining is set when the server enters drain mode (SIGUSR1) or
	// shutdown (SIGTERM). Health endpoints return 503 when set.
	Draining atomic.Bool

	// Passive is true when BLOCKYARD_PASSIVE=1 is set. Background
	// goroutines are deferred until POST /api/v1/admin/activate.
	Passive atomic.Bool

	// Bootstrap token state — one-time token that can be exchanged for
	// a real PAT via POST /api/v1/bootstrap. Hash is set at startup;
	// Redeemed is flipped to true on first successful exchange.
	BootstrapTokenHash []byte
	BootstrapRedeemed  atomic.Bool

	// Version is the server version string, set at build time.
	Version string

	// UpdateStatus holds the most recent CheckLatest result. Nil
	// until the first check runs. The richer State enum lets readers
	// distinguish "update available" from "ahead of latest" / "dev
	// build" / "couldn't reach GitHub" / etc.
	UpdateStatus atomic.Pointer[update.Result]

	// UpdateLastChecked records the wall-clock time of the most recent
	// update check (scheduled or manual). Nil until the first check runs.
	UpdateLastChecked atomic.Pointer[time.Time]

	// RestoreWG is used in tests to wait for background restore goroutines
	// to complete before cleanup. Nil in production.
	RestoreWG *sync.WaitGroup

	// Hooks for operations that would cause import cycles if called
	// directly from server. Set during initialization in main().
	EvictWorkerFn     func(ctx context.Context, srv *Server, workerID, reason string)
	SpawnLogCaptureFn func(ctx context.Context, srv *Server, workerID, appID string)
	// contains filtered or unexported fields
}

Server holds all shared state for the running server. Passed by pointer to API handlers, proxy, and background goroutines.

func NewServer

func NewServer(cfg *config.Config, be backend.Backend, database *db.DB) *Server

NewServer creates a Server with all in-memory stores initialized. The returned Server owns a fresh per-instance telemetry.Metrics registered against a private prometheus registry, so tests that construct a Server never contend on a shared counter state. Production callers should replace [Server.Metrics] with one registered against prometheus.DefaultRegisterer so the /metrics HTTP handler can scrape it — see NewServerWithDefaultMetrics.

func NewServerWithDefaultMetrics added in v0.0.3

func NewServerWithDefaultMetrics(cfg *config.Config, be backend.Backend, database *db.DB) *Server

NewServerWithDefaultMetrics is the production constructor. It creates a Server whose metrics are registered with prometheus.DefaultRegisterer so the promhttp /metrics endpoint can scrape them.

func (*Server) AuthDeps added in v0.0.2

func (srv *Server) AuthDeps() *auth.Deps

AuthDeps returns an auth.Deps populated from this server's fields. Used by the router to wire auth handlers and middleware without a circular import.

func (*Server) BundlePaths added in v0.0.3

func (srv *Server) BundlePaths(appID, bundleID string) bundle.Paths

BundlePaths returns the filesystem paths for a bundle.

func (*Server) CancelTokenRefresher added in v0.0.3

func (srv *Server) CancelTokenRefresher(workerID string)

CancelTokenRefresher calls and removes the cancel function for a worker. No-op if the worker has no registered cancel function.

func (*Server) CleanupInstallMu added in v0.0.3

func (srv *Server) CleanupInstallMu(workerID string)

CleanupInstallMu removes the per-worker mutex (called on eviction).

func (*Server) ClearTransferring added in v0.0.3

func (srv *Server) ClearTransferring(workerID string)

ClearTransferring removes the transfer-in-progress flag for a worker.

func (*Server) GetVersion added in v0.0.3

func (srv *Server) GetVersion() string

GetVersion returns the running server version.

func (*Server) InstallPackage added in v0.0.3

func (srv *Server) InstallPackage(
	ctx context.Context,
	appID, workerID string,
	req PackageRequest,
) (PackageResponse, error)

InstallPackage is the core orchestration for runtime package installation. Uses the same four-phase store-aware flow as the build pipeline, differing in resolution context: the worker's existing /lib is a reference library so pak sees what's installed.

func (*Server) InternalAPIURL added in v0.0.3

func (srv *Server) InternalAPIURL() string

InternalAPIURL returns the URL workers should use to reach this server.

func (*Server) IsTransferring added in v0.0.3

func (srv *Server) IsTransferring(workerID string) bool

IsTransferring returns true if a container transfer is in progress for the given worker.

func (*Server) RunRefresh added in v0.0.3

func (srv *Server) RunRefresh(
	ctx context.Context,
	app *db.AppRow,
	m *manifest.Manifest,
	sender task.Sender,
) bool

RunRefresh re-resolves dependencies for an unpinned deployment. Returns true if a new worker was spawned (dependencies changed).

func (*Server) RunRefreshScheduler added in v0.0.3

func (srv *Server) RunRefreshScheduler(ctx context.Context)

RunRefreshScheduler checks active apps with a refresh_schedule and triggers refresh at the configured times. Blocks until ctx is cancelled.

func (*Server) RunRollback added in v0.0.3

func (srv *Server) RunRollback(
	ctx context.Context,
	app *db.AppRow,
	target string,
	sender task.Sender,
)

RunRollback performs a rollback to either the previous refresh or the original build, then drains and replaces workers.

func (*Server) SetCancelToken added in v0.0.3

func (srv *Server) SetCancelToken(workerID string, cancel func())

SetCancelToken registers a cancel function for a worker's token refresher.

func (*Server) SetTransferring added in v0.0.3

func (srv *Server) SetTransferring(workerID string)

SetTransferring marks a worker as having a transfer in progress.

func (*Server) SetUpdateStatus added in v0.0.3

func (srv *Server) SetUpdateStatus(r *update.Result)

SetUpdateStatus records the latest update check result and stamps UpdateLastChecked with the current time. Satisfies the update.UpdateStore interface so the periodic checker and the UI's manual-refresh handler can both feed results in.

func (*Server) TransferDir added in v0.0.3

func (srv *Server) TransferDir(workerID string) string

TransferDir returns the host-side transfer directory for a worker.

func (*Server) UpdateAvailableVersion added in v0.0.3

func (srv *Server) UpdateAvailableVersion() string

UpdateAvailableVersion returns the upstream version string when an update is recommended, or "" otherwise. Convenience for consumers (preflight, readyz) that previously only cared about the boolean.

type WorkerMap

type WorkerMap interface {
	Get(id string) (ActiveWorker, bool)
	Set(id string, w ActiveWorker)
	Delete(id string)
	Count() int
	CountForApp(appID string) int
	All() []string
	ForApp(appID string) []string
	ForAppAvailable(appID string) []string
	MarkDraining(appID string) []string
	SetDraining(workerID string)
	SetIdleSince(workerID string, t time.Time)
	SetIdleSinceIfZero(workerID string, t time.Time)
	ClearIdleSince(workerID string) bool
	IdleWorkers(timeout time.Duration) []string
	AppIDs() []string
	IsDraining(appID string) bool
	ClearDraining(workerID string)

	// WorkersForServer returns the worker IDs owned by the given
	// server_id. Used by Drainer.waitForIdle during a same-host
	// rolling update so the old server waits for its own sessions
	// to end, not the new server's fresh sessions. In the memory
	// implementation this is equivalent to All() — single-node
	// deployments have one server, so every worker belongs to it.
	// The Redis implementation filters by the server_id hash field
	// phase 3-3 already writes.
	WorkersForServer(serverID string) []string
}

WorkerMap defines the contract for the active worker map. MemoryWorkerMap is the in-process implementation; Redis implements the same interface for shared state during rolling updates.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL