orchestrator

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: GPL-3.0 Imports: 25 Imported by: 0

Documentation

Overview

Package orchestrator manages rolling updates from inside the running server. The core flow (Update → Watchdog → Rollback → scheduled) is backend-agnostic. Variant-specific code (Docker container clone, process fork+exec) lives behind the ServerFactory interface in clone_docker.go and clone_process.go.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultChecker

type DefaultChecker struct{}

DefaultChecker wraps the update package's install-target lookup.

func (DefaultChecker) FetchInstallTarget

func (DefaultChecker) FetchInstallTarget(channel, currentVersion string) (string, error)

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator manages rolling updates from inside the running server. The factory field abstracts the variant-specific "create a new server instance" step. The activeInstance field carries the new instance between Update and Watchdog without threading it through the admin handler goroutine — the state machine already serializes those phases so no additional locking is needed.

func New

func New(
	factory ServerFactory,
	database *db.DB,
	cfg *config.Config,
	version string,
	tasks *task.Store,
	checker updateAPI,
	log *slog.Logger,
	drainFn, undrainFn, exitFn func(),
) *Orchestrator

New creates an Orchestrator wired to a ServerFactory. The factory encapsulates variant-specific server-creation logic; the core flow uses only the interface.

func NewForTest

func NewForTest() *Orchestrator

NewForTest creates a minimal Orchestrator for API tests that only need state management. The update checker returns "already up to date" so background goroutines spawned by handlers exit quickly without panics. The factory is a stub that reports SupportsRollback=true so the admin rollback handler's pre-dispatch check passes; the factory's CreateInstance is never reached because the test exits before the goroutine runs.

func NewForTestNoRollback

func NewForTestNoRollback() *Orchestrator

NewForTestNoRollback is NewForTest but with a factory that reports SupportsRollback=false — simulates the process backend at the orchestrator layer. The admin rollback handler uses this to cover the phase 3-8 "backend cannot rollback" 501 branch without plugging the real processServerFactory (which lives behind a build tag).

func (*Orchestrator) CASState

func (o *Orchestrator) CASState(old, new string) bool

CASState performs a compare-and-swap on the state. Returns true on success.

func (*Orchestrator) Exit

func (o *Orchestrator) Exit()

Exit signals the main goroutine to call Finish and exit.

func (*Orchestrator) Rollback

func (o *Orchestrator) Rollback(
	ctx context.Context,
	sender task.Sender,
	shutdownFn func(),
) error

Rollback restores the previous version using backup metadata.

  1. Read latest backup metadata
  2. Check for irreversible migrations
  3. Variant-specific prep (Docker: pull old image; process: returns 501 upstream)
  4. Run down migrations to the recorded version
  5. Create old instance (passive mode)
  6. Poll /readyz on old instance
  7. Drain current server
  8. Activate old instance

Steps 1–3 are side-effect-free. Step 4 (down-migration) is the point of no return: if any subsequent step fails, the running server's code no longer matches the database schema. Rather than serve broken requests, the server shuts itself down and logs the backup path for manual recovery.

Rollback requires a factory that SupportsRollback(). The admin handler returns 501 for factories that don't.

func (*Orchestrator) RunScheduled

func (o *Orchestrator) RunScheduled(
	ctx context.Context,
	schedule string,
	channel string,
)

RunScheduled checks for updates on the configured cron schedule. When an update is available, it triggers the full update + watchdog flow. Blocks until ctx is cancelled.

Uses o.exitFn to signal the main goroutine — RunScheduled is a bgWg goroutine and cannot call Finish directly (deadlock).

func (*Orchestrator) SetState

func (o *Orchestrator) SetState(s string)

SetState sets the orchestrator state directly.

func (*Orchestrator) State

func (o *Orchestrator) State() string

State returns the current orchestrator phase.

func (*Orchestrator) SupportsRollback

func (o *Orchestrator) SupportsRollback() bool

SupportsRollback reports whether the active factory can restart a previous version. The admin handler returns 501 for factories that cannot (process backend).

func (*Orchestrator) Update

func (o *Orchestrator) Update(
	ctx context.Context,
	channel string,
	sender task.Sender,
) (bool, error)

Update executes the rolling update. It reports progress to the provided sender and returns true on success (false when already up to date).

On success, the new instance is stashed on o.activeInstance so Watchdog can poll it without the caller threading any opaque handle through the API layer. The state machine serializes Update → Watchdog so the field is only ever read between those phases by one caller.

The caller (API handler or cron trigger) runs this in a goroutine. The context should be the server's background context, not a request context.

func (*Orchestrator) Watchdog

func (o *Orchestrator) Watchdog(
	ctx context.Context,
	watchPeriod time.Duration,
	sender task.Sender,
) error

Watchdog monitors the new server after a successful update. It reads the target instance from o.activeInstance (set by Update) so the admin handler doesn't thread an opaque handle through the API layer.

If the new server becomes unhealthy (3 consecutive failures) within the watch period, the watchdog kills the new instance, un-drains, and resumes serving.

If the new server stays healthy for the full period, the old server exits (returns nil, caller signals main goroutine).

type ServerFactory

type ServerFactory interface {
	// CreateInstance starts the new server instance and blocks until
	// its address is resolvable. On success, the returned instance's
	// Addr() is immediately usable for polling and activation — no
	// async resolution required by the caller. The ctx's deadline
	// (set by the orchestrator from cfg.Proxy.WorkerStartTimeout)
	// bounds address resolution; the remaining budget flows through
	// to waitReady for /readyz polling.
	//
	// ref is the image reference for the Docker variant; the process
	// variant ignores it (it always execs the current binary).
	//
	// extraEnv is a list of KEY=VALUE strings injected into the new
	// instance's environment (activation token, etc.).
	CreateInstance(ctx context.Context, ref string, extraEnv []string, sender task.Sender) (newServerInstance, error)

	// PreUpdate runs variant-specific preparation before the instance
	// is created. Docker pulls the new image; process is a no-op (the
	// binary is already on disk).
	PreUpdate(ctx context.Context, version string, sender task.Sender) error

	// CurrentImageBase returns the image repository (without tag) for
	// the running server. Docker reads it from container inspect;
	// process returns a stable placeholder (the process variant has
	// no equivalent concept).
	CurrentImageBase(ctx context.Context) string

	// CurrentImageTag returns the image tag for the running server.
	// Docker reads it from container inspect; process returns the
	// current version string.
	CurrentImageTag(ctx context.Context) string

	// SupportsRollback indicates whether this factory can restart a
	// previous version. Docker can (pull old image); process cannot
	// (previous binary typically overwritten by upgrade).
	SupportsRollback() bool
}

ServerFactory abstracts "create a new server instance" so the orchestrator's cutover/watchdog/scheduled core stays backend-agnostic. Two implementations exist: Docker container clone (clone_docker.go) and process fork+exec (clone_process.go).

func NewDockerFactory

func NewDockerFactory(c *client.Client, serverID, version string, listenPort func() string) ServerFactory

NewDockerFactory constructs the Docker variant of ServerFactory. listenPort is a closure over cfg.Server.Bind passed in from the wiring site (cmd/blockyard/orchestrator_docker.go).

func NewProcessFactory

func NewProcessFactory(cfg *config.Config, version string) ServerFactory

NewProcessFactory constructs the process variant of ServerFactory. version is the running server's version string, used as the backup metadata image tag via CurrentImageTag — mirrors the symmetric Docker NewDockerFactory argument so both factories record a meaningful tag even though only the Docker variant can consume it during automatic rollback.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL