Documentation
¶
Overview ¶
Package dataframe provides high-performance DataFrame operations
Package dataframe provides high-performance DataFrame operations with optimized HAVING clause evaluation
Index ¶
- Constants
- type AccessPattern
- type ActualCost
- type AnalysisReport
- type ArrowValueArray
- type BinaryExprInterface
- type Bottleneck
- type CacheLocalityHint
- type ColumnAccessInfo
- type CompiledExpression
- type CompiledHavingEvaluator
- type CompiledOperation
- type ConstantFoldingRule
- type DataFrame
- func (df *DataFrame) Column(name string) (ISeries, bool)
- func (df *DataFrame) Columns() []string
- func (df *DataFrame) Concat(others ...*DataFrame) *DataFrame
- func (df *DataFrame) Correlation(col1, col2 string) (float64, error)
- func (df *DataFrame) Debug() *DataFrame
- func (df *DataFrame) Drop(names ...string) *DataFrame
- func (df *DataFrame) GroupBy(columns ...string) *GroupBy
- func (df *DataFrame) HasColumn(name string) bool
- func (df *DataFrame) Join(right *DataFrame, options *JoinOptions) (*DataFrame, error)
- func (df *DataFrame) Lazy() *LazyFrame
- func (df *DataFrame) Len() int
- func (df *DataFrame) NumRows() int
- func (df *DataFrame) OptimizedJoin(right *DataFrame, options *JoinOptions) (*DataFrame, error)
- func (df *DataFrame) Release()
- func (df *DataFrame) RollingWindow(column string, windowSize int, operation string) (*DataFrame, error)
- func (df *DataFrame) SafeCollectParallel() (*DataFrame, error)
- func (df *DataFrame) SafeCollectParallelWithMonitoring() (*DataFrame, error)
- func (df *DataFrame) Select(names ...string) *DataFrame
- func (df *DataFrame) Slice(start, end int) *DataFrame
- func (df *DataFrame) Sort(column string, ascending bool) (*DataFrame, error)
- func (df *DataFrame) SortBy(columns []string, ascending []bool) (*DataFrame, error)
- func (df *DataFrame) String() string
- func (df *DataFrame) Width() int
- func (df *DataFrame) WindowFunction(function string, columns ...string) (*DataFrame, error)
- func (df *DataFrame) WindowFunctionWithPartition(function, orderBy, partitionBy string) (*DataFrame, error)
- func (df *DataFrame) WithConfig(opConfig config.OperationConfig) *DataFrame
- func (df *DataFrame) WithDebugConfig(_ DebugConfig) *DataFrame
- type DebugConfig
- type DebugContext
- type DebugExecutionPlan
- type DebugPlanMetadata
- type EstimatedCost
- type ExecutionPlan
- type ExpressionPlan
- type FilterFusionRule
- type FilterMaskOperation
- type FilterMemoryPoolManager
- type FilterOperation
- type FunctionExprInterface
- type GroupBy
- type GroupByHavingOperation
- type GroupByOperation
- type HavingMemoryPool
- type HavingOperation
- type HavingPerformanceMetrics
- type ISeries
- type JoinOperation
- type JoinOptimizer
- type JoinOptions
- type JoinStrategy
- type JoinType
- type LazyFrame
- func (lf *LazyFrame) Collect(ctx ...context.Context) (*DataFrame, error)
- func (lf *LazyFrame) Explain() DebugExecutionPlan
- func (lf *LazyFrame) ExplainAnalyze() (DebugExecutionPlan, error)
- func (lf *LazyFrame) Filter(predicate expr.Expr) *LazyFrame
- func (lf *LazyFrame) GroupBy(columns ...string) *LazyGroupBy
- func (lf *LazyFrame) Join(right *LazyFrame, options *JoinOptions) *LazyFrame
- func (lf *LazyFrame) Release()
- func (lf *LazyFrame) SafeCollectParallel() (*DataFrame, error)
- func (lf *LazyFrame) SafeCollectParallelWithMonitoring() (*DataFrame, error)
- func (lf *LazyFrame) Select(columns ...string) *LazyFrame
- func (lf *LazyFrame) Sort(column string, ascending bool) *LazyFrame
- func (lf *LazyFrame) SortBy(columns []string, ascending []bool) *LazyFrame
- func (lf *LazyFrame) String() string
- func (lf *LazyFrame) WithColumn(name string, expr expr.Expr) *LazyFrame
- type LazyGroupBy
- func (lgb *LazyGroupBy) Agg(aggregations ...*expr.AggregationExpr) *LazyFrame
- func (lgb *LazyGroupBy) AggWithHaving(havingPredicate expr.Expr, aggregations ...*expr.AggregationExpr) *LazyFrame
- func (lgb *LazyGroupBy) Count(column string) *LazyFrame
- func (lgb *LazyGroupBy) Having(predicate expr.Expr) *LazyFrame
- func (lgb *LazyGroupBy) Max(column string) *LazyFrame
- func (lgb *LazyGroupBy) Mean(column string) *LazyFrame
- func (lgb *LazyGroupBy) Min(column string) *LazyFrame
- func (lgb *LazyGroupBy) Sum(column string) *LazyFrame
- type LazyOperation
- type LogLevel
- type MemoryLayoutInfo
- type MemoryStats
- type MergeJoinOptimizer
- type OperationFusionRule
- type OperationSummary
- type OperationTrace
- type OperationType
- type OptimizationRule
- type OptimizedHashMap
- type OutputFormat
- type PerformanceHint
- type PlanCost
- type PlanMetadata
- type PlanNode
- type PlanStats
- type PredicatePushdownRule
- type ProjectionPushdownRule
- type QueryAnalyzer
- type QueryOptimizer
- type SelectOperation
- type SortOperation
- type Stats
- type TableStats
- type UnaryExprInterface
- type WithColumnOperation
Constants ¶
const ( // AvgBytesPerCell is the estimated average bytes per DataFrame cell. AvgBytesPerCell = 8 // ParallelThreshold is the minimum number of rows to trigger parallel execution. ParallelThreshold = 1000 // FilterSelectivity is the default assumed selectivity for filter operations. FilterSelectivity = 0.5 )
Package-level constants for consistent usage across debug functionality.
const ( // DefaultMaxPoolSize defines the default maximum memory pool size (64MB). DefaultMaxPoolSize = 64 * 1024 * 1024 // MinParallelGroupThreshold defines the minimum groups needed for parallel execution. MinParallelGroupThreshold = 100 // DefaultChunkSize defines the default chunk size for parallel processing. DefaultChunkSize = 1000 // MinChunkSize defines the minimum chunk size for processing. MinChunkSize = 100 // MaxChunkSize defines the maximum chunk size for processing. MaxChunkSize = 10000 // MultiColumnThreshold defines threshold for multi-column cache locality. MultiColumnThreshold = 3 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccessPattern ¶ added in v0.3.0
type AccessPattern int
AccessPattern describes how the column will be accessed.
const ( AccessSequential AccessPattern = iota AccessRandom AccessGrouped )
type ActualCost ¶
type ActualCost struct {
Rows int64 `json:"rows"`
Memory int64 `json:"memory"`
CPU time.Duration `json:"cpu"`
Duration time.Duration `json:"duration"`
}
ActualCost contains actual cost metrics after execution.
type AnalysisReport ¶
type AnalysisReport struct {
Operations []OperationTrace `json:"operations"`
Summary OperationSummary `json:"summary"`
Bottlenecks []Bottleneck `json:"bottlenecks"`
Suggestions []string `json:"suggestions"`
}
AnalysisReport contains the complete analysis report.
type ArrowValueArray ¶
ArrowValueArray represents an Arrow array with value access.
type BinaryExprInterface ¶
type BinaryExprInterface interface {
expr.Expr
Left() expr.Expr
Op() expr.BinaryOp
Right() expr.Expr
}
BinaryExprInterface defines the interface for binary expressions.
type Bottleneck ¶
type Bottleneck struct {
Operation string `json:"operation"`
Duration time.Duration `json:"duration"`
Reason string `json:"reason"`
}
Bottleneck represents a performance bottleneck.
type CacheLocalityHint ¶ added in v0.3.0
type CacheLocalityHint int
CacheLocalityHint provides cache optimization hints.
const ( CacheLocalitySequential CacheLocalityHint = iota CacheLocalityRandom CacheLocalityGrouped )
type ColumnAccessInfo ¶ added in v0.3.0
type ColumnAccessInfo struct {
// contains filtered or unexported fields
}
ColumnAccessInfo contains optimized column access information.
type CompiledExpression ¶ added in v0.3.0
type CompiledExpression struct {
// contains filtered or unexported fields
}
CompiledExpression represents a pre-compiled expression for repeated evaluation.
type CompiledHavingEvaluator ¶ added in v0.3.0
type CompiledHavingEvaluator struct {
// contains filtered or unexported fields
}
CompiledHavingEvaluator provides optimized evaluation for HAVING predicates.
func NewCompiledHavingEvaluator ¶ added in v0.3.0
func NewCompiledHavingEvaluator(predicate expr.Expr, hint PerformanceHint) (*CompiledHavingEvaluator, error)
NewCompiledHavingEvaluator creates a new optimized HAVING evaluator.
func (*CompiledHavingEvaluator) CompileExpression ¶ added in v0.3.0
func (c *CompiledHavingEvaluator) CompileExpression() error
CompileExpression compiles the HAVING predicate for optimized evaluation.
func (*CompiledHavingEvaluator) EvaluateAggregated ¶ added in v0.3.0
func (c *CompiledHavingEvaluator) EvaluateAggregated(aggregatedData map[string]arrow.Array) (*array.Boolean, error)
EvaluateAggregated evaluates the HAVING predicate against aggregated data.
func (*CompiledHavingEvaluator) GetMetrics ¶ added in v0.3.0
func (c *CompiledHavingEvaluator) GetMetrics() HavingPerformanceMetrics
GetMetrics returns the current performance metrics.
func (*CompiledHavingEvaluator) Release ¶ added in v0.3.0
func (c *CompiledHavingEvaluator) Release()
Release releases resources held by the evaluator.
type CompiledOperation ¶ added in v0.3.0
type CompiledOperation struct {
// contains filtered or unexported fields
}
CompiledOperation represents a single optimized operation in the expression plan.
type ConstantFoldingRule ¶
type ConstantFoldingRule struct{}
ConstantFoldingRule evaluates constant expressions at planning time.
func (*ConstantFoldingRule) Apply ¶
func (r *ConstantFoldingRule) Apply(plan *ExecutionPlan) *ExecutionPlan
func (*ConstantFoldingRule) Name ¶
func (r *ConstantFoldingRule) Name() string
type DataFrame ¶
type DataFrame struct {
// contains filtered or unexported fields
}
DataFrame represents a two-dimensional table of data with named, typed columns.
DataFrame provides efficient data manipulation operations through lazy evaluation and automatic parallelization. All operations preserve data integrity and type safety while optimizing for performance with large datasets.
Key characteristics:
- Columnar data storage using Apache Arrow arrays
- Lazy evaluation for query optimization
- Automatic parallel processing for operations on large datasets (>1000 rows)
- Type-safe operations with compile-time verification
- Memory-efficient operations with proper resource management
Example usage:
mem := memory.NewGoAllocator()
names := series.New("name", []string{"Alice", "Bob"}, mem)
ages := series.New("age", []int64{25, 30}, mem)
df := dataframe.New(names, ages)
defer df.Release() // Essential for memory management
// Lazy operations are chained and optimized
result, err := df.Lazy().
Filter(expr.Col("age").Gt(expr.Lit(25))).
Select("name").
Collect()
Memory Management: DataFrames manage Apache Arrow memory and require explicit cleanup via Release(). Always use defer to ensure proper resource cleanup, even in error conditions.
func New ¶
New creates a new DataFrame from one or more Series columns.
All provided Series must have the same length, or the DataFrame creation will panic. Column names must be unique across all Series.
Parameters:
series: One or more ISeries representing the columns of the DataFrame.
Each series becomes a column with the series name as the column name.
Returns:
*DataFrame: A new DataFrame containing the provided columns.
Panics:
- If series have different lengths
- If duplicate column names are provided
- If no series are provided
Example:
mem := memory.NewGoAllocator()
names := series.New("employee", []string{"Alice", "Bob", "Charlie"}, mem)
ages := series.New("age", []int64{25, 30, 35}, mem)
salaries := series.New("salary", []float64{50000.0, 60000.0, 70000.0}, mem)
df := dataframe.New(names, ages, salaries)
defer df.Release()
Memory Management: The DataFrame takes ownership of the provided Series. You must call Release() on the DataFrame to properly clean up memory. The original Series references remain valid but should also be released when no longer needed.
func (*DataFrame) Concat ¶
Concat concatenates multiple DataFrames vertically (row-wise) All DataFrames must have the same column structure.
func (*DataFrame) Correlation ¶
Correlation calculates the Pearson correlation coefficient between two numeric columns.
func (*DataFrame) Join ¶
func (df *DataFrame) Join(right *DataFrame, options *JoinOptions) (*DataFrame, error)
Join performs a join operation between two DataFrames.
func (*DataFrame) Lazy ¶
Lazy converts a DataFrame to a LazyFrame for deferred operations.
This method creates a LazyFrame that wraps the current DataFrame, enabling lazy evaluation and query optimization. Operations added to the LazyFrame are not executed immediately but are instead accumulated in a query plan.
Returns:
*LazyFrame: A new LazyFrame wrapping this DataFrame with an empty operation queue.
Example:
// Convert DataFrame to LazyFrame for chained operations
lazy := df.Lazy()
// Chain operations without immediate execution
result, err := lazy.
Filter(expr.Col("status").Eq(expr.Lit("active"))).
Select("id", "name").
Collect() // Operations execute here
Performance Benefits:
- Operations are optimized before execution
- Automatic parallelization for large datasets
- Memory-efficient streaming processing
- Predicate pushdown reduces data movement
The LazyFrame maintains a reference to the original DataFrame, so both objects should be properly released when no longer needed.
func (*DataFrame) NumRows ¶ added in v0.3.0
NumRows returns the number of rows (alias for Len for compatibility).
func (*DataFrame) OptimizedJoin ¶
func (df *DataFrame) OptimizedJoin(right *DataFrame, options *JoinOptions) (*DataFrame, error)
OptimizedJoin performs join using the selected optimal strategy.
func (*DataFrame) Release ¶
func (df *DataFrame) Release()
Release releases all underlying Arrow memory.
func (*DataFrame) RollingWindow ¶
func (df *DataFrame) RollingWindow(column string, windowSize int, operation string) (*DataFrame, error)
RollingWindow applies a rolling window operation to a column.
func (*DataFrame) SafeCollectParallel ¶
SafeCollectParallel performs memory-safe parallel collection using the new infrastructure.
func (*DataFrame) SafeCollectParallelWithMonitoring ¶
SafeCollectParallelWithMonitoring performs memory-safe parallel collection with memory monitoring.
func (*DataFrame) Select ¶
Select returns a new DataFrame containing only the specified columns.
The operation preserves the order of columns as specified in the names parameter. If a column name doesn't exist, it is silently ignored. This is an eager operation that immediately creates a new DataFrame.
Parameters:
names: Variable number of column names to include in the result.
Column names are case-sensitive and must match exactly.
Returns:
*DataFrame: A new DataFrame with only the selected columns.
If no valid column names are provided, returns an empty DataFrame.
Example:
// Original DataFrame has columns: "name", "age", "salary", "department"
selected := df.Select("name", "age") // Two columns
nameOnly := df.Select("name") // Single column
reordered := df.Select("salary", "name") // Different order
partial := df.Select("name", "invalid") // "invalid" ignored
Performance: Select is an O(k) operation where k is the number of selected columns. No data copying occurs - the new DataFrame shares references to the same underlying Arrow arrays.
Memory Management: The returned DataFrame shares column references with the original DataFrame. Both DataFrames should be released independently when no longer needed.
func (*DataFrame) Slice ¶
Slice creates a new DataFrame containing rows from start (inclusive) to end (exclusive).
func (*DataFrame) WindowFunction ¶
WindowFunction applies a window function to the DataFrame.
func (*DataFrame) WindowFunctionWithPartition ¶
func (df *DataFrame) WindowFunctionWithPartition(function, orderBy, partitionBy string) (*DataFrame, error)
WindowFunctionWithPartition applies a window function with optional partitioning.
func (*DataFrame) WithConfig ¶
func (df *DataFrame) WithConfig(opConfig config.OperationConfig) *DataFrame
WithConfig returns a new DataFrame with the specified operation configuration.
func (*DataFrame) WithDebugConfig ¶
func (df *DataFrame) WithDebugConfig(_ DebugConfig) *DataFrame
WithDebugConfig sets the debug configuration for the DataFrame.
type DebugConfig ¶
type DebugConfig struct {
Enabled bool `json:"enabled"`
LogLevel LogLevel `json:"log_level"`
ProfileOperations bool `json:"profile_operations"`
TrackMemory bool `json:"track_memory"`
ShowOptimizations bool `json:"show_optimizations"`
OutputFormat OutputFormat `json:"output_format"`
}
DebugConfig configures debug mode settings.
type DebugContext ¶ added in v0.3.1
type DebugContext struct {
// contains filtered or unexported fields
}
DebugContext manages debug-related state and provides thread-safe utilities.
func NewDebugContext ¶ added in v0.3.1
func NewDebugContext() *DebugContext
NewDebugContext creates a new debug context.
func (*DebugContext) GenerateTraceID ¶ added in v0.3.1
func (dc *DebugContext) GenerateTraceID() string
GenerateTraceID generates a unique trace ID using atomic operations for thread safety.
type DebugExecutionPlan ¶
type DebugExecutionPlan struct {
RootNode *PlanNode `json:"root"`
Estimated PlanStats `json:"estimated"`
Actual PlanStats `json:"actual,omitempty"`
Metadata DebugPlanMetadata `json:"metadata"`
}
DebugExecutionPlan represents the execution plan for DataFrame operations with debug information.
func (*DebugExecutionPlan) RenderJSON ¶
func (plan *DebugExecutionPlan) RenderJSON() ([]byte, error)
RenderJSON renders the execution plan as JSON.
func (*DebugExecutionPlan) RenderText ¶
func (plan *DebugExecutionPlan) RenderText() string
RenderText renders the execution plan as text.
type DebugPlanMetadata ¶
type DebugPlanMetadata struct {
CreatedAt time.Time `json:"created_at"`
OptimizedAt time.Time `json:"optimized_at,omitempty"`
ExecutedAt time.Time `json:"executed_at,omitempty"`
Optimizations []string `json:"optimizations,omitempty"`
}
DebugPlanMetadata contains metadata about the execution plan.
type EstimatedCost ¶
type EstimatedCost struct {
Rows int64 `json:"rows"`
Memory int64 `json:"memory"`
CPU time.Duration `json:"cpu"`
}
EstimatedCost contains estimated cost metrics.
type ExecutionPlan ¶
type ExecutionPlan struct {
// contains filtered or unexported fields
}
ExecutionPlan represents a planned query execution with metadata.
func CreateExecutionPlan ¶
func CreateExecutionPlan(source *DataFrame, operations []LazyOperation) *ExecutionPlan
CreateExecutionPlan analyzes operations and creates an execution plan.
type ExpressionPlan ¶ added in v0.3.0
type ExpressionPlan struct {
// contains filtered or unexported fields
}
ExpressionPlan represents an optimized execution plan for an expression.
type FilterFusionRule ¶
type FilterFusionRule struct{}
FilterFusionRule combines multiple filter operations into a single operation.
func (*FilterFusionRule) Apply ¶
func (r *FilterFusionRule) Apply(plan *ExecutionPlan) *ExecutionPlan
func (*FilterFusionRule) Name ¶
func (r *FilterFusionRule) Name() string
type FilterMaskOperation ¶ added in v0.3.1
type FilterMaskOperation interface {
// contains filtered or unexported methods
}
FilterMaskOperation represents an operation that can apply boolean filter masks.
type FilterMemoryPoolManager ¶ added in v0.3.1
type FilterMemoryPoolManager struct {
// contains filtered or unexported fields
}
FilterMemoryPoolManager manages the global memory pool for filter operations.
type FilterOperation ¶
type FilterOperation struct {
// contains filtered or unexported fields
}
FilterOperation represents a filter operation.
func (*FilterOperation) String ¶
func (f *FilterOperation) String() string
type FunctionExprInterface ¶
FunctionExprInterface defines the interface for function expressions.
type GroupBy ¶
type GroupBy struct {
// contains filtered or unexported fields
}
GroupBy represents a grouped DataFrame for aggregation operations.
type GroupByHavingOperation ¶ added in v0.3.0
type GroupByHavingOperation struct {
// contains filtered or unexported fields
}
GroupByHavingOperation combines GroupBy, aggregation, and Having filtering.
func (*GroupByHavingOperation) Apply ¶ added in v0.3.0
func (gh *GroupByHavingOperation) Apply(df *DataFrame) (*DataFrame, error)
Apply performs groupby, extracts aggregations from predicate, performs them, and filters.
func (*GroupByHavingOperation) Release ¶ added in v0.3.0
func (gh *GroupByHavingOperation) Release()
Release returns the cached allocator to the pool for reuse.
func (*GroupByHavingOperation) String ¶ added in v0.3.0
func (gh *GroupByHavingOperation) String() string
String returns a string representation of the operation.
type GroupByOperation ¶
type GroupByOperation struct {
// contains filtered or unexported fields
}
GroupByOperation represents a group by and aggregation operation.
func NewGroupByOperation ¶ added in v0.3.0
func NewGroupByOperation(groupByCols []string, aggregations []*expr.AggregationExpr) *GroupByOperation
NewGroupByOperation creates a new GroupByOperation without HAVING predicate (for backward compatibility).
func NewGroupByOperationWithHaving ¶ added in v0.3.0
func NewGroupByOperationWithHaving( groupByCols []string, aggregations []*expr.AggregationExpr, havingPredicate expr.Expr, ) *GroupByOperation
NewGroupByOperationWithHaving creates a new GroupByOperation with optional HAVING predicate.
func (*GroupByOperation) Apply ¶
func (g *GroupByOperation) Apply(df *DataFrame) (*DataFrame, error)
func (*GroupByOperation) String ¶
func (g *GroupByOperation) String() string
type HavingMemoryPool ¶ added in v0.3.0
type HavingMemoryPool struct {
// contains filtered or unexported fields
}
HavingMemoryPool manages memory allocation for HAVING operations.
func NewHavingMemoryPool ¶ added in v0.3.0
func NewHavingMemoryPool(allocator memory.Allocator) *HavingMemoryPool
NewHavingMemoryPool creates a new memory pool for HAVING operations.
func (*HavingMemoryPool) GetAllocator ¶ added in v0.3.0
func (p *HavingMemoryPool) GetAllocator() memory.Allocator
GetAllocator returns the underlying memory allocator.
func (*HavingMemoryPool) GetBooleanBuilder ¶ added in v0.3.0
func (p *HavingMemoryPool) GetBooleanBuilder() *array.BooleanBuilder
GetBooleanBuilder gets a boolean array builder from the pool.
func (*HavingMemoryPool) PutBooleanBuilder ¶ added in v0.3.0
func (p *HavingMemoryPool) PutBooleanBuilder(builder *array.BooleanBuilder)
PutBooleanBuilder returns a boolean array builder to the pool.
func (*HavingMemoryPool) Release ¶ added in v0.3.0
func (p *HavingMemoryPool) Release()
Release releases the memory pool.
type HavingOperation ¶ added in v0.3.0
type HavingOperation struct {
// contains filtered or unexported fields
}
HavingOperation represents a HAVING clause that filters grouped data based on aggregation predicates.
func NewHavingOperation ¶ added in v0.3.0
func NewHavingOperation(predicate expr.Expr) *HavingOperation
NewHavingOperation creates a new HavingOperation with the given predicate.
func (*HavingOperation) Apply ¶ added in v0.3.0
func (h *HavingOperation) Apply(df *DataFrame) (*DataFrame, error)
Apply filters grouped DataFrame based on the aggregation predicate.
func (*HavingOperation) Name ¶ added in v0.3.0
func (h *HavingOperation) Name() string
Name returns the operation name for debugging.
func (*HavingOperation) String ¶ added in v0.3.0
func (h *HavingOperation) String() string
String returns a string representation of the HAVING operation.
type HavingPerformanceMetrics ¶ added in v0.3.0
type HavingPerformanceMetrics struct {
EvaluationTime time.Duration
CompilationTime time.Duration
MemoryAllocations int64
ParallelEfficiency float64
ThroughputMBps float64
CacheHitRate float64
SelectivityActual float64
ColumnAccessTime time.Duration
ExpressionEvalTime time.Duration
FilterApplicationTime time.Duration
MemoryAllocationTime time.Duration
// contains filtered or unexported fields
}
HavingPerformanceMetrics tracks performance metrics for HAVING operations.
type ISeries ¶
type ISeries interface {
Name() string
Len() int
DataType() arrow.DataType
IsNull(index int) bool
String() string
Array() arrow.Array
Release()
GetAsString(index int) string
}
ISeries provides a type-erased interface for Series of any type.
type JoinOperation ¶
type JoinOperation struct {
// contains filtered or unexported fields
}
JoinOperation represents a join operation.
func (*JoinOperation) String ¶
func (j *JoinOperation) String() string
type JoinOptimizer ¶
type JoinOptimizer struct {
// contains filtered or unexported fields
}
JoinOptimizer selects optimal join strategy based on table statistics.
func NewJoinOptimizer ¶
func NewJoinOptimizer(left, right *DataFrame) *JoinOptimizer
NewJoinOptimizer creates a new join optimizer with table statistics.
func (*JoinOptimizer) SelectStrategy ¶
func (jo *JoinOptimizer) SelectStrategy(leftKeys, rightKeys []string) JoinStrategy
SelectStrategy chooses the optimal join strategy based on table characteristics.
type JoinOptions ¶
type JoinOptions struct {
Type JoinType
LeftKey string // Single join key for left DataFrame
RightKey string // Single join key for right DataFrame
LeftKeys []string // Multiple join keys for left DataFrame
RightKeys []string // Multiple join keys for right DataFrame
}
JoinOptions specifies parameters for join operations.
type JoinStrategy ¶
type JoinStrategy int
JoinStrategy represents different join algorithms.
const ( HashJoinStrategy JoinStrategy = iota BroadcastJoinStrategy MergeJoinStrategy OptimizedHashJoinStrategy )
type LazyFrame ¶
type LazyFrame struct {
// contains filtered or unexported fields
}
LazyFrame represents a DataFrame with deferred operations for optimized execution.
LazyFrame implements lazy evaluation, building up a query plan of operations without executing them immediately. This enables powerful optimizations including:
- Query optimization (predicate pushdown, operation fusion)
- Automatic parallelization for large datasets
- Memory-efficient processing through streaming
- Operation reordering for better performance
Operations are only executed when Collect() is called, at which point the entire query plan is optimized and executed in the most efficient manner.
Key characteristics:
- Zero-cost operation chaining until execution
- Automatic parallel execution for datasets > 1000 rows
- Query optimization with predicate pushdown
- Memory-efficient streaming for large operations
- Thread-safe parallel chunk processing
Example usage:
result, err := df.Lazy().
Filter(expr.Col("age").Gt(expr.Lit(25))).
Select("name", "department").
GroupBy("department").
Agg(expr.Count(expr.Col("*")).As("employee_count")).
Collect()
The above builds a query plan and executes it optimally, potentially reordering operations and using parallel processing automatically.
func (*LazyFrame) Collect ¶
Collect executes all accumulated operations and returns the final DataFrame.
This method triggers the execution of the entire query plan built up through lazy operations. The execution is optimized with:
- Query optimization (predicate pushdown, operation fusion)
- Automatic parallelization for large datasets (>1000 rows)
- Memory-efficient chunk processing
- Operation reordering for performance
Parameters:
ctx: Optional context for cancellation support. If provided, the operation
can be canceled before completion.
Returns:
*DataFrame: The result of executing all operations in the query plan. error: Any error encountered during execution.
Example:
result, err := df.Lazy().
Filter(expr.Col("age").Gt(expr.Lit(25))).
GroupBy("department").
Agg(expr.Mean(expr.Col("salary")).As("avg_salary")).
Collect()
if err != nil {
log.Fatal(err)
}
defer result.Release()
Performance:
- Automatic parallel execution for datasets with >1000 rows
- Uses worker pools sized to available CPU cores
- Optimized memory allocation through pooling
- Query plan optimization reduces unnecessary operations
Memory Management: The returned DataFrame is independent and must be released when no longer needed. The LazyFrame and its source DataFrame remain valid after Collect().
Cancellation: If a context is provided, the operation will check for cancellation and return early if the context is canceled. This is useful for long-running operations.
func (*LazyFrame) Explain ¶
func (lf *LazyFrame) Explain() DebugExecutionPlan
Explain generates an execution plan without executing the operations.
func (*LazyFrame) ExplainAnalyze ¶
func (lf *LazyFrame) ExplainAnalyze() (DebugExecutionPlan, error)
ExplainAnalyze generates an execution plan and executes it with profiling.
func (*LazyFrame) GroupBy ¶
func (lf *LazyFrame) GroupBy(columns ...string) *LazyGroupBy
GroupBy adds a group by and aggregation operation to the lazy frame.
func (*LazyFrame) Join ¶
func (lf *LazyFrame) Join(right *LazyFrame, options *JoinOptions) *LazyFrame
Join adds a join operation to the lazy frame.
func (*LazyFrame) SafeCollectParallel ¶
SafeCollectParallel executes all deferred operations using memory-safe parallel processing.
func (*LazyFrame) SafeCollectParallelWithMonitoring ¶
SafeCollectParallelWithMonitoring executes operations with memory monitoring and adaptive parallelism.
type LazyGroupBy ¶
type LazyGroupBy struct {
// contains filtered or unexported fields
}
LazyGroupBy represents a lazy groupby operation that can be followed by aggregations.
func (*LazyGroupBy) Agg ¶
func (lgb *LazyGroupBy) Agg(aggregations ...*expr.AggregationExpr) *LazyFrame
Agg performs aggregation operations and returns a new LazyFrame.
func (*LazyGroupBy) AggWithHaving ¶ added in v0.3.0
func (lgb *LazyGroupBy) AggWithHaving(havingPredicate expr.Expr, aggregations ...*expr.AggregationExpr) *LazyFrame
AggWithHaving performs aggregation operations with an optional HAVING predicate and returns a new LazyFrame.
func (*LazyGroupBy) Count ¶
func (lgb *LazyGroupBy) Count(column string) *LazyFrame
Count creates a count aggregation for the specified column.
func (*LazyGroupBy) Having ¶ added in v0.3.0
func (lgb *LazyGroupBy) Having(predicate expr.Expr) *LazyFrame
Having adds a HAVING clause to filter grouped data based on aggregation predicates.
func (*LazyGroupBy) Max ¶
func (lgb *LazyGroupBy) Max(column string) *LazyFrame
Max creates a max aggregation for the specified column.
func (*LazyGroupBy) Mean ¶
func (lgb *LazyGroupBy) Mean(column string) *LazyFrame
Mean creates a mean aggregation for the specified column.
func (*LazyGroupBy) Min ¶
func (lgb *LazyGroupBy) Min(column string) *LazyFrame
Min creates a min aggregation for the specified column.
func (*LazyGroupBy) Sum ¶
func (lgb *LazyGroupBy) Sum(column string) *LazyFrame
Sum creates a sum aggregation for the specified column.
type LazyOperation ¶
LazyOperation represents a deferred operation on a DataFrame.
type MemoryLayoutInfo ¶ added in v0.3.0
type MemoryLayoutInfo struct {
// contains filtered or unexported fields
}
MemoryLayoutInfo contains memory optimization hints.
type MemoryStats ¶
type MemoryStats struct {
Before int64 `json:"before"`
After int64 `json:"after"`
Delta int64 `json:"delta"`
}
MemoryStats contains memory usage statistics.
type MergeJoinOptimizer ¶
type MergeJoinOptimizer struct {
// contains filtered or unexported fields
}
MergeJoinOptimizer tracks if DataFrames are pre-sorted.
func NewMergeJoinOptimizer ¶
func NewMergeJoinOptimizer(leftSorted, rightSorted bool, sortKeys []string) *MergeJoinOptimizer
NewMergeJoinOptimizer creates a merge join optimizer.
func (*MergeJoinOptimizer) Join ¶
func (mjo *MergeJoinOptimizer) Join(left, right *DataFrame, options *JoinOptions) (*DataFrame, error)
Join performs optimized merge join.
type OperationFusionRule ¶
type OperationFusionRule struct{}
OperationFusionRule combines compatible operations for better performance.
func (*OperationFusionRule) Apply ¶
func (r *OperationFusionRule) Apply(plan *ExecutionPlan) *ExecutionPlan
func (*OperationFusionRule) Name ¶
func (r *OperationFusionRule) Name() string
type OperationSummary ¶
type OperationSummary struct {
TotalOperations int `json:"total_operations"`
TotalDuration time.Duration `json:"total_duration"`
TotalMemory int64 `json:"total_memory"`
ParallelOps int `json:"parallel_ops"`
}
OperationSummary contains summary statistics.
type OperationTrace ¶
type OperationTrace struct {
ID string `json:"id"`
Operation string `json:"operation"`
Input Stats `json:"input"`
Output Stats `json:"output"`
Duration time.Duration `json:"duration"`
Memory MemoryStats `json:"memory"`
Parallel bool `json:"parallel"`
WorkerCount int `json:"worker_count,omitempty"`
Properties map[string]string `json:"properties"`
}
OperationTrace represents a traced operation during execution.
type OperationType ¶ added in v0.3.0
type OperationType int
OperationType represents the type of compiled operation.
const ( OpTypeColumnAccess OperationType = iota OpTypeLiteral OpTypeAggregation OpTypeComparison OpTypeArithmetic OpTypeLogical )
type OptimizationRule ¶
type OptimizationRule interface {
Apply(plan *ExecutionPlan) *ExecutionPlan
Name() string
}
OptimizationRule represents a single optimization transformation.
type OptimizedHashMap ¶
type OptimizedHashMap struct {
// contains filtered or unexported fields
}
OptimizedHashMap uses xxhash for better performance.
func NewOptimizedHashMap ¶
func NewOptimizedHashMap(estimatedSize int) *OptimizedHashMap
NewOptimizedHashMap creates a new optimized hash map.
func (*OptimizedHashMap) Get ¶
func (ohm *OptimizedHashMap) Get(key string) ([]int, bool)
Get retrieves values for a key.
func (*OptimizedHashMap) Put ¶
func (ohm *OptimizedHashMap) Put(key string, value int)
Put adds a key-value pair to the hash map.
type OutputFormat ¶
type OutputFormat int
OutputFormat defines the output format for debug information.
const ( OutputFormatText OutputFormat = iota OutputFormatJSON )
type PerformanceHint ¶ added in v0.3.0
type PerformanceHint struct {
ExpectedGroupCount int
ExpectedSelectivity float64
PreferMemoryOptimization bool
EnableParallelization bool
MaxMemoryUsage int64
}
PerformanceHint provides optimization hints for evaluation.
type PlanCost ¶
type PlanCost struct {
Estimated EstimatedCost `json:"estimated"`
Actual ActualCost `json:"actual,omitempty"`
}
PlanCost contains cost information for a plan node.
type PlanMetadata ¶
type PlanMetadata struct {
// contains filtered or unexported fields
}
PlanMetadata contains analysis information about the execution plan.
type PlanNode ¶
type PlanNode struct {
ID string `json:"id"`
Type string `json:"type"` // "Filter", "Select", "GroupBy", etc.
Description string `json:"description"` // Human-readable operation description
Children []*PlanNode `json:"children,omitempty"`
Cost PlanCost `json:"cost"`
Properties map[string]string `json:"properties"`
}
PlanNode represents a node in the execution plan tree.
type PlanStats ¶
type PlanStats struct {
TotalRows int64 `json:"total_rows"`
TotalMemory int64 `json:"total_memory"`
TotalDuration time.Duration `json:"total_duration"`
PeakMemory int64 `json:"peak_memory"`
ParallelOps int `json:"parallel_ops"`
}
PlanStats contains overall plan statistics.
type PredicatePushdownRule ¶
type PredicatePushdownRule struct{}
PredicatePushdownRule moves filter operations earlier in the pipeline.
func (*PredicatePushdownRule) Apply ¶
func (r *PredicatePushdownRule) Apply(plan *ExecutionPlan) *ExecutionPlan
func (*PredicatePushdownRule) Name ¶
func (r *PredicatePushdownRule) Name() string
type ProjectionPushdownRule ¶
type ProjectionPushdownRule struct{}
ProjectionPushdownRule pushes column selections earlier to reduce data processing.
func (*ProjectionPushdownRule) Apply ¶
func (r *ProjectionPushdownRule) Apply(plan *ExecutionPlan) *ExecutionPlan
func (*ProjectionPushdownRule) Name ¶
func (r *ProjectionPushdownRule) Name() string
type QueryAnalyzer ¶
type QueryAnalyzer struct {
// contains filtered or unexported fields
}
QueryAnalyzer analyzes and traces query execution.
func NewQueryAnalyzer ¶
func NewQueryAnalyzer(config DebugConfig) *QueryAnalyzer
NewQueryAnalyzer creates a new query analyzer.
func (*QueryAnalyzer) GenerateReport ¶
func (qa *QueryAnalyzer) GenerateReport() AnalysisReport
GenerateReport generates an analysis report.
func (*QueryAnalyzer) TraceOperation ¶
func (qa *QueryAnalyzer) TraceOperation( op string, input *DataFrame, fn func() (*DataFrame, error), ) (*DataFrame, error)
TraceOperation traces an operation execution.
type QueryOptimizer ¶
type QueryOptimizer struct {
// contains filtered or unexported fields
}
QueryOptimizer applies optimization rules to improve query performance.
func NewQueryOptimizer ¶
func NewQueryOptimizer() *QueryOptimizer
NewQueryOptimizer creates a new optimizer with default rules.
func (*QueryOptimizer) Optimize ¶
func (qo *QueryOptimizer) Optimize(plan *ExecutionPlan) *ExecutionPlan
Optimize applies all optimization rules to the execution plan.
type SelectOperation ¶
type SelectOperation struct {
// contains filtered or unexported fields
}
SelectOperation represents a column selection operation.
func (*SelectOperation) String ¶
func (s *SelectOperation) String() string
type SortOperation ¶
type SortOperation struct {
// contains filtered or unexported fields
}
SortOperation represents a sort operation.
func (*SortOperation) String ¶
func (s *SortOperation) String() string
type Stats ¶ added in v0.3.1
type Stats struct {
Rows int `json:"rows"`
Columns int `json:"columns"`
Memory int64 `json:"memory"`
Schema []string `json:"schema"`
}
Stats contains statistics about a DataFrame.
type TableStats ¶
type TableStats struct {
RowCount int
SortedColumns map[string]bool
Cardinality map[string]int // distinct values per column
}
TableStats holds statistics about a DataFrame for optimization decisions.
type UnaryExprInterface ¶
UnaryExprInterface defines the interface for unary expressions.
type WithColumnOperation ¶
type WithColumnOperation struct {
// contains filtered or unexported fields
}
WithColumnOperation represents adding/modifying a column.
func (*WithColumnOperation) Apply ¶
func (w *WithColumnOperation) Apply(df *DataFrame) (*DataFrame, error)
func (*WithColumnOperation) String ¶
func (w *WithColumnOperation) String() string