server

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: GPL-3.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AppImage added in v0.0.3

func AppImage(app *db.AppRow, serverDefault string) string

AppImage returns the per-app image override, or the server-wide default.

func AppRuntime added in v0.0.3

func AppRuntime(app *db.AppRow, cfg config.DockerConfig) string

AppRuntime returns the effective OCI runtime for an app. Fallback chain: app.Runtime → config.RuntimeDefaults[accessType] → config.Runtime.

func BaseWorkerSpec added in v0.0.3

func BaseWorkerSpec(srv *Server, app *db.AppRow, workerID, bundleID string) backend.WorkerSpec

BaseWorkerSpec returns a WorkerSpec with all fields that are common across spawn sites (coldstart, transfer, API scale-up). Callers fill in site-specific fields like LibDir, TransferDir, TokenDir, MemoryLimit, and CPULimit.

func CleanupTokenDir added in v0.0.3

func CleanupTokenDir(bundleServerPath, workerID string)

CleanupTokenDir removes the token directory for a worker.

func EnsureRProfile added in v0.0.3

func EnsureRProfile(dir string) (string, error)

EnsureRProfile writes the blockyard R profile to dir and returns the path. The file bridges SHINY_HOST and SHINY_PORT env vars to the corresponding R options so bundles don't have to. Safe to call from multiple goroutines; the file is written once.

func LoadOrCreateWorkerKey added in v0.0.3

func LoadOrCreateWorkerKey(
	ctx context.Context,
	vaultClient *integration.Client,
	cfg *config.Config,
) (*auth.SigningKey, error)

LoadOrCreateWorkerKey resolves the worker signing key. It tries three sources in order:

  1. The vault (if configured) -- read or generate + store
  2. File ({bundle_server_path}/.worker-key) -- read existing
  3. Generate new + write to file

This ensures both the old and new server use the same key during a rolling update. When the vault is not available, the file path provides persistence across restarts.

func SpawnTokenRefresher added in v0.0.3

func SpawnTokenRefresher(
	ctx context.Context,
	bundleServerPath string,
	signingKey *auth.SigningKey,
	appID, workerID string,
) (tokDir string, cancel func(), err error)

SpawnTokenRefresher starts a goroutine that refreshes the worker's token file every TTL/2. Returns the token directory and a cancel function. The goroutine writes an initial token synchronously before returning, so the token file is ready before the container starts.

func WorkerEnv added in v0.0.3

func WorkerEnv(srv *Server) map[string]string

WorkerEnv builds the backend-agnostic environment variable map for worker containers. Always sets BLOCKYARD_API_URL (needed for runtime package installs). Includes vault integration vars when configured. Sets SHINY_HOST per backend so bundles don't have to. Values from server.worker_env are merged in last; blockyard-managed keys win on collision, everything else (e.g. OTEL_*) is passed through.

Types

type ActiveWorker

type ActiveWorker struct {
	AppID     string
	BundleID  string    // bundle active at spawn time; runtime installs resolve against this
	Draining  bool      // set by graceful drain; no new sessions routed
	IdleSince time.Time // zero value = not idle; set when session count hits 0
	StartedAt time.Time // when the worker was spawned
}

ActiveWorker represents a running worker tracked by the server. The worker ID is the map key in WorkerMap, not stored here.

type LayeredWorkerMap added in v0.0.4

type LayeredWorkerMap struct {
	// contains filtered or unexported fields
}

LayeredWorkerMap layers a cache WorkerMap over a primary (see #287, parent #262). The primary is the source of truth (Postgres in production); the cache is an optional optimization (Redis).

Reads: cache first; on miss, fall back to primary and populate the cache on the way out.

Writes: primary first; cache mirrored best-effort. Cache errors are swallowed inside the concrete stores, so LayeredWorkerMap just calls both — the primary operation's outcome is the one surfaced.

Aggregate queries (Count, CountForApp, ForApp, MarkDraining, …) always go to the primary: the cache may hold a subset and can't answer accurately.

func NewLayeredWorkerMap added in v0.0.4

func NewLayeredWorkerMap(primary, cache WorkerMap) *LayeredWorkerMap

func (*LayeredWorkerMap) All added in v0.0.4

func (m *LayeredWorkerMap) All() []string

func (*LayeredWorkerMap) AppIDs added in v0.0.4

func (m *LayeredWorkerMap) AppIDs() []string

func (*LayeredWorkerMap) ClearDraining added in v0.0.4

func (m *LayeredWorkerMap) ClearDraining(workerID string)

func (*LayeredWorkerMap) ClearIdleSince added in v0.0.4

func (m *LayeredWorkerMap) ClearIdleSince(workerID string) bool

func (*LayeredWorkerMap) Count added in v0.0.4

func (m *LayeredWorkerMap) Count() int

func (*LayeredWorkerMap) CountForApp added in v0.0.4

func (m *LayeredWorkerMap) CountForApp(appID string) int

func (*LayeredWorkerMap) Delete added in v0.0.4

func (m *LayeredWorkerMap) Delete(id string)

func (*LayeredWorkerMap) ForApp added in v0.0.4

func (m *LayeredWorkerMap) ForApp(appID string) []string

func (*LayeredWorkerMap) ForAppAvailable added in v0.0.4

func (m *LayeredWorkerMap) ForAppAvailable(appID string) []string

func (*LayeredWorkerMap) Get added in v0.0.4

func (m *LayeredWorkerMap) Get(id string) (ActiveWorker, bool)

func (*LayeredWorkerMap) IdleWorkers added in v0.0.4

func (m *LayeredWorkerMap) IdleWorkers(timeout time.Duration) []string

func (*LayeredWorkerMap) IsDraining added in v0.0.4

func (m *LayeredWorkerMap) IsDraining(appID string) bool

func (*LayeredWorkerMap) MarkDraining added in v0.0.4

func (m *LayeredWorkerMap) MarkDraining(appID string) []string

MarkDraining writes the drain flag to the primary, then mirrors the state to the cache for each affected worker so a subsequent cache hit returns the new draining flag.

func (*LayeredWorkerMap) Set added in v0.0.4

func (m *LayeredWorkerMap) Set(id string, w ActiveWorker)

func (*LayeredWorkerMap) SetDraining added in v0.0.4

func (m *LayeredWorkerMap) SetDraining(workerID string)

func (*LayeredWorkerMap) SetIdleSince added in v0.0.4

func (m *LayeredWorkerMap) SetIdleSince(workerID string, t time.Time)

func (*LayeredWorkerMap) SetIdleSinceIfZero added in v0.0.4

func (m *LayeredWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)

func (*LayeredWorkerMap) WorkersForServer added in v0.0.4

func (m *LayeredWorkerMap) WorkersForServer(serverID string) []string

type MemoryWorkerMap added in v0.0.3

type MemoryWorkerMap struct {
	// contains filtered or unexported fields
}

MemoryWorkerMap is a concurrent in-memory map of worker ID → ActiveWorker.

func NewMemoryWorkerMap added in v0.0.3

func NewMemoryWorkerMap() *MemoryWorkerMap

func (*MemoryWorkerMap) All added in v0.0.3

func (m *MemoryWorkerMap) All() []string

All returns a snapshot of all worker IDs.

func (*MemoryWorkerMap) AppIDs added in v0.0.3

func (m *MemoryWorkerMap) AppIDs() []string

AppIDs returns a deduplicated list of app IDs that have active workers.

func (*MemoryWorkerMap) ClearDraining added in v0.0.3

func (m *MemoryWorkerMap) ClearDraining(workerID string)

ClearDraining clears the draining flag on a single worker by ID.

func (*MemoryWorkerMap) ClearIdleSince added in v0.0.3

func (m *MemoryWorkerMap) ClearIdleSince(workerID string) bool

ClearIdleSince resets the idle timer (a new session was assigned). Returns true if the worker was idle before clearing.

func (*MemoryWorkerMap) Count added in v0.0.3

func (m *MemoryWorkerMap) Count() int

func (*MemoryWorkerMap) CountForApp added in v0.0.3

func (m *MemoryWorkerMap) CountForApp(appID string) int

func (*MemoryWorkerMap) Delete added in v0.0.3

func (m *MemoryWorkerMap) Delete(id string)

func (*MemoryWorkerMap) ForApp added in v0.0.3

func (m *MemoryWorkerMap) ForApp(appID string) []string

ForApp returns all worker IDs for a given app (including draining).

func (*MemoryWorkerMap) ForAppAvailable added in v0.0.3

func (m *MemoryWorkerMap) ForAppAvailable(appID string) []string

ForAppAvailable returns worker IDs for an app that are not draining.

func (*MemoryWorkerMap) Get added in v0.0.3

func (m *MemoryWorkerMap) Get(id string) (ActiveWorker, bool)

func (*MemoryWorkerMap) IdleWorkers added in v0.0.3

func (m *MemoryWorkerMap) IdleWorkers(timeout time.Duration) []string

IdleWorkers returns workers that have been idle longer than the given timeout, excluding draining workers (they have their own lifecycle).

func (*MemoryWorkerMap) IsDraining added in v0.0.3

func (m *MemoryWorkerMap) IsDraining(appID string) bool

IsDraining returns true if any worker for the given app is draining.

func (*MemoryWorkerMap) MarkDraining added in v0.0.3

func (m *MemoryWorkerMap) MarkDraining(appID string) []string

MarkDraining sets the draining flag on all workers for an app. Returns the list of affected worker IDs.

func (*MemoryWorkerMap) Set added in v0.0.3

func (m *MemoryWorkerMap) Set(id string, w ActiveWorker)

func (*MemoryWorkerMap) SetDraining added in v0.0.3

func (m *MemoryWorkerMap) SetDraining(workerID string)

SetDraining sets the draining flag on a single worker by ID. Used by refresh drain-and-replace to drain specific old workers while keeping newly spawned workers available.

func (*MemoryWorkerMap) SetIdleSince added in v0.0.3

func (m *MemoryWorkerMap) SetIdleSince(workerID string, t time.Time)

SetIdleSince marks when a worker became idle (zero sessions). Called when the last session for a worker is removed.

func (*MemoryWorkerMap) SetIdleSinceIfZero added in v0.0.3

func (m *MemoryWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)

SetIdleSinceIfZero marks when a worker became idle, but only if it isn't already marked. This avoids resetting the timer on repeated ticks while the worker remains idle.

func (*MemoryWorkerMap) WorkersForServer added in v0.0.3

func (m *MemoryWorkerMap) WorkersForServer(_ string) []string

WorkersForServer returns all worker IDs. In the memory implementation every worker belongs to "this" server — single-node deployments have one process — so the serverID filter is a no-op. The Redis variant does the real filtering.

type PackageRequest added in v0.0.3

type PackageRequest struct {
	Name             string   `json:"name"`              // package name or pkgdepends ref
	LoadedNamespaces []string `json:"loaded_namespaces"` // from loadedNamespaces() in R
}

PackageRequest is the body of POST /api/v1/packages.

type PackageResponse added in v0.0.3

type PackageResponse struct {
	Status       string `json:"status"` // "ok", "transfer", "error"
	Message      string `json:"message,omitempty"`
	TransferPath string `json:"transfer_path,omitempty"` // set when status == "transfer"
}

PackageResponse is returned by POST /api/v1/packages.

type PostgresWorkerMap added in v0.0.4

type PostgresWorkerMap struct {
	// contains filtered or unexported fields
}

PostgresWorkerMap implements WorkerMap against the blockyard_workers table, making Postgres the source of truth for worker metadata (see #287, parent #262). The same table also backs registry.PostgresRegistry — each store upserts its own column subset so a production spawn that calls Workers.Set before Registry.Set converges to a full row.

idle_since is stored as NULL when the worker is not idle (mapped to Go's zero-value time.Time), mirroring the MemoryWorkerMap semantic used throughout the codebase.

func NewPostgresWorkerMap added in v0.0.4

func NewPostgresWorkerMap(db *sqlx.DB, serverID string) *PostgresWorkerMap

func (*PostgresWorkerMap) All added in v0.0.4

func (m *PostgresWorkerMap) All() []string

func (*PostgresWorkerMap) AppIDs added in v0.0.4

func (m *PostgresWorkerMap) AppIDs() []string

AppIDs returns the deduplicated set of app_ids with at least one tracked worker. Filters out the empty-string default so rows created by registry.PostgresRegistry alone don't leak a bogus "" entry.

func (*PostgresWorkerMap) ClearDraining added in v0.0.4

func (m *PostgresWorkerMap) ClearDraining(workerID string)

func (*PostgresWorkerMap) ClearIdleSince added in v0.0.4

func (m *PostgresWorkerMap) ClearIdleSince(workerID string) bool

ClearIdleSince resets idle_since to NULL and reports whether the worker was previously idle. Done in a single UPDATE so the read-modify-write is race-free.

func (*PostgresWorkerMap) Count added in v0.0.4

func (m *PostgresWorkerMap) Count() int

Count returns the total number of tracked workers. Rows created by registry.PostgresRegistry alone (no Workers.Set ever called) still count — a registered address is a tracked worker.

func (*PostgresWorkerMap) CountForApp added in v0.0.4

func (m *PostgresWorkerMap) CountForApp(appID string) int

func (*PostgresWorkerMap) Delete added in v0.0.4

func (m *PostgresWorkerMap) Delete(id string)

func (*PostgresWorkerMap) ForApp added in v0.0.4

func (m *PostgresWorkerMap) ForApp(appID string) []string

func (*PostgresWorkerMap) ForAppAvailable added in v0.0.4

func (m *PostgresWorkerMap) ForAppAvailable(appID string) []string

func (*PostgresWorkerMap) Get added in v0.0.4

func (m *PostgresWorkerMap) Get(id string) (ActiveWorker, bool)

func (*PostgresWorkerMap) IdleWorkers added in v0.0.4

func (m *PostgresWorkerMap) IdleWorkers(timeout time.Duration) []string

IdleWorkers returns workers idle longer than timeout, excluding draining workers (they're on their own lifecycle).

func (*PostgresWorkerMap) IsDraining added in v0.0.4

func (m *PostgresWorkerMap) IsDraining(appID string) bool

func (*PostgresWorkerMap) MarkDraining added in v0.0.4

func (m *PostgresWorkerMap) MarkDraining(appID string) []string

MarkDraining sets draining on every worker for the given app and returns the affected worker IDs. Implemented as a single UPDATE ... RETURNING so the read-modify-write stays atomic.

func (*PostgresWorkerMap) RunReaper added in v0.0.4

func (m *PostgresWorkerMap) RunReaper(ctx context.Context, threshold, interval time.Duration)

RunReaper deletes blockyard_workers rows whose last_heartbeat has been stale for longer than `threshold`, every `interval`. Blocks until ctx is cancelled. Caller runs it in a goroutine.

Motivation: Redis workermap entries had implicit TTLs that cleaned up after a pod died without graceful shutdown. Postgres doesn't expire rows, so without this sweep a crashed pod's workers linger in the shared blockyard_workers table — Registry.Get hides them via its own last_heartbeat filter, but WorkerMap's Count / ForApp / IdleWorkers / AppIDs would keep reporting ghosts and skew scheduler decisions.

Pick a threshold well above registryTTL so a transient network blip (heartbeat writes briefly blocked) doesn't reap a worker the health poller will resume updating seconds later. cmd/blockyard/main.go uses 5 × registryTTL, giving 60 s of recovery slack on top of the 15 s "Registry considers dead" signal with default health intervals.

func (*PostgresWorkerMap) Set added in v0.0.4

func (m *PostgresWorkerMap) Set(id string, w ActiveWorker)

func (*PostgresWorkerMap) SetDraining added in v0.0.4

func (m *PostgresWorkerMap) SetDraining(workerID string)

SetDraining flips draining on a single worker. The WHERE guard matches the "must not create ghost entry" conformance test — if the row doesn't exist, nothing is inserted.

func (*PostgresWorkerMap) SetIdleSince added in v0.0.4

func (m *PostgresWorkerMap) SetIdleSince(workerID string, t time.Time)

func (*PostgresWorkerMap) SetIdleSinceIfZero added in v0.0.4

func (m *PostgresWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)

SetIdleSinceIfZero sets idle_since only when it's currently NULL (the zero-value representation). Matches the MemoryWorkerMap semantic used to avoid resetting the timer on repeated ticks.

func (*PostgresWorkerMap) WorkersForServer added in v0.0.4

func (m *PostgresWorkerMap) WorkersForServer(serverID string) []string

type RedisWorkerMap added in v0.0.3

type RedisWorkerMap struct {
	// contains filtered or unexported fields
}

RedisWorkerMap implements WorkerMap using Redis hashes.

Key schema:

{prefix}worker:{workerID}  ->  hash {app_id, bundle_id, draining, idle_since, started_at, server_id}

No TTL — workers are explicitly deleted on eviction.

func NewRedisWorkerMap added in v0.0.3

func NewRedisWorkerMap(client *redisstate.Client, serverID string) *RedisWorkerMap

func (*RedisWorkerMap) All added in v0.0.3

func (m *RedisWorkerMap) All() []string

func (*RedisWorkerMap) AppIDs added in v0.0.3

func (m *RedisWorkerMap) AppIDs() []string

func (*RedisWorkerMap) ClearDraining added in v0.0.3

func (m *RedisWorkerMap) ClearDraining(workerID string)

func (*RedisWorkerMap) ClearIdleSince added in v0.0.3

func (m *RedisWorkerMap) ClearIdleSince(workerID string) bool

func (*RedisWorkerMap) Count added in v0.0.3

func (m *RedisWorkerMap) Count() int

func (*RedisWorkerMap) CountForApp added in v0.0.3

func (m *RedisWorkerMap) CountForApp(appID string) int

func (*RedisWorkerMap) Delete added in v0.0.3

func (m *RedisWorkerMap) Delete(id string)

func (*RedisWorkerMap) ForApp added in v0.0.3

func (m *RedisWorkerMap) ForApp(appID string) []string

func (*RedisWorkerMap) ForAppAvailable added in v0.0.3

func (m *RedisWorkerMap) ForAppAvailable(appID string) []string

func (*RedisWorkerMap) Get added in v0.0.3

func (m *RedisWorkerMap) Get(id string) (ActiveWorker, bool)

func (*RedisWorkerMap) IdleWorkers added in v0.0.3

func (m *RedisWorkerMap) IdleWorkers(timeout time.Duration) []string

func (*RedisWorkerMap) IsDraining added in v0.0.3

func (m *RedisWorkerMap) IsDraining(appID string) bool

func (*RedisWorkerMap) MarkDraining added in v0.0.3

func (m *RedisWorkerMap) MarkDraining(appID string) []string

func (*RedisWorkerMap) Set added in v0.0.3

func (m *RedisWorkerMap) Set(id string, w ActiveWorker)

func (*RedisWorkerMap) SetDraining added in v0.0.3

func (m *RedisWorkerMap) SetDraining(workerID string)

func (*RedisWorkerMap) SetIdleSince added in v0.0.3

func (m *RedisWorkerMap) SetIdleSince(workerID string, t time.Time)

func (*RedisWorkerMap) SetIdleSinceIfZero added in v0.0.3

func (m *RedisWorkerMap) SetIdleSinceIfZero(workerID string, t time.Time)

func (*RedisWorkerMap) WorkersForServer added in v0.0.3

func (m *RedisWorkerMap) WorkersForServer(serverID string) []string

WorkersForServer returns worker IDs whose server_id hash field matches the given serverID. Pattern lifted from ForApp — scan the worker key namespace, pipeline-HGET the server_id field, filter. Same shape as the other server-scoped filters, just on a different hash field.

type Server

type Server struct {
	Config   *config.Config
	Backend  backend.Backend
	DB       *db.DB
	Workers  WorkerMap
	Sessions session.Store
	WsConns  *WsConnCounter
	Registry registry.WorkerRegistry
	Tasks    *task.Store
	LogStore *logstore.Store
	ErrorLog *errorlog.Store
	Metrics  *telemetry.Metrics

	// ServerID uniquely identifies this process among peers sharing the
	// same Postgres / Redis (see #262). Health-poll, startup-reconcile,
	// and graceful-shutdown use it via Workers.WorkersForServer so they
	// only act on workers this process owns — otherwise a sibling pod's
	// boot would wipe this pod's workers from the shared table.
	ServerID string

	// Auth fields — nil when [oidc] is not configured (v0 compat).
	OIDCClient   *auth.OIDCClient
	SigningKey   *auth.SigningKey
	UserSessions *auth.UserSessionStore
	// Session token signing key — for credential exchange tokens.
	// Derived from session_secret with a different domain string.
	SessionTokenKey *auth.SigningKey

	// Vault — nil when [vault] is not configured.
	VaultClient     *integration.Client
	VaultTokenCache *integration.VaultTokenCache

	// Board-storage provisioner — non-nil only when
	// database.board_storage is enabled. Drives first-login
	// provisioning from the OIDC callback and deactivation/reactivation
	// from the admin update-user endpoint.
	BoardStorage *boardstorage.Provisioner

	// VaultTokenHealthy reports whether the vault token is valid.
	// Non-nil only when AppRole auth is used (token renewal active).
	VaultTokenHealthy func() bool

	// Redis client — nil when [redis] is not configured.
	RedisClient *redisstate.Client

	// Audit log — nil when [audit] is not configured.
	AuditLog *audit.Log

	// System checks — populated during startup, used by the system page
	// and API endpoints. Nil until Init is called.
	Checker *preflight.Checker

	// Package store — nil when not available (no builds yet).
	PkgStore *pkgstore.Store

	// HMAC key for worker tokens. Persisted via vault or file-based
	// fallback so both servers verify the same tokens during a rolling
	// update. Independent of SessionSecret and OIDC.
	WorkerTokenKey *auth.SigningKey

	// Draining is set when the server enters drain mode (SIGUSR1) or
	// shutdown (SIGTERM). Health endpoints return 503 when set.
	Draining atomic.Bool

	// Passive is true when BLOCKYARD_PASSIVE=1 is set. Background
	// goroutines are deferred until POST /api/v1/admin/activate.
	Passive atomic.Bool

	// Bootstrap token state — one-time token that can be exchanged for
	// a real PAT via POST /api/v1/bootstrap. Hash is set at startup;
	// Redeemed is flipped to true on first successful exchange.
	BootstrapTokenHash []byte
	BootstrapRedeemed  atomic.Bool

	// Version is the server version string, set at build time.
	Version string

	// UpdateStatus holds the most recent CheckLatest result. Nil
	// until the first check runs. The richer State enum lets readers
	// distinguish "update available" from "ahead of latest" / "dev
	// build" / "couldn't reach GitHub" / etc.
	UpdateStatus atomic.Pointer[update.Result]

	// UpdateLastChecked records the wall-clock time of the most recent
	// update check (scheduled or manual). Nil until the first check runs.
	UpdateLastChecked atomic.Pointer[time.Time]

	// RestoreWG is used in tests to wait for background restore goroutines
	// to complete before cleanup. Nil in production.
	RestoreWG *sync.WaitGroup

	// Hooks for operations that would cause import cycles if called
	// directly from server. Set during initialization in main().
	EvictWorkerFn     func(ctx context.Context, srv *Server, workerID, reason string)
	SpawnLogCaptureFn func(ctx context.Context, srv *Server, workerID, appID string)
	// contains filtered or unexported fields
}

Server holds all shared state for the running server. Passed by pointer to API handlers, proxy, and background goroutines.

func NewServer

func NewServer(cfg *config.Config, be backend.Backend, database *db.DB) *Server

NewServer creates a Server with all in-memory stores initialized. The returned Server owns a fresh per-instance telemetry.Metrics registered against a private prometheus registry, so tests that construct a Server never contend on a shared counter state. Production callers should replace Server.Metrics with one registered against prometheus.DefaultRegisterer so the /metrics HTTP handler can scrape it — see NewServerWithDefaultMetrics.

func NewServerWithDefaultMetrics added in v0.0.3

func NewServerWithDefaultMetrics(cfg *config.Config, be backend.Backend, database *db.DB) *Server

NewServerWithDefaultMetrics is the production constructor. It creates a Server whose metrics are registered with prometheus.DefaultRegisterer so the promhttp /metrics endpoint can scrape them.

func (*Server) AuthDeps added in v0.0.2

func (srv *Server) AuthDeps() *auth.Deps

AuthDeps returns an auth.Deps populated from this server's fields. Used by the router to wire auth handlers and middleware without a circular import.

func (*Server) BundlePaths added in v0.0.3

func (srv *Server) BundlePaths(appID, bundleID string) bundle.Paths

BundlePaths returns the filesystem paths for a bundle.

func (*Server) CancelTokenRefresher added in v0.0.3

func (srv *Server) CancelTokenRefresher(workerID string)

CancelTokenRefresher calls and removes the cancel function for a worker. No-op if the worker has no registered cancel function.

func (*Server) CleanupInstallMu added in v0.0.3

func (srv *Server) CleanupInstallMu(workerID string)

CleanupInstallMu removes the per-worker mutex (called on eviction).

func (*Server) ClearTransferring added in v0.0.3

func (srv *Server) ClearTransferring(workerID string)

ClearTransferring removes the transfer-in-progress flag for a worker.

func (*Server) GetVersion added in v0.0.3

func (srv *Server) GetVersion() string

GetVersion returns the running server version.

func (*Server) InstallPackage added in v0.0.3

func (srv *Server) InstallPackage(
	ctx context.Context,
	appID, workerID string,
	req PackageRequest,
) (PackageResponse, error)

InstallPackage is the core orchestration for runtime package installation. Uses the same four-phase store-aware flow as the build pipeline, differing in resolution context: the worker's existing /lib is a reference library so pak sees what's installed.

func (*Server) InternalAPIURL added in v0.0.3

func (srv *Server) InternalAPIURL() string

InternalAPIURL returns the URL workers should use to reach this server.

func (*Server) IsTransferring added in v0.0.3

func (srv *Server) IsTransferring(workerID string) bool

IsTransferring returns true if a container transfer is in progress for the given worker.

func (*Server) RunRefresh added in v0.0.3

func (srv *Server) RunRefresh(
	ctx context.Context,
	app *db.AppRow,
	m *manifest.Manifest,
	sender task.Sender,
) bool

RunRefresh re-resolves dependencies for an unpinned deployment. Returns true if a new worker was spawned (dependencies changed).

func (*Server) RunRefreshScheduler added in v0.0.3

func (srv *Server) RunRefreshScheduler(ctx context.Context)

RunRefreshScheduler checks active apps with a refresh_schedule and triggers refresh at the configured times. Blocks until ctx is cancelled.

func (*Server) RunRollback added in v0.0.3

func (srv *Server) RunRollback(
	ctx context.Context,
	app *db.AppRow,
	target string,
	sender task.Sender,
)

RunRollback performs a rollback to either the previous refresh or the original build, then drains and replaces workers.

func (*Server) SetCancelToken added in v0.0.3

func (srv *Server) SetCancelToken(workerID string, cancel func())

SetCancelToken registers a cancel function for a worker's token refresher.

func (*Server) SetTransferring added in v0.0.3

func (srv *Server) SetTransferring(workerID string)

SetTransferring marks a worker as having a transfer in progress.

func (*Server) SetUpdateStatus added in v0.0.3

func (srv *Server) SetUpdateStatus(r *update.Result)

SetUpdateStatus records the latest update check result and stamps UpdateLastChecked with the current time. Satisfies the update.UpdateStore interface so the periodic checker and the UI's manual-refresh handler can both feed results in.

func (*Server) TransferDir added in v0.0.3

func (srv *Server) TransferDir(workerID string) string

TransferDir returns the host-side transfer directory for a worker.

func (*Server) UpdateAvailableVersion added in v0.0.3

func (srv *Server) UpdateAvailableVersion() string

UpdateAvailableVersion returns the upstream version string when an update is recommended, or "" otherwise. Convenience for consumers (preflight, readyz) that previously only cared about the boolean.

type WorkerMap

type WorkerMap interface {
	Get(id string) (ActiveWorker, bool)
	Set(id string, w ActiveWorker)
	Delete(id string)
	Count() int
	CountForApp(appID string) int
	All() []string
	ForApp(appID string) []string
	ForAppAvailable(appID string) []string
	MarkDraining(appID string) []string
	SetDraining(workerID string)
	SetIdleSince(workerID string, t time.Time)
	SetIdleSinceIfZero(workerID string, t time.Time)
	ClearIdleSince(workerID string) bool
	IdleWorkers(timeout time.Duration) []string
	AppIDs() []string
	IsDraining(appID string) bool
	ClearDraining(workerID string)

	// WorkersForServer returns the worker IDs owned by the given
	// server_id. Used by Drainer.waitForIdle during a same-host
	// rolling update so the old server waits for its own sessions
	// to end, not the new server's fresh sessions. In the memory
	// implementation this is equivalent to All() — single-node
	// deployments have one server, so every worker belongs to it.
	// The Redis implementation filters by the server_id hash field
	// phase 3-3 already writes.
	WorkersForServer(serverID string) []string
}

WorkerMap defines the contract for the active worker map. MemoryWorkerMap is the in-process implementation; Redis implements the same interface for shared state during rolling updates.

type WsConnCounter added in v0.0.4

type WsConnCounter struct {
	// contains filtered or unexported fields
}

WsConnCounter tracks active WebSocket connections per worker.

Used to enforce max_sessions_per_worker at WebSocket handshake time and to inform load-balancer assignment and idle detection. A session in blockyard is one active Shiny WebSocket — one browser tab — not one browser cookie, so this counter is what "max sessions" gates against. The session.Store continues to track cookie-level worker pinning for HTTP routing stickiness; it does not gate capacity.

func NewWsConnCounter added in v0.0.4

func NewWsConnCounter() *WsConnCounter

func (*WsConnCounter) Count added in v0.0.4

func (c *WsConnCounter) Count(workerID string) int

Count returns the current active WS count for workerID.

func (*WsConnCounter) CountForWorkers added in v0.0.4

func (c *WsConnCounter) CountForWorkers(ids []string) int

CountForWorkers sums active WS counts across the given worker IDs.

func (*WsConnCounter) Dec added in v0.0.4

func (c *WsConnCounter) Dec(workerID string)

Dec decrements the count for workerID. Idempotent at zero.

func (*WsConnCounter) DeleteWorker added in v0.0.4

func (c *WsConnCounter) DeleteWorker(workerID string)

DeleteWorker drops the entry for workerID. Called on eviction so stale counts don't linger against a vanished worker.

func (*WsConnCounter) TryInc added in v0.0.4

func (c *WsConnCounter) TryInc(workerID string, max int) bool

TryInc atomically increments the count for workerID if it is strictly below max. Returns true on success, false if the worker is already at capacity. A single mutex-protected check-and-increment avoids the race where two concurrent handshakes both observe count < max.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL