Documentation
¶
Index ¶
- type DataFrame
- type DataFrameNaFunctions
- type DataFrameReader
- type DataFrameStatFunctions
- type DataFrameWriter
- type GroupedData
- func (gd *GroupedData) Agg(ctx context.Context, exprs ...column.Convertible) (DataFrame, error)
- func (gd *GroupedData) Avg(ctx context.Context, cols ...string) (DataFrame, error)
- func (gd *GroupedData) Count(ctx context.Context) (DataFrame, error)
- func (gd *GroupedData) Max(ctx context.Context, cols ...string) (DataFrame, error)
- func (gd *GroupedData) Mean(ctx context.Context, cols ...string) (DataFrame, error)
- func (gd *GroupedData) Min(ctx context.Context, cols ...string) (DataFrame, error)
- func (gd *GroupedData) Pivot(ctx context.Context, pivotCol string, pivotValues []types.LiteralType) (*GroupedData, error)
- func (gd *GroupedData) Sum(ctx context.Context, cols ...string) (DataFrame, error)
- type ResultCollector
- type SparkSession
- type SparkSessionBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataFrame ¶
type DataFrame interface {
// PlanId returns the plan id of the data frame.
PlanId() int64
All(ctx context.Context) iter.Seq2[types.Row, error]
Agg(ctx context.Context, exprs ...column.Convertible) (DataFrame, error)
AggWithMap(ctx context.Context, exprs map[string]string) (DataFrame, error)
// Alias creates a new DataFrame with the specified subquery alias
Alias(ctx context.Context, alias string) DataFrame
ApproxQuantile(ctx context.Context, probabilities []float64, relativeError float64, cols ...string) ([][]float64, error)
// Cache persists the DataFrame with the default storage level.
Cache(ctx context.Context) error
// Coalesce returns a new DataFrame that has exactly numPartitions partitions.DataFrame
//
// Similar to coalesce defined on an :class:`RDD`, this operation results in a
// narrow dependency, e.g. if you go from 1000 partitions to 100 partitions,
// there will not be a shuffle, instead each of the 100 new partitions will
// claim 10 of the current partitions. If a larger number of partitions is requested,
// it will stay at the current number of partitions.
//
// However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
// this may result in your computation taking place on fewer nodes than
// you like (e.g. one node in the case of numPartitions = 1). To avoid this,
// you can call repartition(). This will add a shuffle step, but means the
// current upstream partitions will be executed in parallel (per whatever
// the current partitioning is).
Coalesce(ctx context.Context, numPartitions int) DataFrame
// Columns returns the list of column names of the DataFrame.
Columns(ctx context.Context) ([]string, error)
// Corr calculates the correlation of two columns of a :class:`DataFrame` as a double value.
// Currently only supports the Pearson Correlation Coefficient.
Corr(ctx context.Context, col1, col2 string) (float64, error)
CorrWithMethod(ctx context.Context, col1, col2 string, method string) (float64, error)
// Count returns the number of rows in the DataFrame.
Count(ctx context.Context) (int64, error)
// Cov calculates the sample covariance for the given columns, specified by their names, as a
// double value.
Cov(ctx context.Context, col1, col2 string) (float64, error)
// Collect returns the data rows of the current data frame.
Collect(ctx context.Context) ([]types.Row, error)
// CreateTempView creates or replaces a temporary view.
CreateTempView(ctx context.Context, viewName string, replace, global bool) error
// CreateOrReplaceTempView creates or replaces a temporary view and replaces the optional existing view.
CreateOrReplaceTempView(ctx context.Context, viewName string) error
// CreateGlobalTempView creates a global temporary view.
CreateGlobalTempView(ctx context.Context, viewName string) error
// CreateOrReplaceGlobalTempView creates or replaces a global temporary view and replaces the optional existing view.
CreateOrReplaceGlobalTempView(ctx context.Context, viewName string) error
// CrossJoin joins the current DataFrame with another DataFrame using the cross product
CrossJoin(ctx context.Context, other DataFrame) DataFrame
// CrossTab computes a pair-wise frequency table of the given columns. Also known as a
// contingency table.
// The first column of each row will be the distinct values of `col1` and the column names
// will be the distinct values of `col2`. The name of the first column will be `$col1_$col2`.
// Pairs that have no occurrences will have zero as their counts.
CrossTab(ctx context.Context, col1, col2 string) DataFrame
// Cube creates a multi-dimensional cube for the current DataFrame using
// the specified columns, so we can run aggregations on them.
Cube(ctx context.Context, cols ...column.Convertible) *GroupedData
// Describe omputes basic statistics for numeric and string columns.
// This includes count, mean, stddev, min, and max.
Describe(ctx context.Context, cols ...string) DataFrame
// Distinct returns a new DataFrame containing the distinct rows in this DataFrame.
Distinct(ctx context.Context) DataFrame
// Drop returns a new DataFrame that drops the specified list of columns.
Drop(ctx context.Context, columns ...column.Convertible) (DataFrame, error)
// DropByName returns a new DataFrame that drops the specified list of columns by name.
DropByName(ctx context.Context, columns ...string) (DataFrame, error)
// DropDuplicates returns a new DataFrame that contains only the unique rows from this DataFrame.
DropDuplicates(ctx context.Context, columns ...string) (DataFrame, error)
// Drops all rows containing any null or NaN values. This is similar to PySparks dropna with how=any
DropNa(ctx context.Context, cols ...string) (DataFrame, error)
// Drops all rows containing all null or NaN values in the specified columns. This is
// similar to PySparks dropna with how=all
DropNaAll(ctx context.Context, cols ...string) (DataFrame, error)
// Drops all rows containing null or NaN values in the specified columns. with a max threshold.
DropNaWithThreshold(ctx context.Context, threshold int32, cols ...string) (DataFrame, error)
// ExceptAll is similar to Substract but does not perform the distinct operation.
ExceptAll(ctx context.Context, other DataFrame) DataFrame
// Explain returns the string explain plan for the current DataFrame according to the explainMode.
Explain(ctx context.Context, explainMode utils.ExplainMode) (string, error)
// FillNa replaces null values with specified value.
FillNa(ctx context.Context, value types.PrimitiveTypeLiteral, columns ...string) (DataFrame, error)
// FillNaWithValues replaces null values in specified columns (key of the map) with values.
FillNaWithValues(ctx context.Context, values map[string]types.PrimitiveTypeLiteral) (DataFrame, error)
// Filter filters the data frame by a column condition.
Filter(ctx context.Context, condition column.Convertible) (DataFrame, error)
// FilterByString filters the data frame by a string condition.
FilterByString(ctx context.Context, condition string) (DataFrame, error)
// Returns the first row of the DataFrame.
First(ctx context.Context) (types.Row, error)
FreqItems(ctx context.Context, cols ...string) DataFrame
FreqItemsWithSupport(ctx context.Context, support float64, cols ...string) DataFrame
// GetStorageLevel returns the storage level of the data frame.
GetStorageLevel(ctx context.Context) (*utils.StorageLevel, error)
// GroupBy groups the DataFrame by the spcified columns so that the aggregation
// can be performed on them. See GroupedData for all the available aggregate functions.
GroupBy(cols ...column.Convertible) *GroupedData
// Head is an alias for Limit
Head(ctx context.Context, limit int32) ([]types.Row, error)
// Intersect performs the set intersection of two data frames and only returns distinct rows.
Intersect(ctx context.Context, other DataFrame) DataFrame
// IntersectAll performs the set intersection of two data frames and returns all rows.
IntersectAll(ctx context.Context, other DataFrame) DataFrame
// IsEmpty returns true if the DataFrame is empty.
IsEmpty(ctx context.Context) (bool, error)
// Join joins the current DataFrame with another DataFrame using the specified column using the joinType specified.
Join(ctx context.Context, other DataFrame, on column.Convertible, joinType utils.JoinType) (DataFrame, error)
// Limit applies a limit on the DataFrame
Limit(ctx context.Context, limit int32) DataFrame
// Melt is an alias for Unpivot.
Melt(ctx context.Context, ids []column.Convertible, values []column.Convertible,
variableColumnName string, valueColumnName string) (DataFrame, error)
Na() DataFrameNaFunctions
// Offset returns a new DataFrame by skipping the first `offset` rows.
Offset(ctx context.Context, offset int32) DataFrame
// OrderBy is an alias for Sort
OrderBy(ctx context.Context, columns ...column.Convertible) (DataFrame, error)
Persist(ctx context.Context, storageLevel utils.StorageLevel) error
RandomSplit(ctx context.Context, weights []float64) ([]DataFrame, error)
// Repartition re-partitions a data frame.
Repartition(ctx context.Context, numPartitions int, columns []string) (DataFrame, error)
// RepartitionByRange re-partitions a data frame by range partition.
RepartitionByRange(ctx context.Context, numPartitions int, columns ...column.Convertible) (DataFrame, error)
// Replace Returns a new DataFrame` replacing a value with another value.
// Values toReplace and Values must have the same type and can only be numerics, booleans,
// or strings. Value can have None. When replacing, the new value will be cast
// to the type of the existing column.
//
// For numeric replacements all values to be replaced should have unique
// floating point representation. If cols is set allows to specify a subset of columns to
// perform the replacement.
Replace(ctx context.Context, toReplace []types.PrimitiveTypeLiteral,
values []types.PrimitiveTypeLiteral, cols ...string) (DataFrame, error)
// Rollup creates a multi-dimensional rollup for the current DataFrame using
// the specified columns, so we can run aggregation on them.
Rollup(ctx context.Context, cols ...column.Convertible) *GroupedData
// SameSemantics returns true if the other DataFrame has the same semantics.
SameSemantics(ctx context.Context, other DataFrame) (bool, error)
// Sample samples a data frame without replacement and random seed.
Sample(ctx context.Context, fraction float64) (DataFrame, error)
// SampleWithReplacement samples a data frame with random seed and with/without replacement.
SampleWithReplacement(ctx context.Context, withReplacement bool, fraction float64) (DataFrame, error)
// SampleWithSeed samples a data frame without replacement and given seed.
SampleWithSeed(ctx context.Context, fraction float64, seed int64) (DataFrame, error)
// SampleWithReplacementAndSeed samples a data frame with/without replacement and given seed.
SampleWithReplacementAndSeed(ctx context.Context, withReplacement bool, fraction float64, seed int64) (DataFrame, error)
// Show uses WriteResult to write the data frames to the console output.
Show(ctx context.Context, numRows int, truncate bool) error
// Schema returns the schema for the current data frame.
Schema(ctx context.Context) (*types.StructType, error)
// Select projects a list of columns from the DataFrame
Select(ctx context.Context, columns ...column.Convertible) (DataFrame, error)
// SelectExpr projects a list of columns from the DataFrame by string expressions
SelectExpr(ctx context.Context, exprs ...string) (DataFrame, error)
// SemanticHash returns the semantic hash of the data frame. The semantic hash can be used to
// understand of the semantic operations are similar.
SemanticHash(ctx context.Context) (int32, error)
// Sort returns a new DataFrame sorted by the specified columns.
Sort(ctx context.Context, columns ...column.Convertible) (DataFrame, error)
Stat() DataFrameStatFunctions
// Subtract subtracts the other DataFrame from the current DataFrame. And only returns
// distinct rows.
Subtract(ctx context.Context, other DataFrame) DataFrame
// Summary computes the specified statistics for the current DataFrame and returns it
// as a new DataFrame. Available statistics are: "count", "mean", "stddev", "min", "max" and
// arbitrary percentiles specified as a percentage (e.g., "75%"). If no statistics are given,
// this function computes "count", "mean", "stddev", "min", "25%", "50%", "75%", "max".
Summary(ctx context.Context, statistics ...string) DataFrame
// Tail returns the last `limit` rows as a list of Row.
Tail(ctx context.Context, limit int32) ([]types.Row, error)
// Take is an alias for Limit
Take(ctx context.Context, limit int32) ([]types.Row, error)
// ToArrow returns the Arrow representation of the DataFrame.
ToArrow(ctx context.Context) (*arrow.Table, error)
// Union is an alias for UnionAll
Union(ctx context.Context, other DataFrame) DataFrame
// UnionAll returns a new DataFrame containing union of rows in this and another DataFrame.
UnionAll(ctx context.Context, other DataFrame) DataFrame
// UnionByName performs a SQL union operation on two dataframes but reorders the schema
// according to the matching columns. If columns are missing, it will throw an eror.
UnionByName(ctx context.Context, other DataFrame) DataFrame
// UnionByNameWithMissingColumns performs a SQL union operation on two dataframes but reorders the schema
// according to the matching columns. Missing columns are supported.
UnionByNameWithMissingColumns(ctx context.Context, other DataFrame) DataFrame
// Unpersist resets the storage level for this data frame, and if necessary removes it
// from server-side caches.
Unpersist(ctx context.Context) error
// Unpivot a DataFrame from wide format to long format, optionally leaving
// identifier columns set. This is the reverse to `groupBy(...).pivot(...).agg(...)`,
// except for the aggregation, which cannot be reversed.
//
// This function is useful to massage a DataFrame into a format where some
// columns are identifier columns ("ids"), while all other columns ("values")
// are "unpivoted" to the rows, leaving just two non-id columns, named as given
// by `variableColumnName` and `valueColumnName`.
//
// When no "id" columns are given, the unpivoted DataFrame consists of only the
// "variable" and "value" columns.
//
// The `values` columns must not be empty so at least one value must be given to be unpivoted.
// When `values` is `None`, all non-id columns will be unpivoted.
//
// All "value" columns must share a least common data type. Unless they are the same data type,
// all "value" columns are cast to the nearest common data type. For instance, types
// `IntegerType` and `LongType` are cast to `LongType`, while `IntegerType` and `StringType`
// do not have a common data type and `unpivot` fails.
Unpivot(ctx context.Context, ids []column.Convertible, values []column.Convertible,
variableColumnName string, valueColumnName string) (DataFrame, error)
// WithColumn returns a new DataFrame by adding a column or replacing the
// existing column that has the same name. The column expression must be an
// expression over this DataFrame; attempting to add a column from some other
// DataFrame will raise an error.
//
// Note: This method introduces a projection internally. Therefore, calling it multiple
// times, for instance, via loops in order to add multiple columns can generate big
// plans which can cause performance issues and even `StackOverflowException`.
// To avoid this, use :func:`select` with multiple columns at once.
WithColumn(ctx context.Context, colName string, col column.Convertible) (DataFrame, error)
WithColumns(ctx context.Context, alias ...column.Alias) (DataFrame, error)
// WithColumnRenamed returns a new DataFrame by renaming an existing column.
// This is a no-op if the schema doesn't contain the given column name.
WithColumnRenamed(ctx context.Context, existingName, newName string) (DataFrame, error)
// WithColumnsRenamed returns a new DataFrame by renaming multiple existing columns.
WithColumnsRenamed(ctx context.Context, colsMap map[string]string) (DataFrame, error)
// WithMetadata returns a new DataFrame with the specified metadata for each of the columns.
WithMetadata(ctx context.Context, metadata map[string]string) (DataFrame, error)
WithWatermark(ctx context.Context, eventTime string, delayThreshold string) (DataFrame, error)
Where(ctx context.Context, condition string) (DataFrame, error)
// Writer returns a data frame writer, which could be used to save data frame to supported storage.
Writer() DataFrameWriter
// Write is an alias for Writer
// Deprecated: Use Writer
Write() DataFrameWriter
// WriteResult streams the data frames to a result collector
WriteResult(ctx context.Context, collector ResultCollector, numRows int, truncate bool) error
}
DataFrame is a wrapper for data frame, representing a distributed collection of data row.
func NewDataFrame ¶
NewDataFrame creates a new DataFrame
type DataFrameNaFunctions ¶
type DataFrameNaFunctions interface {
Drop(ctx context.Context, cols ...string) (DataFrame, error)
DropAll(ctx context.Context, cols ...string) (DataFrame, error)
DropWithThreshold(ctx context.Context, threshold int32, cols ...string) (DataFrame, error)
Fill(ctx context.Context, value types.PrimitiveTypeLiteral, cols ...string) (DataFrame, error)
FillWithValues(ctx context.Context, values map[string]types.PrimitiveTypeLiteral) (DataFrame, error)
Replace(ctx context.Context, toReplace []types.PrimitiveTypeLiteral,
values []types.PrimitiveTypeLiteral, cols ...string) (DataFrame, error)
}
type DataFrameReader ¶
type DataFrameReader interface {
// Format specifies data format (data source type) for the underlying data, e.g. parquet.
Format(source string) DataFrameReader
// Load reads the underlying data and returns a data frame.
Load(path string) (DataFrame, error)
// Reads a table from the underlying data source.
Table(name string) (DataFrame, error)
Option(key, value string) DataFrameReader
}
DataFrameReader supports reading data from storage and returning a data frame. TODO needs to implement other methods like Option(), Schema(), and also "strong typed" reading (e.g. Parquet(), Orc(), Csv(), etc.
func NewDataframeReader ¶
func NewDataframeReader(session *sparkSessionImpl) DataFrameReader
NewDataframeReader creates a new DataFrameReader
type DataFrameStatFunctions ¶
type DataFrameStatFunctions interface {
ApproxQuantile(ctx context.Context, probabilities []float64, relativeError float64, cols ...string) ([][]float64, error)
Cov(ctx context.Context, col1, col2 string) (float64, error)
Corr(ctx context.Context, col1, col2 string) (float64, error)
CorrWithMethod(ctx context.Context, col1, col2 string, method string) (float64, error)
CrossTab(ctx context.Context, col1, col2 string) DataFrame
FreqItems(ctx context.Context, cols ...string) DataFrame
FreqItemsWithSupport(ctx context.Context, support float64, cols ...string) DataFrame
Sample(ctx context.Context, fraction float64) (DataFrame, error)
SampleWithReplacement(ctx context.Context, withReplacement bool, fraction float64) (DataFrame, error)
SampleWithSeed(ctx context.Context, fraction float64, seed int64) (DataFrame, error)
SampleWithReplacementAndSeed(ctx context.Context, withReplacement bool, fraction float64, seed int64) (DataFrame, error)
}
type DataFrameWriter ¶
type DataFrameWriter interface {
// Mode specifies saving mode for the data, e.g. Append, Overwrite, ErrorIfExists.
Mode(saveMode string) DataFrameWriter
// Format specifies data format (data source type) for the underlying data, e.g. parquet.
Format(source string) DataFrameWriter
// Save writes data frame to the given path.
Save(ctx context.Context, path string) error
}
DataFrameWriter supports writing data frame to storage.
type GroupedData ¶
type GroupedData struct {
// contains filtered or unexported fields
}
func (*GroupedData) Agg ¶
func (gd *GroupedData) Agg(ctx context.Context, exprs ...column.Convertible) (DataFrame, error)
Agg compute aggregates and returns the result as a DataFrame. The aggegrate expressions are passed as column.Column arguments.
func (*GroupedData) Count ¶
func (gd *GroupedData) Count(ctx context.Context) (DataFrame, error)
Count Computes the count value for each group.
func (*GroupedData) Pivot ¶
func (gd *GroupedData) Pivot(ctx context.Context, pivotCol string, pivotValues []types.LiteralType) (*GroupedData, error)
type ResultCollector ¶
type ResultCollector interface {
// WriteRow receives a single row from the data frame
WriteRow(values []any)
}
ResultCollector receives a stream of result rows
type SparkSession ¶
type SparkSession interface {
Read() DataFrameReader
Sql(ctx context.Context, query string) (DataFrame, error)
Stop() error
Table(name string) (DataFrame, error)
CreateDataFrameFromArrow(ctx context.Context, data arrow.Table) (DataFrame, error)
CreateDataFrame(ctx context.Context, data [][]any, schema *types.StructType) (DataFrame, error)
Config() client.RuntimeConfig
}
type SparkSessionBuilder ¶
type SparkSessionBuilder struct {
// contains filtered or unexported fields
}
func NewSessionBuilder ¶
func NewSessionBuilder() *SparkSessionBuilder
NewSessionBuilder creates a new session builder for starting a new spark session
func (*SparkSessionBuilder) Build ¶
func (s *SparkSessionBuilder) Build(ctx context.Context) (SparkSession, error)
func (*SparkSessionBuilder) Remote ¶
func (s *SparkSessionBuilder) Remote(connectionString string) *SparkSessionBuilder
Remote sets the connection string for remote connection
func (*SparkSessionBuilder) WithChannelBuilder ¶
func (s *SparkSessionBuilder) WithChannelBuilder(cb channel.Builder) *SparkSessionBuilder