bigquery

package
v0.0.0-...-370038a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Overview

Package bigquery implements a BigQuery SQL pushdown engine for the dataset library.

All computation stays in BigQuery. The engine creates lazy datasets that accumulate SelectedFields and RowRestriction on the Storage Read API. Complex operations (GROUP BY, JOIN, WINDOW) execute SQL Jobs that write to temporary BigQuery tables. Data only reaches local memory when Column().Values() is called.

Usage:

eng, _ := bigquery.NewEngine(ctx, "my-project")
defer eng.Close()

ds := eng.Table("analytics", "events")

result, _ := dataset.From(ds).
    Select("region", "revenue").
    Filter(dataset.Gt("revenue", 1000)).
    Collect()

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DtypeToBQSQL

func DtypeToBQSQL(dt dataset.DType) string

DtypeToBQSQL maps dataset.DType to BigQuery SQL type names.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the BigQuery SQL pushdown engine. It holds three GCP clients:

  • bqClient: for SQL Jobs (GROUP BY, JOIN, etc.) and table metadata
  • readClient: for Storage Read API (Arrow IPC download)
  • writeClient: (Phase 4) for Storage Write API (managed writer upload)

func NewEngine

func NewEngine(ctx context.Context, projectID string, opts ...Option) (*Engine, error)

NewEngine creates a BigQuery engine for the given GCP project. Uses Application Default Credentials unless overridden via opts.

func (*Engine) Abs

func (e *Engine) Abs(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Acos

func (e *Engine) Acos(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) AddCols

func (e *Engine) AddCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) AddScalar

func (e *Engine) AddScalar(col dataset.AnyColumn, val float64) (dataset.AnyColumn, error)

func (*Engine) Asin

func (e *Engine) Asin(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Atan

func (e *Engine) Atan(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Atan2

func (e *Engine) Atan2(y, x dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) BitAnd

func (e *Engine) BitAnd(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) BitNot

func (e *Engine) BitNot(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) BitOr

func (e *Engine) BitOr(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) BitShiftLeft

func (e *Engine) BitShiftLeft(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)

func (*Engine) BitShiftRight

func (e *Engine) BitShiftRight(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)

func (*Engine) BitXor

func (e *Engine) BitXor(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Cast

func (e *Engine) Cast(col dataset.AnyColumn, target dataset.DType) (dataset.AnyColumn, error)

func (*Engine) Ceil

func (e *Engine) Ceil(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Close

func (e *Engine) Close() error

Close releases all clients and cleans up temporary tables.

func (*Engine) Combine

func (e *Engine) Combine(datasets ...dataset.Table) (dataset.Table, error)

func (*Engine) Complete

func (e *Engine) Complete(ds dataset.Table, cols ...string) (dataset.Table, error)

Complete generates all combinations of the given columns. SQL: CROSS JOIN of DISTINCT values for each column.

func (*Engine) Concatenate

func (e *Engine) Concatenate(ds dataset.Table, col string, from []string, sep string) (dataset.Table, error)

Concatenate joins multiple string columns into one with a separator. SQL: CONCAT(col1, sep, col2, sep, col3) AS output

func (*Engine) Cos

func (e *Engine) Cos(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Count

func (e *Engine) Count(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) CumMax

func (e *Engine) CumMax(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) CumMin

func (e *Engine) CumMin(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) CumSum

func (e *Engine) CumSum(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) DenseRank

func (e *Engine) DenseRank(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) DivCols

func (e *Engine) DivCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) DropNA

func (e *Engine) DropNA(ds dataset.Table, cols ...string) (dataset.Table, error)

func (*Engine) Erf

func (e *Engine) Erf(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Exp

func (e *Engine) Exp(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Fill

func (*Engine) Filter

func (e *Engine) Filter(ds dataset.Table, mask dataset.Masker) (dataset.Table, error)

Filter applies a predicate as a RowRestriction on the bqDataset. No execution happens — just appends to the restriction string.

func (*Engine) FilterIndices

func (e *Engine) FilterIndices(mask []bool) []int

FilterIndices evaluates a boolean mask to indices (local only).

func (*Engine) Floor

func (e *Engine) Floor(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) FromColumns

func (e *Engine) FromColumns(schema *dataset.Schema, cols ...dataset.AnyColumn) (dataset.Table, error)

FromColumns builds a dataset from the given columns. If all columns are bqColumns from the same table, returns a new bqDataset with SelectedFields (column projection — zero-cost). Otherwise, delegates to the arrow engine for local data.

func (*Engine) Join

func (e *Engine) Join(left, right dataset.Table, spec dataset.JoinSpec) (dataset.Table, error)

func (*Engine) Lag

func (e *Engine) Lag(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)

func (*Engine) Lead

func (e *Engine) Lead(col dataset.AnyColumn, n int) (dataset.AnyColumn, error)

func (*Engine) Ln

func (*Engine) Log2

func (e *Engine) Log2(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Log10

func (e *Engine) Log10(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Mean

func (e *Engine) Mean(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Median

func (e *Engine) Median(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) MinMax

func (*Engine) MulCols

func (e *Engine) MulCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) MulScalar

func (e *Engine) MulScalar(col dataset.AnyColumn, val float64) (dataset.AnyColumn, error)

func (*Engine) Name

func (e *Engine) Name() string

Name returns "bigquery".

func (*Engine) Neg

func (e *Engine) Neg(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) NewBoolColumn

func (e *Engine) NewBoolColumn(name string, data []bool) dataset.AnyColumn

NewBoolColumn creates a local bool column via the arrow engine.

func (*Engine) NewBuilder

func (e *Engine) NewBuilder(schema *dataset.Schema) dataset.Builder

NewBuilder creates a bqBuilder that accumulates rows locally and writes them to BigQuery via the Storage Write API (managedwriter) on Build().

Usage:

b := eng.NewBuilder(schema)
b.Float64("x").Append(1.5)
b.String("name").Append("Alice")
ds, _ := b.Build() // streams to BQ, returns lazy bqDataset

func (*Engine) NewFloat64Column

func (e *Engine) NewFloat64Column(name string, data []float64) dataset.AnyColumn

NewFloat64Column creates a local float64 column via the arrow engine.

func (*Engine) NewInt64Column

func (e *Engine) NewInt64Column(name string, data []int64) dataset.AnyColumn

NewInt64Column creates a local int64 column via the arrow engine.

func (*Engine) NewStringColumn

func (e *Engine) NewStringColumn(name string, data []string) dataset.AnyColumn

NewStringColumn creates a local string column via the arrow engine.

func (*Engine) NewTimestampColumn

func (e *Engine) NewTimestampColumn(name string, data []int64) dataset.AnyColumn

NewTimestampColumn creates a local timestamp column via the arrow engine.

func (*Engine) PercentRank

func (e *Engine) PercentRank(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) PivotLonger

func (e *Engine) PivotLonger(ds dataset.Table, spec dataset.PivotLongerSpec) (dataset.Table, error)

PivotLonger gathers multiple columns into name+value pairs. SQL: UNPIVOT (value FOR name IN (col1, col2, ...))

func (*Engine) PivotWider

func (e *Engine) PivotWider(ds dataset.Table, spec dataset.PivotWiderSpec) (dataset.Table, error)

PivotWider spreads a name column's values into new columns. SQL: SELECT id_cols, MAX(IF(name_col='val1', value_col, NULL)) AS val1, ... FROM source GROUP BY id_cols

func (*Engine) Pow

func (e *Engine) Pow(col dataset.AnyColumn, exp float64) (dataset.AnyColumn, error)

func (*Engine) Rank

func (e *Engine) Rank(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) ReplaceNA

func (e *Engine) ReplaceNA(col dataset.AnyColumn, defaultVal float64) (dataset.AnyColumn, error)

func (*Engine) Round

func (e *Engine) Round(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) RowNumber

func (e *Engine) RowNumber(n int) (dataset.AnyColumn, error)

func (*Engine) Select

func (e *Engine) Select(col dataset.AnyColumn, indices []int) (dataset.AnyColumn, error)

Select applies Take (row gather by indices). For bqColumns: materializes first, then downloads and operates locally. This is inherently non-lazy — indices come from local computation.

func (*Engine) Separate

func (e *Engine) Separate(ds dataset.Table, col string, into []string, sep string) (dataset.Table, error)

Separate splits a string column into multiple columns by a separator. SQL: SPLIT(col, sep)[OFFSET(i)] AS into_i

func (*Engine) Sigmoid

func (e *Engine) Sigmoid(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Sign

func (e *Engine) Sign(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Sin

func (e *Engine) Sin(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Slice

func (e *Engine) Slice(col dataset.AnyColumn, start, end int) (dataset.AnyColumn, error)

Slice applies a range restriction on a column. For bqColumns: uses RowRestriction "col BETWEEN start AND end" — fully lazy.

func (*Engine) SortIndices

func (e *Engine) SortIndices(col dataset.AnyColumn) ([]int, error)

SortIndices is not supported on remote data. Use Frame.Arrange which should generate ORDER BY SQL.

func (*Engine) Sqrt

func (e *Engine) Sqrt(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Stack

func (e *Engine) Stack(datasets ...dataset.Table) (dataset.Table, error)

func (*Engine) SubCols

func (e *Engine) SubCols(a, b dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Sum

func (e *Engine) Sum(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Table

func (e *Engine) Table(datasetID, tableID string) (dataset.Table, error)

Table returns a lazy dataset.Dataset pointing at a BigQuery table. No data is downloaded — only table metadata (schema + row count) is fetched.

func (*Engine) Tan

func (e *Engine) Tan(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Tanh

func (e *Engine) Tanh(col dataset.AnyColumn) (dataset.AnyColumn, error)

func (*Engine) Variance

func (e *Engine) Variance(col dataset.AnyColumn) (dataset.AnyColumn, error)

type Option

type Option func(*Engine)

Option configures the BigQuery engine.

func WithBQClient

func WithBQClient(c *bigquery.Client) Option

WithBQClient injects a pre-configured BigQuery client (e.g. for emulator tests).

func WithClientOptions

func WithClientOptions(opts ...option.ClientOption) Option

WithClientOptions passes google.api.option.ClientOption to the GCP client constructors.

func WithDryRun

func WithDryRun(v bool) Option

WithDryRun enables dry-run mode for SQL Jobs.

func WithMaxDownloadBytes

func WithMaxDownloadBytes(n int64) Option

WithMaxDownloadBytes sets the maximum download size in bytes.

func WithMaxDownloadRows

func WithMaxDownloadRows(n int64) Option

WithMaxDownloadRows sets the maximum number of rows that can be downloaded.

func WithMaxQueryBytes

func WithMaxQueryBytes(n int64) Option

WithMaxQueryBytes limits the maximum bytes processed by SQL Jobs.

func WithWarnDownloadRows

func WithWarnDownloadRows(n int64) Option

WithWarnDownloadRows sets the row count threshold for download warnings.

type Quota

type Quota struct {
	// MaxDownloadRows limits how many rows can be pulled locally via Storage Read API.
	// 0 = unlimited. Default: 1_000_000.
	MaxDownloadRows int64

	// MaxDownloadBytes limits download size in bytes. 0 = unlimited. Default: 1 GB.
	MaxDownloadBytes int64

	// WarnDownloadRows triggers a log warning above this threshold.
	// Default: 100_000.
	WarnDownloadRows int64

	// DryRun if true, estimates cost before executing SQL Jobs.
	// Logs estimated bytes processed. Does NOT execute.
	DryRun bool

	// MaxQueryBytes limits SQL Job size in bytes. 0 = unlimited.
	// Jobs exceeding this are rejected before execution.
	MaxQueryBytes int64
}

Quota controls download limits and billing guards.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL