Documentation
¶
Overview ¶
Package analyze provides read-only pipeline constraint analysis. It identifies the bottleneck stage from interval telemetry and recommends worker allocation. Does not actuate — shadow mode only.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Analyzer ¶
type Analyzer struct {
// contains filtered or unexported fields
}
Analyzer periodically evaluates pipeline stages and logs constraint identification + worker allocation recommendations. Read-only.
func NewAnalyzer ¶
NewAnalyzer creates an analyzer that evaluates every interval. Panics if interval <= 0.
func (*Analyzer) AddStage ¶
AddStage registers a stage for analysis. Must be called before Run. Panics if Name is empty, Stats is nil, or Run has started.
func (*Analyzer) CurrentSnapshot ¶
Snapshot returns the most recent analysis. Nil before the first interval.
func (*Analyzer) Run ¶
Run blocks, analyzing every interval until ctx is canceled. Panics if called twice.
type Option ¶
type Option func(*Analyzer)
Option configures an Analyzer.
func WithLogger ¶
WithLogger sets the logger for analyzer output.
type Rebalancer ¶ added in v0.79.0
type Rebalancer struct {
// contains filtered or unexported fields
}
Rebalancer consumes Analyzer snapshots and moves workers between stages. Moves at most one worker per interval. Reverts if throughput regresses.
func NewRebalancer ¶ added in v0.79.0
func NewRebalancer(analyzer *Analyzer, opts ...RebalancerOption) *Rebalancer
NewRebalancer creates a rebalancer that consumes analyzer snapshots.
func (*Rebalancer) AddStage ¶ added in v0.79.0
func (r *Rebalancer) AddStage(sc StageControl)
AddStage registers a stage for rebalancing. Must be called before Run.
func (*Rebalancer) Disable ¶ added in v0.79.0
func (r *Rebalancer) Disable()
Disable stops actuation. Workers stay where they are.
func (*Rebalancer) Enable ¶ added in v0.79.0
func (r *Rebalancer) Enable()
Enable resumes actuation.
func (*Rebalancer) Enabled ¶ added in v0.79.0
func (r *Rebalancer) Enabled() bool
Enabled returns whether the rebalancer is currently actuating.
func (*Rebalancer) FormatStatus ¶ added in v0.79.0
func (r *Rebalancer) FormatStatus() string
FormatStatus returns a human-readable status line.
type RebalancerOption ¶ added in v0.79.0
type RebalancerOption func(*Rebalancer)
RebalancerOption configures a Rebalancer.
func WithCooldown ¶ added in v0.79.0
func WithCooldown(n int) RebalancerOption
WithCooldown sets the number of intervals to wait after a move.
func WithKillSwitch ¶ added in v0.79.0
func WithKillSwitch(fn func() bool) RebalancerOption
WithKillSwitch sets a function that, when returning true, disables actuation.
func WithRebalancerLogger ¶ added in v0.79.0
func WithRebalancerLogger(l *log.Logger) RebalancerOption
WithRebalancerLogger sets the logger.
type Snapshot ¶
type Snapshot struct {
At time.Time
Constraint string // empty if none identified
Confidence float64 // 0.0-1.0
Stages []StageSnapshot // ordered by registration
// DrumStarvationCount tracks consecutive intervals the identified
// constraint was classified as [StateStarved] (high idle ratio,
// queue draining). A non-zero value is a Step 2 violation — the
// drum is being wasted. Reset when the drum stops being starved.
DrumStarvationCount int
}
Snapshot is the analyzer's output for one interval. Published via atomic.Pointer. Callers receive a deep copy — safe to read and retain without synchronization.
type StageAnalysis ¶
type StageAnalysis struct {
State StageState
Utilization float64
IdleRatio float64
BlockedRatio float64
QueueGrowth float64
ErrorRate float64
Goodput float64 // successful completions/sec
ArrivalRate float64 // submitted items/sec
CurrentWorkers int
Recommendation int // suggested workers; 0 = no recommendation
RecommendReason string // human-readable explanation
}
StageAnalysis holds the analysis of a single stage for one interval.
type StageControl ¶ added in v0.79.0
type StageControl struct {
Name string
SetWorkers func(int) (int, error)
Stats func() toc.Stats
Policy WorkerPolicy
}
StageControl provides actuation and observation for a stage.
type StageSnapshot ¶ added in v0.78.0
type StageSnapshot struct {
Name string
Analysis StageAnalysis
}
StageSnapshot pairs a stage name with its analysis.
type StageSpec ¶
type StageSpec struct {
Name string
Stats func() toc.Stats
MinWorkers int // default 1
MaxWorkers int // 0 = unlimited
Scalable bool // false = don't recommend changes
}
StageSpec describes a stage for analysis.
type StageState ¶
type StageState int
StageState classifies a stage's operational state from interval signals.
const ( StateUnknown StageState = iota // insufficient data StateHealthy // normal operation StateStarved // high idle, waiting for input StateBlocked // high output-blocked, downstream-limited StateSaturated // high busy, low idle/blocked — constraint candidate StateBroken // elevated errors )
func (StageState) String ¶
func (s StageState) String() string
type WorkerPolicy ¶ added in v0.79.0
type WorkerPolicy struct {
Min int // minimum workers (default 1)
Max int // maximum workers (0 = unlimited)
DonateOK bool // can workers be taken from this stage
ReceiveOK bool // can workers be added to this stage
}
WorkerPolicy constrains how the rebalancer treats a stage.