Documentation
¶
Overview ¶
Package bigquery implements a BigQuery SQL pushdown engine for the dataset library.
All computation stays in BigQuery. The engine creates lazy datasets that accumulate SelectedFields and RowRestriction on the Storage Read API. Complex operations (GROUP BY, JOIN, WINDOW) execute SQL Jobs that write to temporary BigQuery tables. Data only reaches local memory when Column().Values() is called.
Usage:
eng, _ := bigquery.NewEngine(ctx, "my-project")
defer eng.Close()
ds := eng.Table("analytics", "events")
result, _ := dataset.From(ds).
Select("region", "revenue").
Filter(dataset.Gt("revenue", 1000)).
Collect()
Index ¶
- Variables
- func DtypeToBQSQL(dt dataset.DType) string
- type Engine
- func (e *Engine) Abs(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Acos(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) AddCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) AddScalar(col dataset.AnyColumn, val float64) (dataset.AnyColumn, error)
- func (e *Engine) Asin(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Atan(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Atan2(y, x dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) BitAnd(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) BitNot(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) BitOr(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) BitShiftLeft(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)
- func (e *Engine) BitShiftRight(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)
- func (e *Engine) BitXor(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Boxplot(yCol, groupCol dataset.AnyColumn, whisker string, notch bool) (dataset.Table, error)
- func (e *Engine) Cast(col dataset.AnyColumn, target dataset.DType) (dataset.AnyColumn, error)
- func (e *Engine) Ceil(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Close() error
- func (e *Engine) Combine(datasets ...dataset.Table) (dataset.Table, error)
- func (e *Engine) Complete(ds dataset.Table, cols ...string) (dataset.Table, error)
- func (e *Engine) Concatenate(ds dataset.Table, col string, from []string, sep string) (dataset.Table, error)
- func (e *Engine) Context() context.Context
- func (e *Engine) Cos(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Count(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) CumMax(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) CumMin(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) CumSum(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) DenseRank(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) DivCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) DropNA(ds dataset.Table, cols ...string) (dataset.Table, error)
- func (e *Engine) Erf(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Exp(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Fill(col dataset.AnyColumn, dir dataset.FillDirection) (dataset.AnyColumn, error)
- func (e *Engine) Filter(ds dataset.Table, mask dataset.Masker) (dataset.Table, error)
- func (e *Engine) FilterIndices(mask []bool) []int
- func (e *Engine) First(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Floor(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) FromColumns(schema *dataset.Schema, cols ...dataset.AnyColumn) (dataset.Table, error)
- func (e *Engine) Histogram(col dataset.AnyColumn, nBins int) (dataset.Table, error)
- func (e *Engine) Join(left, right dataset.Table, spec dataset.JoinSpec) (dataset.Table, error)
- func (e *Engine) KDE(_ context.Context, col dataset.AnyColumn, bandwidth float64, points int) (dataset.Table, error)
- func (e *Engine) Lag(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)
- func (e *Engine) Last(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Lead(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)
- func (e *Engine) LinearFit(xCol, yCol dataset.AnyColumn, nOut int) (dataset.Table, error)
- func (e *Engine) LinearFitSE(xCol, yCol dataset.AnyColumn, nOut int) (dataset.Table, error)
- func (e *Engine) Ln(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) LoessFit(_ context.Context, xCol, yCol dataset.AnyColumn, nOut int) (dataset.Table, error)
- func (e *Engine) LoessFitSE(_ context.Context, xCol, yCol dataset.AnyColumn, nOut int) (dataset.Table, error)
- func (e *Engine) Log2(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Log10(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Mean(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Median(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) MinMax(col dataset.AnyColumn) (dataset.AnyColumn, dataset.AnyColumn, error)
- func (e *Engine) Mode(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) MulCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) MulScalar(col dataset.AnyColumn, val float64) (dataset.AnyColumn, error)
- func (e *Engine) Name() string
- func (e *Engine) Neg(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) NewBoolColumn(name string, data []bool) dataset.AnyColumn
- func (e *Engine) NewBuilder(schema *dataset.Schema) dataset.Builder
- func (e *Engine) NewFloat64Column(name string, data []float64) dataset.AnyColumn
- func (e *Engine) NewInt64Column(name string, data []int64) dataset.AnyColumn
- func (e *Engine) NewStringColumn(name string, data []string) dataset.AnyColumn
- func (e *Engine) NewTimestampColumn(name string, data []int64) dataset.AnyColumn
- func (e *Engine) PercentRank(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Percentile(col dataset.AnyColumn, p float64) (dataset.AnyColumn, error)
- func (e *Engine) PivotLonger(ds dataset.Table, spec dataset.PivotLongerSpec) (dataset.Table, error)
- func (e *Engine) PivotWider(ds dataset.Table, spec dataset.PivotWiderSpec) (dataset.Table, error)
- func (e *Engine) Pow(col dataset.AnyColumn, exp float64) (dataset.AnyColumn, error)
- func (e *Engine) Rank(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) ReplaceNA(col dataset.AnyColumn, defaultVal float64) (dataset.AnyColumn, error)
- func (e *Engine) Round(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) RowNumber(n int) (dataset.AnyColumn, error)
- func (e *Engine) Select(col dataset.AnyColumn, indices []int) (dataset.AnyColumn, error)
- func (e *Engine) Separate(ds dataset.Table, col string, into []string, sep string) (dataset.Table, error)
- func (e *Engine) Sigmoid(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Sign(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Sin(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Slice(col dataset.AnyColumn, start, end int) (dataset.AnyColumn, error)
- func (e *Engine) SortIndices(_ dataset.AnyColumn) ([]int, error)
- func (e *Engine) Sqrt(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Stack(datasets ...dataset.Table) (dataset.Table, error)
- func (e *Engine) StdDev(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) SubCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Sum(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Table(datasetID, tableID string) (dataset.Table, error)
- func (e *Engine) Tan(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Tanh(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Variance(col dataset.AnyColumn) (dataset.AnyColumn, error)
- type Option
- type Quota
Constants ¶
This section is empty.
Variables ¶
var ( // ErrUnsupportedType is returned for unsupported column types. ErrUnsupportedType = errors.New("bigquery: unsupported column type") // ErrUnsupportedOp is returned for unsupported operations. ErrUnsupportedOp = errors.New("bigquery: unsupported operation") // ErrSchemaMismatch is returned when schemas don't match. ErrSchemaMismatch = errors.New("bigquery: schema mismatch") // ErrNilDataset is returned when a nil dataset is provided. ErrNilDataset = errors.New("bigquery: nil dataset") // ErrEmptyDataset is returned when an empty dataset is provided. ErrEmptyDataset = errors.New("bigquery: empty dataset") // ErrLengthMismatch is returned when column lengths don't match. ErrLengthMismatch = errors.New("bigquery: column length mismatch") )
Sentinel errors for the BigQuery engine package.
Functions ¶
func DtypeToBQSQL ¶
DtypeToBQSQL maps dataset.DType to BigQuery SQL type names.
Types ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the BigQuery SQL pushdown engine. It holds three GCP clients:
- bqClient: for SQL Jobs (GROUP BY, JOIN, etc.) and table metadata
- readClient: for Storage Read API (Arrow IPC download)
- writeClient: (Phase 4) for Storage Write API (managed writer upload)
func NewEngine ¶
NewEngine creates a BigQuery engine for the given GCP project. Uses Application Default Credentials unless overridden via opts.
func (*Engine) BitShiftLeft ¶
BitShiftLeft shifts each element left by n bits.
func (*Engine) BitShiftRight ¶
BitShiftRight shifts each element right by n bits.
func (*Engine) Boxplot ¶ added in v0.0.5
func (e *Engine) Boxplot(yCol, groupCol dataset.AnyColumn, whisker string, notch bool) (dataset.Table, error)
Boxplot computes the five-number summary for a numeric column, optionally grouped by a categorical column. Returns a lazy bqDataset — APPROX_QUANTILES, whisker, and notch computation all run server-side in a single SQL query.
func (*Engine) Complete ¶
Complete generates all combinations of the given columns. SQL: CROSS JOIN of DISTINCT values for each column.
func (*Engine) Concatenate ¶
func (e *Engine) Concatenate(ds dataset.Table, col string, from []string, sep string) (dataset.Table, error)
Concatenate joins multiple string columns into one with a separator. SQL: CONCAT(col1, sep, col2, sep, col3) AS output
func (*Engine) Filter ¶
Filter applies a predicate as a RowRestriction on the bqDataset. No execution happens — just appends to the restriction string.
func (*Engine) FilterIndices ¶
FilterIndices evaluates a boolean mask to indices (local only).
func (*Engine) First ¶ added in v0.0.5
First returns the first element of a column. For BQ columns, generates a SELECT with LIMIT 1 (no ORDER BY — natural order).
func (*Engine) FromColumns ¶
func (e *Engine) FromColumns(schema *dataset.Schema, cols ...dataset.AnyColumn) (dataset.Table, error)
FromColumns builds a dataset from the given columns. If all columns are bqColumns from the same table, returns a new bqDataset with SelectedFields (column projection — zero-cost). Otherwise, delegates to the arrow engine for local data.
func (*Engine) Histogram ¶ added in v0.0.5
Histogram bins a numeric column into equal-width bins. Returns a lazy bqDataset — computation stays in BigQuery.
func (*Engine) KDE ¶ added in v0.0.5
func (e *Engine) KDE(_ context.Context, col dataset.AnyColumn, bandwidth float64, points int) (dataset.Table, error)
KDE computes kernel density estimation over a numeric column. Returns a lazy bqDataset — grid generation and Gaussian kernel evaluation all run server-side via CROSS JOIN.
func (*Engine) Last ¶ added in v0.0.5
Last returns the last element of a column. For BQ columns, generates a subquery that reverses row order.
func (*Engine) LinearFit ¶ added in v0.0.5
LinearFit computes OLS linear regression y = a + b*x. Returns a lazy bqDataset — coefficients and grid all computed server-side.
func (*Engine) LinearFitSE ¶ added in v0.0.5
LinearFitSE computes OLS regression with 95% confidence bands. Coefficients and residual SE all computed server-side in BigQuery.
func (*Engine) LoessFit ¶ added in v0.0.5
func (e *Engine) LoessFit(_ context.Context, xCol, yCol dataset.AnyColumn, nOut int) (dataset.Table, error)
LoessFit computes locally weighted regression (LOESS) entirely server-side. Uses VECTOR_SEARCH for k-NN neighbor lookup and SQL aggregation for tri-cube weighted least squares — no data download.
func (*Engine) LoessFitSE ¶ added in v0.0.5
func (e *Engine) LoessFitSE(_ context.Context, xCol, yCol dataset.AnyColumn, nOut int) (dataset.Table, error)
LoessFitSE computes LOESS with approximate 95% confidence bands, entirely server-side via VECTOR_SEARCH + SQL aggregation.
func (*Engine) NewBoolColumn ¶
NewBoolColumn creates a local bool column via the arrow engine.
func (*Engine) NewBuilder ¶
NewBuilder creates a bqBuilder that accumulates rows locally and writes them to BigQuery via the Storage Write API (managedwriter) on Build().
Usage:
b := eng.NewBuilder(schema)
b.Float64("x").Append(1.5)
b.String("name").Append("Alice")
ds, _ := b.Build() // streams to BQ, returns lazy bqDataset
func (*Engine) NewFloat64Column ¶
NewFloat64Column creates a local float64 column via the arrow engine.
func (*Engine) NewInt64Column ¶
NewInt64Column creates a local int64 column via the arrow engine.
func (*Engine) NewStringColumn ¶
NewStringColumn creates a local string column via the arrow engine.
func (*Engine) NewTimestampColumn ¶
NewTimestampColumn creates a local timestamp column via the arrow engine.
func (*Engine) PercentRank ¶
PercentRank returns the percent rank (SQL PERCENT_RANK).
func (*Engine) Percentile ¶ added in v0.0.5
Percentile returns the p-th quantile via BQ PERCENTILE_CONT.
func (*Engine) PivotLonger ¶
PivotLonger gathers multiple columns into name+value pairs. SQL: UNPIVOT (value FOR name IN (col1, col2, ...))
func (*Engine) PivotWider ¶
PivotWider spreads a name column's values into new columns. SQL: SELECT id_cols, MAX(IF(name_col='val1', value_col, NULL)) AS val1, ... FROM source GROUP BY id_cols
func (*Engine) RowNumber ¶
RowNumber returns a 1-based sequential row-number column (delegates locally).
func (*Engine) Select ¶
Select applies Take (row gather by indices). For bqColumns: materializes first, then downloads and operates locally. This is inherently non-lazy — indices come from local computation.
func (*Engine) Separate ¶
func (e *Engine) Separate(ds dataset.Table, col string, into []string, sep string) (dataset.Table, error)
Separate splits a string column into multiple columns by a separator. SQL: SPLIT(col, sep)[OFFSET(i)] AS into_i
func (*Engine) Slice ¶
Slice applies a range restriction on a column. For bqColumns: uses RowRestriction "col BETWEEN start AND end" — fully lazy.
func (*Engine) SortIndices ¶
SortIndices is not supported on remote data. Use Frame.Arrange which should generate ORDER BY SQL.
func (*Engine) StdDev ¶ added in v0.0.5
StdDev returns the sample standard deviation via SQL STDDEV_SAMP.
func (*Engine) Table ¶
Table returns a lazy dataset.Dataset pointing at a BigQuery table. No data is downloaded — only table metadata (schema + row count) is fetched.
type Option ¶
type Option func(*Engine)
Option configures the BigQuery engine.
func WithBQClient ¶
WithBQClient injects a pre-configured BigQuery client (e.g. for emulator tests).
func WithClientOptions ¶
func WithClientOptions(opts ...option.ClientOption) Option
WithClientOptions passes google.api.option.ClientOption to the GCP client constructors.
func WithMaxDownloadBytes ¶
WithMaxDownloadBytes sets the maximum download size in bytes.
func WithMaxDownloadRows ¶
WithMaxDownloadRows sets the maximum number of rows that can be downloaded.
func WithMaxQueryBytes ¶
WithMaxQueryBytes limits the maximum bytes processed by SQL Jobs.
func WithWarnDownloadRows ¶
WithWarnDownloadRows sets the row count threshold for download warnings.
type Quota ¶
type Quota struct {
// MaxDownloadRows limits how many rows can be pulled locally via Storage Read API.
// 0 = unlimited. Default: 1_000_000.
MaxDownloadRows int64
// MaxDownloadBytes limits download size in bytes. 0 = unlimited. Default: 1 GB.
MaxDownloadBytes int64
// WarnDownloadRows triggers a log warning above this threshold.
// Default: 100_000.
WarnDownloadRows int64
// DryRun if true, estimates cost before executing SQL Jobs.
// Logs estimated bytes processed. Does NOT execute.
DryRun bool
// MaxQueryBytes limits SQL Job size in bytes. 0 = unlimited.
// Jobs exceeding this are rejected before execution.
MaxQueryBytes int64
}
Quota controls download limits and billing guards.