Documentation
¶
Overview ¶
Package iceberg provides utilities for interacting with Iceberg catalogs.
Index ¶
- Constants
- func BenchmarkDataSchema() *iceberg.Schema
- func BenchmarkSchema() *arrow.Schema
- func DeleteWarehouse(ctx context.Context, cfg CatalogConfig) error
- func EnsureNamespace(ctx context.Context, cat catalog.Catalog, namespace string) error
- func EnsureWarehouse(ctx context.Context, cfg CatalogConfig) error
- func GetCachedTPCDSFiles(cfg TPCDSConfig) ([]string, error)
- func IsAlreadyExists(err error) bool
- func IsConflictError(err error) bool
- func LoadOrCreateTable(ctx context.Context, cat catalog.Catalog, namespace, tableName string, ...) (*table.Table, error)
- func NewCatalog(ctx context.Context, cfg CatalogConfig) (*rest.Catalog, error)
- func StoreSalesSchema() *iceberg.Schema
- type CatalogConfig
- type CatalogPool
- type CommitConfig
- type CommitResult
- type DatasetCreator
- func (d *DatasetCreator) CreateAll(ctx context.Context, updateStatus func(string)) error
- func (d *DatasetCreator) CreateNamespaces(ctx context.Context) error
- func (d *DatasetCreator) CreateTables(ctx context.Context) error
- func (d *DatasetCreator) CreateViews(ctx context.Context) error
- func (d *DatasetCreator) DeleteAll(ctx context.Context)
- type ExternalCatalogType
- type GenerateConfig
- type GenerateResult
- type NamespaceInfo
- type TPCDSConfig
- type TPCDSDownloadResult
- type TableInfo
- type Tree
- func (t *Tree) AllNamespaces() []NamespaceInfo
- func (t *Tree) AllTables() []TableInfo
- func (t *Tree) AllViews() []ViewInfo
- func (t *Tree) ChildrenOf(ordinal int) []int
- func (t *Tree) Config() TreeConfig
- func (t *Tree) DepthOf(ordinal int) int
- func (t *Tree) IsLeaf(ordinal int) bool
- func (t *Tree) LeafNamespaces() int
- func (t *Tree) LeafOrdinals() []int
- func (t *Tree) PathToRoot(ordinal int) []string
- func (t *Tree) TableLocation(namespace []string, tableName string) string
- func (t *Tree) TableName(index int) string
- func (t *Tree) TotalNamespaces() int
- func (t *Tree) TotalTables() int
- func (t *Tree) TotalViews() int
- func (t *Tree) ViewName(index int) string
- type TreeConfig
- type ViewInfo
Constants ¶
const ( TPCDSGCSBucket = "beam-tpcds" TPCDSGCSPrefix = "datasets/parquet/nonpartitioned" )
TPC-DS data source constants for GCS.
Variables ¶
This section is empty.
Functions ¶
func BenchmarkDataSchema ¶
BenchmarkDataSchema returns the Iceberg schema for benchmark data files. This matches the Arrow schema in BenchmarkSchema() in parquet.go.
func BenchmarkSchema ¶
BenchmarkSchema returns the Arrow schema for benchmark data files. This schema is simple and avoids decimal types to ensure compatibility.
func DeleteWarehouse ¶
func DeleteWarehouse(ctx context.Context, cfg CatalogConfig) error
DeleteWarehouse deletes a warehouse. For external catalogs like Polaris, this is a no-op.
func EnsureNamespace ¶
EnsureNamespace creates the namespace if it doesn't exist.
func EnsureWarehouse ¶
func EnsureWarehouse(ctx context.Context, cfg CatalogConfig) error
EnsureWarehouse creates the warehouse if it doesn't exist. This uses the MinIO AIStor Tables API directly since the standard Iceberg REST catalog API doesn't support warehouse creation. For external catalogs like Polaris, this is a no-op (catalog must be created externally).
func GetCachedTPCDSFiles ¶
func GetCachedTPCDSFiles(cfg TPCDSConfig) ([]string, error)
GetCachedTPCDSFiles returns locally cached TPC-DS files if they exist.
func IsAlreadyExists ¶
func IsConflictError ¶
IsConflictError checks if an error is a commit conflict that can be retried.
func LoadOrCreateTable ¶
func LoadOrCreateTable(ctx context.Context, cat catalog.Catalog, namespace, tableName string, schema *iceberg.Schema) (*table.Table, error)
LoadOrCreateTable loads an existing table or creates it if it doesn't exist.
func NewCatalog ¶
NewCatalog creates a new Iceberg REST catalog connection.
func StoreSalesSchema ¶
StoreSalesSchema returns the Iceberg schema for TPC-DS store_sales table.
Types ¶
type CatalogConfig ¶
type CatalogConfig struct {
CatalogURI string
Warehouse string
AccessKey string
SecretKey string
Region string
S3Endpoint string // S3 endpoint for file IO (e.g., http://localhost:9000)
ExternalCatalog ExternalCatalogType // External catalog type (polaris, etc.)
}
CatalogConfig holds configuration for connecting to an Iceberg catalog.
type CatalogPool ¶
type CatalogPool struct {
// contains filtered or unexported fields
}
CatalogPool provides round-robin access to multiple catalog clients.
func NewCatalogPool ¶
func NewCatalogPool(ctx context.Context, catalogURLs []string, baseCfg CatalogConfig) (*CatalogPool, error)
NewCatalogPool creates a pool of catalog clients for round-robin access. It takes a base config and a list of catalog URLs, creating one client per URL.
func (*CatalogPool) First ¶
func (p *CatalogPool) First() *rest.Catalog
First returns the first catalog client (for setup/cleanup operations).
func (*CatalogPool) Get ¶
func (p *CatalogPool) Get() *rest.Catalog
Get returns the next catalog client in round-robin order.
func (*CatalogPool) Len ¶
func (p *CatalogPool) Len() int
Len returns the number of catalogs in the pool.
type CommitConfig ¶
CommitConfig holds configuration for commit retry behavior.
func DefaultCommitConfig ¶
func DefaultCommitConfig() CommitConfig
DefaultCommitConfig returns sensible defaults for commit retries. These match Apache Iceberg's defaults: 4 retries, 100ms min, 60s max.
type CommitResult ¶
type CommitResult struct {
Success bool
Retries int
Duration time.Duration
Err error
UpdatedTable *table.Table
}
CommitResult holds the result of a commit operation.
func CommitWithRetry ¶
func CommitWithRetry(ctx context.Context, tbl *table.Table, files []string, cfg CommitConfig) CommitResult
CommitWithRetry commits files to an Iceberg table with exponential backoff on conflict.
type DatasetCreator ¶
type DatasetCreator struct {
Catalog *restcat.Catalog
CatalogPool *CatalogPool
Tree *Tree
CatalogURI string
AccessKey string
SecretKey string
Concurrency int
ExternalCatalog ExternalCatalogType
OnProgress func(float64)
}
func (*DatasetCreator) CreateAll ¶
func (d *DatasetCreator) CreateAll(ctx context.Context, updateStatus func(string)) error
func (*DatasetCreator) CreateNamespaces ¶
func (d *DatasetCreator) CreateNamespaces(ctx context.Context) error
func (*DatasetCreator) CreateTables ¶
func (d *DatasetCreator) CreateTables(ctx context.Context) error
func (*DatasetCreator) CreateViews ¶
func (d *DatasetCreator) CreateViews(ctx context.Context) error
func (*DatasetCreator) DeleteAll ¶
func (d *DatasetCreator) DeleteAll(ctx context.Context)
type ExternalCatalogType ¶
type ExternalCatalogType string
ExternalCatalogType represents the type of external catalog
const ( ExternalCatalogNone ExternalCatalogType = "" ExternalCatalogPolaris ExternalCatalogType = "polaris" )
External catalog type constants.
type GenerateConfig ¶
GenerateConfig holds configuration for parquet file generation.
type GenerateResult ¶
GenerateResult holds the result of parquet generation.
func GenerateParquetFiles ¶
func GenerateParquetFiles(ctx context.Context, cfg GenerateConfig, progress func(completed, total int64)) (*GenerateResult, error)
GenerateParquetFiles generates test parquet files for benchmarking.
type NamespaceInfo ¶
type TPCDSConfig ¶
type TPCDSConfig struct {
ScaleFactor string // e.g., "sf1", "sf10", "sf100", "sf1000"
Table string // e.g., "store_sales", "customer", etc.
CacheDir string // local directory to cache downloaded files
Concurrency int // download concurrency
}
TPCDSConfig holds configuration for TPC-DS data operations.
func DefaultTPCDSConfig ¶
func DefaultTPCDSConfig() TPCDSConfig
DefaultTPCDSConfig returns sensible defaults.
type TPCDSDownloadResult ¶
type TPCDSDownloadResult struct {
Files []string
TotalBytes int64
DownloadedBytes int64
SkippedFiles int
}
TPCDSDownloadResult holds the result of a download operation.
func DownloadTPCDS ¶
func DownloadTPCDS(ctx context.Context, cfg TPCDSConfig, progress func(completed, total, bytes int64)) (*TPCDSDownloadResult, error)
DownloadTPCDS downloads TPC-DS parquet files from GCS to local cache. Files that already exist locally are skipped.
type Tree ¶
type Tree struct {
// contains filtered or unexported fields
}
func NewTree ¶
func NewTree(cfg TreeConfig) *Tree
func (*Tree) AllNamespaces ¶
func (t *Tree) AllNamespaces() []NamespaceInfo
func (*Tree) ChildrenOf ¶
func (*Tree) Config ¶
func (t *Tree) Config() TreeConfig