bigquery

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2026 License: MIT Imports: 26 Imported by: 0

Documentation

Overview

Package bigquery implements a BigQuery SQL pushdown engine for the dataset library.

All computation stays in BigQuery. The engine creates lazy datasets that accumulate SelectedFields and RowRestriction on the Storage Read API. Complex operations (GROUP BY, JOIN, WINDOW) execute SQL Jobs that write to temporary BigQuery tables. Data only reaches local memory when Column().Values() is called.

Usage:

eng, _ := bigquery.NewEngine(ctx, "my-project")
defer eng.Close()

ds := eng.Table("analytics", "events")

result, _ := dataset.From(ds).
    Select("region", "revenue").
    Filter(dataset.Gt("revenue", 1000)).
    Collect()

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrUnsupportedType is returned for unsupported column types.
	ErrUnsupportedType = errors.New("bigquery: unsupported column type")

	// ErrUnsupportedOp is returned for unsupported operations.
	ErrUnsupportedOp = errors.New("bigquery: unsupported operation")

	// ErrSchemaMismatch is returned when schemas don't match.
	ErrSchemaMismatch = errors.New("bigquery: schema mismatch")

	// ErrNilDataset is returned when a nil dataset is provided.
	ErrNilDataset = errors.New("bigquery: nil dataset")

	// ErrEmptyDataset is returned when an empty dataset is provided.
	ErrEmptyDataset = errors.New("bigquery: empty dataset")

	// ErrLengthMismatch is returned when column lengths don't match.
	ErrLengthMismatch = errors.New("bigquery: column length mismatch")
)

Sentinel errors for the BigQuery engine package.

Functions

func DtypeToBQSQL

func DtypeToBQSQL(dt dataset.DType) string

DtypeToBQSQL maps dataset.DType to BigQuery SQL type names.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the BigQuery SQL pushdown engine. It holds three GCP clients:

  • bqClient: for SQL Jobs (GROUP BY, JOIN, etc.) and table metadata
  • readClient: for Storage Read API (Arrow IPC download)
  • writeClient: (Phase 4) for Storage Write API (managed writer upload)

func NewEngine

func NewEngine(ctx context.Context, projectID string, opts ...Option) (*Engine, error)

NewEngine creates a BigQuery engine for the given GCP project. Uses Application Default Credentials unless overridden via opts.

func (*Engine) Abs

func (e *Engine) Abs(col dataset.AnyColumn) (dataset.AnyColumn, error)

Abs returns the absolute value of each element.

func (*Engine) Acos

func (e *Engine) Acos(col dataset.AnyColumn) (dataset.AnyColumn, error)

Acos returns the arc cosine of each element.

func (*Engine) AddCols

func (e *Engine) AddCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

AddCols returns the element-wise sum of two columns.

func (*Engine) AddScalar

func (e *Engine) AddScalar(col dataset.AnyColumn, val float64) (dataset.AnyColumn, error)

AddScalar adds a scalar value to every element.

func (*Engine) Asin

func (e *Engine) Asin(col dataset.AnyColumn) (dataset.AnyColumn, error)

Asin returns the arc sine of each element.

func (*Engine) Atan

func (e *Engine) Atan(col dataset.AnyColumn) (dataset.AnyColumn, error)

Atan returns the arc tangent of each element.

func (*Engine) Atan2

func (e *Engine) Atan2(y, x dataset.AnyColumn) (dataset.AnyColumn, error)

Atan2 returns the two-argument arc tangent of y/x.

func (*Engine) BitAnd

func (e *Engine) BitAnd(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

BitAnd returns the bitwise AND of two columns.

func (*Engine) BitNot

func (e *Engine) BitNot(col dataset.AnyColumn) (dataset.AnyColumn, error)

BitNot returns the bitwise NOT of each element.

func (*Engine) BitOr

func (e *Engine) BitOr(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

BitOr returns the bitwise OR of two columns.

func (*Engine) BitShiftLeft

func (e *Engine) BitShiftLeft(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)

BitShiftLeft shifts each element left by n bits.

func (*Engine) BitShiftRight

func (e *Engine) BitShiftRight(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)

BitShiftRight shifts each element right by n bits.

func (*Engine) BitXor

func (e *Engine) BitXor(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

BitXor returns the bitwise XOR of two columns.

func (*Engine) Boxplot added in v0.0.5

func (e *Engine) Boxplot(yCol, groupCol dataset.AnyColumn, whisker string, notch bool) (dataset.Table, error)

Boxplot computes the five-number summary for a numeric column, optionally grouped by a categorical column. Returns a lazy bqDataset — APPROX_QUANTILES, whisker, and notch computation all run server-side in a single SQL query.

func (*Engine) Cast

func (e *Engine) Cast(col dataset.AnyColumn, target dataset.DType) (dataset.AnyColumn, error)

Cast converts a column to the target DType via SQL CAST.

func (*Engine) Ceil

func (e *Engine) Ceil(col dataset.AnyColumn) (dataset.AnyColumn, error)

Ceil rounds each element up to the nearest integer.

func (*Engine) Close

func (e *Engine) Close() error

Close releases all clients and cleans up temporary tables.

func (*Engine) Combine

func (e *Engine) Combine(datasets ...dataset.Table) (dataset.Table, error)

Combine horizontally concatenates datasets (downloads then delegates).

func (*Engine) Complete

func (e *Engine) Complete(ds dataset.Table, cols ...string) (dataset.Table, error)

Complete generates all combinations of the given columns. SQL: CROSS JOIN of DISTINCT values for each column.

func (*Engine) Concatenate

func (e *Engine) Concatenate(ds dataset.Table, col string, from []string, sep string) (dataset.Table, error)

Concatenate joins multiple string columns into one with a separator. SQL: CONCAT(col1, sep, col2, sep, col3) AS output

func (*Engine) Context

func (e *Engine) Context() context.Context

Context returns the engine's lifecycle context.

func (*Engine) Cos

func (e *Engine) Cos(col dataset.AnyColumn) (dataset.AnyColumn, error)

Cos returns the cosine of each element (radians).

func (*Engine) Count

func (e *Engine) Count(col dataset.AnyColumn) (dataset.AnyColumn, error)

Count returns a single-element int64 column with the SQL COUNT.

func (*Engine) CumMax

func (e *Engine) CumMax(col dataset.AnyColumn) (dataset.AnyColumn, error)

CumMax returns the cumulative maximum (SQL MAX OVER window).

func (*Engine) CumMin

func (e *Engine) CumMin(col dataset.AnyColumn) (dataset.AnyColumn, error)

CumMin returns the cumulative minimum (SQL MIN OVER window).

func (*Engine) CumSum

func (e *Engine) CumSum(col dataset.AnyColumn) (dataset.AnyColumn, error)

CumSum returns the cumulative sum (SQL SUM OVER window).

func (*Engine) DenseRank

func (e *Engine) DenseRank(col dataset.AnyColumn) (dataset.AnyColumn, error)

DenseRank returns the dense rank (SQL DENSE_RANK OVER ORDER BY).

func (*Engine) DivCols

func (e *Engine) DivCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

DivCols returns the element-wise quotient of two columns.

func (*Engine) DropNA

func (e *Engine) DropNA(ds dataset.Table, cols ...string) (dataset.Table, error)

DropNA returns a dataset with rows filtered by IS NOT NULL.

func (*Engine) Erf

func (e *Engine) Erf(col dataset.AnyColumn) (dataset.AnyColumn, error)

Erf returns the error function of each element.

func (*Engine) Exp

func (e *Engine) Exp(col dataset.AnyColumn) (dataset.AnyColumn, error)

Exp returns e raised to each element.

func (*Engine) Fill

Fill forward- or backward-fills null values (downloads then delegates).

func (*Engine) Filter

func (e *Engine) Filter(ds dataset.Table, mask dataset.Masker) (dataset.Table, error)

Filter applies a predicate as a RowRestriction on the bqDataset. No execution happens — just appends to the restriction string.

func (*Engine) FilterIndices

func (e *Engine) FilterIndices(mask []bool) []int

FilterIndices evaluates a boolean mask to indices (local only).

func (*Engine) First added in v0.0.5

func (e *Engine) First(col dataset.AnyColumn) (dataset.AnyColumn, error)

First returns the first element of a column. For BQ columns, generates a SELECT with LIMIT 1 (no ORDER BY — natural order).

func (*Engine) Floor

func (e *Engine) Floor(col dataset.AnyColumn) (dataset.AnyColumn, error)

Floor rounds each element down to the nearest integer.

func (*Engine) FromColumns

func (e *Engine) FromColumns(schema *dataset.Schema, cols ...dataset.AnyColumn) (dataset.Table, error)

FromColumns builds a dataset from the given columns. If all columns are bqColumns from the same table, returns a new bqDataset with SelectedFields (column projection — zero-cost). Otherwise, delegates to the arrow engine for local data.

func (*Engine) Histogram added in v0.0.5

func (e *Engine) Histogram(col dataset.AnyColumn, nBins int) (dataset.Table, error)

Histogram bins a numeric column into equal-width bins. Returns a lazy bqDataset — computation stays in BigQuery.

func (*Engine) Join

func (e *Engine) Join(left, right dataset.Table, spec dataset.JoinSpec) (dataset.Table, error)

Join creates a lazy SQL JOIN between two BigQuery datasets.

func (*Engine) KDE added in v0.0.5

func (e *Engine) KDE(_ context.Context, col dataset.AnyColumn, bandwidth float64, points int) (dataset.Table, error)

KDE computes kernel density estimation over a numeric column. Returns a lazy bqDataset — grid generation and Gaussian kernel evaluation all run server-side via CROSS JOIN.

func (*Engine) Lag

func (e *Engine) Lag(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)

Lag shifts column values down by n positions (SQL LAG).

func (*Engine) Last added in v0.0.5

func (e *Engine) Last(col dataset.AnyColumn) (dataset.AnyColumn, error)

Last returns the last element of a column. For BQ columns, generates a subquery that reverses row order.

func (*Engine) Lead

func (e *Engine) Lead(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)

Lead shifts column values up by n positions (SQL LEAD).

func (*Engine) LinearFit added in v0.0.5

func (e *Engine) LinearFit(xCol, yCol dataset.AnyColumn, nOut int) (dataset.Table, error)

LinearFit computes OLS linear regression y = a + b*x. Returns a lazy bqDataset — coefficients and grid all computed server-side.

func (*Engine) LinearFitSE added in v0.0.5

func (e *Engine) LinearFitSE(xCol, yCol dataset.AnyColumn, nOut int) (dataset.Table, error)

LinearFitSE computes OLS regression with 95% confidence bands. Coefficients and residual SE all computed server-side in BigQuery.

func (*Engine) Ln

Ln returns the natural logarithm of each element.

func (*Engine) LoessFit added in v0.0.5

func (e *Engine) LoessFit(_ context.Context, xCol, yCol dataset.AnyColumn, nOut int) (dataset.Table, error)

LoessFit computes locally weighted regression (LOESS) entirely server-side. Uses VECTOR_SEARCH for k-NN neighbor lookup and SQL aggregation for tri-cube weighted least squares — no data download.

func (*Engine) LoessFitSE added in v0.0.5

func (e *Engine) LoessFitSE(_ context.Context, xCol, yCol dataset.AnyColumn, nOut int) (dataset.Table, error)

LoessFitSE computes LOESS with approximate 95% confidence bands, entirely server-side via VECTOR_SEARCH + SQL aggregation.

func (*Engine) Log2

func (e *Engine) Log2(col dataset.AnyColumn) (dataset.AnyColumn, error)

Log2 returns the base-2 logarithm of each element.

func (*Engine) Log10

func (e *Engine) Log10(col dataset.AnyColumn) (dataset.AnyColumn, error)

Log10 returns the base-10 logarithm of each element.

func (*Engine) Mean

func (e *Engine) Mean(col dataset.AnyColumn) (dataset.AnyColumn, error)

Mean returns a single-element column containing the SQL AVG.

func (*Engine) Median

func (e *Engine) Median(col dataset.AnyColumn) (dataset.AnyColumn, error)

Median returns the approximate median via BQ APPROX_QUANTILES.

func (*Engine) MinMax

MinMax returns two single-element columns containing SQL MIN and MAX.

func (*Engine) Mode added in v0.0.5

func (e *Engine) Mode(col dataset.AnyColumn) (dataset.AnyColumn, error)

Mode returns the most frequent value via BQ APPROX_TOP_COUNT.

func (*Engine) MulCols

func (e *Engine) MulCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

MulCols returns the element-wise product of two columns.

func (*Engine) MulScalar

func (e *Engine) MulScalar(col dataset.AnyColumn, val float64) (dataset.AnyColumn, error)

MulScalar multiplies every element by a scalar value.

func (*Engine) Name

func (e *Engine) Name() string

Name returns "bigquery".

func (*Engine) Neg

func (e *Engine) Neg(col dataset.AnyColumn) (dataset.AnyColumn, error)

Neg returns the negation of each element.

func (*Engine) NewBoolColumn

func (e *Engine) NewBoolColumn(name string, data []bool) dataset.AnyColumn

NewBoolColumn creates a local bool column via the arrow engine.

func (*Engine) NewBuilder

func (e *Engine) NewBuilder(schema *dataset.Schema) dataset.Builder

NewBuilder creates a bqBuilder that accumulates rows locally and writes them to BigQuery via the Storage Write API (managedwriter) on Build().

Usage:

b := eng.NewBuilder(schema)
b.Float64("x").Append(1.5)
b.String("name").Append("Alice")
ds, _ := b.Build() // streams to BQ, returns lazy bqDataset

func (*Engine) NewFloat64Column

func (e *Engine) NewFloat64Column(name string, data []float64) dataset.AnyColumn

NewFloat64Column creates a local float64 column via the arrow engine.

func (*Engine) NewInt64Column

func (e *Engine) NewInt64Column(name string, data []int64) dataset.AnyColumn

NewInt64Column creates a local int64 column via the arrow engine.

func (*Engine) NewStringColumn

func (e *Engine) NewStringColumn(name string, data []string) dataset.AnyColumn

NewStringColumn creates a local string column via the arrow engine.

func (*Engine) NewTimestampColumn

func (e *Engine) NewTimestampColumn(name string, data []int64) dataset.AnyColumn

NewTimestampColumn creates a local timestamp column via the arrow engine.

func (*Engine) PercentRank

func (e *Engine) PercentRank(col dataset.AnyColumn) (dataset.AnyColumn, error)

PercentRank returns the percent rank (SQL PERCENT_RANK).

func (*Engine) Percentile added in v0.0.5

func (e *Engine) Percentile(col dataset.AnyColumn, p float64) (dataset.AnyColumn, error)

Percentile returns the p-th quantile via BQ PERCENTILE_CONT.

func (*Engine) PivotLonger

func (e *Engine) PivotLonger(ds dataset.Table, spec dataset.PivotLongerSpec) (dataset.Table, error)

PivotLonger gathers multiple columns into name+value pairs. SQL: UNPIVOT (value FOR name IN (col1, col2, ...))

func (*Engine) PivotWider

func (e *Engine) PivotWider(ds dataset.Table, spec dataset.PivotWiderSpec) (dataset.Table, error)

PivotWider spreads a name column's values into new columns. SQL: SELECT id_cols, MAX(IF(name_col='val1', value_col, NULL)) AS val1, ... FROM source GROUP BY id_cols

func (*Engine) Pow

func (e *Engine) Pow(col dataset.AnyColumn, exp float64) (dataset.AnyColumn, error)

Pow raises each element to the given exponent.

func (*Engine) Rank

func (e *Engine) Rank(col dataset.AnyColumn) (dataset.AnyColumn, error)

Rank returns the 1-based rank (SQL RANK OVER ORDER BY).

func (*Engine) ReplaceNA

func (e *Engine) ReplaceNA(col dataset.AnyColumn, defaultVal float64) (dataset.AnyColumn, error)

ReplaceNA replaces null values with a default via SQL COALESCE.

func (*Engine) Round

func (e *Engine) Round(col dataset.AnyColumn) (dataset.AnyColumn, error)

Round rounds each element to the nearest integer.

func (*Engine) RowNumber

func (e *Engine) RowNumber(n int) (dataset.AnyColumn, error)

RowNumber returns a 1-based sequential row-number column (delegates locally).

func (*Engine) Select

func (e *Engine) Select(col dataset.AnyColumn, indices []int) (dataset.AnyColumn, error)

Select applies Take (row gather by indices). For bqColumns: materializes first, then downloads and operates locally. This is inherently non-lazy — indices come from local computation.

func (*Engine) Separate

func (e *Engine) Separate(ds dataset.Table, col string, into []string, sep string) (dataset.Table, error)

Separate splits a string column into multiple columns by a separator. SQL: SPLIT(col, sep)[OFFSET(i)] AS into_i

func (*Engine) Sigmoid

func (e *Engine) Sigmoid(col dataset.AnyColumn) (dataset.AnyColumn, error)

Sigmoid returns the logistic sigmoid of each element.

func (*Engine) Sign

func (e *Engine) Sign(col dataset.AnyColumn) (dataset.AnyColumn, error)

Sign returns the sign (-1, 0, or 1) of each element.

func (*Engine) Sin

func (e *Engine) Sin(col dataset.AnyColumn) (dataset.AnyColumn, error)

Sin returns the sine of each element (radians).

func (*Engine) Slice

func (e *Engine) Slice(col dataset.AnyColumn, start, end int) (dataset.AnyColumn, error)

Slice applies a range restriction on a column. For bqColumns: uses RowRestriction "col BETWEEN start AND end" — fully lazy.

func (*Engine) SortIndices

func (e *Engine) SortIndices(_ dataset.AnyColumn) ([]int, error)

SortIndices is not supported on remote data. Use Frame.Arrange which should generate ORDER BY SQL.

func (*Engine) Sqrt

func (e *Engine) Sqrt(col dataset.AnyColumn) (dataset.AnyColumn, error)

Sqrt returns the square root of each element.

func (*Engine) Stack

func (e *Engine) Stack(datasets ...dataset.Table) (dataset.Table, error)

Stack vertically concatenates BigQuery datasets via UNION ALL.

func (*Engine) StdDev added in v0.0.5

func (e *Engine) StdDev(col dataset.AnyColumn) (dataset.AnyColumn, error)

StdDev returns the sample standard deviation via SQL STDDEV_SAMP.

func (*Engine) SubCols

func (e *Engine) SubCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

SubCols returns the element-wise difference of two columns.

func (*Engine) Sum

func (e *Engine) Sum(col dataset.AnyColumn) (dataset.AnyColumn, error)

Sum returns a single-element column containing the SQL SUM.

func (*Engine) Table

func (e *Engine) Table(datasetID, tableID string) (dataset.Table, error)

Table returns a lazy dataset.Dataset pointing at a BigQuery table. No data is downloaded — only table metadata (schema + row count) is fetched.

func (*Engine) Tan

func (e *Engine) Tan(col dataset.AnyColumn) (dataset.AnyColumn, error)

Tan returns the tangent of each element (radians).

func (*Engine) Tanh

func (e *Engine) Tanh(col dataset.AnyColumn) (dataset.AnyColumn, error)

Tanh returns the hyperbolic tangent of each element.

func (*Engine) Variance

func (e *Engine) Variance(col dataset.AnyColumn) (dataset.AnyColumn, error)

Variance returns a single-element column containing the SQL VARIANCE.

type Option

type Option func(*Engine)

Option configures the BigQuery engine.

func WithBQClient

func WithBQClient(c *bigquery.Client) Option

WithBQClient injects a pre-configured BigQuery client (e.g. for emulator tests).

func WithClientOptions

func WithClientOptions(opts ...option.ClientOption) Option

WithClientOptions passes google.api.option.ClientOption to the GCP client constructors.

func WithDryRun

func WithDryRun(v bool) Option

WithDryRun enables dry-run mode for SQL Jobs.

func WithMaxDownloadBytes

func WithMaxDownloadBytes(n int64) Option

WithMaxDownloadBytes sets the maximum download size in bytes.

func WithMaxDownloadRows

func WithMaxDownloadRows(n int64) Option

WithMaxDownloadRows sets the maximum number of rows that can be downloaded.

func WithMaxQueryBytes

func WithMaxQueryBytes(n int64) Option

WithMaxQueryBytes limits the maximum bytes processed by SQL Jobs.

func WithWarnDownloadRows

func WithWarnDownloadRows(n int64) Option

WithWarnDownloadRows sets the row count threshold for download warnings.

type Quota

type Quota struct {
	// MaxDownloadRows limits how many rows can be pulled locally via Storage Read API.
	// 0 = unlimited. Default: 1_000_000.
	MaxDownloadRows int64

	// MaxDownloadBytes limits download size in bytes. 0 = unlimited. Default: 1 GB.
	MaxDownloadBytes int64

	// WarnDownloadRows triggers a log warning above this threshold.
	// Default: 100_000.
	WarnDownloadRows int64

	// DryRun if true, estimates cost before executing SQL Jobs.
	// Logs estimated bytes processed. Does NOT execute.
	DryRun bool

	// MaxQueryBytes limits SQL Job size in bytes. 0 = unlimited.
	// Jobs exceeding this are rejected before execution.
	MaxQueryBytes int64
}

Quota controls download limits and billing guards.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL