Documentation
¶
Overview ¶
Package streaming provides chunked execution with analyzer hibernation for memory-bounded analysis.
Index ¶
- Constants
- func LogChunkMemory(ctx context.Context, logger *slog.Logger, entry ChunkMemoryLog)
- type AdaptivePlanner
- type AdaptiveStats
- type ChunkBounds
- type ChunkMemoryLog
- type HeapSnapshot
- type Hibernatable
- type MemoryPressureLevel
- type Planner
- type ReplanObservation
- type Schedule
- type SchedulerConfig
- type SpillCleaner
- type SpillCleanupGuard
Constants ¶
const ( // KiB is the exported kibibyte constant for use in log formatting. KiB = kib // MiB is the exported mebibyte constant for use in log formatting. MiB = mib )
Size constants.
const ( // MinChunkSize is the minimum commits per chunk to amortize hibernation cost. MinChunkSize = 50 // MaxChunkSize is the safety cap on commits per chunk. The primary constraint // is the memory budget divided by per-analyzer growth rate, not this cap. MaxChunkSize = 3000 // BaseOverhead is the fixed memory overhead for Go runtime + libgit2 + caches. BaseOverhead = 400 * mib // DefaultStateGrowthPerCommit is the conservative fallback when // AggregateGrowthPerCommit is zero (e.g. in tests or when no analyzers // are selected). Equal to DefaultWorkingStateSize + DefaultAvgTCSize. DefaultStateGrowthPerCommit = 500 * kib // DefaultWorkingStateSize is the fallback per-commit working state estimate. DefaultWorkingStateSize = 400 * kib // DefaultAvgTCSize is the fallback per-commit TC payload estimate. DefaultAvgTCSize = 100 * kib )
Planner constraints.
const ( // DefaultReplanThreshold triggers re-planning when observed growth diverges // from predicted by more than 25%. DefaultReplanThreshold = 0.25 // DefaultEMAAlpha controls EMA smoothing. 0.3 gives ~3-chunk half-life. DefaultEMAAlpha = 0.3 )
Adaptive planner constants.
const ( // PressureWarningRatio is the fraction of budget at which a warning is logged. PressureWarningRatio = 0.80 // PressureCriticalRatio is the fraction of budget at which early hibernation // is triggered to prevent OOM before the next chunk starts. PressureCriticalRatio = 0.90 )
Memory pressure detection constants.
const ( // UsablePercent is the fraction of total budget available after slack reserve. UsablePercent = 95 // WorkStatePercent is the fraction of remaining budget for analyzer working state. WorkStatePercent = 60 // AggStatePercent is the fraction of remaining budget for aggregator state. AggStatePercent = 30 // ChunkMemPercent is the fraction of remaining budget for in-flight data. ChunkMemPercent = 10 )
Budget decomposition constants (SPEC section 3.2).
Variables ¶
This section is empty.
Functions ¶
func LogChunkMemory ¶
func LogChunkMemory(ctx context.Context, logger *slog.Logger, entry ChunkMemoryLog)
LogChunkMemory emits a structured log entry with per-chunk memory telemetry.
Types ¶
type AdaptivePlanner ¶
type AdaptivePlanner struct {
// contains filtered or unexported fields
}
AdaptivePlanner wraps the static Planner with feedback-driven re-planning. After each chunk, it examines three separate metrics (working state growth, TC payload size, aggregator state growth), updates smoothed EMA estimates, and re-plans remaining chunks if any metric diverges beyond a threshold.
func NewAdaptivePlanner ¶
func NewAdaptivePlanner(totalCommits int, memBudget, declaredGrowth, pipelineOverhead int64) *AdaptivePlanner
NewAdaptivePlanner creates an adaptive planner seeded with the declared growth rate.
func (*AdaptivePlanner) DeclaredGrowth ¶
func (ap *AdaptivePlanner) DeclaredGrowth() int64
DeclaredGrowth returns the initial declared growth rate in bytes/commit.
func (*AdaptivePlanner) InitialPlan ¶
func (ap *AdaptivePlanner) InitialPlan() []ChunkBounds
InitialPlan returns the first set of chunk boundaries using the declared growth rate.
func (*AdaptivePlanner) Replan ¶
func (ap *AdaptivePlanner) Replan(obs ReplanObservation) []ChunkBounds
Replan examines three per-chunk metric observations (working state growth, TC payload size, aggregator state growth) and, if any metric diverges from prediction by more than replanThreshold, re-computes chunk boundaries for all chunks after the observed chunk.
Processed chunks [0..obs.ChunkIndex] are never modified (checkpoint safety). The returned slice always covers exactly [0..totalCommits).
func (*AdaptivePlanner) Stats ¶
func (ap *AdaptivePlanner) Stats() AdaptiveStats
Stats returns adaptive planner telemetry.
func (*AdaptivePlanner) TotalCommits ¶
func (ap *AdaptivePlanner) TotalCommits() int
TotalCommits returns the total number of commits being planned.
type AdaptiveStats ¶
type AdaptiveStats struct {
ReplanCount int
FinalGrowthRate float64
InitialGrowthRate float64
FinalWorkGrowth float64
FinalTCSize float64
FinalAggGrowth float64
}
AdaptiveStats holds telemetry from the adaptive planner.
type ChunkBounds ¶
ChunkBounds represents a chunk of commits to process.
type ChunkMemoryLog ¶
type ChunkMemoryLog struct {
ChunkIndex int
HeapBefore int64
HeapAfter int64
SysAfter int64
BudgetUsedPct float64
GrowthPerCommit int64
EMAGrowthRate float64
Replanned bool
}
ChunkMemoryLog holds memory measurements for a single chunk.
type HeapSnapshot ¶
type HeapSnapshot struct {
HeapInuse int64
HeapAlloc int64
Sys int64 // Total bytes obtained from the OS (Go runtime).
NumGC uint32
TakenAtNS int64
}
HeapSnapshot captures Go runtime memory stats at a point in time.
func TakeHeapSnapshot ¶
func TakeHeapSnapshot() HeapSnapshot
TakeHeapSnapshot reads runtime.MemStats and returns a HeapSnapshot.
type Hibernatable ¶
type Hibernatable interface {
// Hibernate compresses the analyzer's state to reduce memory usage.
// Called between chunks during streaming execution.
Hibernate() error
// Boot restores the analyzer from hibernated state.
// Called before processing a new chunk after hibernation.
Boot() error
}
Hibernatable is an optional interface for analyzers that support hibernation. Analyzers implementing this interface can have their state compressed between chunks to reduce memory usage during streaming execution.
type MemoryPressureLevel ¶
type MemoryPressureLevel int
MemoryPressureLevel indicates how close heap usage is to the budget.
const ( // PressureNone indicates heap usage is well within budget. PressureNone MemoryPressureLevel = iota // PressureWarning indicates heap usage exceeds 80% of budget. PressureWarning // PressureCritical indicates heap usage exceeds 90% of budget. PressureCritical )
func CheckMemoryPressure ¶
func CheckMemoryPressure(heapInuse, memBudget int64) MemoryPressureLevel
CheckMemoryPressure compares the current heap usage against the memory budget and returns the pressure level. Returns PressureNone when budget is zero (unlimited).
type Planner ¶
type Planner struct {
TotalCommits int
MemoryBudget int64
// AggregateGrowthPerCommit is the summed per-commit state growth across all
// selected leaf analyzers. When zero, DefaultStateGrowthPerCommit is used.
AggregateGrowthPerCommit int64
// PipelineOverhead is the estimated memory consumed by caches, workers, and
// buffers (everything except analyzer state). When positive, it replaces
// BaseOverhead for more accurate chunk sizing. The budget solver provides
// this value via EstimateMemoryUsage.
PipelineOverhead int64
}
Planner calculates chunk boundaries for streaming execution.
func (*Planner) Plan ¶
func (p *Planner) Plan() []ChunkBounds
Plan returns chunk boundaries as [start, end) index pairs.
func (*Planner) PlanFrom ¶
func (p *Planner) PlanFrom(startCommit int) []ChunkBounds
PlanFrom returns chunk boundaries for commits [startCommit..TotalCommits). Used by the adaptive planner to re-plan remaining chunks after observing actual growth rates.
type ReplanObservation ¶
type ReplanObservation struct {
// ChunkIndex is the zero-based index of the chunk just processed.
ChunkIndex int
// Chunk is the bounds of the chunk just processed.
Chunk ChunkBounds
// WorkGrowthPerCommit is the observed per-commit working state growth in bytes.
// Computed as (HeapInuse delta - aggregator state delta) / commits.
WorkGrowthPerCommit int64
// TCPayloadPerCommit is the observed per-commit TC payload size in bytes.
TCPayloadPerCommit int64
// AggGrowthPerCommit is the observed per-commit aggregator state growth in bytes.
AggGrowthPerCommit int64
// CurrentChunks is the current chunk plan (including already-processed chunks).
CurrentChunks []ChunkBounds
}
ReplanObservation carries per-chunk metric observations for adaptive replanning.
type Schedule ¶
type Schedule struct {
// Chunks are the planned chunk boundaries.
Chunks []ChunkBounds
// ChunkSize is the number of commits per chunk (last chunk may be smaller).
ChunkSize int
// BufferingFactor is the pipelining factor (1=single, 2=double, 3=triple).
BufferingFactor int
// AggSpillBudget is the maximum bytes of aggregator state before spilling.
// Zero means no limit (unlimited budget or budget too small).
AggSpillBudget int64
}
Schedule holds the computed scheduling parameters.
func ComputeSchedule ¶
func ComputeSchedule(cfg SchedulerConfig) Schedule
ComputeSchedule decomposes the memory budget into P + W + A + S regions and computes chunk boundaries, buffering factor, and aggregator spill budget. The buffering factor is the highest value in [1, MaxBuffering] for which ChunkSize >= MinChunkSize. Only the workState region is divided among buffering slots; AggSpillBudget is unaffected.
type SchedulerConfig ¶
type SchedulerConfig struct {
// TotalCommits is the number of commits to process.
TotalCommits int
// MemoryBudget is the user-specified memory budget in bytes. Zero means unlimited.
MemoryBudget int64
// PipelineOverhead is the estimated fixed overhead for the pipeline
// (workers, caches, buffers). When zero, BaseOverhead is used.
PipelineOverhead int64
// WorkStatePerCommit is the per-commit working state growth in bytes.
// When zero, DefaultWorkingStateSize is used.
WorkStatePerCommit int64
// AvgTCSize is the average TC payload size per commit in bytes.
// Currently informational; used for future chunkMem sizing.
AvgTCSize int64
// MaxBuffering is the maximum buffering factor (1=single, 2=double, 3=triple).
// The scheduler iterates from MaxBuffering down to 1, selecting the highest
// factor where ChunkSize >= MinChunkSize. When zero or negative, treated as 1.
MaxBuffering int
}
SchedulerConfig holds inputs for the unified budget-aware scheduler.
type SpillCleaner ¶
type SpillCleaner interface {
CleanupSpills()
}
SpillCleaner is an optional interface for analyzers that create spill files on disk during hibernation. CleanupSpills removes all temp directories and files. It is called by SpillCleanupGuard on normal exit, error exit, and SIGTERM/SIGINT to prevent orphaned temp files.
type SpillCleanupGuard ¶
type SpillCleanupGuard struct {
// contains filtered or unexported fields
}
SpillCleanupGuard ensures that spill temp directories are removed when the streaming pipeline exits, whether normally, on error, or via signal. Create one via NewSpillCleanupGuard and defer its Close method.
func NewSpillCleanupGuard ¶
func NewSpillCleanupGuard(cleaners []SpillCleaner, logger *slog.Logger) *SpillCleanupGuard
NewSpillCleanupGuard registers SIGTERM and SIGINT handlers that invoke CleanupSpills on all registered analyzers. The caller must defer Close() to ensure cleanup runs on normal/error exit and the signal handler is deregistered.
func (*SpillCleanupGuard) Close ¶
func (g *SpillCleanupGuard) Close()
Close performs spill cleanup (if not already done) and deregisters the signal handler.