streaming

package
v0.0.0-...-05004e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: Apache-2.0, Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package streaming provides chunked execution with analyzer hibernation for memory-bounded analysis.

Index

Constants

View Source
const (

	// KiB is the exported kibibyte constant for use in log formatting.
	KiB = kib

	// MiB is the exported mebibyte constant for use in log formatting.
	MiB = mib
)

Size constants.

View Source
const (
	// MinChunkSize is the minimum commits per chunk to amortize hibernation cost.
	MinChunkSize = 50

	// MaxChunkSize is the safety cap on commits per chunk. The primary constraint
	// is the memory budget divided by per-analyzer growth rate, not this cap.
	MaxChunkSize = 3000

	// BaseOverhead is the fixed memory overhead for Go runtime + libgit2 + caches.
	BaseOverhead = 400 * mib

	// DefaultStateGrowthPerCommit is the conservative fallback when
	// AggregateGrowthPerCommit is zero (e.g. in tests or when no analyzers
	// are selected). Equal to DefaultWorkingStateSize + DefaultAvgTCSize.
	DefaultStateGrowthPerCommit = 500 * kib

	// DefaultWorkingStateSize is the fallback per-commit working state estimate.
	DefaultWorkingStateSize = 400 * kib

	// DefaultAvgTCSize is the fallback per-commit TC payload estimate.
	DefaultAvgTCSize = 100 * kib
)

Planner constraints.

View Source
const (
	// DefaultReplanThreshold triggers re-planning when observed growth diverges
	// from predicted by more than 25%.
	DefaultReplanThreshold = 0.25

	// DefaultEMAAlpha controls EMA smoothing. 0.3 gives ~3-chunk half-life.
	DefaultEMAAlpha = 0.3
)

Adaptive planner constants.

View Source
const (
	// PressureWarningRatio is the fraction of budget at which a warning is logged.
	PressureWarningRatio = 0.80

	// PressureCriticalRatio is the fraction of budget at which early hibernation
	// is triggered to prevent OOM before the next chunk starts.
	PressureCriticalRatio = 0.90
)

Memory pressure detection constants.

View Source
const (
	// UsablePercent is the fraction of total budget available after slack reserve.
	UsablePercent = 95

	// WorkStatePercent is the fraction of remaining budget for analyzer working state.
	WorkStatePercent = 60

	// AggStatePercent is the fraction of remaining budget for aggregator state.
	AggStatePercent = 30

	// ChunkMemPercent is the fraction of remaining budget for in-flight data.
	ChunkMemPercent = 10
)

Budget decomposition constants (SPEC section 3.2).

Variables

This section is empty.

Functions

func LogChunkMemory

func LogChunkMemory(ctx context.Context, logger *slog.Logger, entry ChunkMemoryLog)

LogChunkMemory emits a structured log entry with per-chunk memory telemetry.

Types

type AdaptivePlanner

type AdaptivePlanner struct {
	// contains filtered or unexported fields
}

AdaptivePlanner wraps the static Planner with feedback-driven re-planning. After each chunk, it examines three separate metrics (working state growth, TC payload size, aggregator state growth), updates smoothed EMA estimates, and re-plans remaining chunks if any metric diverges beyond a threshold.

func NewAdaptivePlanner

func NewAdaptivePlanner(totalCommits int, memBudget, declaredGrowth, pipelineOverhead int64) *AdaptivePlanner

NewAdaptivePlanner creates an adaptive planner seeded with the declared growth rate.

func (*AdaptivePlanner) DeclaredGrowth

func (ap *AdaptivePlanner) DeclaredGrowth() int64

DeclaredGrowth returns the initial declared growth rate in bytes/commit.

func (*AdaptivePlanner) InitialPlan

func (ap *AdaptivePlanner) InitialPlan() []ChunkBounds

InitialPlan returns the first set of chunk boundaries using the declared growth rate.

func (*AdaptivePlanner) Replan

func (ap *AdaptivePlanner) Replan(obs ReplanObservation) []ChunkBounds

Replan examines three per-chunk metric observations (working state growth, TC payload size, aggregator state growth) and, if any metric diverges from prediction by more than replanThreshold, re-computes chunk boundaries for all chunks after the observed chunk.

Processed chunks [0..obs.ChunkIndex] are never modified (checkpoint safety). The returned slice always covers exactly [0..totalCommits).

func (*AdaptivePlanner) Stats

func (ap *AdaptivePlanner) Stats() AdaptiveStats

Stats returns adaptive planner telemetry.

func (*AdaptivePlanner) TotalCommits

func (ap *AdaptivePlanner) TotalCommits() int

TotalCommits returns the total number of commits being planned.

type AdaptiveStats

type AdaptiveStats struct {
	ReplanCount       int
	FinalGrowthRate   float64
	InitialGrowthRate float64
	FinalWorkGrowth   float64
	FinalTCSize       float64
	FinalAggGrowth    float64
}

AdaptiveStats holds telemetry from the adaptive planner.

type ChunkBounds

type ChunkBounds struct {
	Start int // Inclusive index.
	End   int // Exclusive index.
}

ChunkBounds represents a chunk of commits to process.

type ChunkMemoryLog

type ChunkMemoryLog struct {
	ChunkIndex      int
	HeapBefore      int64
	HeapAfter       int64
	SysAfter        int64
	BudgetUsedPct   float64
	GrowthPerCommit int64
	EMAGrowthRate   float64
	Replanned       bool
}

ChunkMemoryLog holds memory measurements for a single chunk.

type HeapSnapshot

type HeapSnapshot struct {
	HeapInuse int64
	HeapAlloc int64
	Sys       int64 // Total bytes obtained from the OS (Go runtime).
	NumGC     uint32
	TakenAtNS int64
}

HeapSnapshot captures Go runtime memory stats at a point in time.

func TakeHeapSnapshot

func TakeHeapSnapshot() HeapSnapshot

TakeHeapSnapshot reads runtime.MemStats and returns a HeapSnapshot.

type Hibernatable

type Hibernatable interface {
	// Hibernate compresses the analyzer's state to reduce memory usage.
	// Called between chunks during streaming execution.
	Hibernate() error

	// Boot restores the analyzer from hibernated state.
	// Called before processing a new chunk after hibernation.
	Boot() error
}

Hibernatable is an optional interface for analyzers that support hibernation. Analyzers implementing this interface can have their state compressed between chunks to reduce memory usage during streaming execution.

type MemoryPressureLevel

type MemoryPressureLevel int

MemoryPressureLevel indicates how close heap usage is to the budget.

const (
	// PressureNone indicates heap usage is well within budget.
	PressureNone MemoryPressureLevel = iota

	// PressureWarning indicates heap usage exceeds 80% of budget.
	PressureWarning

	// PressureCritical indicates heap usage exceeds 90% of budget.
	PressureCritical
)

func CheckMemoryPressure

func CheckMemoryPressure(heapInuse, memBudget int64) MemoryPressureLevel

CheckMemoryPressure compares the current heap usage against the memory budget and returns the pressure level. Returns PressureNone when budget is zero (unlimited).

type Planner

type Planner struct {
	TotalCommits int
	MemoryBudget int64

	// AggregateGrowthPerCommit is the summed per-commit state growth across all
	// selected leaf analyzers. When zero, DefaultStateGrowthPerCommit is used.
	AggregateGrowthPerCommit int64

	// PipelineOverhead is the estimated memory consumed by caches, workers, and
	// buffers (everything except analyzer state). When positive, it replaces
	// BaseOverhead for more accurate chunk sizing. The budget solver provides
	// this value via EstimateMemoryUsage.
	PipelineOverhead int64
}

Planner calculates chunk boundaries for streaming execution.

func (*Planner) Plan

func (p *Planner) Plan() []ChunkBounds

Plan returns chunk boundaries as [start, end) index pairs.

func (*Planner) PlanFrom

func (p *Planner) PlanFrom(startCommit int) []ChunkBounds

PlanFrom returns chunk boundaries for commits [startCommit..TotalCommits). Used by the adaptive planner to re-plan remaining chunks after observing actual growth rates.

type ReplanObservation

type ReplanObservation struct {
	// ChunkIndex is the zero-based index of the chunk just processed.
	ChunkIndex int

	// Chunk is the bounds of the chunk just processed.
	Chunk ChunkBounds

	// WorkGrowthPerCommit is the observed per-commit working state growth in bytes.
	// Computed as (HeapInuse delta - aggregator state delta) / commits.
	WorkGrowthPerCommit int64

	// TCPayloadPerCommit is the observed per-commit TC payload size in bytes.
	TCPayloadPerCommit int64

	// AggGrowthPerCommit is the observed per-commit aggregator state growth in bytes.
	AggGrowthPerCommit int64

	// CurrentChunks is the current chunk plan (including already-processed chunks).
	CurrentChunks []ChunkBounds
}

ReplanObservation carries per-chunk metric observations for adaptive replanning.

type Schedule

type Schedule struct {
	// Chunks are the planned chunk boundaries.
	Chunks []ChunkBounds

	// ChunkSize is the number of commits per chunk (last chunk may be smaller).
	ChunkSize int

	// BufferingFactor is the pipelining factor (1=single, 2=double, 3=triple).
	BufferingFactor int

	// AggSpillBudget is the maximum bytes of aggregator state before spilling.
	// Zero means no limit (unlimited budget or budget too small).
	AggSpillBudget int64
}

Schedule holds the computed scheduling parameters.

func ComputeSchedule

func ComputeSchedule(cfg SchedulerConfig) Schedule

ComputeSchedule decomposes the memory budget into P + W + A + S regions and computes chunk boundaries, buffering factor, and aggregator spill budget. The buffering factor is the highest value in [1, MaxBuffering] for which ChunkSize >= MinChunkSize. Only the workState region is divided among buffering slots; AggSpillBudget is unaffected.

type SchedulerConfig

type SchedulerConfig struct {
	// TotalCommits is the number of commits to process.
	TotalCommits int

	// MemoryBudget is the user-specified memory budget in bytes. Zero means unlimited.
	MemoryBudget int64

	// PipelineOverhead is the estimated fixed overhead for the pipeline
	// (workers, caches, buffers). When zero, BaseOverhead is used.
	PipelineOverhead int64

	// WorkStatePerCommit is the per-commit working state growth in bytes.
	// When zero, DefaultWorkingStateSize is used.
	WorkStatePerCommit int64

	// AvgTCSize is the average TC payload size per commit in bytes.
	// Currently informational; used for future chunkMem sizing.
	AvgTCSize int64

	// MaxBuffering is the maximum buffering factor (1=single, 2=double, 3=triple).
	// The scheduler iterates from MaxBuffering down to 1, selecting the highest
	// factor where ChunkSize >= MinChunkSize. When zero or negative, treated as 1.
	MaxBuffering int
}

SchedulerConfig holds inputs for the unified budget-aware scheduler.

type SpillCleaner

type SpillCleaner interface {
	CleanupSpills()
}

SpillCleaner is an optional interface for analyzers that create spill files on disk during hibernation. CleanupSpills removes all temp directories and files. It is called by SpillCleanupGuard on normal exit, error exit, and SIGTERM/SIGINT to prevent orphaned temp files.

type SpillCleanupGuard

type SpillCleanupGuard struct {
	// contains filtered or unexported fields
}

SpillCleanupGuard ensures that spill temp directories are removed when the streaming pipeline exits, whether normally, on error, or via signal. Create one via NewSpillCleanupGuard and defer its Close method.

func NewSpillCleanupGuard

func NewSpillCleanupGuard(cleaners []SpillCleaner, logger *slog.Logger) *SpillCleanupGuard

NewSpillCleanupGuard registers SIGTERM and SIGINT handlers that invoke CleanupSpills on all registered analyzers. The caller must defer Close() to ensure cleanup runs on normal/error exit and the signal handler is deregistered.

func (*SpillCleanupGuard) Close

func (g *SpillCleanupGuard) Close()

Close performs spill cleanup (if not already done) and deregisters the signal handler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL