schedule

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: AGPL-3.0 Imports: 5 Imported by: 0

Documentation

Overview

Package schedule provides flate's dependency-driven reconcile scheduler: a re-entrant fixpoint engine that runs each node's reconcile body on a bounded task pool, parks a body that reports unsatisfied dependencies (Dispatcher OutcomeBlocked), and re-runs it when any of those dependencies advances.

Termination is STRUCTURAL, not timed. Every render emission is a synchronous store write on the body's own task goroutine, completing before the body returns and before the scheduler decrements its in-flight count. Therefore when no body is in flight and the runnable frontier is empty, no new object can ever appear — so any still-parked node's dependencies are provably unproducible. A draining sweep then terminalizes those nodes with the canonical "dependency not found" / cascade / "not ready" statuses. There is no per-dependency timeout and no shared quiescence counter, so the #666 transient-drain false-drop cannot occur: a parked node is never counted in flight, and nothing drops it except the fixpoint, which only fires when nothing is running.

The package depends only on pkg/manifest and pkg/task; all store and controller interaction is behind the Dispatcher seam, so the scheduler is unit-testable against a fake Dispatcher with no store or controllers.

Index

Constants

View Source
const (
	// DrainNone: normal operation — an unsatisfiable dependency parks.
	DrainNone = 0
	// DrainCascade: an absent dependency (and a never-true ReadyExpr)
	// terminalizes as a failure; a present-but-Pending dependency still
	// parks, so a dangling chain fails leaf-first and each level cascades
	// the child's real terminal message upward.
	DrainCascade = 1
	// DrainForce: a present-but-Pending dependency ALSO terminalizes ("not
	// ready"). Reached only when a DrainCascade pass made no progress — i.e.
	// a cross-kind structural cycle the same-kind preflight detector cannot
	// represent; forcing the failure breaks it.
	DrainForce = 2
)

Drain levels passed to Dispatcher.Dispatch. 0 is normal operation; the scheduler escalates only at the structural fixpoint (nothing in flight, nodes still parked).

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher interface {
	// Dispatch invokes id's reconcile body synchronously on the calling
	// goroutine (a task.Service worker) and reports back:
	//   - out: OutcomeTerminal or OutcomeBlocked.
	//   - blocked: unsatisfied dependency ids (non-nil only when Blocked).
	//   - ready: whether id's final store status is Ready (meaningful only
	//     when Terminal) — the scheduler records it so a node parking on id
	//     can tell, without reading the store, whether id is satisfied.
	// drainLevel is one of DrainNone/DrainCascade/DrainForce.
	Dispatch(ctx context.Context, id NodeID, drainLevel int) (out Outcome, blocked []NodeID, ready bool)
}

Dispatcher runs a node's reconcile body. The orchestrator supplies the concrete implementation, closing over the store and the three controllers; the scheduler never sees a store or controller type.

type NodeID

type NodeID = manifest.NamedResource

NodeID identifies a schedulable node — a Kustomization, HelmRelease, or source CR — by its store identity.

type Outcome

type Outcome int

Outcome is what one reconcile-body invocation reported via the Dispatcher.

const (
	// OutcomeTerminal means the body wrote a terminal store status
	// (Ready/Skipped/Failed); the node is done unless a later store event
	// re-emits or resets it.
	OutcomeTerminal Outcome = iota
	// OutcomeBlocked means the body could not proceed because one or more
	// dependencies are unsatisfied; the scheduler parks the node keyed on the
	// returned ids and re-runs it when any advances.
	OutcomeBlocked
)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is a re-entrant fixpoint reconcile driver. Construct with New, Seed the initial node set, wire store events to OnArrival/OnStatusWake/ OnDelete, then call Run.

func New

func New(tasks *task.Service, disp Dispatcher) *Scheduler

New constructs a Scheduler that runs bodies on tasks via disp.

func (*Scheduler) OnArrival

func (s *Scheduler) OnArrival(id NodeID, schedulable bool)

OnArrival is called from the store's EventObjectAdded subscription (which fires only when an object's content actually changed — including a Refire's status reset). schedulable reports whether id is a node the scheduler runs (a Kustomization/HelmRelease/source) versus pure data (a ConfigMap/Secret): a data arrival must WAKE nodes parked on it (a KS waiting on a substituteFrom CM) but is never registered as a runnable node. A content-changed arrival of a terminal node re-dispatches it (the re-run re-reads and is idempotent via fingerprint dedup), which is what restores a Refired producer/source.

func (*Scheduler) OnDelete

func (s *Scheduler) OnDelete(id NodeID)

OnDelete is called when a render-discovered node is removed from the store mid-run. It terminalizes the node in the scheduler and wakes its waiters, which re-Require and route through the absent-dep path.

func (*Scheduler) OnStatusWake

func (s *Scheduler) OnStatusWake(id NodeID, ready, failed bool)

OnStatusWake is called from the store's EventStatusUpdated subscription. It acts only on a TERMINAL store status (Ready or Failed): a parked node gates on Ready/Failed, so intermediate Pending progress writes never change its gate answer and waking on them is pure churn that also widens race windows.

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context)

Run drives the scheduler to a fixpoint, returning when every node is terminal (or ctx is canceled) after the in-flight bodies drain.

func (*Scheduler) Seed

func (s *Scheduler) Seed(ids []NodeID)

Seed registers the initial node set (file-loaded Kustomizations, HelmReleases, and source CRs from Bootstrap) as runnable, in deterministic id order. Duplicates and already-known ids are ignored.

func (*Scheduler) SetRerunAtDrain

func (s *Scheduler) SetRerunAtDrain(fn func(NodeID) bool)

SetRerunAtDrain installs the predicate that decides whether a node re-runs at the structural fixpoint (a selector-only ResourceSet, which has no nameable input provider to park on). It is evaluated in the dispatch goroutine after each Dispatch, so it may read the store. Optional — nil means no node reruns.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL