Documentation
¶
Overview ¶
Package nodes holds the DAG node implementations. P02 ships SourceNode — the leaf that reads a .pulse via Resolver and materialises a Table. Future phases (P03+) add Filter, Project, GroupAggregate, Join, etc.
File stub.go centralises the helpers every P03 stub node uses:
- notImplementedErr(kind) — the PRISM_COMPILE_001 error returned by every stubbed Execute body until P04 lands the real impls.
- cloneSchema / appendField / projectFields — pure helpers that compute deterministic output schemas from input schemas plus op parameters, so Schema(in) works in P03 without execution data.
- fingerprintFor(kind, parts...) — sha256-prefixed cache-key component, deterministic across runs.
Stub nodes (everything that is not SourceNode, InlineNode, or SinkNode) wire these helpers in their per-type files (filter.go, project.go, ...). The Execute body is always one line:
return nil, notImplementedErr("FilterNode")
P04 swaps each stub's body for the real implementation; tests that currently assert PRISM_COMPILE_001 flip to assert correct output tables.
Index ¶
- func DerivePulseChainID(ref string, absorbed []plan.NodeID) plan.NodeID
- type AggOp
- type BinNode
- func (n *BinNode) As() string
- func (n *BinNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *BinNode) Field() string
- func (n *BinNode) Fingerprint() string
- func (n *BinNode) ID() plan.NodeID
- func (n *BinNode) Inputs() []plan.NodeID
- func (n *BinNode) Kind() string
- func (n *BinNode) Params() BinParams
- func (n *BinNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *BinNode) SetBackend(b plan.Backend)
- func (n *BinNode) Summary() string
- type BinParams
- type CalculateNode
- func (n *CalculateNode) As() string
- func (n *CalculateNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *CalculateNode) Expr() string
- func (n *CalculateNode) Fingerprint() string
- func (n *CalculateNode) ID() plan.NodeID
- func (n *CalculateNode) Inputs() []plan.NodeID
- func (n *CalculateNode) Kind() string
- func (n *CalculateNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *CalculateNode) SetBackend(b plan.Backend)
- func (n *CalculateNode) Summary() string
- type FilterNode
- func (n *FilterNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *FilterNode) Expr() string
- func (n *FilterNode) Fingerprint() string
- func (n *FilterNode) ID() plan.NodeID
- func (n *FilterNode) Inputs() []plan.NodeID
- func (n *FilterNode) Kind() string
- func (n *FilterNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *FilterNode) SetBackend(b plan.Backend)
- func (n *FilterNode) Summary() string
- type GroupAggregateNode
- func (n *GroupAggregateNode) Aggs() []AggOp
- func (n *GroupAggregateNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *GroupAggregateNode) Fingerprint() string
- func (n *GroupAggregateNode) Groupby() []string
- func (n *GroupAggregateNode) ID() plan.NodeID
- func (n *GroupAggregateNode) Inputs() []plan.NodeID
- func (n *GroupAggregateNode) Kind() string
- func (n *GroupAggregateNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *GroupAggregateNode) SetBackend(b plan.Backend)
- func (n *GroupAggregateNode) Summary() string
- type InlineNode
- func (n *InlineNode) Execute(_ context.Context, _ []*table.Table) (*table.Table, error)
- func (n *InlineNode) Fingerprint() string
- func (n *InlineNode) ID() plan.NodeID
- func (n *InlineNode) Inputs() []plan.NodeID
- func (n *InlineNode) Kind() string
- func (n *InlineNode) Name() string
- func (n *InlineNode) Schema(_ []*encoding.Schema) (*encoding.Schema, error)
- func (n *InlineNode) Summary() string
- type JoinKind
- type JoinNode
- func (n *JoinNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *JoinNode) Fingerprint() string
- func (n *JoinNode) ID() plan.NodeID
- func (n *JoinNode) Inputs() []plan.NodeID
- func (n *JoinNode) JoinKind() JoinKind
- func (n *JoinNode) Kind() string
- func (n *JoinNode) On() []string
- func (n *JoinNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *JoinNode) Summary() string
- type LimitNode
- func (n *LimitNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *LimitNode) Fingerprint() string
- func (n *LimitNode) ID() plan.NodeID
- func (n *LimitNode) Inputs() []plan.NodeID
- func (n *LimitNode) Kind() string
- func (n *LimitNode) Limit() int
- func (n *LimitNode) Offset() int
- func (n *LimitNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *LimitNode) SetBackend(b plan.Backend)
- func (n *LimitNode) Summary() string
- type PivotNode
- func (n *PivotNode) Execute(_ context.Context, _ []*table.Table) (*table.Table, error)
- func (n *PivotNode) Fingerprint() string
- func (n *PivotNode) ID() plan.NodeID
- func (n *PivotNode) Inputs() []plan.NodeID
- func (n *PivotNode) Kind() string
- func (n *PivotNode) Pivot() string
- func (n *PivotNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *PivotNode) Summary() string
- func (n *PivotNode) Value() string
- type ProjectNode
- func (n *ProjectNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *ProjectNode) Fields() []string
- func (n *ProjectNode) Fingerprint() string
- func (n *ProjectNode) ID() plan.NodeID
- func (n *ProjectNode) Inputs() []plan.NodeID
- func (n *ProjectNode) Kind() string
- func (n *ProjectNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *ProjectNode) SetBackend(b plan.Backend)
- func (n *ProjectNode) Summary() string
- type PulseChainNode
- func (n *PulseChainNode) ChainRequest() *pulsetypes.ChainRequest
- func (n *PulseChainNode) Execute(ctx context.Context, _ []*table.Table) (*table.Table, error)
- func (n *PulseChainNode) Fingerprint() string
- func (n *PulseChainNode) ID() plan.NodeID
- func (n *PulseChainNode) Inputs() []plan.NodeID
- func (n *PulseChainNode) Kind() string
- func (n *PulseChainNode) Ref() string
- func (n *PulseChainNode) Schema(_ []*encoding.Schema) (*encoding.Schema, error)
- func (n *PulseChainNode) StageIDs() []plan.NodeID
- func (n *PulseChainNode) Summary() string
- type SampleNode
- func (n *SampleNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *SampleNode) Fingerprint() string
- func (n *SampleNode) ID() plan.NodeID
- func (n *SampleNode) Inputs() []plan.NodeID
- func (n *SampleNode) Kind() string
- func (n *SampleNode) N() int
- func (n *SampleNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *SampleNode) Seed() *int64
- func (n *SampleNode) SetBackend(b plan.Backend)
- func (n *SampleNode) Summary() string
- type SortKey
- type SortNode
- func (n *SortNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *SortNode) Fingerprint() string
- func (n *SortNode) ID() plan.NodeID
- func (n *SortNode) Inputs() []plan.NodeID
- func (n *SortNode) Kind() string
- func (n *SortNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *SortNode) SetBackend(b plan.Backend)
- func (n *SortNode) Sort() []SortKey
- func (n *SortNode) SortLabel() string
- func (n *SortNode) Summary() string
- type SourceNode
- func (n *SourceNode) Execute(ctx context.Context, _ []*table.Table) (*table.Table, error)
- func (n *SourceNode) FS() afero.Fs
- func (n *SourceNode) Fingerprint() string
- func (n *SourceNode) ID() plan.NodeID
- func (n *SourceNode) Inputs() []plan.NodeID
- func (n *SourceNode) Kind() string
- func (n *SourceNode) OutputSchema() (*encoding.Schema, error)
- func (n *SourceNode) Ref() string
- func (n *SourceNode) RowCount(ctx context.Context) (uint64, error)
- func (n *SourceNode) Schema(_ []*encoding.Schema) (*encoding.Schema, error)
- func (n *SourceNode) Summary() string
- type UnionNode
- func (n *UnionNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *UnionNode) Fingerprint() string
- func (n *UnionNode) ID() plan.NodeID
- func (n *UnionNode) Inputs() []plan.NodeID
- func (n *UnionNode) Kind() string
- func (n *UnionNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *UnionNode) Summary() string
- type UnpivotNode
- func (n *UnpivotNode) Execute(_ context.Context, _ []*table.Table) (*table.Table, error)
- func (n *UnpivotNode) Fingerprint() string
- func (n *UnpivotNode) ID() plan.NodeID
- func (n *UnpivotNode) Inputs() []plan.NodeID
- func (n *UnpivotNode) Kind() string
- func (n *UnpivotNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *UnpivotNode) Summary() string
- func (n *UnpivotNode) Unpivot() []string
- type WindowNode
- func (n *WindowNode) Execute(ctx context.Context, in []*table.Table) (*table.Table, error)
- func (n *WindowNode) Fingerprint() string
- func (n *WindowNode) Frame() []any
- func (n *WindowNode) ID() plan.NodeID
- func (n *WindowNode) Inputs() []plan.NodeID
- func (n *WindowNode) Kind() string
- func (n *WindowNode) Ops() []WindowOp
- func (n *WindowNode) Partitionby() []string
- func (n *WindowNode) Schema(in []*encoding.Schema) (*encoding.Schema, error)
- func (n *WindowNode) SetBackend(b plan.Backend)
- func (n *WindowNode) Sort() []SortKey
- func (n *WindowNode) Summary() string
- type WindowOp
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DerivePulseChainID ¶
DerivePulseChainID builds a stable id for the fused node by hashing the source ref together with the absorbed-node id list. Two equivalent fusions share an id, mirroring SourceNode's content fingerprint convention.
Types ¶
type AggOp ¶
AggOp encodes one aggregate calculation: op (mean, sum, count, ...), the source field (empty for count(*) variants), and the output column name.
type BinNode ¶
type BinNode struct {
// contains filtered or unexported fields
}
BinNode buckets a numeric field.
func (*BinNode) Fingerprint ¶
Fingerprint implements plan.Node.
func (*BinNode) Schema ¶
Schema implements plan.Node. Output schema is input + one F64 field named n.as (the bin edge for each row).
func (*BinNode) SetBackend ¶
SetBackend wires the compile backend that powers Execute.
type BinParams ¶
BinParams captures the optional knobs for a numeric bin operation. Auto=true (the bin: true shorthand) requests automatic bin selection; Maxbins, Step, and Extent override pieces of that selection.
type CalculateNode ¶
type CalculateNode struct {
// contains filtered or unexported fields
}
CalculateNode appends one computed column derived from a Pulse expression.
func NewCalculate ¶
func NewCalculate(id, input plan.NodeID, expr, as string) *CalculateNode
NewCalculate constructs a CalculateNode.
func (*CalculateNode) As ¶
func (n *CalculateNode) As() string
As exposes the output column name for renderers + tests.
func (*CalculateNode) Expr ¶
func (n *CalculateNode) Expr() string
Expr exposes the expression for renderers + tests.
func (*CalculateNode) Fingerprint ¶
func (n *CalculateNode) Fingerprint() string
Fingerprint implements plan.Node.
func (*CalculateNode) Inputs ¶
func (n *CalculateNode) Inputs() []plan.NodeID
Inputs implements plan.Node.
func (*CalculateNode) Schema ¶
Schema implements plan.Node. Output schema is input + one F64 field named n.as. F64 is the conservative bucket for expression results (every Pulse arithmetic expression promotes to float).
func (*CalculateNode) SetBackend ¶
func (n *CalculateNode) SetBackend(b plan.Backend)
SetBackend wires the compile backend that powers Execute.
func (*CalculateNode) Summary ¶
func (n *CalculateNode) Summary() string
Summary implements plan.Labeled.
type FilterNode ¶
type FilterNode struct {
// contains filtered or unexported fields
}
FilterNode applies a Pulse expression predicate to its input table. Execute routes through the injected backend; falls back to PRISM_COMPILE_001 when no backend is wired (preserves P03 stub behaviour for callers that haven't migrated). See D033.
func NewFilter ¶
func NewFilter(id, input plan.NodeID, expr string) *FilterNode
NewFilter constructs a FilterNode with a stable id derived from (input, expr) so two equivalent filters share a fingerprint.
func (*FilterNode) Execute ¶
Execute implements plan.Node. Routes through the injected backend when one is wired; returns PRISM_COMPILE_001 otherwise.
func (*FilterNode) Expr ¶
func (n *FilterNode) Expr() string
Expr exposes the predicate string for renderers + tests.
func (*FilterNode) Fingerprint ¶
func (n *FilterNode) Fingerprint() string
Fingerprint implements plan.Node.
func (*FilterNode) Inputs ¶
func (n *FilterNode) Inputs() []plan.NodeID
Inputs implements plan.Node.
func (*FilterNode) SetBackend ¶
func (n *FilterNode) SetBackend(b plan.Backend)
SetBackend wires the compile backend that powers Execute. The builder calls this after construction so node constructors keep their P03 signatures stable. See D033.
type GroupAggregateNode ¶
type GroupAggregateNode struct {
// contains filtered or unexported fields
}
GroupAggregateNode partitions its input by groupby fields and emits one row per partition with each aggregate.
func NewGroupAggregate ¶
func NewGroupAggregate(id, input plan.NodeID, groupby []string, aggs []AggOp) *GroupAggregateNode
NewGroupAggregate constructs a GroupAggregateNode.
func (*GroupAggregateNode) Aggs ¶
func (n *GroupAggregateNode) Aggs() []AggOp
Aggs exposes the aggregate operations for renderers + tests.
func (*GroupAggregateNode) Fingerprint ¶
func (n *GroupAggregateNode) Fingerprint() string
Fingerprint implements plan.Node.
func (*GroupAggregateNode) Groupby ¶
func (n *GroupAggregateNode) Groupby() []string
Groupby exposes the partition keys for renderers + tests.
func (*GroupAggregateNode) ID ¶
func (n *GroupAggregateNode) ID() plan.NodeID
ID implements plan.Node.
func (*GroupAggregateNode) Inputs ¶
func (n *GroupAggregateNode) Inputs() []plan.NodeID
Inputs implements plan.Node.
func (*GroupAggregateNode) Kind ¶
func (n *GroupAggregateNode) Kind() string
Kind implements plan.Labeled.
func (*GroupAggregateNode) Schema ¶
Schema implements plan.Node. Output schema is the projection of input down to groupby fields plus one F64 field per AggOp named by op.As. Result types come from aggregateOutputType (every shipped op is scalar F64 today).
func (*GroupAggregateNode) SetBackend ¶
func (n *GroupAggregateNode) SetBackend(b plan.Backend)
SetBackend wires the compile backend that powers Execute.
func (*GroupAggregateNode) Summary ¶
func (n *GroupAggregateNode) Summary() string
Summary implements plan.Labeled — "by: a,b | aggs: mean(score)->m, ...".
type InlineNode ¶
type InlineNode struct {
// contains filtered or unexported fields
}
InlineNode is the leaf node fed by `data.values` (and optional `data.fields`) declarations. Unlike SourceNode (which reads .pulse bytes via Resolver), InlineNode delegates straight to table.FromInline at Execute time.
Not a stub — table.FromInline shipped in P02; the executor needs real inline-data behaviour in P03 to make any inline-bound spec runnable end-to-end through the pipeline.
func NewInline ¶
func NewInline(id plan.NodeID, name string, values []map[string]any, fields []spec.FieldSpec) *InlineNode
NewInline constructs an InlineNode from raw spec inputs.
func (*InlineNode) Fingerprint ¶
func (n *InlineNode) Fingerprint() string
Fingerprint implements plan.Node. Hashes a canonical JSON encoding of (name, fields, values-with-sorted-keys) so two equivalent inline declarations produce the same fingerprint regardless of original map ordering.
func (*InlineNode) Inputs ¶
func (n *InlineNode) Inputs() []plan.NodeID
Inputs implements plan.Node. Inline is a leaf — same as Source.
func (*InlineNode) Name ¶
func (n *InlineNode) Name() string
Name exposes the dataset name (empty if unnamed).
type JoinKind ¶
type JoinKind string
JoinKind is the join semantics token: inner|left|outer|anti. The schema computation does not branch on kind in P03 (the column set is the union either way; null behaviour comes in at Execute time, P07).
type JoinNode ¶
type JoinNode struct {
// contains filtered or unexported fields
}
JoinNode hash-joins two inputs on equality. P03 stub.
func (*JoinNode) Execute ¶
Execute implements plan.Node. Hash join body lives in join_execute.go (kept separate for diffability against the P03 stub surface).
func (*JoinNode) Fingerprint ¶
Fingerprint implements plan.Node.
func (*JoinNode) Inputs ¶
Inputs implements plan.Node. Order is (left, right) so the executor can pass the two upstream Tables in the right slot.
func (*JoinNode) JoinKind ¶
JoinKind exposes the join semantics for renderers + tests. (Named JoinKind not Kind to avoid colliding with plan.Labeled.Kind.)
type LimitNode ¶
type LimitNode struct {
// contains filtered or unexported fields
}
LimitNode keeps the first N rows (with optional offset).
func (*LimitNode) Fingerprint ¶
Fingerprint implements plan.Node.
func (*LimitNode) SetBackend ¶
SetBackend wires the compile backend that powers Execute.
type PivotNode ¶
type PivotNode struct {
// contains filtered or unexported fields
}
PivotNode reshapes long → wide. P03 stub.
Schema computation cannot determine the output column names without scanning the input data (the cells in the `pivot` column become the new column headers). In P03 we conservatively return the input schema unchanged so DAG validation does not block — the real Schema derivation lands in P04 alongside the real Execute path.
TODO P04: implement schema-on-execute by reading the input table once at Schema() time (or annotating the plan with the discovered categories at build time when the data is available statically).
func (*PivotNode) Fingerprint ¶
Fingerprint implements plan.Node.
type ProjectNode ¶
type ProjectNode struct {
// contains filtered or unexported fields
}
ProjectNode keeps only the named fields from its input.
func NewProject ¶
func NewProject(id, input plan.NodeID, fields []string) *ProjectNode
NewProject constructs a ProjectNode. The fields slice is copied.
func (*ProjectNode) Fields ¶
func (n *ProjectNode) Fields() []string
Fields exposes the field list for renderers + tests.
func (*ProjectNode) Fingerprint ¶
func (n *ProjectNode) Fingerprint() string
Fingerprint implements plan.Node.
func (*ProjectNode) Inputs ¶
func (n *ProjectNode) Inputs() []plan.NodeID
Inputs implements plan.Node.
func (*ProjectNode) Schema ¶
Schema implements plan.Node. Output schema is the projection of the input schema down to fields named in n.fields, in the requested order. Missing field names raise PRISM_PLAN_003.
func (*ProjectNode) SetBackend ¶
func (n *ProjectNode) SetBackend(b plan.Backend)
SetBackend wires the compile backend that powers Execute.
func (*ProjectNode) Summary ¶
func (n *ProjectNode) Summary() string
Summary implements plan.Labeled.
type PulseChainNode ¶
type PulseChainNode struct {
// contains filtered or unexported fields
}
PulseChainNode collapses a source-rooted linear chain of fusable operators (Filter, Calculate, GroupAggregate, Sort) into a single `pulse.ProcessChain` call. The node is its own root — it opens the .pulse cohort itself rather than consuming an upstream Table, so fusion bypasses the per-source materialization the inmem backend would otherwise pay.
The fusion pass (plan/passes/pulse_chain_fusion.go) constructs the node once eligibility is verified; v1 emits a single chain stage (one types.Request bundling filter + groupby + agg + sort). The ChainRequest envelope is used regardless so adding multi-stage support later is purely additive.
Fallback semantics: if Pulse rejects a stage at execute time with PULSE_CHAIN_NOT_MERGEABLE, Execute wraps it as PRISM_PLAN_CHAIN_NOT_MERGEABLE and the executor surfaces the error like any other PRISM_* AppError. Callers that want to re-run with fusion disabled can swap the optimizer pass list.
func NewPulseChain ¶
func NewPulseChain( id plan.NodeID, ref string, fs afero.Fs, chainReq *pulsetypes.ChainRequest, outSchema *encoding.Schema, summary string, stageIDs []plan.NodeID, ) *PulseChainNode
NewPulseChain constructs a PulseChainNode. The chainReq must carry the cohort on its first stage; outSchema is the schema synthesised by the final stage (already validated by the fusion pass). Callers pass the absorbed node ids via stageIDs so the fingerprint reflects the upstream plan shape — two equivalent fusions hash identically.
func (*PulseChainNode) ChainRequest ¶
func (n *PulseChainNode) ChainRequest() *pulsetypes.ChainRequest
ChainRequest returns the underlying Pulse chain request for tests and plan renderers.
func (*PulseChainNode) Execute ¶
Execute implements plan.Node. Opens a fresh pulse.Pulse against the node's afero.Fs, runs ProcessChain, and materialises the final stage's row maps into a typed *table.Table.
Pulse errors carrying a recognised code (PULSE_CHAIN_NOT_MERGEABLE, PULSE_PROCESSING_*) are rewrapped as PRISM_PLAN_CHAIN_NOT_MERGEABLE or PRISM_RESOLVE_006 so the executor's NodeError envelope picks up a stable PRISM_* identifier.
func (*PulseChainNode) Fingerprint ¶
func (n *PulseChainNode) Fingerprint() string
Fingerprint implements plan.Node. Mixes the source ref and each absorbed stage id so equivalent fusions share a cache key.
func (*PulseChainNode) Inputs ¶
func (n *PulseChainNode) Inputs() []plan.NodeID
Inputs implements plan.Node — the chain node is a leaf (it opens the cohort itself).
func (*PulseChainNode) Ref ¶
func (n *PulseChainNode) Ref() string
Ref returns the source ref for diagnostics and tests.
func (*PulseChainNode) Schema ¶
Schema implements plan.Node. The chain output schema is computed at fusion time via processing.ChainOutputSchema(finalStageReq).
func (*PulseChainNode) StageIDs ¶
func (n *PulseChainNode) StageIDs() []plan.NodeID
StageIDs returns the ids of the upstream Prism nodes this chain replaced, in fusion order.
func (*PulseChainNode) Summary ¶
func (n *PulseChainNode) Summary() string
Summary implements plan.Labeled — human-readable label produced by the fusion pass.
type SampleNode ¶
type SampleNode struct {
// contains filtered or unexported fields
}
SampleNode draws a random subsample of size n.
func NewSample ¶
func NewSample(id, input plan.NodeID, sample int, seed *int64) *SampleNode
NewSample constructs a SampleNode. seed is optional (nil = unseeded).
func (*SampleNode) Fingerprint ¶
func (n *SampleNode) Fingerprint() string
Fingerprint implements plan.Node.
func (*SampleNode) Inputs ¶
func (n *SampleNode) Inputs() []plan.NodeID
Inputs implements plan.Node.
func (*SampleNode) N ¶
func (n *SampleNode) N() int
N exposes the requested sample size for renderers + tests.
func (*SampleNode) Seed ¶
func (n *SampleNode) Seed() *int64
Seed exposes the (optional) deterministic RNG seed. nil means the executor seeds from wall-clock time.
func (*SampleNode) SetBackend ¶
func (n *SampleNode) SetBackend(b plan.Backend)
SetBackend wires the compile backend that powers Execute.
type SortNode ¶
type SortNode struct {
// contains filtered or unexported fields
}
SortNode orders rows by one or more (field, order) keys.
func (*SortNode) Fingerprint ¶
Fingerprint implements plan.Node.
func (*SortNode) SetBackend ¶
SetBackend wires the compile backend that powers Execute.
type SourceNode ¶
type SourceNode struct {
// contains filtered or unexported fields
}
SourceNode is the leaf DAG node. It resolves a ref to a .pulse cohort (or shard), materialises every record into typed columns, and emits a *table.Table tagged with the content hash of the on-disk bytes.
SourceNode satisfies the full plan.Node interface (P03 widened it to include Schema(in)). The Schema method delegates to OutputSchema, which is kept for backwards compatibility with the P02 callers (validate.PulseLookup) that already use it.
func New ¶
New constructs a SourceNode. The ref must use one of the four forms documented in resolve/resolver.go. fs is the filesystem the resolver honours at execute time (test code passes afero.NewMemMapFs()). resolver must be non-nil; pass resolve.New(nil) for the default EmptyRegistry-backed implementation.
func (*SourceNode) Execute ¶
Execute implements plan.Node. Reads the resolved payload bytes, computes their xxhash64, then decodes every record into typed column slices and constructs the Table. Row count is gated by limits.TableMaxRows() — once the counter would exceed the cap, Execute returns PRISM_RESOLVE_007 immediately (no further decode).
func (*SourceNode) FS ¶
func (n *SourceNode) FS() afero.Fs
FS returns the afero filesystem this source was constructed with. Optimizer passes that rewrite a source-rooted subtree (e.g. PulseChainFusionPass) reuse this fs so the replacement node still honours an in-memory test environment.
func (*SourceNode) Fingerprint ¶
func (n *SourceNode) Fingerprint() string
Fingerprint implements plan.Node. Source fingerprint = sha256(ref). Combined with Table.Hash() at execute time by the cache key builder.
func (*SourceNode) Inputs ¶
func (n *SourceNode) Inputs() []plan.NodeID
Inputs implements plan.Node. SourceNode is a leaf.
func (*SourceNode) OutputSchema ¶
func (n *SourceNode) OutputSchema() (*encoding.Schema, error)
OutputSchema resolves the ref so the schema is discoverable without materialising records. The ReadCloser returned by Resolve is closed immediately. Kept as a public method for backwards compatibility with validate.PulseLookup; new callers should prefer Schema(nil).
func (*SourceNode) Ref ¶
func (n *SourceNode) Ref() string
Ref returns the user-supplied ref string. Useful in error messages and plan-visualisation tooling.
func (*SourceNode) RowCount ¶
func (n *SourceNode) RowCount(ctx context.Context) (uint64, error)
RowCount returns the record count from the cohort's Pulse header without materialising any records. Backed by pulse.CountRecords (available since pulse v0.10.0). Used by the SampleInjection optimizer pass to decide whether to inject a SampleNode below a Source whose row count exceeds PRISM_RENDER_MAX_MARKS.
func (*SourceNode) Schema ¶
Schema implements plan.Node. Source nodes ignore the `in` slice (they have no upstream); the output schema is whatever the resolver reports for the underlying .pulse cohort.
func (*SourceNode) Summary ¶
func (n *SourceNode) Summary() string
Summary implements plan.Labeled — the ref string the user wrote.
type UnionNode ¶
type UnionNode struct {
// contains filtered or unexported fields
}
UnionNode vertically concatenates N inputs. P03 stub.
Schema computation in P03 returns the first input's schema verbatim; the real Execute path (P07) will validate cross-input schema compatibility (same field names, compatible types). Marked here so the eventual swap is mechanical.
TODO P07: validate that every input schema matches in[0] and emit a PRISM_PLAN_* code on mismatch.
func (*UnionNode) Fingerprint ¶
Fingerprint implements plan.Node.
type UnpivotNode ¶
type UnpivotNode struct {
// contains filtered or unexported fields
}
UnpivotNode reshapes wide → long. P03 stub.
Output schema is the input minus the unpivoted fields, plus two new fields: a categorical key column (named as[0], default "key") and a numeric value column (named as[1], default "value").
func NewUnpivot ¶
func NewUnpivot(id, input plan.NodeID, unpivot, as []string) *UnpivotNode
NewUnpivot constructs an UnpivotNode.
func (*UnpivotNode) Fingerprint ¶
func (n *UnpivotNode) Fingerprint() string
Fingerprint implements plan.Node.
func (*UnpivotNode) Inputs ¶
func (n *UnpivotNode) Inputs() []plan.NodeID
Inputs implements plan.Node.
func (*UnpivotNode) Schema ¶
Schema implements plan.Node. Drops the unpivoted fields from the input schema and appends two new fields (key column + value column).
func (*UnpivotNode) Summary ¶
func (n *UnpivotNode) Summary() string
Summary implements plan.Labeled.
func (*UnpivotNode) Unpivot ¶
func (n *UnpivotNode) Unpivot() []string
Unpivot exposes the source fields for renderers + tests.
type WindowNode ¶
type WindowNode struct {
// contains filtered or unexported fields
}
WindowNode applies windowed aggregates / ranks over an input.
func NewWindow ¶
func NewWindow(id, input plan.NodeID, ops []WindowOp, partitionby []string, sort []SortKey, frame []any) *WindowNode
NewWindow constructs a WindowNode. All slices are copied.
func (*WindowNode) Fingerprint ¶
func (n *WindowNode) Fingerprint() string
Fingerprint implements plan.Node.
func (*WindowNode) Frame ¶
func (n *WindowNode) Frame() []any
Frame exposes the frame specification for the executor; nil when the window op does not consume a frame.
func (*WindowNode) Inputs ¶
func (n *WindowNode) Inputs() []plan.NodeID
Inputs implements plan.Node.
func (*WindowNode) Ops ¶
func (n *WindowNode) Ops() []WindowOp
Ops exposes the window operations for renderers + tests.
func (*WindowNode) Partitionby ¶
func (n *WindowNode) Partitionby() []string
Partitionby exposes the partition keys for renderers + executor.
func (*WindowNode) Schema ¶
Schema implements plan.Node. Output schema is input + one F64 field per WindowOp (ranks come back as float to keep downstream arithmetic uniform; real impl can re-type integer ranks if profiling motivates).
func (*WindowNode) SetBackend ¶
func (n *WindowNode) SetBackend(b plan.Backend)
SetBackend wires the compile backend that powers Execute.
func (*WindowNode) Sort ¶
func (n *WindowNode) Sort() []SortKey
Sort exposes the per-partition ordering keys for executor.