sql

package
v35.0.0-...-ffd8320 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2025 License: Apache-2.0 Imports: 24 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DataFrame

type DataFrame interface {
	// PlanId returns the plan id of the data frame.
	PlanId() int64
	All(ctx context.Context) iter.Seq2[types.Row, error]
	Agg(ctx context.Context, exprs ...column.Convertible) (DataFrame, error)
	AggWithMap(ctx context.Context, exprs map[string]string) (DataFrame, error)
	// Alias creates a new DataFrame with the specified subquery alias
	Alias(ctx context.Context, alias string) DataFrame
	ApproxQuantile(ctx context.Context, probabilities []float64, relativeError float64, cols ...string) ([][]float64, error)
	// Cache persists the DataFrame with the default storage level.
	Cache(ctx context.Context) error
	// Coalesce returns a new DataFrame that has exactly numPartitions partitions.DataFrame
	//
	// Similar to coalesce defined on an :class:`RDD`, this operation results in a
	// narrow dependency, e.g. if you go from 1000 partitions to 100 partitions,
	// there will not be a shuffle, instead each of the 100 new partitions will
	// claim 10 of the current partitions. If a larger number of partitions is requested,
	// it will stay at the current number of partitions.
	//
	// However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
	// this may result in your computation taking place on fewer nodes than
	// you like (e.g. one node in the case of numPartitions = 1). To avoid this,
	// you can call repartition(). This will add a shuffle step, but means the
	// current upstream partitions will be executed in parallel (per whatever
	// the current partitioning is).
	Coalesce(ctx context.Context, numPartitions int) DataFrame
	// Columns returns the list of column names of the DataFrame.
	Columns(ctx context.Context) ([]string, error)
	// Corr calculates the correlation of two columns of a :class:`DataFrame` as a double value.
	// Currently only supports the Pearson Correlation Coefficient.
	Corr(ctx context.Context, col1, col2 string) (float64, error)
	CorrWithMethod(ctx context.Context, col1, col2 string, method string) (float64, error)
	// Count returns the number of rows in the DataFrame.
	Count(ctx context.Context) (int64, error)
	// Cov calculates the sample covariance for the given columns, specified by their names, as a
	// double value.
	Cov(ctx context.Context, col1, col2 string) (float64, error)
	// Collect returns the data rows of the current data frame.
	Collect(ctx context.Context) ([]types.Row, error)
	// CreateTempView creates or replaces a temporary view.
	CreateTempView(ctx context.Context, viewName string, replace, global bool) error
	// CreateOrReplaceTempView creates or replaces a temporary view and replaces the optional existing view.
	CreateOrReplaceTempView(ctx context.Context, viewName string) error
	// CreateGlobalTempView creates a global temporary view.
	CreateGlobalTempView(ctx context.Context, viewName string) error
	// CreateOrReplaceGlobalTempView creates or replaces a global temporary view and replaces the optional existing view.
	CreateOrReplaceGlobalTempView(ctx context.Context, viewName string) error
	// CrossJoin joins the current DataFrame with another DataFrame using the cross product
	CrossJoin(ctx context.Context, other DataFrame) DataFrame
	// CrossTab computes a pair-wise frequency table of the given columns. Also known as a
	// contingency table.
	// The first column of each row will be the distinct values of `col1` and the column names
	// will be the distinct values of `col2`. The name of the first column will be `$col1_$col2`.
	// Pairs that have no occurrences will have zero as their counts.
	CrossTab(ctx context.Context, col1, col2 string) DataFrame
	// Cube creates a multi-dimensional cube for the current DataFrame using
	// the specified columns, so we can run aggregations on them.
	Cube(ctx context.Context, cols ...column.Convertible) *GroupedData
	// Describe omputes basic statistics for numeric and string columns.
	// This includes count, mean, stddev, min, and max.
	Describe(ctx context.Context, cols ...string) DataFrame
	// Distinct returns a new DataFrame containing the distinct rows in this DataFrame.
	Distinct(ctx context.Context) DataFrame
	// Drop returns a new DataFrame that drops the specified list of columns.
	Drop(ctx context.Context, columns ...column.Convertible) (DataFrame, error)
	// DropByName returns a new DataFrame that drops the specified list of columns by name.
	DropByName(ctx context.Context, columns ...string) (DataFrame, error)
	// DropDuplicates returns a new DataFrame that contains only the unique rows from this DataFrame.
	DropDuplicates(ctx context.Context, columns ...string) (DataFrame, error)
	// Drops all rows containing any null or NaN values. This is similar to PySparks dropna with how=any
	DropNa(ctx context.Context, cols ...string) (DataFrame, error)
	// Drops all rows containing all null or NaN values in the specified columns. This is
	// similar to PySparks dropna with how=all
	DropNaAll(ctx context.Context, cols ...string) (DataFrame, error)
	// Drops all rows containing null or NaN values in the specified columns. with a max threshold.
	DropNaWithThreshold(ctx context.Context, threshold int32, cols ...string) (DataFrame, error)
	// ExceptAll is similar to Substract but does not perform the distinct operation.
	ExceptAll(ctx context.Context, other DataFrame) DataFrame
	// Explain returns the string explain plan for the current DataFrame according to the explainMode.
	Explain(ctx context.Context, explainMode utils.ExplainMode) (string, error)
	// FillNa replaces null values with specified value.
	FillNa(ctx context.Context, value types.PrimitiveTypeLiteral, columns ...string) (DataFrame, error)
	// FillNaWithValues replaces null values in specified columns (key of the map) with values.
	FillNaWithValues(ctx context.Context, values map[string]types.PrimitiveTypeLiteral) (DataFrame, error)
	// Filter filters the data frame by a column condition.
	Filter(ctx context.Context, condition column.Convertible) (DataFrame, error)
	// FilterByString filters the data frame by a string condition.
	FilterByString(ctx context.Context, condition string) (DataFrame, error)
	// Returns the first row of the DataFrame.
	First(ctx context.Context) (types.Row, error)
	FreqItems(ctx context.Context, cols ...string) DataFrame
	FreqItemsWithSupport(ctx context.Context, support float64, cols ...string) DataFrame
	// GetStorageLevel returns the storage level of the data frame.
	GetStorageLevel(ctx context.Context) (*utils.StorageLevel, error)
	// GroupBy groups the DataFrame by the spcified columns so that the aggregation
	// can be performed on them. See GroupedData for all the available aggregate functions.
	GroupBy(cols ...column.Convertible) *GroupedData
	// Head is an alias for Limit
	Head(ctx context.Context, limit int32) ([]types.Row, error)
	// Intersect performs the set intersection of two data frames and only returns distinct rows.
	Intersect(ctx context.Context, other DataFrame) DataFrame
	// IntersectAll performs the set intersection of two data frames and returns all rows.
	IntersectAll(ctx context.Context, other DataFrame) DataFrame
	// IsEmpty returns true if the DataFrame is empty.
	IsEmpty(ctx context.Context) (bool, error)
	// Join joins the current DataFrame with another DataFrame using the specified column using the joinType specified.
	Join(ctx context.Context, other DataFrame, on column.Convertible, joinType utils.JoinType) (DataFrame, error)
	// Limit applies a limit on the DataFrame
	Limit(ctx context.Context, limit int32) DataFrame
	// Melt is an alias for Unpivot.
	Melt(ctx context.Context, ids []column.Convertible, values []column.Convertible,
		variableColumnName string, valueColumnName string) (DataFrame, error)
	Na() DataFrameNaFunctions
	// Offset returns a new DataFrame by skipping the first `offset` rows.
	Offset(ctx context.Context, offset int32) DataFrame
	// OrderBy is an alias for Sort
	OrderBy(ctx context.Context, columns ...column.Convertible) (DataFrame, error)
	Persist(ctx context.Context, storageLevel utils.StorageLevel) error
	RandomSplit(ctx context.Context, weights []float64) ([]DataFrame, error)
	// Repartition re-partitions a data frame.
	Repartition(ctx context.Context, numPartitions int, columns []string) (DataFrame, error)
	// RepartitionByRange re-partitions a data frame by range partition.
	RepartitionByRange(ctx context.Context, numPartitions int, columns ...column.Convertible) (DataFrame, error)
	// Replace Returns a new DataFrame` replacing a value with another value.
	// Values toReplace and Values must have the same type and can only be numerics, booleans,
	// or strings. Value can have None. When replacing, the new value will be cast
	// to the type of the existing column.
	//
	// For numeric replacements all values to be replaced should have unique
	// floating point representation. If cols is set allows to specify a subset of columns to
	// perform the replacement.
	Replace(ctx context.Context, toReplace []types.PrimitiveTypeLiteral,
		values []types.PrimitiveTypeLiteral, cols ...string) (DataFrame, error)
	// Rollup creates a multi-dimensional rollup for the current DataFrame using
	// the specified columns, so we can run aggregation on them.
	Rollup(ctx context.Context, cols ...column.Convertible) *GroupedData
	// SameSemantics returns true if the other DataFrame has the same semantics.
	SameSemantics(ctx context.Context, other DataFrame) (bool, error)
	// Sample samples a data frame without replacement and random seed.
	Sample(ctx context.Context, fraction float64) (DataFrame, error)
	// SampleWithReplacement samples a data frame with random seed and with/without replacement.
	SampleWithReplacement(ctx context.Context, withReplacement bool, fraction float64) (DataFrame, error)
	// SampleWithSeed samples a data frame without replacement and given seed.
	SampleWithSeed(ctx context.Context, fraction float64, seed int64) (DataFrame, error)
	// SampleWithReplacementAndSeed samples a data frame with/without replacement and given seed.
	SampleWithReplacementAndSeed(ctx context.Context, withReplacement bool, fraction float64, seed int64) (DataFrame, error)
	// Show uses WriteResult to write the data frames to the console output.
	Show(ctx context.Context, numRows int, truncate bool) error
	// Schema returns the schema for the current data frame.
	Schema(ctx context.Context) (*types.StructType, error)
	// Select projects a list of columns from the DataFrame
	Select(ctx context.Context, columns ...column.Convertible) (DataFrame, error)
	// SelectExpr projects a list of columns from the DataFrame by string expressions
	SelectExpr(ctx context.Context, exprs ...string) (DataFrame, error)
	// SemanticHash returns the semantic hash of the data frame. The semantic hash can be used to
	// understand of the semantic operations are similar.
	SemanticHash(ctx context.Context) (int32, error)
	// Sort returns a new DataFrame sorted by the specified columns.
	Sort(ctx context.Context, columns ...column.Convertible) (DataFrame, error)
	Stat() DataFrameStatFunctions
	// Subtract subtracts the other DataFrame from the current DataFrame. And only returns
	// distinct rows.
	Subtract(ctx context.Context, other DataFrame) DataFrame
	// Summary computes the specified statistics for the current DataFrame and returns it
	// as a new DataFrame. Available statistics are: "count", "mean", "stddev", "min", "max" and
	// arbitrary percentiles specified as a percentage (e.g., "75%"). If no statistics are given,
	// this function computes "count", "mean", "stddev", "min", "25%", "50%", "75%", "max".
	Summary(ctx context.Context, statistics ...string) DataFrame
	// Tail returns the last `limit` rows as a list of Row.
	Tail(ctx context.Context, limit int32) ([]types.Row, error)
	// Take is an alias for Limit
	Take(ctx context.Context, limit int32) ([]types.Row, error)
	// ToArrow returns the Arrow representation of the DataFrame.
	ToArrow(ctx context.Context) (*arrow.Table, error)
	// Union is an alias for UnionAll
	Union(ctx context.Context, other DataFrame) DataFrame
	// UnionAll returns a new DataFrame containing union of rows in this and another DataFrame.
	UnionAll(ctx context.Context, other DataFrame) DataFrame
	// UnionByName performs a SQL union operation on two dataframes but reorders the schema
	// according to the matching columns. If columns are missing, it will throw an eror.
	UnionByName(ctx context.Context, other DataFrame) DataFrame
	// UnionByNameWithMissingColumns performs a SQL union operation on two dataframes but reorders the schema
	// according to the matching columns. Missing columns are supported.
	UnionByNameWithMissingColumns(ctx context.Context, other DataFrame) DataFrame
	// Unpersist resets the storage level for this data frame, and if necessary removes it
	// from server-side caches.
	Unpersist(ctx context.Context) error
	// Unpivot a DataFrame from wide format to long format, optionally leaving
	// identifier columns set. This is the reverse to `groupBy(...).pivot(...).agg(...)`,
	// except for the aggregation, which cannot be reversed.
	//
	// This function is useful to massage a DataFrame into a format where some
	// columns are identifier columns ("ids"), while all other columns ("values")
	// are "unpivoted" to the rows, leaving just two non-id columns, named as given
	// by `variableColumnName` and `valueColumnName`.
	//
	// When no "id" columns are given, the unpivoted DataFrame consists of only the
	// "variable" and "value" columns.
	//
	// The `values` columns must not be empty so at least one value must be given to be unpivoted.
	// When `values` is `None`, all non-id columns will be unpivoted.
	//
	// All "value" columns must share a least common data type. Unless they are the same data type,
	// all "value" columns are cast to the nearest common data type. For instance, types
	// `IntegerType` and `LongType` are cast to `LongType`, while `IntegerType` and `StringType`
	// do not have a common data type and `unpivot` fails.
	Unpivot(ctx context.Context, ids []column.Convertible, values []column.Convertible,
		variableColumnName string, valueColumnName string) (DataFrame, error)
	// WithColumn returns a new DataFrame by adding a column or replacing the
	// existing column that has the same name. The column expression must be an
	// expression over this DataFrame; attempting to add a column from some other
	// DataFrame will raise an error.
	//
	// Note: This method introduces a projection internally. Therefore, calling it multiple
	// times, for instance, via loops in order to add multiple columns can generate big
	// plans which can cause performance issues and even `StackOverflowException`.
	// To avoid this, use :func:`select` with multiple columns at once.
	WithColumn(ctx context.Context, colName string, col column.Convertible) (DataFrame, error)
	WithColumns(ctx context.Context, alias ...column.Alias) (DataFrame, error)
	// WithColumnRenamed returns a new DataFrame by renaming an existing column.
	// This is a no-op if the schema doesn't contain the given column name.
	WithColumnRenamed(ctx context.Context, existingName, newName string) (DataFrame, error)
	// WithColumnsRenamed returns a new DataFrame by renaming multiple existing columns.
	WithColumnsRenamed(ctx context.Context, colsMap map[string]string) (DataFrame, error)
	// WithMetadata returns a new DataFrame with the specified metadata for each of the columns.
	WithMetadata(ctx context.Context, metadata map[string]string) (DataFrame, error)
	WithWatermark(ctx context.Context, eventTime string, delayThreshold string) (DataFrame, error)
	Where(ctx context.Context, condition string) (DataFrame, error)
	// Writer returns a data frame writer, which could be used to save data frame to supported storage.
	Writer() DataFrameWriter
	// Write is an alias for Writer
	// Deprecated: Use Writer
	Write() DataFrameWriter
	// WriteResult streams the data frames to a result collector
	WriteResult(ctx context.Context, collector ResultCollector, numRows int, truncate bool) error
}

DataFrame is a wrapper for data frame, representing a distributed collection of data row.

func NewDataFrame

func NewDataFrame(session *sparkSessionImpl, relation *proto.Relation) DataFrame

NewDataFrame creates a new DataFrame

type DataFrameNaFunctions

type DataFrameNaFunctions interface {
	Drop(ctx context.Context, cols ...string) (DataFrame, error)
	DropAll(ctx context.Context, cols ...string) (DataFrame, error)
	DropWithThreshold(ctx context.Context, threshold int32, cols ...string) (DataFrame, error)
	Fill(ctx context.Context, value types.PrimitiveTypeLiteral, cols ...string) (DataFrame, error)
	FillWithValues(ctx context.Context, values map[string]types.PrimitiveTypeLiteral) (DataFrame, error)
	Replace(ctx context.Context, toReplace []types.PrimitiveTypeLiteral,
		values []types.PrimitiveTypeLiteral, cols ...string) (DataFrame, error)
}

type DataFrameReader

type DataFrameReader interface {
	// Format specifies data format (data source type) for the underlying data, e.g. parquet.
	Format(source string) DataFrameReader
	// Load reads the underlying data and returns a data frame.
	Load(path string) (DataFrame, error)
	// Reads a table from the underlying data source.
	Table(name string) (DataFrame, error)
	Option(key, value string) DataFrameReader
}

DataFrameReader supports reading data from storage and returning a data frame. TODO needs to implement other methods like Option(), Schema(), and also "strong typed" reading (e.g. Parquet(), Orc(), Csv(), etc.

func NewDataframeReader

func NewDataframeReader(session *sparkSessionImpl) DataFrameReader

NewDataframeReader creates a new DataFrameReader

type DataFrameStatFunctions

type DataFrameStatFunctions interface {
	ApproxQuantile(ctx context.Context, probabilities []float64, relativeError float64, cols ...string) ([][]float64, error)
	Cov(ctx context.Context, col1, col2 string) (float64, error)
	Corr(ctx context.Context, col1, col2 string) (float64, error)
	CorrWithMethod(ctx context.Context, col1, col2 string, method string) (float64, error)
	CrossTab(ctx context.Context, col1, col2 string) DataFrame
	FreqItems(ctx context.Context, cols ...string) DataFrame
	FreqItemsWithSupport(ctx context.Context, support float64, cols ...string) DataFrame
	Sample(ctx context.Context, fraction float64) (DataFrame, error)
	SampleWithReplacement(ctx context.Context, withReplacement bool, fraction float64) (DataFrame, error)
	SampleWithSeed(ctx context.Context, fraction float64, seed int64) (DataFrame, error)
	SampleWithReplacementAndSeed(ctx context.Context, withReplacement bool, fraction float64, seed int64) (DataFrame, error)
}

type DataFrameWriter

type DataFrameWriter interface {
	// Mode specifies saving mode for the data, e.g. Append, Overwrite, ErrorIfExists.
	Mode(saveMode string) DataFrameWriter
	// Format specifies data format (data source type) for the underlying data, e.g. parquet.
	Format(source string) DataFrameWriter
	// Save writes data frame to the given path.
	Save(ctx context.Context, path string) error
}

DataFrameWriter supports writing data frame to storage.

type GroupedData

type GroupedData struct {
	// contains filtered or unexported fields
}

func (*GroupedData) Agg

func (gd *GroupedData) Agg(ctx context.Context, exprs ...column.Convertible) (DataFrame, error)

Agg compute aggregates and returns the result as a DataFrame. The aggegrate expressions are passed as column.Column arguments.

func (*GroupedData) Avg

func (gd *GroupedData) Avg(ctx context.Context, cols ...string) (DataFrame, error)

Avg Computes the avg value for each numeric column for each group.

func (*GroupedData) Count

func (gd *GroupedData) Count(ctx context.Context) (DataFrame, error)

Count Computes the count value for each group.

func (*GroupedData) Max

func (gd *GroupedData) Max(ctx context.Context, cols ...string) (DataFrame, error)

Max Computes the max value for each numeric column for each group.

func (*GroupedData) Mean

func (gd *GroupedData) Mean(ctx context.Context, cols ...string) (DataFrame, error)

Mean Computes the average value for each numeric column for each group.

func (*GroupedData) Min

func (gd *GroupedData) Min(ctx context.Context, cols ...string) (DataFrame, error)

Min Computes the min value for each numeric column for each group.

func (*GroupedData) Pivot

func (gd *GroupedData) Pivot(ctx context.Context, pivotCol string, pivotValues []types.LiteralType) (*GroupedData, error)

func (*GroupedData) Sum

func (gd *GroupedData) Sum(ctx context.Context, cols ...string) (DataFrame, error)

Sum Computes the sum value for each numeric column for each group.

type ResultCollector

type ResultCollector interface {
	// WriteRow receives a single row from the data frame
	WriteRow(values []any)
}

ResultCollector receives a stream of result rows

type SparkSession

type SparkSession interface {
	Read() DataFrameReader
	Sql(ctx context.Context, query string) (DataFrame, error)
	Stop() error
	Table(name string) (DataFrame, error)
	CreateDataFrameFromArrow(ctx context.Context, data arrow.Table) (DataFrame, error)
	CreateDataFrame(ctx context.Context, data [][]any, schema *types.StructType) (DataFrame, error)
	Config() client.RuntimeConfig
}

type SparkSessionBuilder

type SparkSessionBuilder struct {
	// contains filtered or unexported fields
}

func NewSessionBuilder

func NewSessionBuilder() *SparkSessionBuilder

NewSessionBuilder creates a new session builder for starting a new spark session

func (*SparkSessionBuilder) Build

func (*SparkSessionBuilder) Remote

func (s *SparkSessionBuilder) Remote(connectionString string) *SparkSessionBuilder

Remote sets the connection string for remote connection

func (*SparkSessionBuilder) WithChannelBuilder

func (s *SparkSessionBuilder) WithChannelBuilder(cb channel.Builder) *SparkSessionBuilder

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL