dataframe

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2025 License: Apache-2.0, MIT Imports: 19 Imported by: 0

Documentation

Overview

Package dataframe provides high-performance DataFrame operations

Index

Constants

View Source
const (
	// AvgBytesPerCell is the estimated average bytes per DataFrame cell
	AvgBytesPerCell = 8
	// ParallelThreshold is the minimum number of rows to trigger parallel execution
	ParallelThreshold = 1000
	// FilterSelectivity is the default assumed selectivity for filter operations
	FilterSelectivity = 0.5
)

Package-level constants for consistent usage across debug functionality

Variables

This section is empty.

Functions

This section is empty.

Types

type ActualCost

type ActualCost struct {
	Rows     int64         `json:"rows"`
	Memory   int64         `json:"memory"`
	CPU      time.Duration `json:"cpu"`
	Duration time.Duration `json:"duration"`
}

ActualCost contains actual cost metrics after execution

type AnalysisReport

type AnalysisReport struct {
	Operations  []OperationTrace `json:"operations"`
	Summary     OperationSummary `json:"summary"`
	Bottlenecks []Bottleneck     `json:"bottlenecks"`
	Suggestions []string         `json:"suggestions"`
}

AnalysisReport contains the complete analysis report

type ArrowValueArray

type ArrowValueArray[T any] interface {
	Len() int
	IsNull(int) bool
	Value(int) T
}

ArrowValueArray represents an Arrow array with value access

type BinaryExprInterface

type BinaryExprInterface interface {
	expr.Expr
	Left() expr.Expr
	Op() expr.BinaryOp
	Right() expr.Expr
}

BinaryExprInterface defines the interface for binary expressions

type Bottleneck

type Bottleneck struct {
	Operation string        `json:"operation"`
	Duration  time.Duration `json:"duration"`
	Reason    string        `json:"reason"`
}

Bottleneck represents a performance bottleneck

type ConstantFoldingRule

type ConstantFoldingRule struct{}

ConstantFoldingRule evaluates constant expressions at planning time

func (*ConstantFoldingRule) Apply

func (*ConstantFoldingRule) Name

func (r *ConstantFoldingRule) Name() string

type DataFrame

type DataFrame struct {
	// contains filtered or unexported fields
}

DataFrame represents a table of data with typed columns

func New

func New(series ...ISeries) *DataFrame

New creates a new DataFrame from a slice of ISeries

func (*DataFrame) Column

func (df *DataFrame) Column(name string) (ISeries, bool)

Column returns the series for the given column name

func (*DataFrame) Columns

func (df *DataFrame) Columns() []string

Columns returns the names of all columns in order

func (*DataFrame) Concat

func (df *DataFrame) Concat(others ...*DataFrame) *DataFrame

Concat concatenates multiple DataFrames vertically (row-wise) All DataFrames must have the same column structure

func (*DataFrame) Correlation

func (df *DataFrame) Correlation(col1, col2 string) (float64, error)

Correlation calculates the Pearson correlation coefficient between two numeric columns

func (*DataFrame) Debug

func (df *DataFrame) Debug() *DataFrame

Debug enables debug mode for the DataFrame

func (*DataFrame) Drop

func (df *DataFrame) Drop(names ...string) *DataFrame

Drop returns a new DataFrame without the specified columns

func (*DataFrame) GroupBy

func (df *DataFrame) GroupBy(columns ...string) *GroupBy

GroupBy creates a GroupBy object for the specified columns

func (*DataFrame) HasColumn

func (df *DataFrame) HasColumn(name string) bool

HasColumn checks if a column exists

func (*DataFrame) Join

func (df *DataFrame) Join(right *DataFrame, options *JoinOptions) (*DataFrame, error)

Join performs a join operation between two DataFrames

func (*DataFrame) Lazy

func (df *DataFrame) Lazy() *LazyFrame

Lazy converts a DataFrame to a LazyFrame

func (*DataFrame) Len

func (df *DataFrame) Len() int

Len returns the number of rows (assumes all columns have same length)

func (*DataFrame) OptimizedJoin

func (df *DataFrame) OptimizedJoin(right *DataFrame, options *JoinOptions) (*DataFrame, error)

OptimizedJoin performs join using the selected optimal strategy

func (*DataFrame) Release

func (df *DataFrame) Release()

Release releases all underlying Arrow memory

func (*DataFrame) RollingWindow

func (df *DataFrame) RollingWindow(column string, windowSize int, operation string) (*DataFrame, error)

RollingWindow applies a rolling window operation to a column

func (*DataFrame) SafeCollectParallel

func (df *DataFrame) SafeCollectParallel() (*DataFrame, error)

SafeCollectParallel performs memory-safe parallel collection using the new infrastructure

func (*DataFrame) SafeCollectParallelWithMonitoring

func (df *DataFrame) SafeCollectParallelWithMonitoring() (*DataFrame, error)

SafeCollectParallelWithMonitoring performs memory-safe parallel collection with memory monitoring

func (*DataFrame) Select

func (df *DataFrame) Select(names ...string) *DataFrame

Select returns a new DataFrame with only the specified columns

func (*DataFrame) Slice

func (df *DataFrame) Slice(start, end int) *DataFrame

Slice creates a new DataFrame containing rows from start (inclusive) to end (exclusive)

func (*DataFrame) Sort

func (df *DataFrame) Sort(column string, ascending bool) (*DataFrame, error)

Sort returns a new DataFrame sorted by the specified column

func (*DataFrame) SortBy

func (df *DataFrame) SortBy(columns []string, ascending []bool) (*DataFrame, error)

SortBy returns a new DataFrame sorted by multiple columns

func (*DataFrame) String

func (df *DataFrame) String() string

String returns a string representation of the DataFrame

func (*DataFrame) Width

func (df *DataFrame) Width() int

Width returns the number of columns

func (*DataFrame) WindowFunction

func (df *DataFrame) WindowFunction(function string, columns ...string) (*DataFrame, error)

WindowFunction applies a window function to the DataFrame

func (*DataFrame) WindowFunctionWithPartition

func (df *DataFrame) WindowFunctionWithPartition(function, orderBy, partitionBy string) (*DataFrame, error)

WindowFunctionWithPartition applies a window function with optional partitioning

func (*DataFrame) WithConfig

func (df *DataFrame) WithConfig(opConfig config.OperationConfig) *DataFrame

WithConfig returns a new DataFrame with the specified operation configuration

func (*DataFrame) WithDebugConfig

func (df *DataFrame) WithDebugConfig(config DebugConfig) *DataFrame

WithDebugConfig sets the debug configuration for the DataFrame

type DataFrameStats

type DataFrameStats struct {
	Rows    int      `json:"rows"`
	Columns int      `json:"columns"`
	Memory  int64    `json:"memory"`
	Schema  []string `json:"schema"`
}

DataFrameStats contains statistics about a DataFrame

type DebugConfig

type DebugConfig struct {
	Enabled           bool         `json:"enabled"`
	LogLevel          LogLevel     `json:"log_level"`
	ProfileOperations bool         `json:"profile_operations"`
	TrackMemory       bool         `json:"track_memory"`
	ShowOptimizations bool         `json:"show_optimizations"`
	OutputFormat      OutputFormat `json:"output_format"`
}

DebugConfig configures debug mode settings

type DebugExecutionPlan

type DebugExecutionPlan struct {
	RootNode  *PlanNode         `json:"root"`
	Estimated PlanStats         `json:"estimated"`
	Actual    PlanStats         `json:"actual,omitempty"`
	Metadata  DebugPlanMetadata `json:"metadata"`
}

DebugExecutionPlan represents the execution plan for DataFrame operations with debug information

func (*DebugExecutionPlan) RenderJSON

func (plan *DebugExecutionPlan) RenderJSON() ([]byte, error)

RenderJSON renders the execution plan as JSON

func (*DebugExecutionPlan) RenderText

func (plan *DebugExecutionPlan) RenderText() string

RenderText renders the execution plan as text

type DebugPlanMetadata

type DebugPlanMetadata struct {
	CreatedAt     time.Time `json:"created_at"`
	OptimizedAt   time.Time `json:"optimized_at,omitempty"`
	ExecutedAt    time.Time `json:"executed_at,omitempty"`
	Optimizations []string  `json:"optimizations,omitempty"`
}

DebugPlanMetadata contains metadata about the execution plan

type EstimatedCost

type EstimatedCost struct {
	Rows   int64         `json:"rows"`
	Memory int64         `json:"memory"`
	CPU    time.Duration `json:"cpu"`
}

EstimatedCost contains estimated cost metrics

type ExecutionPlan

type ExecutionPlan struct {
	// contains filtered or unexported fields
}

ExecutionPlan represents a planned query execution with metadata

func CreateExecutionPlan

func CreateExecutionPlan(source *DataFrame, operations []LazyOperation) *ExecutionPlan

CreateExecutionPlan analyzes operations and creates an execution plan

type FilterFusionRule

type FilterFusionRule struct{}

FilterFusionRule combines multiple filter operations into a single operation

func (*FilterFusionRule) Apply

func (r *FilterFusionRule) Apply(plan *ExecutionPlan) *ExecutionPlan

func (*FilterFusionRule) Name

func (r *FilterFusionRule) Name() string

type FilterOperation

type FilterOperation struct {
	// contains filtered or unexported fields
}

FilterOperation represents a filter operation

func (*FilterOperation) Apply

func (f *FilterOperation) Apply(df *DataFrame) (*DataFrame, error)

func (*FilterOperation) String

func (f *FilterOperation) String() string

type FunctionExprInterface

type FunctionExprInterface interface {
	expr.Expr
	Name() string
	Args() []expr.Expr
}

FunctionExprInterface defines the interface for function expressions

type GroupBy

type GroupBy struct {
	// contains filtered or unexported fields
}

GroupBy represents a grouped DataFrame for aggregation operations

func (*GroupBy) Agg

func (gb *GroupBy) Agg(aggregations ...*expr.AggregationExpr) *DataFrame

Agg performs aggregation operations on the grouped data

type GroupByOperation

type GroupByOperation struct {
	// contains filtered or unexported fields
}

GroupByOperation represents a group by and aggregation operation

func (*GroupByOperation) Apply

func (g *GroupByOperation) Apply(df *DataFrame) (*DataFrame, error)

func (*GroupByOperation) String

func (g *GroupByOperation) String() string

type ISeries

type ISeries interface {
	Name() string
	Len() int
	DataType() arrow.DataType
	IsNull(index int) bool
	String() string
	Array() arrow.Array
	Release()
}

ISeries provides a type-erased interface for Series of any type

type JoinOperation

type JoinOperation struct {
	// contains filtered or unexported fields
}

JoinOperation represents a join operation

func (*JoinOperation) Apply

func (j *JoinOperation) Apply(df *DataFrame) (*DataFrame, error)

func (*JoinOperation) String

func (j *JoinOperation) String() string

type JoinOptimizer

type JoinOptimizer struct {
	// contains filtered or unexported fields
}

JoinOptimizer selects optimal join strategy based on table statistics

func NewJoinOptimizer

func NewJoinOptimizer(left, right *DataFrame) *JoinOptimizer

NewJoinOptimizer creates a new join optimizer with table statistics

func (*JoinOptimizer) SelectStrategy

func (jo *JoinOptimizer) SelectStrategy(leftKeys, rightKeys []string) JoinStrategy

SelectStrategy chooses the optimal join strategy based on table characteristics

type JoinOptions

type JoinOptions struct {
	Type      JoinType
	LeftKey   string   // Single join key for left DataFrame
	RightKey  string   // Single join key for right DataFrame
	LeftKeys  []string // Multiple join keys for left DataFrame
	RightKeys []string // Multiple join keys for right DataFrame
}

JoinOptions specifies parameters for join operations

type JoinStrategy

type JoinStrategy int

JoinStrategy represents different join algorithms

const (
	HashJoinStrategy JoinStrategy = iota
	BroadcastJoinStrategy
	MergeJoinStrategy
	OptimizedHashJoinStrategy
)

type JoinType

type JoinType int

JoinType represents the type of join operation

const (
	InnerJoin JoinType = iota
	LeftJoin
	RightJoin
	FullOuterJoin
)

type LazyFrame

type LazyFrame struct {
	// contains filtered or unexported fields
}

LazyFrame holds a DataFrame and a sequence of deferred operations

func (*LazyFrame) Collect

func (lf *LazyFrame) Collect() (*DataFrame, error)

Collect executes all deferred operations and returns the resulting DataFrame

func (*LazyFrame) Explain

func (lf *LazyFrame) Explain() DebugExecutionPlan

Explain generates an execution plan without executing the operations

func (*LazyFrame) ExplainAnalyze

func (lf *LazyFrame) ExplainAnalyze() (DebugExecutionPlan, error)

ExplainAnalyze generates an execution plan and executes it with profiling

func (*LazyFrame) Filter

func (lf *LazyFrame) Filter(predicate expr.Expr) *LazyFrame

Filter adds a filter operation to the lazy frame

func (*LazyFrame) GroupBy

func (lf *LazyFrame) GroupBy(columns ...string) *LazyGroupBy

GroupBy adds a group by and aggregation operation to the lazy frame

func (*LazyFrame) Join

func (lf *LazyFrame) Join(right *LazyFrame, options *JoinOptions) *LazyFrame

Join adds a join operation to the lazy frame

func (*LazyFrame) Release

func (lf *LazyFrame) Release()

Release releases resources

func (*LazyFrame) SafeCollectParallel

func (lf *LazyFrame) SafeCollectParallel() (*DataFrame, error)

SafeCollectParallel executes all deferred operations using memory-safe parallel processing

func (*LazyFrame) SafeCollectParallelWithMonitoring

func (lf *LazyFrame) SafeCollectParallelWithMonitoring() (*DataFrame, error)

SafeCollectParallelWithMonitoring executes operations with memory monitoring and adaptive parallelism

func (*LazyFrame) Select

func (lf *LazyFrame) Select(columns ...string) *LazyFrame

Select adds a column selection operation to the lazy frame

func (*LazyFrame) Sort

func (lf *LazyFrame) Sort(column string, ascending bool) *LazyFrame

Sort adds a sort operation to the lazy frame

func (*LazyFrame) SortBy

func (lf *LazyFrame) SortBy(columns []string, ascending []bool) *LazyFrame

SortBy adds a multi-column sort operation to the lazy frame

func (*LazyFrame) String

func (lf *LazyFrame) String() string

String returns a string representation of the lazy frame and its operations

func (*LazyFrame) WithColumn

func (lf *LazyFrame) WithColumn(name string, expr expr.Expr) *LazyFrame

WithColumn adds a column creation/modification operation to the lazy frame

type LazyGroupBy

type LazyGroupBy struct {
	// contains filtered or unexported fields
}

LazyGroupBy represents a lazy groupby operation that can be followed by aggregations

func (*LazyGroupBy) Agg

func (lgb *LazyGroupBy) Agg(aggregations ...*expr.AggregationExpr) *LazyFrame

Agg performs aggregation operations and returns a new LazyFrame

func (*LazyGroupBy) Count

func (lgb *LazyGroupBy) Count(column string) *LazyFrame

Count creates a count aggregation for the specified column

func (*LazyGroupBy) Max

func (lgb *LazyGroupBy) Max(column string) *LazyFrame

Max creates a max aggregation for the specified column

func (*LazyGroupBy) Mean

func (lgb *LazyGroupBy) Mean(column string) *LazyFrame

Mean creates a mean aggregation for the specified column

func (*LazyGroupBy) Min

func (lgb *LazyGroupBy) Min(column string) *LazyFrame

Min creates a min aggregation for the specified column

func (*LazyGroupBy) Sum

func (lgb *LazyGroupBy) Sum(column string) *LazyFrame

Sum creates a sum aggregation for the specified column

type LazyOperation

type LazyOperation interface {
	Apply(df *DataFrame) (*DataFrame, error)
	String() string
}

LazyOperation represents a deferred operation on a DataFrame

type LogLevel

type LogLevel int

LogLevel defines the verbosity of debug logging

const (
	LogLevelInfo LogLevel = iota
	LogLevelDebug
	LogLevelTrace
)

type MemoryStats

type MemoryStats struct {
	Before int64 `json:"before"`
	After  int64 `json:"after"`
	Delta  int64 `json:"delta"`
}

MemoryStats contains memory usage statistics

type MergeJoinOptimizer

type MergeJoinOptimizer struct {
	// contains filtered or unexported fields
}

MergeJoinOptimizer tracks if DataFrames are pre-sorted

func NewMergeJoinOptimizer

func NewMergeJoinOptimizer(leftSorted, rightSorted bool, sortKeys []string) *MergeJoinOptimizer

NewMergeJoinOptimizer creates a merge join optimizer

func (*MergeJoinOptimizer) Join

func (mjo *MergeJoinOptimizer) Join(left, right *DataFrame, options *JoinOptions) (*DataFrame, error)

Join performs optimized merge join

type OperationFusionRule

type OperationFusionRule struct{}

OperationFusionRule combines compatible operations for better performance

func (*OperationFusionRule) Apply

func (*OperationFusionRule) Name

func (r *OperationFusionRule) Name() string

type OperationSummary

type OperationSummary struct {
	TotalOperations int           `json:"total_operations"`
	TotalDuration   time.Duration `json:"total_duration"`
	TotalMemory     int64         `json:"total_memory"`
	ParallelOps     int           `json:"parallel_ops"`
}

OperationSummary contains summary statistics

type OperationTrace

type OperationTrace struct {
	ID          string            `json:"id"`
	Operation   string            `json:"operation"`
	Input       DataFrameStats    `json:"input"`
	Output      DataFrameStats    `json:"output"`
	Duration    time.Duration     `json:"duration"`
	Memory      MemoryStats       `json:"memory"`
	Parallel    bool              `json:"parallel"`
	WorkerCount int               `json:"worker_count,omitempty"`
	Properties  map[string]string `json:"properties"`
}

OperationTrace represents a traced operation during execution

type OptimizationRule

type OptimizationRule interface {
	Apply(plan *ExecutionPlan) *ExecutionPlan
	Name() string
}

OptimizationRule represents a single optimization transformation

type OptimizedHashMap

type OptimizedHashMap struct {
	// contains filtered or unexported fields
}

OptimizedHashMap uses xxhash for better performance

func NewOptimizedHashMap

func NewOptimizedHashMap(estimatedSize int) *OptimizedHashMap

NewOptimizedHashMap creates a new optimized hash map

func (*OptimizedHashMap) Get

func (ohm *OptimizedHashMap) Get(key string) ([]int, bool)

Get retrieves values for a key

func (*OptimizedHashMap) Put

func (ohm *OptimizedHashMap) Put(key string, value int)

Put adds a key-value pair to the hash map

type OutputFormat

type OutputFormat int

OutputFormat defines the output format for debug information

const (
	OutputFormatText OutputFormat = iota
	OutputFormatJSON
)

type PlanCost

type PlanCost struct {
	Estimated EstimatedCost `json:"estimated"`
	Actual    ActualCost    `json:"actual,omitempty"`
}

PlanCost contains cost information for a plan node

type PlanMetadata

type PlanMetadata struct {
	// contains filtered or unexported fields
}

PlanMetadata contains analysis information about the execution plan

type PlanNode

type PlanNode struct {
	ID          string            `json:"id"`
	Type        string            `json:"type"`        // "Filter", "Select", "GroupBy", etc.
	Description string            `json:"description"` // Human-readable operation description
	Children    []*PlanNode       `json:"children,omitempty"`
	Cost        PlanCost          `json:"cost"`
	Properties  map[string]string `json:"properties"`
}

PlanNode represents a node in the execution plan tree

type PlanStats

type PlanStats struct {
	TotalRows     int64         `json:"total_rows"`
	TotalMemory   int64         `json:"total_memory"`
	TotalDuration time.Duration `json:"total_duration"`
	PeakMemory    int64         `json:"peak_memory"`
	ParallelOps   int           `json:"parallel_ops"`
}

PlanStats contains overall plan statistics

type PredicatePushdownRule

type PredicatePushdownRule struct{}

PredicatePushdownRule moves filter operations earlier in the pipeline

func (*PredicatePushdownRule) Apply

func (*PredicatePushdownRule) Name

func (r *PredicatePushdownRule) Name() string

type ProjectionPushdownRule

type ProjectionPushdownRule struct{}

ProjectionPushdownRule pushes column selections earlier to reduce data processing

func (*ProjectionPushdownRule) Apply

func (*ProjectionPushdownRule) Name

func (r *ProjectionPushdownRule) Name() string

type QueryAnalyzer

type QueryAnalyzer struct {
	// contains filtered or unexported fields
}

QueryAnalyzer analyzes and traces query execution

func NewQueryAnalyzer

func NewQueryAnalyzer(config DebugConfig) *QueryAnalyzer

NewQueryAnalyzer creates a new query analyzer

func (*QueryAnalyzer) GenerateReport

func (qa *QueryAnalyzer) GenerateReport() AnalysisReport

GenerateReport generates an analysis report

func (*QueryAnalyzer) TraceOperation

func (qa *QueryAnalyzer) TraceOperation(
	op string, input *DataFrame, fn func() (*DataFrame, error),
) (*DataFrame, error)

TraceOperation traces an operation execution

type QueryOptimizer

type QueryOptimizer struct {
	// contains filtered or unexported fields
}

QueryOptimizer applies optimization rules to improve query performance

func NewQueryOptimizer

func NewQueryOptimizer() *QueryOptimizer

NewQueryOptimizer creates a new optimizer with default rules

func (*QueryOptimizer) Optimize

func (qo *QueryOptimizer) Optimize(plan *ExecutionPlan) *ExecutionPlan

Optimize applies all optimization rules to the execution plan

type SelectOperation

type SelectOperation struct {
	// contains filtered or unexported fields
}

SelectOperation represents a column selection operation

func (*SelectOperation) Apply

func (s *SelectOperation) Apply(df *DataFrame) (*DataFrame, error)

func (*SelectOperation) String

func (s *SelectOperation) String() string

type SortOperation

type SortOperation struct {
	// contains filtered or unexported fields
}

SortOperation represents a sort operation

func (*SortOperation) Apply

func (s *SortOperation) Apply(df *DataFrame) (*DataFrame, error)

func (*SortOperation) String

func (s *SortOperation) String() string

type TableStats

type TableStats struct {
	RowCount      int
	SortedColumns map[string]bool
	Cardinality   map[string]int // distinct values per column
}

TableStats holds statistics about a DataFrame for optimization decisions

type UnaryExprInterface

type UnaryExprInterface interface {
	expr.Expr
	Op() expr.UnaryOp
	Operand() expr.Expr
}

UnaryExprInterface defines the interface for unary expressions

type WithColumnOperation

type WithColumnOperation struct {
	// contains filtered or unexported fields
}

WithColumnOperation represents adding/modifying a column

func (*WithColumnOperation) Apply

func (w *WithColumnOperation) Apply(df *DataFrame) (*DataFrame, error)

func (*WithColumnOperation) String

func (w *WithColumnOperation) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL