framework

package
v0.0.0-...-cd37b43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: Apache-2.0, Apache-2.0 Imports: 41 Imported by: 0

Documentation

Overview

Package framework provides orchestration for running analysis pipelines.

Index

Constants

View Source
const (
	FileModeCommit = 0o160000
	FileModeTree   = 0o040000
	FileModeBlob   = 0o100644
	FileModeExec   = 0o100755
	FileModeLink   = 0o120000
)

File mode constants for git tree entries.

View Source
const DefaultBlobBatchArenaSize = 4 * 1024 * 1024

DefaultBlobBatchArenaSize is the default size of the memory arena for blob loading (4MB).

View Source
const DefaultDiffCacheSize = 10000

DefaultDiffCacheSize is the default maximum number of diff entries to cache.

Variables

View Source
var (
	ErrInvalidSizeFormat = errors.New("invalid size format")
	ErrInvalidGCPercent  = errors.New("invalid GC percent")
)

Sentinel errors for configuration.

View Source
var ErrCacheMiss = errors.New("cache miss")

ErrCacheMiss is returned by a cache-backed Fetcher when the key is not found.

View Source
var ErrCommitTooLarge = errors.New("commit exceeds max changes cap")

ErrCommitTooLarge is set on BlobData.Error for commits exceeding maxChangesPerCommit. The runner uses errors.Is to distinguish this from fatal pipeline errors and skips the commit instead of aborting.

View Source
var ErrNotParallelizable = errors.New("leaf does not implement Parallelizable")

ErrNotParallelizable is returned when a leaf analyzer does not implement analyze.Parallelizable.

View Source
var ErrNotStoreWriter = errors.New("analyzer does not implement StoreWriter")

ErrNotStoreWriter is returned when an analyzer does not implement analyze.StoreWriter.

Functions

func CanResumeWithCheckpoint

func CanResumeWithCheckpoint(totalAnalyzers, checkpointableCount int) bool

CanResumeWithCheckpoint returns true if all analyzers support checkpointing.

func DefaultMemoryBudget

func DefaultMemoryBudget() int64

DefaultMemoryBudget returns a sensible memory budget based on available system memory. Returns min(50% of total RAM, 4 GiB), or 0 if detection fails.

func MaybeStartCPUProfile

func MaybeStartCPUProfile(path string) (func(), error)

MaybeStartCPUProfile starts CPU profiling to the given file. Returns a stop function that must be deferred. Returns a no-op if path is empty.

func MaybeWriteHeapProfile

func MaybeWriteHeapProfile(path string, logger *slog.Logger)

MaybeWriteHeapProfile writes a heap profile to the given file. No-op if path is empty. Uses the provided logger for error reporting.

func ParseOptionalSize

func ParseOptionalSize(sizeValue string) (int64, error)

ParseOptionalSize parses a human-readable size string, returning 0 for empty or "0".

func RunStreaming

func RunStreaming(
	ctx context.Context,
	runner *Runner,
	commits []*gitlib.Commit,
	analyzers []analyze.HistoryAnalyzer,
	config StreamingConfig,
) (map[analyze.HistoryAnalyzer]analyze.Report, error)

RunStreaming executes the pipeline in streaming chunks with optional checkpoint support. The scheduler determines the buffering factor (single/double/triple) based on the memory budget. Higher buffering factors overlap more pipeline phases with analyzer consumption but require smaller chunks.

When the commit set is trivially small (single commit) and no streaming features are needed, delegates to Runner.Run for simplicity and lower overhead.

func RunStreamingFromIterator

func RunStreamingFromIterator(
	ctx context.Context,
	runner *Runner,
	iter *gitlib.CommitIter,
	commitCount int,
	analyzers []analyze.HistoryAnalyzer,
	config StreamingConfig,
) (map[analyze.HistoryAnalyzer]analyze.Report, error)

RunStreamingFromIterator executes the pipeline using a commit iterator instead of a pre-loaded commit slice. Commits are loaded chunk-at-a-time from the iterator and freed after processing, keeping memory usage proportional to chunk size rather than total repository size. Double-buffering is not supported in iterator mode; use RunStreaming with a pre-loaded slice for that.

Types

type BlobData

type BlobData struct {
	Commit    *gitlib.Commit
	Index     int
	Changes   gitlib.Changes
	BlobCache map[gitlib.Hash]*gitlib.CachedBlob
	Error     error
}

BlobData holds loaded blob data for a commit.

type BlobPipeline

type BlobPipeline struct {
	SeqWorkerChan  chan<- gitlib.WorkerRequest
	PoolWorkerChan chan<- gitlib.WorkerRequest
	BufferSize     int
	WorkerCount    int
	BlobCache      *cache.LRUBlobCache
	ArenaSize      int

	// Metrics provides per-stage counters for memory triage. Nil-safe.
	Metrics *StageMetrics
	// contains filtered or unexported fields
}

BlobPipeline processes commit batches to load blobs.

func NewBlobPipelineWithCache

func NewBlobPipelineWithCache(
	seqChan chan<- gitlib.WorkerRequest,
	poolChan chan<- gitlib.WorkerRequest,
	bufferSize int,
	workerCount int,
	blobCache *cache.LRUBlobCache,
) *BlobPipeline

NewBlobPipelineWithCache creates a new blob pipeline with an optional global blob cache.

func (*BlobPipeline) Process

func (p *BlobPipeline) Process(ctx context.Context, commits <-chan CommitBatch) <-chan BlobData

Process receives commit batches and outputs blob data.

type BudgetSolver

type BudgetSolver func(budgetBytes int64) (CoordinatorConfig, error)

BudgetSolver resolves a memory budget (in bytes) to a CoordinatorConfig.

type CheckpointParams

type CheckpointParams struct {
	Enabled   bool
	Dir       string
	Resume    bool
	ClearPrev bool
}

CheckpointParams holds checkpoint-related configuration.

type CommitBatch

type CommitBatch struct {
	// Commits in this batch.
	Commits []*gitlib.Commit

	// StartIndex is the index of the first commit in the full sequence.
	StartIndex int

	// BatchID identifies this batch for ordering.
	BatchID int
}

CommitBatch represents a batch of commits for processing.

type CommitData

type CommitData struct {
	Commit        *gitlib.Commit
	Index         int
	Changes       gitlib.Changes
	BlobCache     map[gitlib.Hash]*gitlib.CachedBlob
	FileDiffs     map[string]plumbing.FileDiffData
	UASTChanges   []uast.Change // Pre-computed UAST changes (nil if not computed).
	UASTSpillPath string        // Path to spilled UAST gob file (empty if in-memory).
	Error         error
}

CommitData holds all processed data for a commit.

type CommitStreamer

type CommitStreamer struct {
	// BatchSize is the number of commits per batch.
	BatchSize int

	// Lookahead is the number of batches to prefetch.
	Lookahead int
}

CommitStreamer iterates commits and groups them into batches for efficient processing.

func (*CommitStreamer) Stream

func (s *CommitStreamer) Stream(ctx context.Context, commits []*gitlib.Commit) <-chan CommitBatch

Stream takes a slice of commits and streams them as batches. The output channel is closed when all commits have been sent.

type ConfigParams

type ConfigParams struct {
	Workers         int
	BufferSize      int
	CommitBatchSize int
	BlobCacheSize   string
	DiffCacheSize   int
	BlobArenaSize   string
	MemoryBudget    string
	GCPercent       int
	BallastSize     string
}

ConfigParams holds raw CLI parameter values for building a CoordinatorConfig. All size strings use humanize format (e.g. "256MB", "1GiB").

type Coordinator

type Coordinator struct {

	// StageMetrics provides per-stage high-watermark counters for memory triage.
	// When non-nil, pipeline stages update counters atomically and the sampler reads them.
	StageMetrics *StageMetrics
	// contains filtered or unexported fields
}

Coordinator orchestrates the full data processing pipeline.

func NewCoordinator

func NewCoordinator(repo *gitlib.Repository, config CoordinatorConfig) *Coordinator

NewCoordinator creates a new coordinator for the repository.

func (*Coordinator) Process

func (c *Coordinator) Process(ctx context.Context, commits []*gitlib.Commit) <-chan CommitData

Process runs the full pipeline on a slice of commits. After the returned channel is fully drained, call Stats() to retrieve pipeline timing and cache metrics.

func (*Coordinator) Stats

func (c *Coordinator) Stats() PipelineStats

Stats returns the pipeline stats collected during Process(). Only valid after the channel returned by Process() is fully drained.

type CoordinatorConfig

type CoordinatorConfig struct {
	// BatchConfig configures batch sizes for blob and diff operations.
	BatchConfig gitlib.BatchConfig

	// CommitBatchSize is the number of commits to process in each batch.
	CommitBatchSize int

	// Workers is the number of parallel workers for processing.
	Workers int

	// BufferSize is the size of internal channels.
	BufferSize int

	// BlobCacheSize is the maximum size of the global blob cache in bytes.
	// Set to 0 to disable caching.
	BlobCacheSize int64

	// DiffCacheSize is the maximum number of diff results to cache.
	// Set to 0 to disable caching.
	DiffCacheSize int

	// BlobArenaSize is the size of the memory arena for blob loading.
	// Defaults to 16MB if 0.
	BlobArenaSize int

	// UASTPipelineWorkers is the number of goroutines for parallel UAST parsing
	// in the pipeline stage. Set to 0 to disable the UAST pipeline stage.
	UASTPipelineWorkers int

	// LeafWorkers is the number of goroutines for parallel leaf analyzer consumption.
	// Each worker processes a disjoint subset of commits via Fork/Merge.
	// Set to 0 to disable parallel leaf consumption (serial path).
	LeafWorkers int

	// GCPercent controls Go's GC aggressiveness.
	// Set to 0 to use auto mode (200 when system memory > 32 GiB).
	GCPercent int

	// BallastSize reserves bytes in a long-lived slice to smooth GC behavior.
	// Set to 0 to disable ballast allocation.
	BallastSize int64

	// FirstParent indicates whether the history walk is restricted to the first parent.
	FirstParent bool

	// WorkerTimeout is the maximum time to wait for a worker response before
	// considering it stalled. Set to 0 to disable the watchdog.
	WorkerTimeout time.Duration
}

CoordinatorConfig configures the pipeline coordinator.

func BuildConfigFromParams

func BuildConfigFromParams(params ConfigParams, budgetSolver BudgetSolver) (CoordinatorConfig, int64, error)

BuildConfigFromParams builds a CoordinatorConfig from raw parameters. Returns the config and the memory budget in bytes (0 if not set). The budgetSolver is called when params.MemoryBudget is set; pass nil if memory-budget is not supported.

func DefaultCoordinatorConfig

func DefaultCoordinatorConfig() CoordinatorConfig

DefaultCoordinatorConfig returns the default coordinator configuration.

func (CoordinatorConfig) EstimatedOverhead

func (c CoordinatorConfig) EstimatedOverhead() int64

EstimatedOverhead returns the estimated memory consumed by the pipeline infrastructure (runtime, workers, caches, buffers, native/mmap overhead) — everything except analyzer state. This allows the streaming planner to accurately compute how much memory remains for analyzer state growth.

type DiffCache

type DiffCache struct {
	// contains filtered or unexported fields
}

DiffCache provides an LRU cache for diff results. It caches computed diffs to avoid redundant diff computations. A Bloom filter pre-filters Get lookups to skip lock acquisition for definite misses.

func NewDiffCache

func NewDiffCache(maxEntries int) *DiffCache

NewDiffCache creates a new diff cache with the specified maximum entries. A Bloom filter is initialized to pre-filter lookups, sized for maxEntries.

func (*DiffCache) CacheHits

func (c *DiffCache) CacheHits() int64

CacheHits returns the total cache hit count (atomic, lock-free).

func (*DiffCache) CacheMisses

func (c *DiffCache) CacheMisses() int64

CacheMisses returns the total cache miss count (atomic, lock-free).

func (*DiffCache) Clear

func (c *DiffCache) Clear()

Clear removes all entries from the cache and resets the Bloom filter.

func (*DiffCache) Get

func (c *DiffCache) Get(key DiffKey) (plumbing.FileDiffData, bool)

Get retrieves a cached diff result. Uses a Bloom filter to skip lock acquisition for definite cache misses.

func (*DiffCache) Put

func (c *DiffCache) Put(key DiffKey, diff plumbing.FileDiffData)

Put adds a diff result to the cache.

func (*DiffCache) Stats

func (c *DiffCache) Stats() DiffCacheStats

Stats returns current cache statistics.

type DiffCacheStats

type DiffCacheStats struct {
	Hits       int64
	Misses     int64
	BloomSkips int64 // Lookups short-circuited by the Bloom pre-filter.
	Entries    int
	MaxEntries int
}

DiffCacheStats holds statistics about diff cache usage.

func (DiffCacheStats) HitRate

func (s DiffCacheStats) HitRate() float64

HitRate returns the cache hit rate as a fraction.

type DiffKey

type DiffKey struct {
	OldHash gitlib.Hash
	NewHash gitlib.Hash
}

DiffKey uniquely identifies a diff computation by blob hashes.

type DiffPipeline

type DiffPipeline struct {
	PoolWorkerChan chan<- gitlib.WorkerRequest
	BufferSize     int
	DiffCache      *DiffCache

	// NoBatch disables cross-commit batching. Each diff request fires immediately.
	// Useful for debugging or single-commit analysis.
	NoBatch bool

	// Metrics provides per-stage counters for memory triage. Nil-safe.
	Metrics *StageMetrics
	// contains filtered or unexported fields
}

DiffPipeline processes blob data to compute file diffs.

func NewDiffPipelineWithCache

func NewDiffPipelineWithCache(workerChan chan<- gitlib.WorkerRequest, bufferSize int, cache *DiffCache) *DiffPipeline

NewDiffPipelineWithCache creates a new diff pipeline with an optional diff cache.

func (*DiffPipeline) Process

func (p *DiffPipeline) Process(ctx context.Context, blobs <-chan BlobData) <-chan CommitData

Process receives blob data and outputs commit data with computed diffs.

type FlushableAnalyzer

type FlushableAnalyzer interface {
	Flush()
}

FlushableAnalyzer is optionally implemented by HistoryAnalyzers that hold per-commit state and need to flush it at the end of a chunk to avoid memory leaks.

type PipelineSampler

type PipelineSampler struct {
	// contains filtered or unexported fields
}

PipelineSampler periodically logs comprehensive memory and pipeline metrics during chunk processing. Implements playbook section 2.1: "lightweight periodic sampler (always-on in debug builds).".

func NewPipelineSampler

func NewPipelineSampler(cfg SamplerConfig) *PipelineSampler

NewPipelineSampler creates a sampler. Call Start to begin periodic logging.

func (*PipelineSampler) CaptureT1

func (s *PipelineSampler) CaptureT1()

CaptureT1 forces capture of the t1 (peak) heap profile. Call after the chunk completes if the automatic RSS threshold wasn't hit.

func (*PipelineSampler) Start

func (s *PipelineSampler) Start(ctx context.Context)

Start begins the sampler goroutine. It captures a t0 heap profile immediately and then logs metrics at the configured interval. Cancel the context to stop.

type PipelineStats

type PipelineStats struct {
	BlobDuration time.Duration
	DiffDuration time.Duration
	UASTDuration time.Duration

	BlobCacheHits   int64
	BlobCacheMisses int64
	DiffCacheHits   int64
	DiffCacheMisses int64
}

PipelineStats holds cumulative pipeline metrics for a single Coordinator run. Populated during Process(); valid after the returned channel is fully drained.

func (*PipelineStats) Add

func (s *PipelineStats) Add(other PipelineStats)

Add accumulates another PipelineStats into this one (cross-chunk aggregation).

type Runner

type Runner struct {
	Repo      *gitlib.Repository
	RepoPath  string
	Analyzers []analyze.HistoryAnalyzer
	Config    CoordinatorConfig

	// Tracer is the OTel tracer for creating pipeline spans.
	// When nil, falls back to otel.Tracer("codefang").
	Tracer trace.Tracer

	// CoreCount is the number of leading analyzers in the Analyzers slice that are
	// core (plumbing) analyzers. These run sequentially. Analyzers after CoreCount
	// are leaf analyzers that can be parallelized via Fork/Merge.
	// Set to 0 to disable parallel leaf consumption.
	CoreCount int

	// MemBudget is the user's memory budget in bytes. When positive, overrides
	// the system-RAM-based debug.SetMemoryLimit with a budget-aligned value.
	MemBudget int64

	// TCSink, when set, receives every non-nil TC as commits are consumed.
	// Used by NDJSON streaming output. When set, aggregators are not created
	// and FinalizeWithAggregators is not called.
	TCSink analyze.TCSink

	// AggSpillBudget is the maximum bytes of aggregator state to keep in memory
	// before spilling to disk. Computed by ComputeSchedule from the memory budget.
	// Zero means no limit (unlimited budget or budget too small to decompose).
	AggSpillBudget int64

	// TmpDir is the parent directory for temporary spill files.
	// Empty means os.TempDir() (system default).
	TmpDir string

	// StageMetrics provides per-stage counters for memory triage.
	// When non-nil, passed to Coordinator and updated by pipeline stages.
	StageMetrics *StageMetrics
	// contains filtered or unexported fields
}

Runner orchestrates multiple HistoryAnalyzers over a commit sequence. It always uses the Coordinator pipeline (batch blob load + batch diff in C).

func NewRunnerWithConfig

func NewRunnerWithConfig(
	repo *gitlib.Repository,
	repoPath string,
	config CoordinatorConfig,
	analyzers ...analyze.HistoryAnalyzer,
) *Runner

NewRunnerWithConfig creates a new Runner with custom coordinator configuration.

func (*Runner) AggregatorSpills

func (runner *Runner) AggregatorSpills() []checkpoint.AggregatorSpillEntry

AggregatorSpills returns the current spill state of all aggregators for checkpoint persistence.

func (*Runner) AggregatorStateSize

func (runner *Runner) AggregatorStateSize() int64

AggregatorStateSize returns the sum of EstimatedStateSize() across all non-nil aggregators. Used for three-metric adaptive feedback.

func (*Runner) DiscardAggregatorState

func (runner *Runner) DiscardAggregatorState()

DiscardAggregatorState clears all in-memory cumulative state from aggregators without serialization. Used in streaming timeseries NDJSON mode where per-commit data is drained each chunk and cumulative state (coupling matrices, burndown histories) is never needed for a final report.

func (*Runner) DiscardLeafAnalyzerState

func (runner *Runner) DiscardLeafAnalyzerState()

DiscardLeafAnalyzerState clears cumulative state from leaf history analyzers that implement stateDiscarder. This complements DiscardAggregatorState by freeing state held directly in analyzers (e.g. shotness node coupling maps) rather than in aggregators.

func (*Runner) DrainCommitMeta

func (runner *Runner) DrainCommitMeta() map[string]analyze.CommitMeta

DrainCommitMeta returns the accumulated per-commit metadata and resets the map. Used by streaming timeseries NDJSON to extract metadata between chunks.

func (*Runner) FinalizeToStore

func (runner *Runner) FinalizeToStore(ctx context.Context, store analyze.ReportStore) error

FinalizeToStore writes analyzer results to a ReportStore one analyzer at a time. Analyzers implementing DirectStoreWriter skip FlushAllTicks; all others must implement StoreWriter. Releases each aggregator before moving to the next.

func (*Runner) FinalizeWithAggregators

func (runner *Runner) FinalizeWithAggregators(ctx context.Context) (map[analyze.HistoryAnalyzer]analyze.Report, error)

FinalizeWithAggregators produces reports from all leaf analyzers:

  • Analyzers with aggregators: Collect → FlushAllTicks → ReportFromTICKs
  • Analyzers without aggregators: store empty report.

Closes all aggregators before returning.

func (*Runner) Initialize

func (runner *Runner) Initialize() error

Initialize initializes all analyzers and creates aggregators. Call once before processing chunks.

func (*Runner) InitializeForResume

func (runner *Runner) InitializeForResume(aggSpills []checkpoint.AggregatorSpillEntry) error

InitializeForResume initializes all analyzers and recreates aggregators with saved spill state from a checkpoint. Called instead of Initialize() when resuming from a checkpoint (startChunk > 0).

func (*Runner) LeafAggregators

func (runner *Runner) LeafAggregators() []analyze.Aggregator

LeafAggregators returns the aggregators for leaf analyzers (indices >= CoreCount). Used by streaming timeseries NDJSON to drain per-commit data between chunks.

func (*Runner) LeafAnalyzers

func (runner *Runner) LeafAnalyzers() []analyze.HistoryAnalyzer

LeafAnalyzers returns the history analyzers registered as leaves (non-plumbing).

func (*Runner) ProcessChunk

func (runner *Runner) ProcessChunk(ctx context.Context, commits []*gitlib.Commit, indexOffset, chunkIndex int) (PipelineStats, error)

ProcessChunk processes a chunk of commits without Initialize/Finalize. Use this for streaming mode where Initialize is called once at start and Finalize once at end. The indexOffset is added to the commit index to maintain correct ordering across chunks. chunkIndex is the zero-based chunk number used for span naming.

func (*Runner) ProcessChunkFromData

func (runner *Runner) ProcessChunkFromData(ctx context.Context, data []CommitData, indexOffset, chunkIndex int) (PipelineStats, error)

ProcessChunkFromData consumes pre-fetched CommitData through analyzers, bypassing Coordinator creation. Used by double-buffered chunk pipelining where the pipeline has already run and collected data. Returns zero PipelineStats since the real stats come from the prefetch Coordinator.

func (*Runner) ResetTCCount

func (runner *Runner) ResetTCCount()

ResetTCCount resets the per-chunk TC counter.

func (*Runner) Run

func (runner *Runner) Run(ctx context.Context, commits []*gitlib.Commit) (map[analyze.HistoryAnalyzer]analyze.Report, error)

Run executes all analyzers over the given commits: initialize, consume each commit via pipeline, then finalize.

func (*Runner) SpillAggregators

func (runner *Runner) SpillAggregators() error

SpillAggregators forces all aggregators to flush their in-memory state to disk. Called before saving a checkpoint so that spill files are complete.

func (*Runner) TCCountAccumulated

func (runner *Runner) TCCountAccumulated() int64

TCCountAccumulated returns the number of TCs added since the last reset.

type SamplerConfig

type SamplerConfig struct {
	Logger       *slog.Logger
	Metrics      *StageMetrics
	DumpDir      string
	ChunkIndex   int
	MemBudget    int64
	ProfileAtRSS int64 // RSS in bytes at which to capture t1 profile. 0 = disabled.
}

SamplerConfig configures the pipeline sampler.

type StageMetrics

type StageMetrics struct {
	// Blob pipeline metrics.
	BlobChangesInFlight atomic.Int64 // Number of file changes being processed.
	BlobBytesLoaded     atomic.Int64 // Total blob bytes loaded in current batch.
	BlobCacheEntries    atomic.Int64 // Current global blob cache entry count.
	BlobCacheBytes      atomic.Int64 // Current global blob cache byte size.

	// Diff pipeline metrics.
	DiffItemsQueued  atomic.Int64 // Diff requests pending in batcher.
	DiffCacheEntries atomic.Int64 // Current diff cache entry count.

	// UAST pipeline metrics.
	UASTItemsQueued atomic.Int64 // UAST parse jobs pending.

	// Runner / aggregator metrics.
	AggregatorBytes  atomic.Int64 // Estimated aggregator state size.
	CommitsProcessed atomic.Int64 // Commits processed in current chunk.
	LastChangeCount  atomic.Int64 // File changes in most recent commit.

	// High-watermarks (updated by Record* methods, never decrease within a chunk).
	PeakBlobChanges atomic.Int64 // Max changes seen in any single commit.
	PeakBlobBytes   atomic.Int64 // Max blob bytes loaded in any single batch.
	PeakDiffQueued  atomic.Int64 // Max diff items queued at any point.
}

StageMetrics provides per-stage high-watermark counters for memory triage. All fields are updated atomically by pipeline stages and read by the sampler. Following the playbook: "diff items queued", "bytes of blob content held", "AST cache entries", "results map size".

func (*StageMetrics) RecordBlobBatch

func (m *StageMetrics) RecordBlobBatch(changes, blobBytes int64)

RecordBlobBatch updates blob metrics and high-watermarks for a batch.

func (*StageMetrics) RecordCommit

func (m *StageMetrics) RecordCommit(changeCount int64)

RecordCommit updates per-commit metrics.

func (*StageMetrics) RecordDiffQueue

func (m *StageMetrics) RecordDiffQueue(queued int64)

RecordDiffQueue updates diff queue depth and high-watermark.

func (*StageMetrics) Reset

func (m *StageMetrics) Reset()

Reset clears all counters and watermarks for a new chunk.

func (*StageMetrics) Snapshot

func (m *StageMetrics) Snapshot() StageMetricsSnapshot

Snapshot reads all counters atomically (each field individually).

type StageMetricsSnapshot

type StageMetricsSnapshot struct {
	BlobChangesInFlight int64
	BlobBytesLoaded     int64
	BlobCacheEntries    int64
	BlobCacheBytes      int64
	DiffItemsQueued     int64
	DiffCacheEntries    int64
	UASTItemsQueued     int64
	AggregatorBytes     int64
	CommitsProcessed    int64
	LastChangeCount     int64
	PeakBlobChanges     int64
	PeakBlobBytes       int64
	PeakDiffQueued      int64
}

StageMetricsSnapshot is a point-in-time copy of all stage metrics.

type StreamingConfig

type StreamingConfig struct {
	MemBudget     int64
	Checkpoint    CheckpointParams
	RepoPath      string
	AnalyzerNames []string

	// Logger is the structured logger for streaming operations.
	// When nil, a discard logger is used.
	Logger *slog.Logger

	// DebugTrace enables 100% trace sampling for debugging.
	DebugTrace bool

	// AnalysisMetrics records analysis-specific OTel metrics (commits, chunks,
	// cache stats). Nil-safe: when nil, no metrics are recorded.
	AnalysisMetrics *observability.AnalysisMetrics

	// TCSink, when set, receives every non-nil TC as commits are consumed.
	// Used by NDJSON streaming output. When set, aggregators are not created
	// and FinalizeWithAggregators is not called — results are nil.
	TCSink analyze.TCSink

	// AggSpillBudget is the maximum bytes of aggregator state to keep in memory
	// before spilling to disk. Computed by ComputeSchedule. Zero means no limit.
	AggSpillBudget int64

	// OnChunkComplete, when set, is called after each chunk finishes processing.
	// Used by streaming timeseries NDJSON to drain per-commit data from aggregators
	// between chunks, keeping memory bounded by chunk size.
	OnChunkComplete func(runner *Runner) error

	// SkipFinalize, when true, causes RunStreaming/RunStreamingFromIterator to
	// return empty reports instead of calling FinalizeWithAggregators. Used when
	// OnChunkComplete already handles output (e.g. streaming timeseries NDJSON).
	SkipFinalize bool

	// ReportStore, when set, causes finalization to write results one-at-a-time
	// to the store via FinalizeToStore instead of materializing all reports in
	// memory via FinalizeWithAggregators. Returns empty reports map.
	ReportStore analyze.ReportStore

	// TmpDir is the parent directory for temporary spill files.
	// Empty means os.TempDir() (system default).
	TmpDir string
}

StreamingConfig holds configuration for streaming pipeline execution.

type UASTPipeline

type UASTPipeline struct {
	Parser     *uast.Parser
	Workers    int
	BufferSize int
	PathFilter *pathfilter.Filter
}

UASTPipeline pre-computes UAST changes for each commit in the pipeline, enabling cross-commit parallelism. It sits between DiffPipeline and the serial analyzer consumption loop.

func NewUASTPipeline

func NewUASTPipeline(parser *uast.Parser, workers, bufferSize int) *UASTPipeline

NewUASTPipeline creates a new UAST pipeline stage.

func (*UASTPipeline) Process

func (p *UASTPipeline) Process(ctx context.Context, diffs <-chan CommitData) <-chan CommitData

Process receives commit data with blobs and diffs, and adds pre-computed UAST changes. Multiple commits are processed concurrently by worker goroutines. Output order matches input order via a slot-based approach.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL