plan

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ScanSourceDataFrame is an in-memory DataFrame scan. The ScanNode.Handle
	// is the *dataframe.DataFrame to materialize.
	ScanSourceDataFrame = "dataframe"
	// ScanSourceFile is a file-backed scan that the executor opens as a
	// streaming RecordReader rather than materializing the whole file. The
	// ScanNode.Handle carries a file-scan descriptor the executor understands
	// (e.g. path + reader options).
	ScanSourceFile = "file"
)

Variables

This section is empty.

Functions

func ExplainLogical

func ExplainLogical(plan *LogicalPlan) string

Types

type AggregateNode

type AggregateNode struct {
	Input     LogicalNode
	GroupKeys []string
	Aggs      []expr.AggNode
	// contains filtered or unexported fields
}

AggregateNode is a grouped aggregation. It is named AggregateNode (not AggNode) to avoid colliding with expr.AggNode, the column-level aggregate it carries in Aggs.

func NewAggregateNode

func NewAggregateNode(input LogicalNode, groupKeys []string, aggs []expr.AggNode) *AggregateNode

func (*AggregateNode) Children

func (a *AggregateNode) Children() []LogicalNode

func (*AggregateNode) Name

func (a *AggregateNode) Name() string

func (*AggregateNode) Schema

func (a *AggregateNode) Schema() *schema.Schema

type Executor

type Executor interface {
	// Scan materializes the source handle into a DataFrame, optionally honoring
	// scan-level pushdown annotations (columns, limit, filters). source is the
	// ScanNode handle (e.g. a *dataframe.DataFrame).
	Scan(ctx context.Context, node *ScanNode) (any, error)
	Filter(ctx context.Context, df any, predicate expr.Expr) (any, error)
	Project(df any, cols []string) (any, error)
	Limit(df any, n int64) (any, error)
	Sort(ctx context.Context, df any, keys []expr.SortKey) (any, error)
	Aggregate(ctx context.Context, df any, groupKeys []string, aggs []expr.AggNode) (any, error)
	Join(ctx context.Context, left, right any, on string, how string) (any, error)
	WithColumn(df any, e expr.Expr) (any, error)

	// ScanStream opens a file-backed ScanNode as a streaming RecordReader,
	// honoring pushed column/limit annotations where the source supports them.
	ScanStream(ctx context.Context, node *ScanNode) (any, error)
	// FilterStream applies predicate to each batch of reader, yielding a reader
	// over the surviving rows.
	FilterStream(ctx context.Context, reader any, predicate expr.Expr) (any, error)
	// ProjectStream narrows each batch of reader to cols.
	ProjectStream(reader any, cols []string) (any, error)
	// LimitStream stops the stream once n rows have been emitted across batches.
	LimitStream(reader any, n int64) any
	// StreamFromDataFrame adapts a materialized DataFrame handle into a
	// RecordReader so a pipeline-breaking operator's output can be streamed.
	StreamFromDataFrame(df any) (any, error)
}

Executor bridges the plan package to the dataframe package without an import cycle. The dataframe package already imports plan, so plan cannot import dataframe back. Instead, the dataframe package provides a concrete implementation of this interface and injects it into PhysicalPlan.Execute.

All DataFrame values cross this boundary as any: each method receives and returns the opaque handle the implementation understands (a *dataframe.DataFrame), keeping plan free of a dataframe import.

type FilterNode

type FilterNode struct {
	Input     LogicalNode
	Predicate expr.Expr
	// contains filtered or unexported fields
}

func NewFilterNode

func NewFilterNode(input LogicalNode, predicate expr.Expr) *FilterNode

func (*FilterNode) Children

func (f *FilterNode) Children() []LogicalNode

func (*FilterNode) Name

func (f *FilterNode) Name() string

func (*FilterNode) Schema

func (f *FilterNode) Schema() *schema.Schema

type JoinNode

type JoinNode struct {
	Left  LogicalNode
	Right LogicalNode
	On    string
	How   string
	// contains filtered or unexported fields
}

func NewJoinNode

func NewJoinNode(left, right LogicalNode, on, how string) *JoinNode

func (*JoinNode) Children

func (j *JoinNode) Children() []LogicalNode

func (*JoinNode) Name

func (j *JoinNode) Name() string

func (*JoinNode) Schema

func (j *JoinNode) Schema() *schema.Schema

type LimitNode

type LimitNode struct {
	Input LogicalNode
	N     int64
	// contains filtered or unexported fields
}

func NewLimitNode

func NewLimitNode(input LogicalNode, n int64) *LimitNode

func (*LimitNode) Children

func (l *LimitNode) Children() []LogicalNode

func (*LimitNode) Name

func (l *LimitNode) Name() string

func (*LimitNode) Schema

func (l *LimitNode) Schema() *schema.Schema

type LogicalNode

type LogicalNode interface {
	Name() string
	Schema() *schema.Schema
	Children() []LogicalNode
}

type LogicalPlan

type LogicalPlan struct {
	Root LogicalNode
}

func Bind

func Bind(plan *LogicalPlan) (*LogicalPlan, error)

func NewLogicalPlan

func NewLogicalPlan(root LogicalNode) *LogicalPlan

func Optimize

func Optimize(p *LogicalPlan) (*LogicalPlan, error)

Optimize runs the logical optimizer passes and returns a new logical plan. The passes annotate the ScanNode with pushdown hints (PushedFilters, PushedColumns, PushedLimit) that the physical executor may honor during scan.

The annotations are advisory: the operator nodes above the scan (Filter, Project, Limit) are preserved, so the plan stays correct whether or not the executor acts on a hint. Today the executor honors PushedLimit during scan; the filter and column hints are recorded for Explain and future scan sources that can prune at the source (e.g. Parquet column/row-group pruning).

type PhysAggregate

type PhysAggregate struct {
	Input     PhysicalNode
	GroupKeys []string
	Aggs      []expr.AggNode
	// contains filtered or unexported fields
}

func (*PhysAggregate) Children

func (n *PhysAggregate) Children() []PhysicalNode

func (*PhysAggregate) Execute

func (n *PhysAggregate) Execute(ctx context.Context, exec Executor) (any, error)

func (*PhysAggregate) Name

func (n *PhysAggregate) Name() string

func (*PhysAggregate) Schema

func (n *PhysAggregate) Schema() *arrow.Schema

type PhysFilter

type PhysFilter struct {
	Input     PhysicalNode
	Predicate expr.Expr
	// contains filtered or unexported fields
}

func (*PhysFilter) Children

func (n *PhysFilter) Children() []PhysicalNode

func (*PhysFilter) Execute

func (n *PhysFilter) Execute(ctx context.Context, exec Executor) (any, error)

func (*PhysFilter) ExecuteStream

func (n *PhysFilter) ExecuteStream(ctx context.Context, exec Executor) (any, error)

PhysFilter streams: filter each batch of the upstream reader.

func (*PhysFilter) Name

func (n *PhysFilter) Name() string

func (*PhysFilter) Schema

func (n *PhysFilter) Schema() *arrow.Schema

type PhysJoin

type PhysJoin struct {
	Left  PhysicalNode
	Right PhysicalNode
	On    string
	How   string
	// contains filtered or unexported fields
}

func (*PhysJoin) Children

func (n *PhysJoin) Children() []PhysicalNode

func (*PhysJoin) Execute

func (n *PhysJoin) Execute(ctx context.Context, exec Executor) (any, error)

func (*PhysJoin) Name

func (n *PhysJoin) Name() string

func (*PhysJoin) Schema

func (n *PhysJoin) Schema() *arrow.Schema

type PhysLimit

type PhysLimit struct {
	Input PhysicalNode
	N     int64
	// contains filtered or unexported fields
}

func (*PhysLimit) Children

func (n *PhysLimit) Children() []PhysicalNode

func (*PhysLimit) Execute

func (n *PhysLimit) Execute(ctx context.Context, exec Executor) (any, error)

func (*PhysLimit) ExecuteStream

func (n *PhysLimit) ExecuteStream(ctx context.Context, exec Executor) (any, error)

PhysLimit streams: stop after N rows across batches.

func (*PhysLimit) Name

func (n *PhysLimit) Name() string

func (*PhysLimit) Schema

func (n *PhysLimit) Schema() *arrow.Schema

type PhysProject

type PhysProject struct {
	Input   PhysicalNode
	Columns []string
	// contains filtered or unexported fields
}

func (*PhysProject) Children

func (n *PhysProject) Children() []PhysicalNode

func (*PhysProject) Execute

func (n *PhysProject) Execute(ctx context.Context, exec Executor) (any, error)

func (*PhysProject) ExecuteStream

func (n *PhysProject) ExecuteStream(ctx context.Context, exec Executor) (any, error)

PhysProject streams: narrow each batch of the upstream reader.

func (*PhysProject) Name

func (n *PhysProject) Name() string

func (*PhysProject) Schema

func (n *PhysProject) Schema() *arrow.Schema

type PhysScan

type PhysScan struct {
	Node *ScanNode
	// contains filtered or unexported fields
}

PhysScan reads the source DataFrame, honoring any pushdown annotations.

func (*PhysScan) Children

func (n *PhysScan) Children() []PhysicalNode

func (*PhysScan) Execute

func (n *PhysScan) Execute(ctx context.Context, exec Executor) (any, error)

func (*PhysScan) ExecuteStream

func (n *PhysScan) ExecuteStream(ctx context.Context, exec Executor) (any, error)

PhysScan streams: a file source opens a RecordReader directly; an in-memory DataFrame source materializes (cheaply, it is already in memory) and adapts.

func (*PhysScan) Name

func (n *PhysScan) Name() string

func (*PhysScan) Schema

func (n *PhysScan) Schema() *arrow.Schema

type PhysSort

type PhysSort struct {
	Input PhysicalNode
	Keys  []expr.SortKey
	// contains filtered or unexported fields
}

func (*PhysSort) Children

func (n *PhysSort) Children() []PhysicalNode

func (*PhysSort) Execute

func (n *PhysSort) Execute(ctx context.Context, exec Executor) (any, error)

func (*PhysSort) Name

func (n *PhysSort) Name() string

func (*PhysSort) Schema

func (n *PhysSort) Schema() *arrow.Schema

type PhysWithColumn

type PhysWithColumn struct {
	Input PhysicalNode
	Expr  expr.Expr
	// contains filtered or unexported fields
}

func (*PhysWithColumn) Children

func (n *PhysWithColumn) Children() []PhysicalNode

func (*PhysWithColumn) Execute

func (n *PhysWithColumn) Execute(ctx context.Context, exec Executor) (any, error)

func (*PhysWithColumn) Name

func (n *PhysWithColumn) Name() string

func (*PhysWithColumn) Schema

func (n *PhysWithColumn) Schema() *arrow.Schema

type PhysicalNode

type PhysicalNode interface {
	Name() string
	Schema() *arrow.Schema
	Children() []PhysicalNode
	Execute(ctx context.Context, exec Executor) (any, error)
}

PhysicalNode is one executable operator. Execute runs the node against the injected Executor and returns the resulting DataFrame as an opaque handle.

type PhysicalPlan

type PhysicalPlan struct {
	Root PhysicalNode
}

func Lower

func Lower(p *LogicalPlan) (*PhysicalPlan, error)

Lower translates an (optimized) logical plan into an executable physical plan.

func NewPhysicalPlan

func NewPhysicalPlan(root PhysicalNode) *PhysicalPlan

func (*PhysicalPlan) Execute

func (p *PhysicalPlan) Execute(ctx context.Context, exec Executor) (any, error)

Execute runs the physical plan against exec and returns the resulting DataFrame handle (a *dataframe.DataFrame).

func (*PhysicalPlan) ExecuteStream

func (p *PhysicalPlan) ExecuteStream(ctx context.Context, exec Executor) (any, error)

ExecuteStream runs the physical plan in streaming mode, returning a Record reader handle (any wrapping scan.RecordReader) instead of a DataFrame. Nodes that implement StreamingNode stream natively; every other node materializes (via its Execute) and is adapted into a reader, which is exactly the pipeline-breaking contract.

func (*PhysicalPlan) Explain

func (p *PhysicalPlan) Explain() string

Explain renders the physical plan tree with operator names and any scan-level pushdown annotations recorded by the optimizer.

type ProjectNode

type ProjectNode struct {
	Input   LogicalNode
	Columns []string
	// contains filtered or unexported fields
}

func NewProjectNode

func NewProjectNode(input LogicalNode, columns []string) *ProjectNode

func (*ProjectNode) Children

func (p *ProjectNode) Children() []LogicalNode

func (*ProjectNode) Name

func (p *ProjectNode) Name() string

func (*ProjectNode) Schema

func (p *ProjectNode) Schema() *schema.Schema

type ScanNode

type ScanNode struct {

	// Handle is the opaque source object the executor scans (e.g. a
	// *dataframe.DataFrame). It is typed as any to keep plan free of a
	// dataframe import.
	Handle any

	// Pushdown annotations produced by Optimize. They are advisory: the
	// executor honors whichever it understands and the surviving logical
	// nodes above the scan still produce a correct result if it does not.
	PushedFilters []expr.Expr
	PushedColumns []string
	PushedLimit   int64 // <0 means "no limit pushed"
	// contains filtered or unexported fields
}

func NewScanNode

func NewScanNode(schema *schema.Schema, source string) *ScanNode

func (*ScanNode) Children

func (s *ScanNode) Children() []LogicalNode

func (*ScanNode) Name

func (s *ScanNode) Name() string

func (*ScanNode) Schema

func (s *ScanNode) Schema() *schema.Schema

func (*ScanNode) Source

func (s *ScanNode) Source() string

type SortNode

type SortNode struct {
	Input LogicalNode
	Keys  []expr.SortKey
	// contains filtered or unexported fields
}

func NewSortNode

func NewSortNode(input LogicalNode, keys []expr.SortKey) *SortNode

func (*SortNode) Children

func (s *SortNode) Children() []LogicalNode

func (*SortNode) Name

func (s *SortNode) Name() string

func (*SortNode) Schema

func (s *SortNode) Schema() *schema.Schema

type StreamingNode

type StreamingNode interface {
	PhysicalNode
	// ExecuteStream runs the node and returns a streaming reader handle.
	ExecuteStream(ctx context.Context, exec Executor) (any, error)
}

StreamingNode is a physical node that can produce its result as a streaming RecordReader handle (carried as any; see Executor) instead of a materialized DataFrame. Streamable operators (Scan over a file, Filter, Project, Limit) transform batches in flight. Pipeline-breaking operators (Sort, Aggregate, Join, WithColumn) cannot emit a correct batch until they have seen all input, so they materialize via Execute and then stream the result through the executor's StreamFromDataFrame — this is the fallback ExecuteStream below.

type WithColumnNode

type WithColumnNode struct {
	Input LogicalNode
	Expr  expr.Expr
	// contains filtered or unexported fields
}

func NewWithColumnNode

func NewWithColumnNode(input LogicalNode, e expr.Expr) *WithColumnNode

func (*WithColumnNode) Children

func (w *WithColumnNode) Children() []LogicalNode

func (*WithColumnNode) Name

func (w *WithColumnNode) Name() string

func (*WithColumnNode) Schema

func (w *WithColumnNode) Schema() *schema.Schema

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL