nodes

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package nodes holds the DAG node implementations. P02 ships SourceNode — the leaf that reads a .pulse via Resolver and materialises a Table. Future phases (P03+) add Filter, Project, GroupAggregate, Join, etc.

File stub.go centralises the helpers every P03 stub node uses:

  • notImplementedErr(kind) — the PRISM_COMPILE_001 error returned by every stubbed Execute body until P04 lands the real impls.
  • cloneSchema / appendField / projectFields — pure helpers that compute deterministic output schemas from input schemas plus op parameters, so Schema(in) works in P03 without execution data.
  • fingerprintFor(kind, parts...) — sha256-prefixed cache-key component, deterministic across runs.

Stub nodes (everything that is not SourceNode, InlineNode, or SinkNode) wire these helpers in their per-type files (filter.go, project.go, ...). The Execute body is always one line:

return nil, notImplementedErr("FilterNode")

P04 swaps each stub's body for the real implementation; tests that currently assert PRISM_COMPILE_001 flip to assert correct output tables.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DerivePulseChainID

func DerivePulseChainID(ref string, absorbed []plan.NodeID) plan.NodeID

DerivePulseChainID builds a stable id for the fused node by hashing the source ref together with the absorbed-node id list. Two equivalent fusions share an id, mirroring SourceNode's content fingerprint convention.

Types

type AggOp

type AggOp struct {
	Op    string
	Field string
	As    string
}

AggOp encodes one aggregate calculation: op (mean, sum, count, ...), the source field (empty for count(*) variants), and the output column name.

func (AggOp) String

func (a AggOp) String() string

String returns a stable text form used in fingerprints and renderer labels.

type BinNode

type BinNode struct {
	// contains filtered or unexported fields
}

BinNode buckets a numeric field.

func NewBin

func NewBin(id, input plan.NodeID, field, as string, params BinParams) *BinNode

NewBin constructs a BinNode.

func (*BinNode) As

func (n *BinNode) As() string

As exposes the output field name for renderers + tests.

func (*BinNode) Execute

func (n *BinNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node via the injected backend.

func (*BinNode) Field

func (n *BinNode) Field() string

Field exposes the source field for renderers + tests.

func (*BinNode) Fingerprint

func (n *BinNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*BinNode) ID

func (n *BinNode) ID() plan.NodeID

ID implements plan.Node.

func (*BinNode) Inputs

func (n *BinNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*BinNode) Kind

func (n *BinNode) Kind() string

Kind implements plan.Labeled.

func (*BinNode) Params

func (n *BinNode) Params() BinParams

Params exposes the bin parameters for the executor.

func (*BinNode) Schema

func (n *BinNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Output schema is input + one F64 field named n.as (the bin edge for each row).

func (*BinNode) SetBackend

func (n *BinNode) SetBackend(b plan.Backend)

SetBackend wires the compile backend that powers Execute.

func (*BinNode) Summary

func (n *BinNode) Summary() string

Summary implements plan.Labeled.

type BinParams

type BinParams struct {
	Auto    bool
	Maxbins *int
	Step    *float64
	Extent  []float64
}

BinParams captures the optional knobs for a numeric bin operation. Auto=true (the bin: true shorthand) requests automatic bin selection; Maxbins, Step, and Extent override pieces of that selection.

func (BinParams) String

func (b BinParams) String() string

String returns a deterministic text form for fingerprints.

type CalculateNode

type CalculateNode struct {
	// contains filtered or unexported fields
}

CalculateNode appends one computed column derived from a Pulse expression.

func NewCalculate

func NewCalculate(id, input plan.NodeID, expr, as string) *CalculateNode

NewCalculate constructs a CalculateNode.

func (*CalculateNode) As

func (n *CalculateNode) As() string

As exposes the output column name for renderers + tests.

func (*CalculateNode) Execute

func (n *CalculateNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node via the injected backend.

func (*CalculateNode) Expr

func (n *CalculateNode) Expr() string

Expr exposes the expression for renderers + tests.

func (*CalculateNode) Fingerprint

func (n *CalculateNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*CalculateNode) ID

func (n *CalculateNode) ID() plan.NodeID

ID implements plan.Node.

func (*CalculateNode) Inputs

func (n *CalculateNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*CalculateNode) Kind

func (n *CalculateNode) Kind() string

Kind implements plan.Labeled.

func (*CalculateNode) Schema

func (n *CalculateNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Output schema is input + one F64 field named n.as. F64 is the conservative bucket for expression results (every Pulse arithmetic expression promotes to float).

func (*CalculateNode) SetBackend

func (n *CalculateNode) SetBackend(b plan.Backend)

SetBackend wires the compile backend that powers Execute.

func (*CalculateNode) Summary

func (n *CalculateNode) Summary() string

Summary implements plan.Labeled.

type FilterNode

type FilterNode struct {
	// contains filtered or unexported fields
}

FilterNode applies a Pulse expression predicate to its input table. Execute routes through the injected backend; falls back to PRISM_COMPILE_001 when no backend is wired (preserves P03 stub behaviour for callers that haven't migrated). See D033.

func NewFilter

func NewFilter(id, input plan.NodeID, expr string) *FilterNode

NewFilter constructs a FilterNode with a stable id derived from (input, expr) so two equivalent filters share a fingerprint.

func (*FilterNode) Execute

func (n *FilterNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node. Routes through the injected backend when one is wired; returns PRISM_COMPILE_001 otherwise.

func (*FilterNode) Expr

func (n *FilterNode) Expr() string

Expr exposes the predicate string for renderers + tests.

func (*FilterNode) Fingerprint

func (n *FilterNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*FilterNode) ID

func (n *FilterNode) ID() plan.NodeID

ID implements plan.Node.

func (*FilterNode) Inputs

func (n *FilterNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*FilterNode) Kind

func (n *FilterNode) Kind() string

Kind implements plan.Labeled.

func (*FilterNode) Schema

func (n *FilterNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Filter does not change the schema.

func (*FilterNode) SetBackend

func (n *FilterNode) SetBackend(b plan.Backend)

SetBackend wires the compile backend that powers Execute. The builder calls this after construction so node constructors keep their P03 signatures stable. See D033.

func (*FilterNode) Summary

func (n *FilterNode) Summary() string

Summary implements plan.Labeled.

type GroupAggregateNode

type GroupAggregateNode struct {
	// contains filtered or unexported fields
}

GroupAggregateNode partitions its input by groupby fields and emits one row per partition with each aggregate.

func NewGroupAggregate

func NewGroupAggregate(id, input plan.NodeID, groupby []string, aggs []AggOp) *GroupAggregateNode

NewGroupAggregate constructs a GroupAggregateNode.

func (*GroupAggregateNode) Aggs

func (n *GroupAggregateNode) Aggs() []AggOp

Aggs exposes the aggregate operations for renderers + tests.

func (*GroupAggregateNode) Execute

func (n *GroupAggregateNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node via the injected backend.

func (*GroupAggregateNode) Fingerprint

func (n *GroupAggregateNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*GroupAggregateNode) Groupby

func (n *GroupAggregateNode) Groupby() []string

Groupby exposes the partition keys for renderers + tests.

func (*GroupAggregateNode) ID

func (n *GroupAggregateNode) ID() plan.NodeID

ID implements plan.Node.

func (*GroupAggregateNode) Inputs

func (n *GroupAggregateNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*GroupAggregateNode) Kind

func (n *GroupAggregateNode) Kind() string

Kind implements plan.Labeled.

func (*GroupAggregateNode) Schema

func (n *GroupAggregateNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Output schema is the projection of input down to groupby fields plus one F64 field per AggOp named by op.As. Result types come from aggregateOutputType (every shipped op is scalar F64 today).

func (*GroupAggregateNode) SetBackend

func (n *GroupAggregateNode) SetBackend(b plan.Backend)

SetBackend wires the compile backend that powers Execute.

func (*GroupAggregateNode) Summary

func (n *GroupAggregateNode) Summary() string

Summary implements plan.Labeled — "by: a,b | aggs: mean(score)->m, ...".

type InlineNode

type InlineNode struct {
	// contains filtered or unexported fields
}

InlineNode is the leaf node fed by `data.values` (and optional `data.fields`) declarations. Unlike SourceNode (which reads .pulse bytes via Resolver), InlineNode delegates straight to table.FromInline at Execute time.

Not a stub — table.FromInline shipped in P02; the executor needs real inline-data behaviour in P03 to make any inline-bound spec runnable end-to-end through the pipeline.

func NewInline

func NewInline(id plan.NodeID, name string, values []map[string]any, fields []spec.FieldSpec) *InlineNode

NewInline constructs an InlineNode from raw spec inputs.

func (*InlineNode) Execute

func (n *InlineNode) Execute(_ context.Context, _ []*table.Table) (*table.Table, error)

Execute implements plan.Node.

func (*InlineNode) Fingerprint

func (n *InlineNode) Fingerprint() string

Fingerprint implements plan.Node. Hashes a canonical JSON encoding of (name, fields, values-with-sorted-keys) so two equivalent inline declarations produce the same fingerprint regardless of original map ordering.

func (*InlineNode) ID

func (n *InlineNode) ID() plan.NodeID

ID implements plan.Node.

func (*InlineNode) Inputs

func (n *InlineNode) Inputs() []plan.NodeID

Inputs implements plan.Node. Inline is a leaf — same as Source.

func (*InlineNode) Kind

func (n *InlineNode) Kind() string

Kind implements plan.Labeled.

func (*InlineNode) Name

func (n *InlineNode) Name() string

Name exposes the dataset name (empty if unnamed).

func (*InlineNode) Schema

func (n *InlineNode) Schema(_ []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Computes via table.FromInline the first time it is called, then caches the result. Errors propagate.

func (*InlineNode) Summary

func (n *InlineNode) Summary() string

Summary implements plan.Labeled.

type JoinKind

type JoinKind string

JoinKind is the join semantics token: inner|left|outer|anti. The schema computation does not branch on kind in P03 (the column set is the union either way; null behaviour comes in at Execute time, P07).

const (
	JoinInner JoinKind = "inner"
	JoinLeft  JoinKind = "left"
	JoinOuter JoinKind = "outer"
	JoinAnti  JoinKind = "anti"
)

type JoinNode

type JoinNode struct {
	// contains filtered or unexported fields
}

JoinNode hash-joins two inputs on equality. P03 stub.

func NewJoin

func NewJoin(id, left, right plan.NodeID, on []string, kind JoinKind, maxRows int) *JoinNode

NewJoin constructs a JoinNode.

func (*JoinNode) Execute

func (n *JoinNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node. Hash join body lives in join_execute.go (kept separate for diffability against the P03 stub surface).

func (*JoinNode) Fingerprint

func (n *JoinNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*JoinNode) ID

func (n *JoinNode) ID() plan.NodeID

ID implements plan.Node.

func (*JoinNode) Inputs

func (n *JoinNode) Inputs() []plan.NodeID

Inputs implements plan.Node. Order is (left, right) so the executor can pass the two upstream Tables in the right slot.

func (*JoinNode) JoinKind

func (n *JoinNode) JoinKind() JoinKind

JoinKind exposes the join semantics for renderers + tests. (Named JoinKind not Kind to avoid colliding with plan.Labeled.Kind.)

func (*JoinNode) Kind

func (n *JoinNode) Kind() string

Kind implements plan.Labeled.

func (*JoinNode) On

func (n *JoinNode) On() []string

On exposes the join keys for renderers + tests.

func (*JoinNode) Schema

func (n *JoinNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Output schema is the union of left and right schemas, dropping right-side duplicates of the join keys.

func (*JoinNode) Summary

func (n *JoinNode) Summary() string

Summary implements plan.Labeled.

type LimitNode

type LimitNode struct {
	// contains filtered or unexported fields
}

LimitNode keeps the first N rows (with optional offset).

func NewLimit

func NewLimit(id, input plan.NodeID, limit, offset int) *LimitNode

NewLimit constructs a LimitNode.

func (*LimitNode) Execute

func (n *LimitNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node via the injected backend.

func (*LimitNode) Fingerprint

func (n *LimitNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*LimitNode) ID

func (n *LimitNode) ID() plan.NodeID

ID implements plan.Node.

func (*LimitNode) Inputs

func (n *LimitNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*LimitNode) Kind

func (n *LimitNode) Kind() string

Kind implements plan.Labeled.

func (*LimitNode) Limit

func (n *LimitNode) Limit() int

Limit exposes the row cap for renderers + tests.

func (*LimitNode) Offset

func (n *LimitNode) Offset() int

Offset exposes the offset for renderers + tests.

func (*LimitNode) Schema

func (n *LimitNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Limit never changes the schema.

func (*LimitNode) SetBackend

func (n *LimitNode) SetBackend(b plan.Backend)

SetBackend wires the compile backend that powers Execute.

func (*LimitNode) Summary

func (n *LimitNode) Summary() string

Summary implements plan.Labeled.

type PivotNode

type PivotNode struct {
	// contains filtered or unexported fields
}

PivotNode reshapes long → wide. P03 stub.

Schema computation cannot determine the output column names without scanning the input data (the cells in the `pivot` column become the new column headers). In P03 we conservatively return the input schema unchanged so DAG validation does not block — the real Schema derivation lands in P04 alongside the real Execute path.

TODO P04: implement schema-on-execute by reading the input table once at Schema() time (or annotating the plan with the discovered categories at build time when the data is available statically).

func NewPivot

func NewPivot(id, input plan.NodeID, pivot, value string, groupby []string, op string) *PivotNode

NewPivot constructs a PivotNode.

func (*PivotNode) Execute

func (n *PivotNode) Execute(_ context.Context, _ []*table.Table) (*table.Table, error)

Execute implements plan.Node. P03 stub.

func (*PivotNode) Fingerprint

func (n *PivotNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*PivotNode) ID

func (n *PivotNode) ID() plan.NodeID

ID implements plan.Node.

func (*PivotNode) Inputs

func (n *PivotNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*PivotNode) Kind

func (n *PivotNode) Kind() string

Kind implements plan.Labeled.

func (*PivotNode) Pivot

func (n *PivotNode) Pivot() string

Pivot exposes the column whose distinct values become headers.

func (*PivotNode) Schema

func (n *PivotNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Conservative default; see the package note above. TODO P04.

func (*PivotNode) Summary

func (n *PivotNode) Summary() string

Summary implements plan.Labeled.

func (*PivotNode) Value

func (n *PivotNode) Value() string

Value exposes the source value column.

type ProjectNode

type ProjectNode struct {
	// contains filtered or unexported fields
}

ProjectNode keeps only the named fields from its input.

func NewProject

func NewProject(id, input plan.NodeID, fields []string) *ProjectNode

NewProject constructs a ProjectNode. The fields slice is copied.

func (*ProjectNode) Execute

func (n *ProjectNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node via the injected backend.

func (*ProjectNode) Fields

func (n *ProjectNode) Fields() []string

Fields exposes the field list for renderers + tests.

func (*ProjectNode) Fingerprint

func (n *ProjectNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*ProjectNode) ID

func (n *ProjectNode) ID() plan.NodeID

ID implements plan.Node.

func (*ProjectNode) Inputs

func (n *ProjectNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*ProjectNode) Kind

func (n *ProjectNode) Kind() string

Kind implements plan.Labeled.

func (*ProjectNode) Schema

func (n *ProjectNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Output schema is the projection of the input schema down to fields named in n.fields, in the requested order. Missing field names raise PRISM_PLAN_003.

func (*ProjectNode) SetBackend

func (n *ProjectNode) SetBackend(b plan.Backend)

SetBackend wires the compile backend that powers Execute.

func (*ProjectNode) Summary

func (n *ProjectNode) Summary() string

Summary implements plan.Labeled.

type PulseChainNode

type PulseChainNode struct {
	// contains filtered or unexported fields
}

PulseChainNode collapses a source-rooted linear chain of fusable operators (Filter, Calculate, GroupAggregate, Sort) into a single `pulse.ProcessChain` call. The node is its own root — it opens the .pulse cohort itself rather than consuming an upstream Table, so fusion bypasses the per-source materialization the inmem backend would otherwise pay.

The fusion pass (plan/passes/pulse_chain_fusion.go) constructs the node once eligibility is verified; v1 emits a single chain stage (one types.Request bundling filter + groupby + agg + sort). The ChainRequest envelope is used regardless so adding multi-stage support later is purely additive.

Fallback semantics: if Pulse rejects a stage at execute time with PULSE_CHAIN_NOT_MERGEABLE, Execute wraps it as PRISM_PLAN_CHAIN_NOT_MERGEABLE and the executor surfaces the error like any other PRISM_* AppError. Callers that want to re-run with fusion disabled can swap the optimizer pass list.

func NewPulseChain

func NewPulseChain(
	id plan.NodeID,
	ref string,
	fs afero.Fs,
	chainReq *pulsetypes.ChainRequest,
	outSchema *encoding.Schema,
	summary string,
	stageIDs []plan.NodeID,
) *PulseChainNode

NewPulseChain constructs a PulseChainNode. The chainReq must carry the cohort on its first stage; outSchema is the schema synthesised by the final stage (already validated by the fusion pass). Callers pass the absorbed node ids via stageIDs so the fingerprint reflects the upstream plan shape — two equivalent fusions hash identically.

func (*PulseChainNode) ChainRequest

func (n *PulseChainNode) ChainRequest() *pulsetypes.ChainRequest

ChainRequest returns the underlying Pulse chain request for tests and plan renderers.

func (*PulseChainNode) Execute

func (n *PulseChainNode) Execute(ctx context.Context, _ []*table.Table) (*table.Table, error)

Execute implements plan.Node. Opens a fresh pulse.Pulse against the node's afero.Fs, runs ProcessChain, and materialises the final stage's row maps into a typed *table.Table.

Pulse errors carrying a recognised code (PULSE_CHAIN_NOT_MERGEABLE, PULSE_PROCESSING_*) are rewrapped as PRISM_PLAN_CHAIN_NOT_MERGEABLE or PRISM_RESOLVE_006 so the executor's NodeError envelope picks up a stable PRISM_* identifier.

func (*PulseChainNode) Fingerprint

func (n *PulseChainNode) Fingerprint() string

Fingerprint implements plan.Node. Mixes the source ref and each absorbed stage id so equivalent fusions share a cache key.

func (*PulseChainNode) ID

func (n *PulseChainNode) ID() plan.NodeID

ID implements plan.Node.

func (*PulseChainNode) Inputs

func (n *PulseChainNode) Inputs() []plan.NodeID

Inputs implements plan.Node — the chain node is a leaf (it opens the cohort itself).

func (*PulseChainNode) Kind

func (n *PulseChainNode) Kind() string

Kind implements plan.Labeled.

func (*PulseChainNode) Ref

func (n *PulseChainNode) Ref() string

Ref returns the source ref for diagnostics and tests.

func (*PulseChainNode) Schema

func (n *PulseChainNode) Schema(_ []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. The chain output schema is computed at fusion time via processing.ChainOutputSchema(finalStageReq).

func (*PulseChainNode) StageIDs

func (n *PulseChainNode) StageIDs() []plan.NodeID

StageIDs returns the ids of the upstream Prism nodes this chain replaced, in fusion order.

func (*PulseChainNode) Summary

func (n *PulseChainNode) Summary() string

Summary implements plan.Labeled — human-readable label produced by the fusion pass.

type SampleNode

type SampleNode struct {
	// contains filtered or unexported fields
}

SampleNode draws a random subsample of size n.

func NewSample

func NewSample(id, input plan.NodeID, sample int, seed *int64) *SampleNode

NewSample constructs a SampleNode. seed is optional (nil = unseeded).

func (*SampleNode) Execute

func (n *SampleNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node via the injected backend.

func (*SampleNode) Fingerprint

func (n *SampleNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*SampleNode) ID

func (n *SampleNode) ID() plan.NodeID

ID implements plan.Node.

func (*SampleNode) Inputs

func (n *SampleNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*SampleNode) Kind

func (n *SampleNode) Kind() string

Kind implements plan.Labeled.

func (*SampleNode) N

func (n *SampleNode) N() int

N exposes the requested sample size for renderers + tests.

func (*SampleNode) Schema

func (n *SampleNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Sampling preserves the schema.

func (*SampleNode) Seed

func (n *SampleNode) Seed() *int64

Seed exposes the (optional) deterministic RNG seed. nil means the executor seeds from wall-clock time.

func (*SampleNode) SetBackend

func (n *SampleNode) SetBackend(b plan.Backend)

SetBackend wires the compile backend that powers Execute.

func (*SampleNode) Summary

func (n *SampleNode) Summary() string

Summary implements plan.Labeled.

type SortKey

type SortKey struct {
	Field string
	Order string // "asc"|"desc"; empty defaults to "asc"
}

SortKey is a (field, order) pair the window operator consumes.

func (SortKey) String

func (s SortKey) String() string

String returns a stable text form for fingerprints.

type SortNode

type SortNode struct {
	// contains filtered or unexported fields
}

SortNode orders rows by one or more (field, order) keys.

func NewSort

func NewSort(id, input plan.NodeID, sort []SortKey) *SortNode

NewSort constructs a SortNode. The sort slice is copied.

func (*SortNode) Execute

func (n *SortNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node via the injected backend.

func (*SortNode) Fingerprint

func (n *SortNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*SortNode) ID

func (n *SortNode) ID() plan.NodeID

ID implements plan.Node.

func (*SortNode) Inputs

func (n *SortNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*SortNode) Kind

func (n *SortNode) Kind() string

Kind implements plan.Labeled.

func (*SortNode) Schema

func (n *SortNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Sort never changes the schema.

func (*SortNode) SetBackend

func (n *SortNode) SetBackend(b plan.Backend)

SetBackend wires the compile backend that powers Execute.

func (*SortNode) Sort

func (n *SortNode) Sort() []SortKey

Sort exposes the keys for renderers + tests.

func (*SortNode) SortLabel

func (n *SortNode) SortLabel() string

SortLabel returns a renderer-friendly summary like "score:desc,name:asc".

func (*SortNode) Summary

func (n *SortNode) Summary() string

Summary implements plan.Labeled.

type SourceNode

type SourceNode struct {
	// contains filtered or unexported fields
}

SourceNode is the leaf DAG node. It resolves a ref to a .pulse cohort (or shard), materialises every record into typed columns, and emits a *table.Table tagged with the content hash of the on-disk bytes.

SourceNode satisfies the full plan.Node interface (P03 widened it to include Schema(in)). The Schema method delegates to OutputSchema, which is kept for backwards compatibility with the P02 callers (validate.PulseLookup) that already use it.

func New

func New(ref string, fs afero.Fs, r resolve.Resolver) *SourceNode

New constructs a SourceNode. The ref must use one of the four forms documented in resolve/resolver.go. fs is the filesystem the resolver honours at execute time (test code passes afero.NewMemMapFs()). resolver must be non-nil; pass resolve.New(nil) for the default EmptyRegistry-backed implementation.

func (*SourceNode) Execute

func (n *SourceNode) Execute(ctx context.Context, _ []*table.Table) (*table.Table, error)

Execute implements plan.Node. Reads the resolved payload bytes, computes their xxhash64, then decodes every record into typed column slices and constructs the Table. Row count is gated by limits.TableMaxRows() — once the counter would exceed the cap, Execute returns PRISM_RESOLVE_007 immediately (no further decode).

func (*SourceNode) FS

func (n *SourceNode) FS() afero.Fs

FS returns the afero filesystem this source was constructed with. Optimizer passes that rewrite a source-rooted subtree (e.g. PulseChainFusionPass) reuse this fs so the replacement node still honours an in-memory test environment.

func (*SourceNode) Fingerprint

func (n *SourceNode) Fingerprint() string

Fingerprint implements plan.Node. Source fingerprint = sha256(ref). Combined with Table.Hash() at execute time by the cache key builder.

func (*SourceNode) ID

func (n *SourceNode) ID() plan.NodeID

ID implements plan.Node.

func (*SourceNode) Inputs

func (n *SourceNode) Inputs() []plan.NodeID

Inputs implements plan.Node. SourceNode is a leaf.

func (*SourceNode) Kind

func (n *SourceNode) Kind() string

Kind implements plan.Labeled.

func (*SourceNode) OutputSchema

func (n *SourceNode) OutputSchema() (*encoding.Schema, error)

OutputSchema resolves the ref so the schema is discoverable without materialising records. The ReadCloser returned by Resolve is closed immediately. Kept as a public method for backwards compatibility with validate.PulseLookup; new callers should prefer Schema(nil).

func (*SourceNode) Ref

func (n *SourceNode) Ref() string

Ref returns the user-supplied ref string. Useful in error messages and plan-visualisation tooling.

func (*SourceNode) RowCount

func (n *SourceNode) RowCount(ctx context.Context) (uint64, error)

RowCount returns the record count from the cohort's Pulse header without materialising any records. Backed by pulse.CountRecords (available since pulse v0.10.0). Used by the SampleInjection optimizer pass to decide whether to inject a SampleNode below a Source whose row count exceeds PRISM_RENDER_MAX_MARKS.

func (*SourceNode) Schema

func (n *SourceNode) Schema(_ []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Source nodes ignore the `in` slice (they have no upstream); the output schema is whatever the resolver reports for the underlying .pulse cohort.

func (*SourceNode) Summary

func (n *SourceNode) Summary() string

Summary implements plan.Labeled — the ref string the user wrote.

type UnionNode

type UnionNode struct {
	// contains filtered or unexported fields
}

UnionNode vertically concatenates N inputs. P03 stub.

Schema computation in P03 returns the first input's schema verbatim; the real Execute path (P07) will validate cross-input schema compatibility (same field names, compatible types). Marked here so the eventual swap is mechanical.

TODO P07: validate that every input schema matches in[0] and emit a PRISM_PLAN_* code on mismatch.

func NewUnion

func NewUnion(id plan.NodeID, inputs []plan.NodeID) *UnionNode

NewUnion constructs a UnionNode. The inputs slice is copied.

func (*UnionNode) Execute

func (n *UnionNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node. Body lives in union_execute.go.

func (*UnionNode) Fingerprint

func (n *UnionNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*UnionNode) ID

func (n *UnionNode) ID() plan.NodeID

ID implements plan.Node.

func (*UnionNode) Inputs

func (n *UnionNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*UnionNode) Kind

func (n *UnionNode) Kind() string

Kind implements plan.Labeled.

func (*UnionNode) Schema

func (n *UnionNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Returns the first input's schema; Execute validates that every other input matches it (PRISM_PLAN_004).

func (*UnionNode) Summary

func (n *UnionNode) Summary() string

Summary implements plan.Labeled — number of inputs being concatenated.

type UnpivotNode

type UnpivotNode struct {
	// contains filtered or unexported fields
}

UnpivotNode reshapes wide → long. P03 stub.

Output schema is the input minus the unpivoted fields, plus two new fields: a categorical key column (named as[0], default "key") and a numeric value column (named as[1], default "value").

func NewUnpivot

func NewUnpivot(id, input plan.NodeID, unpivot, as []string) *UnpivotNode

NewUnpivot constructs an UnpivotNode.

func (*UnpivotNode) Execute

func (n *UnpivotNode) Execute(_ context.Context, _ []*table.Table) (*table.Table, error)

Execute implements plan.Node. P03 stub.

func (*UnpivotNode) Fingerprint

func (n *UnpivotNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*UnpivotNode) ID

func (n *UnpivotNode) ID() plan.NodeID

ID implements plan.Node.

func (*UnpivotNode) Inputs

func (n *UnpivotNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*UnpivotNode) Kind

func (n *UnpivotNode) Kind() string

Kind implements plan.Labeled.

func (*UnpivotNode) Schema

func (n *UnpivotNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Drops the unpivoted fields from the input schema and appends two new fields (key column + value column).

func (*UnpivotNode) Summary

func (n *UnpivotNode) Summary() string

Summary implements plan.Labeled.

func (*UnpivotNode) Unpivot

func (n *UnpivotNode) Unpivot() []string

Unpivot exposes the source fields for renderers + tests.

type WindowNode

type WindowNode struct {
	// contains filtered or unexported fields
}

WindowNode applies windowed aggregates / ranks over an input.

func NewWindow

func NewWindow(id, input plan.NodeID, ops []WindowOp, partitionby []string, sort []SortKey, frame []any) *WindowNode

NewWindow constructs a WindowNode. All slices are copied.

func (*WindowNode) Execute

func (n *WindowNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)

Execute implements plan.Node via the injected backend.

func (*WindowNode) Fingerprint

func (n *WindowNode) Fingerprint() string

Fingerprint implements plan.Node.

func (*WindowNode) Frame

func (n *WindowNode) Frame() []any

Frame exposes the frame specification for the executor; nil when the window op does not consume a frame.

func (*WindowNode) ID

func (n *WindowNode) ID() plan.NodeID

ID implements plan.Node.

func (*WindowNode) Inputs

func (n *WindowNode) Inputs() []plan.NodeID

Inputs implements plan.Node.

func (*WindowNode) Kind

func (n *WindowNode) Kind() string

Kind implements plan.Labeled.

func (*WindowNode) Ops

func (n *WindowNode) Ops() []WindowOp

Ops exposes the window operations for renderers + tests.

func (*WindowNode) Partitionby

func (n *WindowNode) Partitionby() []string

Partitionby exposes the partition keys for renderers + executor.

func (*WindowNode) Schema

func (n *WindowNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)

Schema implements plan.Node. Output schema is input + one F64 field per WindowOp (ranks come back as float to keep downstream arithmetic uniform; real impl can re-type integer ranks if profiling motivates).

func (*WindowNode) SetBackend

func (n *WindowNode) SetBackend(b plan.Backend)

SetBackend wires the compile backend that powers Execute.

func (*WindowNode) Sort

func (n *WindowNode) Sort() []SortKey

Sort exposes the per-partition ordering keys for executor.

func (*WindowNode) Summary

func (n *WindowNode) Summary() string

Summary implements plan.Labeled.

type WindowOp

type WindowOp struct {
	Op    string
	Field string
	As    string
	Param *float64
}

WindowOp is one windowed calculation: op (rank, dense_rank, lag, lead, sum, mean, ...), source field, output name, optional parameter.

func (WindowOp) String

func (w WindowOp) String() string

String returns a stable text form for fingerprints.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL