Documentation
¶
Overview ¶
Package bigquery implements a BigQuery SQL pushdown engine for the dataset library.
All computation stays in BigQuery. The engine creates lazy datasets that accumulate SelectedFields and RowRestriction on the Storage Read API. Complex operations (GROUP BY, JOIN, WINDOW) execute SQL Jobs that write to temporary BigQuery tables. Data only reaches local memory when Column().Values() is called.
Usage:
eng, _ := bigquery.NewEngine(ctx, "my-project")
defer eng.Close()
ds := eng.Table("analytics", "events")
result, _ := dataset.From(ds).
Select("region", "revenue").
Filter(dataset.Gt("revenue", 1000)).
Collect()
Index ¶
- func DtypeToBQSQL(dt dataset.DType) string
- type Engine
- func (e *Engine) Abs(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Acos(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) AddCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) AddScalar(col dataset.AnyColumn, val float64) (dataset.AnyColumn, error)
- func (e *Engine) Asin(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Atan(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Atan2(y, x dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) BitAnd(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) BitNot(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) BitOr(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) BitShiftLeft(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)
- func (e *Engine) BitShiftRight(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)
- func (e *Engine) BitXor(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Cast(col dataset.AnyColumn, target dataset.DType) (dataset.AnyColumn, error)
- func (e *Engine) Ceil(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Close() error
- func (e *Engine) Combine(datasets ...dataset.Table) (dataset.Table, error)
- func (e *Engine) Complete(ds dataset.Table, cols ...string) (dataset.Table, error)
- func (e *Engine) Concatenate(ds dataset.Table, col string, from []string, sep string) (dataset.Table, error)
- func (e *Engine) Cos(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Count(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) CumMax(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) CumMin(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) CumSum(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) DenseRank(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) DivCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) DropNA(ds dataset.Table, cols ...string) (dataset.Table, error)
- func (e *Engine) Erf(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Exp(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Fill(col dataset.AnyColumn, dir dataset.FillDirection) (dataset.AnyColumn, error)
- func (e *Engine) Filter(ds dataset.Table, mask dataset.Masker) (dataset.Table, error)
- func (e *Engine) FilterIndices(mask []bool) []int
- func (e *Engine) Floor(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) FromColumns(schema *dataset.Schema, cols ...dataset.AnyColumn) (dataset.Table, error)
- func (e *Engine) Join(left, right dataset.Table, spec dataset.JoinSpec) (dataset.Table, error)
- func (e *Engine) Lag(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)
- func (e *Engine) Lead(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)
- func (e *Engine) Ln(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Log2(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Log10(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Mean(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Median(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) MinMax(col dataset.AnyColumn) (dataset.AnyColumn, dataset.AnyColumn, error)
- func (e *Engine) MulCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) MulScalar(col dataset.AnyColumn, val float64) (dataset.AnyColumn, error)
- func (e *Engine) Name() string
- func (e *Engine) Neg(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) NewBoolColumn(name string, data []bool) dataset.AnyColumn
- func (e *Engine) NewBuilder(schema *dataset.Schema) dataset.Builder
- func (e *Engine) NewFloat64Column(name string, data []float64) dataset.AnyColumn
- func (e *Engine) NewInt64Column(name string, data []int64) dataset.AnyColumn
- func (e *Engine) NewStringColumn(name string, data []string) dataset.AnyColumn
- func (e *Engine) NewTimestampColumn(name string, data []int64) dataset.AnyColumn
- func (e *Engine) PercentRank(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) PivotLonger(ds dataset.Table, spec dataset.PivotLongerSpec) (dataset.Table, error)
- func (e *Engine) PivotWider(ds dataset.Table, spec dataset.PivotWiderSpec) (dataset.Table, error)
- func (e *Engine) Pow(col dataset.AnyColumn, exp float64) (dataset.AnyColumn, error)
- func (e *Engine) Rank(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) ReplaceNA(col dataset.AnyColumn, defaultVal float64) (dataset.AnyColumn, error)
- func (e *Engine) Round(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) RowNumber(n int) (dataset.AnyColumn, error)
- func (e *Engine) Select(col dataset.AnyColumn, indices []int) (dataset.AnyColumn, error)
- func (e *Engine) Separate(ds dataset.Table, col string, into []string, sep string) (dataset.Table, error)
- func (e *Engine) Sigmoid(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Sign(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Sin(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Slice(col dataset.AnyColumn, start, end int) (dataset.AnyColumn, error)
- func (e *Engine) SortIndices(col dataset.AnyColumn) ([]int, error)
- func (e *Engine) Sqrt(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Stack(datasets ...dataset.Table) (dataset.Table, error)
- func (e *Engine) SubCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Sum(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Table(datasetID, tableID string) (dataset.Table, error)
- func (e *Engine) Tan(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Tanh(col dataset.AnyColumn) (dataset.AnyColumn, error)
- func (e *Engine) Variance(col dataset.AnyColumn) (dataset.AnyColumn, error)
- type Option
- type Quota
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DtypeToBQSQL ¶
DtypeToBQSQL maps dataset.DType to BigQuery SQL type names.
Types ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the BigQuery SQL pushdown engine. It holds three GCP clients:
- bqClient: for SQL Jobs (GROUP BY, JOIN, etc.) and table metadata
- readClient: for Storage Read API (Arrow IPC download)
- writeClient: (Phase 4) for Storage Write API (managed writer upload)
func NewEngine ¶
NewEngine creates a BigQuery engine for the given GCP project. Uses Application Default Credentials unless overridden via opts.
func (*Engine) BitShiftLeft ¶
func (*Engine) BitShiftRight ¶
func (*Engine) Complete ¶
Complete generates all combinations of the given columns. SQL: CROSS JOIN of DISTINCT values for each column.
func (*Engine) Concatenate ¶
func (e *Engine) Concatenate(ds dataset.Table, col string, from []string, sep string) (dataset.Table, error)
Concatenate joins multiple string columns into one with a separator. SQL: CONCAT(col1, sep, col2, sep, col3) AS output
func (*Engine) Filter ¶
Filter applies a predicate as a RowRestriction on the bqDataset. No execution happens — just appends to the restriction string.
func (*Engine) FilterIndices ¶
FilterIndices evaluates a boolean mask to indices (local only).
func (*Engine) FromColumns ¶
func (e *Engine) FromColumns(schema *dataset.Schema, cols ...dataset.AnyColumn) (dataset.Table, error)
FromColumns builds a dataset from the given columns. If all columns are bqColumns from the same table, returns a new bqDataset with SelectedFields (column projection — zero-cost). Otherwise, delegates to the arrow engine for local data.
func (*Engine) NewBoolColumn ¶
NewBoolColumn creates a local bool column via the arrow engine.
func (*Engine) NewBuilder ¶
NewBuilder creates a bqBuilder that accumulates rows locally and writes them to BigQuery via the Storage Write API (managedwriter) on Build().
Usage:
b := eng.NewBuilder(schema)
b.Float64("x").Append(1.5)
b.String("name").Append("Alice")
ds, _ := b.Build() // streams to BQ, returns lazy bqDataset
func (*Engine) NewFloat64Column ¶
NewFloat64Column creates a local float64 column via the arrow engine.
func (*Engine) NewInt64Column ¶
NewInt64Column creates a local int64 column via the arrow engine.
func (*Engine) NewStringColumn ¶
NewStringColumn creates a local string column via the arrow engine.
func (*Engine) NewTimestampColumn ¶
NewTimestampColumn creates a local timestamp column via the arrow engine.
func (*Engine) PercentRank ¶
func (*Engine) PivotLonger ¶
PivotLonger gathers multiple columns into name+value pairs. SQL: UNPIVOT (value FOR name IN (col1, col2, ...))
func (*Engine) PivotWider ¶
PivotWider spreads a name column's values into new columns. SQL: SELECT id_cols, MAX(IF(name_col='val1', value_col, NULL)) AS val1, ... FROM source GROUP BY id_cols
func (*Engine) Select ¶
Select applies Take (row gather by indices). For bqColumns: materializes first, then downloads and operates locally. This is inherently non-lazy — indices come from local computation.
func (*Engine) Separate ¶
func (e *Engine) Separate(ds dataset.Table, col string, into []string, sep string) (dataset.Table, error)
Separate splits a string column into multiple columns by a separator. SQL: SPLIT(col, sep)[OFFSET(i)] AS into_i
func (*Engine) Slice ¶
Slice applies a range restriction on a column. For bqColumns: uses RowRestriction "col BETWEEN start AND end" — fully lazy.
func (*Engine) SortIndices ¶
SortIndices is not supported on remote data. Use Frame.Arrange which should generate ORDER BY SQL.
func (*Engine) Table ¶
Table returns a lazy dataset.Dataset pointing at a BigQuery table. No data is downloaded — only table metadata (schema + row count) is fetched.
type Option ¶
type Option func(*Engine)
Option configures the BigQuery engine.
func WithBQClient ¶
WithBQClient injects a pre-configured BigQuery client (e.g. for emulator tests).
func WithClientOptions ¶
func WithClientOptions(opts ...option.ClientOption) Option
WithClientOptions passes google.api.option.ClientOption to the GCP client constructors.
func WithMaxDownloadBytes ¶
WithMaxDownloadBytes sets the maximum download size in bytes.
func WithMaxDownloadRows ¶
WithMaxDownloadRows sets the maximum number of rows that can be downloaded.
func WithMaxQueryBytes ¶
WithMaxQueryBytes limits the maximum bytes processed by SQL Jobs.
func WithWarnDownloadRows ¶
WithWarnDownloadRows sets the row count threshold for download warnings.
type Quota ¶
type Quota struct {
// MaxDownloadRows limits how many rows can be pulled locally via Storage Read API.
// 0 = unlimited. Default: 1_000_000.
MaxDownloadRows int64
// MaxDownloadBytes limits download size in bytes. 0 = unlimited. Default: 1 GB.
MaxDownloadBytes int64
// WarnDownloadRows triggers a log warning above this threshold.
// Default: 100_000.
WarnDownloadRows int64
// DryRun if true, estimates cost before executing SQL Jobs.
// Logs estimated bytes processed. Does NOT execute.
DryRun bool
// MaxQueryBytes limits SQL Job size in bytes. 0 = unlimited.
// Jobs exceeding this are rejected before execution.
MaxQueryBytes int64
}
Quota controls download limits and billing guards.