iceberg

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: AGPL-3.0 Imports: 35 Imported by: 0

Documentation

Overview

Package iceberg provides utilities for interacting with Iceberg catalogs.

Index

Constants

View Source
const (
	TPCDSGCSBucket = "beam-tpcds"
	TPCDSGCSPrefix = "datasets/parquet/nonpartitioned"
)

TPC-DS data source constants for GCS.

Variables

This section is empty.

Functions

func BenchmarkDataSchema

func BenchmarkDataSchema() *iceberg.Schema

BenchmarkDataSchema returns the Iceberg schema for benchmark data files. This matches the Arrow schema in BenchmarkSchema() in parquet.go.

func BenchmarkSchema

func BenchmarkSchema() *arrow.Schema

BenchmarkSchema returns the Arrow schema for benchmark data files. This schema is simple and avoids decimal types to ensure compatibility.

func DeleteWarehouse

func DeleteWarehouse(ctx context.Context, cfg CatalogConfig) error

DeleteWarehouse deletes a warehouse. For external catalogs like Polaris, this is a no-op.

func EnsureNamespace

func EnsureNamespace(ctx context.Context, cat catalog.Catalog, namespace string) error

EnsureNamespace creates the namespace if it doesn't exist.

func EnsureWarehouse

func EnsureWarehouse(ctx context.Context, cfg CatalogConfig) error

EnsureWarehouse creates the warehouse if it doesn't exist. This uses the MinIO AIStor Tables API directly since the standard Iceberg REST catalog API doesn't support warehouse creation. For external catalogs like Polaris, this is a no-op (catalog must be created externally).

func GetCachedTPCDSFiles

func GetCachedTPCDSFiles(cfg TPCDSConfig) ([]string, error)

GetCachedTPCDSFiles returns locally cached TPC-DS files if they exist.

func IsAlreadyExists

func IsAlreadyExists(err error) bool

func IsConflictError

func IsConflictError(err error) bool

IsConflictError checks if an error is a commit conflict that can be retried.

func LoadOrCreateTable

func LoadOrCreateTable(ctx context.Context, cat catalog.Catalog, namespace, tableName string, schema *iceberg.Schema) (*table.Table, error)

LoadOrCreateTable loads an existing table or creates it if it doesn't exist.

func NewCatalog

func NewCatalog(ctx context.Context, cfg CatalogConfig) (*rest.Catalog, error)

NewCatalog creates a new Iceberg REST catalog connection.

func StoreSalesSchema

func StoreSalesSchema() *iceberg.Schema

StoreSalesSchema returns the Iceberg schema for TPC-DS store_sales table.

Types

type CatalogConfig

type CatalogConfig struct {
	CatalogURI      string
	Warehouse       string
	AccessKey       string
	SecretKey       string
	Region          string
	S3Endpoint      string              // S3 endpoint for file IO (e.g., http://localhost:9000)
	ExternalCatalog ExternalCatalogType // External catalog type (polaris, etc.)
}

CatalogConfig holds configuration for connecting to an Iceberg catalog.

type CatalogPool

type CatalogPool struct {
	// contains filtered or unexported fields
}

CatalogPool provides round-robin access to multiple catalog clients.

func NewCatalogPool

func NewCatalogPool(ctx context.Context, catalogURLs []string, baseCfg CatalogConfig) (*CatalogPool, error)

NewCatalogPool creates a pool of catalog clients for round-robin access. It takes a base config and a list of catalog URLs, creating one client per URL.

func (*CatalogPool) First

func (p *CatalogPool) First() *rest.Catalog

First returns the first catalog client (for setup/cleanup operations).

func (*CatalogPool) Get

func (p *CatalogPool) Get() *rest.Catalog

Get returns the next catalog client in round-robin order.

func (*CatalogPool) Len

func (p *CatalogPool) Len() int

Len returns the number of catalogs in the pool.

func (*CatalogPool) URLs

func (p *CatalogPool) URLs() []string

URLs returns the catalog URLs.

type CommitConfig

type CommitConfig struct {
	MaxRetries  int
	BackoffBase time.Duration
	BackoffMax  time.Duration
}

CommitConfig holds configuration for commit retry behavior.

func DefaultCommitConfig

func DefaultCommitConfig() CommitConfig

DefaultCommitConfig returns sensible defaults for commit retries. These match Apache Iceberg's defaults: 4 retries, 100ms min, 60s max.

type CommitResult

type CommitResult struct {
	Success      bool
	Retries      int
	Duration     time.Duration
	Err          error
	UpdatedTable *table.Table
}

CommitResult holds the result of a commit operation.

func CommitWithRetry

func CommitWithRetry(ctx context.Context, tbl *table.Table, files []string, cfg CommitConfig) CommitResult

CommitWithRetry commits files to an Iceberg table with exponential backoff on conflict.

type DatasetCreator

type DatasetCreator struct {
	Catalog         *restcat.Catalog
	CatalogPool     *CatalogPool
	Tree            *Tree
	CatalogURI      string
	AccessKey       string
	SecretKey       string
	Concurrency     int
	ExternalCatalog ExternalCatalogType
	OnProgress      func(float64)
}

func (*DatasetCreator) CreateAll

func (d *DatasetCreator) CreateAll(ctx context.Context, updateStatus func(string)) error

func (*DatasetCreator) CreateNamespaces

func (d *DatasetCreator) CreateNamespaces(ctx context.Context) error

func (*DatasetCreator) CreateTables

func (d *DatasetCreator) CreateTables(ctx context.Context) error

func (*DatasetCreator) CreateViews

func (d *DatasetCreator) CreateViews(ctx context.Context) error

func (*DatasetCreator) DeleteAll

func (d *DatasetCreator) DeleteAll(ctx context.Context)

type ExternalCatalogType

type ExternalCatalogType string

ExternalCatalogType represents the type of external catalog

const (
	ExternalCatalogNone    ExternalCatalogType = ""
	ExternalCatalogPolaris ExternalCatalogType = "polaris"
)

External catalog type constants.

type GenerateConfig

type GenerateConfig struct {
	OutputDir   string
	NumFiles    int
	RowsPerFile int
	Concurrency int
}

GenerateConfig holds configuration for parquet file generation.

type GenerateResult

type GenerateResult struct {
	Files      []string
	TotalRows  int64
	TotalBytes int64
}

GenerateResult holds the result of parquet generation.

func GenerateParquetFiles

func GenerateParquetFiles(ctx context.Context, cfg GenerateConfig, progress func(completed, total int64)) (*GenerateResult, error)

GenerateParquetFiles generates test parquet files for benchmarking.

type NamespaceInfo

type NamespaceInfo struct {
	Ordinal   int
	Path      []string
	IsLeaf    bool
	TableIdxs []int
	ViewIdxs  []int
}

type TPCDSConfig

type TPCDSConfig struct {
	ScaleFactor string // e.g., "sf1", "sf10", "sf100", "sf1000"
	Table       string // e.g., "store_sales", "customer", etc.
	CacheDir    string // local directory to cache downloaded files
	Concurrency int    // download concurrency
}

TPCDSConfig holds configuration for TPC-DS data operations.

func DefaultTPCDSConfig

func DefaultTPCDSConfig() TPCDSConfig

DefaultTPCDSConfig returns sensible defaults.

type TPCDSDownloadResult

type TPCDSDownloadResult struct {
	Files           []string
	TotalBytes      int64
	DownloadedBytes int64
	SkippedFiles    int
}

TPCDSDownloadResult holds the result of a download operation.

func DownloadTPCDS

func DownloadTPCDS(ctx context.Context, cfg TPCDSConfig, progress func(completed, total, bytes int64)) (*TPCDSDownloadResult, error)

DownloadTPCDS downloads TPC-DS parquet files from GCS to local cache. Files that already exist locally are skipped.

type TableInfo

type TableInfo struct {
	Index     int
	Name      string
	Namespace []string
	Location  string
}

type Tree

type Tree struct {
	// contains filtered or unexported fields
}

func NewTree

func NewTree(cfg TreeConfig) *Tree

func (*Tree) AllNamespaces

func (t *Tree) AllNamespaces() []NamespaceInfo

func (*Tree) AllTables

func (t *Tree) AllTables() []TableInfo

func (*Tree) AllViews

func (t *Tree) AllViews() []ViewInfo

func (*Tree) ChildrenOf

func (t *Tree) ChildrenOf(ordinal int) []int

func (*Tree) Config

func (t *Tree) Config() TreeConfig

func (*Tree) DepthOf

func (t *Tree) DepthOf(ordinal int) int

func (*Tree) IsLeaf

func (t *Tree) IsLeaf(ordinal int) bool

func (*Tree) LeafNamespaces

func (t *Tree) LeafNamespaces() int

func (*Tree) LeafOrdinals

func (t *Tree) LeafOrdinals() []int

func (*Tree) PathToRoot

func (t *Tree) PathToRoot(ordinal int) []string

func (*Tree) TableLocation

func (t *Tree) TableLocation(namespace []string, tableName string) string

func (*Tree) TableName

func (t *Tree) TableName(index int) string

func (*Tree) TotalNamespaces

func (t *Tree) TotalNamespaces() int

func (*Tree) TotalTables

func (t *Tree) TotalTables() int

func (*Tree) TotalViews

func (t *Tree) TotalViews() int

func (*Tree) ViewName

func (t *Tree) ViewName(index int) string

type TreeConfig

type TreeConfig struct {
	NamespaceWidth   int
	NamespaceDepth   int
	TablesPerNS      int
	ViewsPerNS       int
	ColumnsPerTable  int
	ColumnsPerView   int
	PropertiesPerNS  int
	PropertiesPerTbl int
	PropertiesPerVw  int
	BaseLocation     string
	CatalogName      string
}

type ViewInfo

type ViewInfo struct {
	Index     int
	Name      string
	Namespace []string
	Location  string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL