Documentation
¶
Overview ¶
Package framework provides orchestration for running analysis pipelines.
Index ¶
- Constants
- Variables
- func CanResumeWithCheckpoint(totalAnalyzers, checkpointableCount int) bool
- func DefaultMemoryBudget() int64
- func MaybeStartCPUProfile(path string) (func(), error)
- func MaybeWriteHeapProfile(path string, logger *slog.Logger)
- func ParseOptionalSize(sizeValue string) (int64, error)
- func RunStreaming(ctx context.Context, runner *Runner, commits []*gitlib.Commit, ...) (map[analyze.HistoryAnalyzer]analyze.Report, error)
- func RunStreamingFromIterator(ctx context.Context, runner *Runner, iter *gitlib.CommitIter, commitCount int, ...) (map[analyze.HistoryAnalyzer]analyze.Report, error)
- type BlobData
- type BlobPipeline
- type BudgetSolver
- type CheckpointParams
- type CommitBatch
- type CommitData
- type CommitStreamer
- type ConfigParams
- type Coordinator
- type CoordinatorConfig
- type DiffCache
- type DiffCacheStats
- type DiffKey
- type DiffPipeline
- type FlushableAnalyzer
- type PipelineSampler
- type PipelineStats
- type Runner
- func (runner *Runner) AggregatorSpills() []checkpoint.AggregatorSpillEntry
- func (runner *Runner) AggregatorStateSize() int64
- func (runner *Runner) DiscardAggregatorState()
- func (runner *Runner) DiscardLeafAnalyzerState()
- func (runner *Runner) DrainCommitMeta() map[string]analyze.CommitMeta
- func (runner *Runner) FinalizeToStore(ctx context.Context, store analyze.ReportStore) error
- func (runner *Runner) FinalizeWithAggregators(ctx context.Context) (map[analyze.HistoryAnalyzer]analyze.Report, error)
- func (runner *Runner) Initialize() error
- func (runner *Runner) InitializeForResume(aggSpills []checkpoint.AggregatorSpillEntry) error
- func (runner *Runner) LeafAggregators() []analyze.Aggregator
- func (runner *Runner) LeafAnalyzers() []analyze.HistoryAnalyzer
- func (runner *Runner) ProcessChunk(ctx context.Context, commits []*gitlib.Commit, indexOffset, chunkIndex int) (PipelineStats, error)
- func (runner *Runner) ProcessChunkFromData(ctx context.Context, data []CommitData, indexOffset, chunkIndex int) (PipelineStats, error)
- func (runner *Runner) ResetTCCount()
- func (runner *Runner) Run(ctx context.Context, commits []*gitlib.Commit) (map[analyze.HistoryAnalyzer]analyze.Report, error)
- func (runner *Runner) SpillAggregators() error
- func (runner *Runner) TCCountAccumulated() int64
- type SamplerConfig
- type StageMetrics
- type StageMetricsSnapshot
- type StreamingConfig
- type UASTPipeline
Constants ¶
const ( FileModeCommit = 0o160000 FileModeTree = 0o040000 FileModeBlob = 0o100644 FileModeExec = 0o100755 FileModeLink = 0o120000 )
File mode constants for git tree entries.
const DefaultBlobBatchArenaSize = 4 * 1024 * 1024
DefaultBlobBatchArenaSize is the default size of the memory arena for blob loading (4MB).
const DefaultDiffCacheSize = 10000
DefaultDiffCacheSize is the default maximum number of diff entries to cache.
Variables ¶
var ( ErrInvalidSizeFormat = errors.New("invalid size format") ErrInvalidGCPercent = errors.New("invalid GC percent") )
Sentinel errors for configuration.
var ErrCacheMiss = errors.New("cache miss")
ErrCacheMiss is returned by a cache-backed Fetcher when the key is not found.
var ErrCommitTooLarge = errors.New("commit exceeds max changes cap")
ErrCommitTooLarge is set on BlobData.Error for commits exceeding maxChangesPerCommit. The runner uses errors.Is to distinguish this from fatal pipeline errors and skips the commit instead of aborting.
var ErrNotParallelizable = errors.New("leaf does not implement Parallelizable")
ErrNotParallelizable is returned when a leaf analyzer does not implement analyze.Parallelizable.
var ErrNotStoreWriter = errors.New("analyzer does not implement StoreWriter")
ErrNotStoreWriter is returned when an analyzer does not implement analyze.StoreWriter.
Functions ¶
func CanResumeWithCheckpoint ¶
CanResumeWithCheckpoint returns true if all analyzers support checkpointing.
func DefaultMemoryBudget ¶
func DefaultMemoryBudget() int64
DefaultMemoryBudget returns a sensible memory budget based on available system memory. Returns min(50% of total RAM, 4 GiB), or 0 if detection fails.
func MaybeStartCPUProfile ¶
MaybeStartCPUProfile starts CPU profiling to the given file. Returns a stop function that must be deferred. Returns a no-op if path is empty.
func MaybeWriteHeapProfile ¶
MaybeWriteHeapProfile writes a heap profile to the given file. No-op if path is empty. Uses the provided logger for error reporting.
func ParseOptionalSize ¶
ParseOptionalSize parses a human-readable size string, returning 0 for empty or "0".
func RunStreaming ¶
func RunStreaming( ctx context.Context, runner *Runner, commits []*gitlib.Commit, analyzers []analyze.HistoryAnalyzer, config StreamingConfig, ) (map[analyze.HistoryAnalyzer]analyze.Report, error)
RunStreaming executes the pipeline in streaming chunks with optional checkpoint support. The scheduler determines the buffering factor (single/double/triple) based on the memory budget. Higher buffering factors overlap more pipeline phases with analyzer consumption but require smaller chunks.
When the commit set is trivially small (single commit) and no streaming features are needed, delegates to Runner.Run for simplicity and lower overhead.
func RunStreamingFromIterator ¶
func RunStreamingFromIterator( ctx context.Context, runner *Runner, iter *gitlib.CommitIter, commitCount int, analyzers []analyze.HistoryAnalyzer, config StreamingConfig, ) (map[analyze.HistoryAnalyzer]analyze.Report, error)
RunStreamingFromIterator executes the pipeline using a commit iterator instead of a pre-loaded commit slice. Commits are loaded chunk-at-a-time from the iterator and freed after processing, keeping memory usage proportional to chunk size rather than total repository size. Double-buffering is not supported in iterator mode; use RunStreaming with a pre-loaded slice for that.
Types ¶
type BlobData ¶
type BlobData struct {
Commit *gitlib.Commit
Index int
Changes gitlib.Changes
BlobCache map[gitlib.Hash]*gitlib.CachedBlob
Error error
}
BlobData holds loaded blob data for a commit.
type BlobPipeline ¶
type BlobPipeline struct {
SeqWorkerChan chan<- gitlib.WorkerRequest
PoolWorkerChan chan<- gitlib.WorkerRequest
BufferSize int
WorkerCount int
BlobCache *cache.LRUBlobCache
ArenaSize int
// Metrics provides per-stage counters for memory triage. Nil-safe.
Metrics *StageMetrics
// contains filtered or unexported fields
}
BlobPipeline processes commit batches to load blobs.
func NewBlobPipelineWithCache ¶
func NewBlobPipelineWithCache( seqChan chan<- gitlib.WorkerRequest, poolChan chan<- gitlib.WorkerRequest, bufferSize int, workerCount int, blobCache *cache.LRUBlobCache, ) *BlobPipeline
NewBlobPipelineWithCache creates a new blob pipeline with an optional global blob cache.
func (*BlobPipeline) Process ¶
func (p *BlobPipeline) Process(ctx context.Context, commits <-chan CommitBatch) <-chan BlobData
Process receives commit batches and outputs blob data.
type BudgetSolver ¶
type BudgetSolver func(budgetBytes int64) (CoordinatorConfig, error)
BudgetSolver resolves a memory budget (in bytes) to a CoordinatorConfig.
type CheckpointParams ¶
CheckpointParams holds checkpoint-related configuration.
type CommitBatch ¶
type CommitBatch struct {
// Commits in this batch.
Commits []*gitlib.Commit
// StartIndex is the index of the first commit in the full sequence.
StartIndex int
// BatchID identifies this batch for ordering.
BatchID int
}
CommitBatch represents a batch of commits for processing.
type CommitData ¶
type CommitData struct {
Commit *gitlib.Commit
Index int
Changes gitlib.Changes
BlobCache map[gitlib.Hash]*gitlib.CachedBlob
FileDiffs map[string]plumbing.FileDiffData
UASTChanges []uast.Change // Pre-computed UAST changes (nil if not computed).
UASTSpillPath string // Path to spilled UAST gob file (empty if in-memory).
Error error
}
CommitData holds all processed data for a commit.
type CommitStreamer ¶
type CommitStreamer struct {
// BatchSize is the number of commits per batch.
BatchSize int
// Lookahead is the number of batches to prefetch.
Lookahead int
}
CommitStreamer iterates commits and groups them into batches for efficient processing.
func (*CommitStreamer) Stream ¶
func (s *CommitStreamer) Stream(ctx context.Context, commits []*gitlib.Commit) <-chan CommitBatch
Stream takes a slice of commits and streams them as batches. The output channel is closed when all commits have been sent.
type ConfigParams ¶
type ConfigParams struct {
Workers int
BufferSize int
CommitBatchSize int
BlobCacheSize string
DiffCacheSize int
BlobArenaSize string
MemoryBudget string
GCPercent int
BallastSize string
}
ConfigParams holds raw CLI parameter values for building a CoordinatorConfig. All size strings use humanize format (e.g. "256MB", "1GiB").
type Coordinator ¶
type Coordinator struct {
// StageMetrics provides per-stage high-watermark counters for memory triage.
// When non-nil, pipeline stages update counters atomically and the sampler reads them.
StageMetrics *StageMetrics
// contains filtered or unexported fields
}
Coordinator orchestrates the full data processing pipeline.
func NewCoordinator ¶
func NewCoordinator(repo *gitlib.Repository, config CoordinatorConfig) *Coordinator
NewCoordinator creates a new coordinator for the repository.
func (*Coordinator) Process ¶
func (c *Coordinator) Process(ctx context.Context, commits []*gitlib.Commit) <-chan CommitData
Process runs the full pipeline on a slice of commits. After the returned channel is fully drained, call Stats() to retrieve pipeline timing and cache metrics.
func (*Coordinator) Stats ¶
func (c *Coordinator) Stats() PipelineStats
Stats returns the pipeline stats collected during Process(). Only valid after the channel returned by Process() is fully drained.
type CoordinatorConfig ¶
type CoordinatorConfig struct {
// BatchConfig configures batch sizes for blob and diff operations.
BatchConfig gitlib.BatchConfig
// CommitBatchSize is the number of commits to process in each batch.
CommitBatchSize int
// Workers is the number of parallel workers for processing.
Workers int
// BufferSize is the size of internal channels.
BufferSize int
// BlobCacheSize is the maximum size of the global blob cache in bytes.
// Set to 0 to disable caching.
BlobCacheSize int64
// DiffCacheSize is the maximum number of diff results to cache.
// Set to 0 to disable caching.
DiffCacheSize int
// BlobArenaSize is the size of the memory arena for blob loading.
// Defaults to 16MB if 0.
BlobArenaSize int
// UASTPipelineWorkers is the number of goroutines for parallel UAST parsing
// in the pipeline stage. Set to 0 to disable the UAST pipeline stage.
UASTPipelineWorkers int
// LeafWorkers is the number of goroutines for parallel leaf analyzer consumption.
// Each worker processes a disjoint subset of commits via Fork/Merge.
// Set to 0 to disable parallel leaf consumption (serial path).
LeafWorkers int
// GCPercent controls Go's GC aggressiveness.
// Set to 0 to use auto mode (200 when system memory > 32 GiB).
GCPercent int
// BallastSize reserves bytes in a long-lived slice to smooth GC behavior.
// Set to 0 to disable ballast allocation.
BallastSize int64
// FirstParent indicates whether the history walk is restricted to the first parent.
FirstParent bool
// WorkerTimeout is the maximum time to wait for a worker response before
// considering it stalled. Set to 0 to disable the watchdog.
WorkerTimeout time.Duration
}
CoordinatorConfig configures the pipeline coordinator.
func BuildConfigFromParams ¶
func BuildConfigFromParams(params ConfigParams, budgetSolver BudgetSolver) (CoordinatorConfig, int64, error)
BuildConfigFromParams builds a CoordinatorConfig from raw parameters. Returns the config and the memory budget in bytes (0 if not set). The budgetSolver is called when params.MemoryBudget is set; pass nil if memory-budget is not supported.
func DefaultCoordinatorConfig ¶
func DefaultCoordinatorConfig() CoordinatorConfig
DefaultCoordinatorConfig returns the default coordinator configuration.
func (CoordinatorConfig) EstimatedOverhead ¶
func (c CoordinatorConfig) EstimatedOverhead() int64
EstimatedOverhead returns the estimated memory consumed by the pipeline infrastructure (runtime, workers, caches, buffers, native/mmap overhead) — everything except analyzer state. This allows the streaming planner to accurately compute how much memory remains for analyzer state growth.
type DiffCache ¶
type DiffCache struct {
// contains filtered or unexported fields
}
DiffCache provides an LRU cache for diff results. It caches computed diffs to avoid redundant diff computations. A Bloom filter pre-filters Get lookups to skip lock acquisition for definite misses.
func NewDiffCache ¶
NewDiffCache creates a new diff cache with the specified maximum entries. A Bloom filter is initialized to pre-filter lookups, sized for maxEntries.
func (*DiffCache) CacheMisses ¶
CacheMisses returns the total cache miss count (atomic, lock-free).
func (*DiffCache) Clear ¶
func (c *DiffCache) Clear()
Clear removes all entries from the cache and resets the Bloom filter.
func (*DiffCache) Get ¶
func (c *DiffCache) Get(key DiffKey) (plumbing.FileDiffData, bool)
Get retrieves a cached diff result. Uses a Bloom filter to skip lock acquisition for definite cache misses.
func (*DiffCache) Put ¶
func (c *DiffCache) Put(key DiffKey, diff plumbing.FileDiffData)
Put adds a diff result to the cache.
func (*DiffCache) Stats ¶
func (c *DiffCache) Stats() DiffCacheStats
Stats returns current cache statistics.
type DiffCacheStats ¶
type DiffCacheStats struct {
Hits int64
Misses int64
BloomSkips int64 // Lookups short-circuited by the Bloom pre-filter.
Entries int
MaxEntries int
}
DiffCacheStats holds statistics about diff cache usage.
func (DiffCacheStats) HitRate ¶
func (s DiffCacheStats) HitRate() float64
HitRate returns the cache hit rate as a fraction.
type DiffPipeline ¶
type DiffPipeline struct {
PoolWorkerChan chan<- gitlib.WorkerRequest
BufferSize int
DiffCache *DiffCache
// NoBatch disables cross-commit batching. Each diff request fires immediately.
// Useful for debugging or single-commit analysis.
NoBatch bool
// Metrics provides per-stage counters for memory triage. Nil-safe.
Metrics *StageMetrics
// contains filtered or unexported fields
}
DiffPipeline processes blob data to compute file diffs.
func NewDiffPipelineWithCache ¶
func NewDiffPipelineWithCache(workerChan chan<- gitlib.WorkerRequest, bufferSize int, cache *DiffCache) *DiffPipeline
NewDiffPipelineWithCache creates a new diff pipeline with an optional diff cache.
func (*DiffPipeline) Process ¶
func (p *DiffPipeline) Process(ctx context.Context, blobs <-chan BlobData) <-chan CommitData
Process receives blob data and outputs commit data with computed diffs.
type FlushableAnalyzer ¶
type FlushableAnalyzer interface {
Flush()
}
FlushableAnalyzer is optionally implemented by HistoryAnalyzers that hold per-commit state and need to flush it at the end of a chunk to avoid memory leaks.
type PipelineSampler ¶
type PipelineSampler struct {
// contains filtered or unexported fields
}
PipelineSampler periodically logs comprehensive memory and pipeline metrics during chunk processing. Implements playbook section 2.1: "lightweight periodic sampler (always-on in debug builds).".
func NewPipelineSampler ¶
func NewPipelineSampler(cfg SamplerConfig) *PipelineSampler
NewPipelineSampler creates a sampler. Call Start to begin periodic logging.
func (*PipelineSampler) CaptureT1 ¶
func (s *PipelineSampler) CaptureT1()
CaptureT1 forces capture of the t1 (peak) heap profile. Call after the chunk completes if the automatic RSS threshold wasn't hit.
func (*PipelineSampler) Start ¶
func (s *PipelineSampler) Start(ctx context.Context)
Start begins the sampler goroutine. It captures a t0 heap profile immediately and then logs metrics at the configured interval. Cancel the context to stop.
type PipelineStats ¶
type PipelineStats struct {
BlobDuration time.Duration
DiffDuration time.Duration
UASTDuration time.Duration
BlobCacheHits int64
BlobCacheMisses int64
DiffCacheHits int64
DiffCacheMisses int64
}
PipelineStats holds cumulative pipeline metrics for a single Coordinator run. Populated during Process(); valid after the returned channel is fully drained.
func (*PipelineStats) Add ¶
func (s *PipelineStats) Add(other PipelineStats)
Add accumulates another PipelineStats into this one (cross-chunk aggregation).
type Runner ¶
type Runner struct {
Repo *gitlib.Repository
RepoPath string
Analyzers []analyze.HistoryAnalyzer
Config CoordinatorConfig
// Tracer is the OTel tracer for creating pipeline spans.
// When nil, falls back to otel.Tracer("codefang").
Tracer trace.Tracer
// CoreCount is the number of leading analyzers in the Analyzers slice that are
// core (plumbing) analyzers. These run sequentially. Analyzers after CoreCount
// are leaf analyzers that can be parallelized via Fork/Merge.
// Set to 0 to disable parallel leaf consumption.
CoreCount int
// MemBudget is the user's memory budget in bytes. When positive, overrides
// the system-RAM-based debug.SetMemoryLimit with a budget-aligned value.
MemBudget int64
// TCSink, when set, receives every non-nil TC as commits are consumed.
// Used by NDJSON streaming output. When set, aggregators are not created
// and FinalizeWithAggregators is not called.
TCSink analyze.TCSink
// AggSpillBudget is the maximum bytes of aggregator state to keep in memory
// before spilling to disk. Computed by ComputeSchedule from the memory budget.
// Zero means no limit (unlimited budget or budget too small to decompose).
AggSpillBudget int64
// TmpDir is the parent directory for temporary spill files.
// Empty means os.TempDir() (system default).
TmpDir string
// StageMetrics provides per-stage counters for memory triage.
// When non-nil, passed to Coordinator and updated by pipeline stages.
StageMetrics *StageMetrics
// contains filtered or unexported fields
}
Runner orchestrates multiple HistoryAnalyzers over a commit sequence. It always uses the Coordinator pipeline (batch blob load + batch diff in C).
func NewRunnerWithConfig ¶
func NewRunnerWithConfig( repo *gitlib.Repository, repoPath string, config CoordinatorConfig, analyzers ...analyze.HistoryAnalyzer, ) *Runner
NewRunnerWithConfig creates a new Runner with custom coordinator configuration.
func (*Runner) AggregatorSpills ¶
func (runner *Runner) AggregatorSpills() []checkpoint.AggregatorSpillEntry
AggregatorSpills returns the current spill state of all aggregators for checkpoint persistence.
func (*Runner) AggregatorStateSize ¶
AggregatorStateSize returns the sum of EstimatedStateSize() across all non-nil aggregators. Used for three-metric adaptive feedback.
func (*Runner) DiscardAggregatorState ¶
func (runner *Runner) DiscardAggregatorState()
DiscardAggregatorState clears all in-memory cumulative state from aggregators without serialization. Used in streaming timeseries NDJSON mode where per-commit data is drained each chunk and cumulative state (coupling matrices, burndown histories) is never needed for a final report.
func (*Runner) DiscardLeafAnalyzerState ¶
func (runner *Runner) DiscardLeafAnalyzerState()
DiscardLeafAnalyzerState clears cumulative state from leaf history analyzers that implement stateDiscarder. This complements DiscardAggregatorState by freeing state held directly in analyzers (e.g. shotness node coupling maps) rather than in aggregators.
func (*Runner) DrainCommitMeta ¶
func (runner *Runner) DrainCommitMeta() map[string]analyze.CommitMeta
DrainCommitMeta returns the accumulated per-commit metadata and resets the map. Used by streaming timeseries NDJSON to extract metadata between chunks.
func (*Runner) FinalizeToStore ¶
FinalizeToStore writes analyzer results to a ReportStore one analyzer at a time. Analyzers implementing DirectStoreWriter skip FlushAllTicks; all others must implement StoreWriter. Releases each aggregator before moving to the next.
func (*Runner) FinalizeWithAggregators ¶
func (runner *Runner) FinalizeWithAggregators(ctx context.Context) (map[analyze.HistoryAnalyzer]analyze.Report, error)
FinalizeWithAggregators produces reports from all leaf analyzers:
- Analyzers with aggregators: Collect → FlushAllTicks → ReportFromTICKs
- Analyzers without aggregators: store empty report.
Closes all aggregators before returning.
func (*Runner) Initialize ¶
Initialize initializes all analyzers and creates aggregators. Call once before processing chunks.
func (*Runner) InitializeForResume ¶
func (runner *Runner) InitializeForResume(aggSpills []checkpoint.AggregatorSpillEntry) error
InitializeForResume initializes all analyzers and recreates aggregators with saved spill state from a checkpoint. Called instead of Initialize() when resuming from a checkpoint (startChunk > 0).
func (*Runner) LeafAggregators ¶
func (runner *Runner) LeafAggregators() []analyze.Aggregator
LeafAggregators returns the aggregators for leaf analyzers (indices >= CoreCount). Used by streaming timeseries NDJSON to drain per-commit data between chunks.
func (*Runner) LeafAnalyzers ¶
func (runner *Runner) LeafAnalyzers() []analyze.HistoryAnalyzer
LeafAnalyzers returns the history analyzers registered as leaves (non-plumbing).
func (*Runner) ProcessChunk ¶
func (runner *Runner) ProcessChunk(ctx context.Context, commits []*gitlib.Commit, indexOffset, chunkIndex int) (PipelineStats, error)
ProcessChunk processes a chunk of commits without Initialize/Finalize. Use this for streaming mode where Initialize is called once at start and Finalize once at end. The indexOffset is added to the commit index to maintain correct ordering across chunks. chunkIndex is the zero-based chunk number used for span naming.
func (*Runner) ProcessChunkFromData ¶
func (runner *Runner) ProcessChunkFromData(ctx context.Context, data []CommitData, indexOffset, chunkIndex int) (PipelineStats, error)
ProcessChunkFromData consumes pre-fetched CommitData through analyzers, bypassing Coordinator creation. Used by double-buffered chunk pipelining where the pipeline has already run and collected data. Returns zero PipelineStats since the real stats come from the prefetch Coordinator.
func (*Runner) ResetTCCount ¶
func (runner *Runner) ResetTCCount()
ResetTCCount resets the per-chunk TC counter.
func (*Runner) Run ¶
func (runner *Runner) Run(ctx context.Context, commits []*gitlib.Commit) (map[analyze.HistoryAnalyzer]analyze.Report, error)
Run executes all analyzers over the given commits: initialize, consume each commit via pipeline, then finalize.
func (*Runner) SpillAggregators ¶
SpillAggregators forces all aggregators to flush their in-memory state to disk. Called before saving a checkpoint so that spill files are complete.
func (*Runner) TCCountAccumulated ¶
TCCountAccumulated returns the number of TCs added since the last reset.
type SamplerConfig ¶
type SamplerConfig struct {
Logger *slog.Logger
Metrics *StageMetrics
DumpDir string
ChunkIndex int
MemBudget int64
ProfileAtRSS int64 // RSS in bytes at which to capture t1 profile. 0 = disabled.
}
SamplerConfig configures the pipeline sampler.
type StageMetrics ¶
type StageMetrics struct {
// Blob pipeline metrics.
BlobChangesInFlight atomic.Int64 // Number of file changes being processed.
BlobBytesLoaded atomic.Int64 // Total blob bytes loaded in current batch.
BlobCacheEntries atomic.Int64 // Current global blob cache entry count.
BlobCacheBytes atomic.Int64 // Current global blob cache byte size.
// Diff pipeline metrics.
DiffItemsQueued atomic.Int64 // Diff requests pending in batcher.
DiffCacheEntries atomic.Int64 // Current diff cache entry count.
// UAST pipeline metrics.
UASTItemsQueued atomic.Int64 // UAST parse jobs pending.
// Runner / aggregator metrics.
AggregatorBytes atomic.Int64 // Estimated aggregator state size.
CommitsProcessed atomic.Int64 // Commits processed in current chunk.
LastChangeCount atomic.Int64 // File changes in most recent commit.
// High-watermarks (updated by Record* methods, never decrease within a chunk).
PeakBlobChanges atomic.Int64 // Max changes seen in any single commit.
PeakBlobBytes atomic.Int64 // Max blob bytes loaded in any single batch.
PeakDiffQueued atomic.Int64 // Max diff items queued at any point.
}
StageMetrics provides per-stage high-watermark counters for memory triage. All fields are updated atomically by pipeline stages and read by the sampler. Following the playbook: "diff items queued", "bytes of blob content held", "AST cache entries", "results map size".
func (*StageMetrics) RecordBlobBatch ¶
func (m *StageMetrics) RecordBlobBatch(changes, blobBytes int64)
RecordBlobBatch updates blob metrics and high-watermarks for a batch.
func (*StageMetrics) RecordCommit ¶
func (m *StageMetrics) RecordCommit(changeCount int64)
RecordCommit updates per-commit metrics.
func (*StageMetrics) RecordDiffQueue ¶
func (m *StageMetrics) RecordDiffQueue(queued int64)
RecordDiffQueue updates diff queue depth and high-watermark.
func (*StageMetrics) Reset ¶
func (m *StageMetrics) Reset()
Reset clears all counters and watermarks for a new chunk.
func (*StageMetrics) Snapshot ¶
func (m *StageMetrics) Snapshot() StageMetricsSnapshot
Snapshot reads all counters atomically (each field individually).
type StageMetricsSnapshot ¶
type StageMetricsSnapshot struct {
BlobChangesInFlight int64
BlobBytesLoaded int64
BlobCacheEntries int64
BlobCacheBytes int64
DiffItemsQueued int64
DiffCacheEntries int64
UASTItemsQueued int64
AggregatorBytes int64
CommitsProcessed int64
LastChangeCount int64
PeakBlobChanges int64
PeakBlobBytes int64
PeakDiffQueued int64
}
StageMetricsSnapshot is a point-in-time copy of all stage metrics.
type StreamingConfig ¶
type StreamingConfig struct {
MemBudget int64
Checkpoint CheckpointParams
RepoPath string
AnalyzerNames []string
// Logger is the structured logger for streaming operations.
// When nil, a discard logger is used.
Logger *slog.Logger
// DebugTrace enables 100% trace sampling for debugging.
DebugTrace bool
// AnalysisMetrics records analysis-specific OTel metrics (commits, chunks,
// cache stats). Nil-safe: when nil, no metrics are recorded.
AnalysisMetrics *observability.AnalysisMetrics
// TCSink, when set, receives every non-nil TC as commits are consumed.
// Used by NDJSON streaming output. When set, aggregators are not created
// and FinalizeWithAggregators is not called — results are nil.
TCSink analyze.TCSink
// AggSpillBudget is the maximum bytes of aggregator state to keep in memory
// before spilling to disk. Computed by ComputeSchedule. Zero means no limit.
AggSpillBudget int64
// OnChunkComplete, when set, is called after each chunk finishes processing.
// Used by streaming timeseries NDJSON to drain per-commit data from aggregators
// between chunks, keeping memory bounded by chunk size.
OnChunkComplete func(runner *Runner) error
// SkipFinalize, when true, causes RunStreaming/RunStreamingFromIterator to
// return empty reports instead of calling FinalizeWithAggregators. Used when
// OnChunkComplete already handles output (e.g. streaming timeseries NDJSON).
SkipFinalize bool
// ReportStore, when set, causes finalization to write results one-at-a-time
// to the store via FinalizeToStore instead of materializing all reports in
// memory via FinalizeWithAggregators. Returns empty reports map.
ReportStore analyze.ReportStore
// TmpDir is the parent directory for temporary spill files.
// Empty means os.TempDir() (system default).
TmpDir string
}
StreamingConfig holds configuration for streaming pipeline execution.
type UASTPipeline ¶
type UASTPipeline struct {
Parser *uast.Parser
Workers int
BufferSize int
PathFilter *pathfilter.Filter
}
UASTPipeline pre-computes UAST changes for each commit in the pipeline, enabling cross-commit parallelism. It sits between DiffPipeline and the serial analyzer consumption loop.
func NewUASTPipeline ¶
func NewUASTPipeline(parser *uast.Parser, workers, bufferSize int) *UASTPipeline
NewUASTPipeline creates a new UAST pipeline stage.
func (*UASTPipeline) Process ¶
func (p *UASTPipeline) Process(ctx context.Context, diffs <-chan CommitData) <-chan CommitData
Process receives commit data with blobs and diffs, and adds pre-computed UAST changes. Multiple commits are processed concurrently by worker goroutines. Output order matches input order via a slot-based approach.