passes

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package passes holds the optimizer passes. P03 ships an empty DefaultPasses registration (in plan/optimize.go) plus this NoopPass so the fixed-point loop has at least one test subject. P07 fills in DedupSources, FilterPushdown, ProjectionPruning, AggregateFusion, SampleInjection per design/05-dag-executor.md.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregateFusionPass

type AggregateFusionPass struct{}

AggregateFusionPass merges sibling GroupAggregateNodes that share an input AND a groupby key list. The merged node carries the union of the source nodes' aggregate ops; downstream consumers that referenced either pre-merge node's id rewire to the merged node id (computed deterministically as `fuse:<sorted-ids-join('+')>`).

Limitation (P07): the pass merges at most one fusion group per Apply call. The fixed-point loop runs Apply repeatedly until no further merges happen, so the pass eventually fuses every eligible group. This keeps the rewire logic simple (one merged node per call = straightforward dependent-rewire pass).

func (AggregateFusionPass) Apply

func (AggregateFusionPass) Apply(d *plan.DAG) (*plan.DAG, bool, error)

Apply implements plan.Pass.

func (AggregateFusionPass) Name

func (AggregateFusionPass) Name() string

Name implements plan.Pass.

type DedupSourcesPass

type DedupSourcesPass struct{}

DedupSourcesPass coalesces SourceNodes that share an underlying ref. Two SourceNodes pointing at the same `.pulse` ref will already share an id today (the SourceNode constructor derives the id from sha256 of the ref), so the builder de-dups inside Build. This pass exists to handle the rarer case where two builders contribute Sources to the same DAG (e.g. layer composition once P08 lands): identical refs with identical downstream filter chains collapse to one.

P07 ships the no-op baseline: the builder already shares Source nodes by id, so the pass detects the situation but never has work to do for v1 specs. The TestPrismDedupSources test exercises the canonical case (two SourceNodes with the same ref attached via the raw builder, bypassing the natural id-sharing).

func (DedupSourcesPass) Apply

func (DedupSourcesPass) Apply(d *plan.DAG) (*plan.DAG, bool, error)

Apply implements plan.Pass. Walks Sources, groups by ref. When two or more SourceNodes share a ref AND have no downstream divergence before the first non-filter / non-projection node, the pass rewires dependents to one canonical id and drops the duplicates.

For P07 the practical impact is small (the builder already shares Source nodes by id-from-ref), but the pass framework needs at least one implementation to test the fixed-point loop end-to-end.

func (DedupSourcesPass) Name

func (DedupSourcesPass) Name() string

Name implements plan.Pass.

type FilterPushdownPass

type FilterPushdownPass struct{}

FilterPushdownPass moves FilterNodes that sit immediately downstream of a JoinNode to whichever side of the join exclusively supplies the columns the filter references. Filters touching columns from both sides stay where they are.

Identifier extraction (P07): we lex the expr string and pull every substring that matches `[A-Za-z_][A-Za-z0-9_]*` and isn't a reserved word. The set is over-approximate (numeric-suffixed literals, etc.) but conservative — the pass only pushes when EVERY identifier maps exclusively to one side, so spurious identifiers cause the pass to bail safely. When P14 ships a proper Pulse expression parser, swap `extractIdentifiers` for the typed AST walk; the rest of the pass is unchanged.

func (FilterPushdownPass) Apply

func (FilterPushdownPass) Apply(d *plan.DAG) (*plan.DAG, bool, error)

Apply implements plan.Pass.

func (FilterPushdownPass) Name

func (FilterPushdownPass) Name() string

Name implements plan.Pass.

type NoopPass

type NoopPass struct{}

NoopPass returns the DAG unchanged. Useful as a baseline + as a way to exercise the Optimize loop without mutating state.

func (NoopPass) Apply

func (NoopPass) Apply(d *plan.DAG) (*plan.DAG, bool, error)

Apply implements plan.Pass.

func (NoopPass) Name

func (NoopPass) Name() string

Name implements plan.Pass.

type ProjectionPruningPass

type ProjectionPruningPass struct{}

ProjectionPruningPass injects a ProjectNode immediately downstream of every Source whose output schema contains columns the downstream pipeline never reads.

Used-column inference: walk backward from each sink. The set of columns each node needs is the union of its self-referenced columns (filter expressions, groupby/aggregate fields, join keys, project fields, calculate expr, etc.) plus the union of the needs of its dependents. For nodes we cannot statically classify, we conservatively declare ALL upstream columns needed — making the pass a no-op in those cases.

P07 ships a minimum-viable implementation: it prunes only when every node downstream of a Source belongs to a small known set (FilterNode, ProjectNode, GroupAggregateNode, JoinNode). Other node types (Calculate / Window / Pivot / Unpivot / Bin / Sort / Limit / Sample / Union) cause the pass to bail conservatively for that Source — they will be handled in a future iteration once their column-set surfaces are exposed via a plan.Labeled extension.

func (ProjectionPruningPass) Apply

func (ProjectionPruningPass) Apply(d *plan.DAG) (*plan.DAG, bool, error)

Apply implements plan.Pass.

func (ProjectionPruningPass) Name

Name implements plan.Pass.

type PulseChainFusionPass

type PulseChainFusionPass struct{}

PulseChainFusionPass collapses a source-rooted linear chain of fusable Prism nodes into one PulseChainNode that calls pulse.ProcessChain in a single round-trip. The win is that the upstream SourceNode no longer materialises the full cohort into table.Table — Pulse pushes filters/aggregates down to its columnar reader and returns only the final result rows.

v1 scope (intentionally narrow):

  • Source must be a single dependent (no branching).
  • Eligible absorbed kinds: FilterNode, CalculateNode, GroupAggregateNode, SortNode.
  • GroupAggregate is REQUIRED. A chain with only Filter/Calculate would still pay the source materialisation cost on the Prism side (final result == filtered source ≈ full source rows), so the win is dominated by aggregation. Defer pure filter/calc chains to a follow-up if profiling motivates.
  • Aggregate alias must be Pulse-backed and scalar-emitting (`mode`, `lift`, `share` are excluded by Pulse's chain gate; `wmean`, `ratio`, `ci0`, `ci1` carry per-call Params and are deferred to a follow-up).
  • SortNode terminates absorption.
  • LimitNode is NOT absorbed (Pulse Request has no limit field today); a downstream Limit stays in plan and consumes the chain node's output.
  • Filter / Calculate AFTER the GroupAggregate are not absorbed (Pulse Filterers/Attributes run pre-aggregation; post-agg filtering needs a second chain stage which v1 does not emit).
  • Source refs starting with `cohort:` or `gs://` are skipped (the chain executor needs a direct Pulse-compatible path).

Pass ordering (plan/passes/register.go): runs after AggregateFusion (so sibling GroupAggregates are already merged into one before we try to absorb) and before SampleInjection (so chain-fused sources are no longer SourceNodes when the sampler looks for them; the row-count probe is unnecessary once Pulse is doing the heavy lifting).

func (PulseChainFusionPass) Apply

func (p PulseChainFusionPass) Apply(d *plan.DAG) (*plan.DAG, bool, error)

Apply implements plan.Pass. Walks each SourceNode root in turn and fuses the first eligible linear chain it finds, then returns — the fixed-point loop in plan.Optimize re-runs Apply until no more fusions happen.

func (PulseChainFusionPass) Name

Name implements plan.Pass.

type SampleInjectionPass

type SampleInjectionPass struct{}

SampleInjectionPass injects a SampleNode below every Source whose header-reported row count exceeds PRISM_RENDER_MAX_MARKS. Pulse v0.10.0 exposes the count via pulse.CountRecords; SourceNode.RowCount wraps it. Sources whose count cannot be read (missing file, unsupported resolver) skip injection silently — the executor will surface the underlying error.

The fixed-point loop is guarded against re-entry: a Source with a SampleNode dependent is considered already-sampled and skipped.

Dependents of the rewired Source are reconstructed via rewireSingleInput; node kinds not covered by that helper (Calculate, Window, Sort, Limit, Bin, Sample, Pivot, Unpivot) are silently left pointing at the Source. The fixed-point loop still terminates because the Source already has a SampleNode dependent (the injected one).

func (SampleInjectionPass) Apply

func (SampleInjectionPass) Apply(d *plan.DAG) (*plan.DAG, bool, error)

Apply implements plan.Pass.

func (SampleInjectionPass) Name

func (SampleInjectionPass) Name() string

Name implements plan.Pass.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL