catalog

package
v0.6.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2026 License: Apache-2.0 Imports: 13 Imported by: 15

README

Catalog Implementations

Integration Testing

The Catalog implementations can be manually tested using the CLI implemented in the cmd/iceberg folder.

REST Catalog

To test the REST catalog implementation, we have a docker configuration for a Minio container and tabluario/iceberg-rest container.

You can spin up the local catalog by going to the dev/ folder and running docker-compose up. You can then follow the steps of the Iceberg Quickstart tutorial, which we've summarized below.

Setup your Iceberg catalog

First launch a pyspark console by running:

docker exec -it spark-iceberg pyspark

Once in the pyspark shell, we create a simple table with a namespace of "demo.nyc" called "taxis":

from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
schema = StructType([
  StructField("vendor_id", LongType(), True),
  StructField("trip_id", LongType(), True),
  StructField("trip_distance", FloatType(), True),
  StructField("fare_amount", DoubleType(), True),
  StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.nyc.taxis").create()

Finally, we write another data-frame to the table to add new files:

schema = spark.table("demo.nyc.taxis").schema
data = [
    (1, 1000371, 1.8, 15.32, "N"),
    (2, 1000372, 2.5, 22.15, "N"),
    (2, 1000373, 0.9, 9.01, "N"),
    (1, 1000374, 8.4, 42.13, "Y")
  ]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.nyc.taxis").append()
Testing with the CLI

Now that we have a table in the catalog which is running. You can use the CLI which is implemented in the cmd/iceberg folder. You will need to set the following environment variables (which can also be found in the docker-compose.yml):

AWS_S3_ENDPOINT=http://localhost:9000
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=admin
AWS_SECRET_ACCESS_KEY=password

With those environment variables set you can now run the CLI:

$ go run ./cmd/iceberg list --catalog rest --uri http://localhost:8181
┌──────┐
| IDs  |
| ---- |
| demo |
└──────┘

You can retrieve the schema of the table:

$ go run ./cmd/iceberg schema --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Current Schema, id=0
├──1: vendor_id: optional long
├──2: trip_id: optional long
├──3: trip_distance: optional float
├──4: fare_amount: optional double
└──5: store_and_fwd_flag: optional string

You can get the file list:

$ go run ./cmd/iceberg files --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Snapshots: rest.demo.nyc.taxis
└─┬Snapshot 7004656639550124164, schema 0: s3://warehouse/demo/nyc/taxis/metadata/snap-7004656639550124164-1-0d533cd4-f0c1-45a6-a691-f2be3abe5491.avro
  └─┬Manifest: s3://warehouse/demo/nyc/taxis/metadata/0d533cd4-f0c1-45a6-a691-f2be3abe5491-m0.avro
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00004-24-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00009-29-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00014-34-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    └──Datafile: s3://warehouse/demo/nyc/taxis/data/00019-39-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet

and so on, for the various options available in the CLI.

Documentation

Overview

Package catalog provides an interface for Catalog implementations along with a registry for registering catalog implementations.

Subpackages of this package provide some default implementations for select catalog types which will register themselves if imported. For instance, adding the following import:

import _ "github.com/apache/iceberg-go/catalog/rest"

Will register the REST catalog implementation.

Example (MultiTableTransaction)

This example demonstrates atomic multi-table commits using the MultiTableTransaction API. Changes to multiple tables are collected and committed in a single all-or-nothing request.

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package main

import (
	"context"
	"errors"
	"fmt"
	"iter"

	"github.com/apache/iceberg-go"
	"github.com/apache/iceberg-go/catalog"
	"github.com/apache/iceberg-go/table"
)

// exampleCatalog is a minimal Catalog + TransactionalCatalog stub
// for the runnable example.
type exampleCatalog struct {
	tables map[string]*table.Table
}

func (c *exampleCatalog) CatalogType() catalog.Type { return catalog.REST }

func (c *exampleCatalog) CommitTransaction(_ context.Context, _ []table.TableCommit) error {
	return nil
}

func (c *exampleCatalog) LoadTable(_ context.Context, id table.Identifier) (*table.Table, error) {
	if t, ok := c.tables[id[len(id)-1]]; ok {
		return t, nil
	}

	return nil, errors.New("table not found")
}

func (c *exampleCatalog) ListTables(context.Context, table.Identifier) iter.Seq2[table.Identifier, error] {
	return nil
}

func (c *exampleCatalog) CreateTable(context.Context, table.Identifier, *iceberg.Schema, ...catalog.CreateTableOpt) (*table.Table, error) {
	return nil, nil
}

func (c *exampleCatalog) CommitTable(context.Context, table.Identifier, []table.Requirement, []table.Update) (table.Metadata, string, error) {
	return nil, "", nil
}

func (c *exampleCatalog) DropTable(context.Context, table.Identifier) error { return nil }

func (c *exampleCatalog) RenameTable(context.Context, table.Identifier, table.Identifier) (*table.Table, error) {
	return nil, nil
}

func (c *exampleCatalog) CheckTableExists(context.Context, table.Identifier) (bool, error) {
	return false, nil
}

func (c *exampleCatalog) ListNamespaces(context.Context, table.Identifier) ([]table.Identifier, error) {
	return nil, nil
}

func (c *exampleCatalog) CreateNamespace(context.Context, table.Identifier, iceberg.Properties) error {
	return nil
}

func (c *exampleCatalog) DropNamespace(context.Context, table.Identifier) error { return nil }

func (c *exampleCatalog) CheckNamespaceExists(context.Context, table.Identifier) (bool, error) {
	return false, nil
}

func (c *exampleCatalog) LoadNamespaceProperties(context.Context, table.Identifier) (iceberg.Properties, error) {
	return nil, nil
}

func (c *exampleCatalog) UpdateNamespaceProperties(context.Context, table.Identifier, []string, iceberg.Properties) (catalog.PropertiesUpdateSummary, error) {
	return catalog.PropertiesUpdateSummary{}, nil
}

func makeExampleTable(ns, name string) *table.Table {
	schema := iceberg.NewSchema(0,
		iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true},
	)
	meta, _ := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
		table.UnsortedSortOrder, "s3://bucket/"+name,
		iceberg.Properties{table.PropertyFormatVersion: "2"})

	return table.New(table.Identifier{ns, name}, meta, "", nil, nil)
}

// This example demonstrates atomic multi-table commits using the
// MultiTableTransaction API. Changes to multiple tables are collected
// and committed in a single all-or-nothing request.
func main() {
	ctx := context.Background()

	tbl1 := makeExampleTable("db", "orders")
	tbl2 := makeExampleTable("db", "inventory")

	cat := &exampleCatalog{
		tables: map[string]*table.Table{
			"orders":    tbl1,
			"inventory": tbl2,
		},
	}

	// Create a multi-table transaction from the catalog.
	mtx, err := catalog.NewMultiTableTransaction(cat)
	if err != nil {
		panic(err)
	}

	// Build changes on each table via individual transactions.
	tx1 := tbl1.NewTransaction()
	_ = tx1.SetProperties(map[string]string{"pipeline": "v2"})
	_ = mtx.AddTransaction(tx1)

	tx2 := tbl2.NewTransaction()
	_ = tx2.SetProperties(map[string]string{"pipeline": "v2"})
	_ = mtx.AddTransaction(tx2)

	// Option A: Commit only — caller reloads tables manually.
	err = mtx.Commit(ctx)
	if err != nil {
		panic(err)
	}

	fmt.Println("committed 2 tables atomically")

	// Option B: CommitAndReload commits and reloads all tables.
	// tables, err := mtx.CommitAndReload(ctx)

}
Output:
committed 2 tables atomically

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoSuchTable is returned when a table does not exist in the catalog.
	ErrNoSuchTable            = errors.New("table does not exist")
	ErrNoSuchNamespace        = errors.New("namespace does not exist")
	ErrNamespaceAlreadyExists = errors.New("namespace already exists")
	ErrTableAlreadyExists     = errors.New("table already exists")
	ErrCatalogNotFound        = errors.New("catalog type not registered")
	ErrNamespaceNotEmpty      = errors.New("namespace is not empty")
	ErrNoSuchView             = errors.New("view does not exist")
	ErrViewAlreadyExists      = errors.New("view already exists")
	ErrEmptyCommitList        = errors.New("commit list must not be empty")
	ErrMissingIdentifier      = errors.New("every table commit must have a valid identifier")
)

Functions

func GetRegisteredCatalogs added in v0.2.0

func GetRegisteredCatalogs() []string

GetRegisteredCatalogs returns the list of registered catalog names that can be looked up via LoadCatalog.

func NamespaceFromIdent

func NamespaceFromIdent(ident table.Identifier) table.Identifier

func Register added in v0.2.0

func Register(catalogType string, reg Registrar)

Register adds the new catalog type to the registry. If the catalog type is already registered, it will be replaced.

func TableNameFromIdent

func TableNameFromIdent(ident table.Identifier) string

func ToIdentifier added in v0.2.0

func ToIdentifier(ident ...string) table.Identifier

func Unregister added in v0.2.0

func Unregister(catalogType string)

Unregister removes the requested catalog factory from the registry.

Types

type Catalog

type Catalog interface {
	// CatalogType returns the type of the catalog.
	CatalogType() Type

	// CreateTable creates a new iceberg table in the catalog using the provided identifier
	// and schema. Options can be used to optionally provide location, partition spec, sort order,
	// and custom properties.
	CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...CreateTableOpt) (*table.Table, error)
	// CommitTable commits the table metadata and updates to the catalog, returning the new metadata
	CommitTable(ctx context.Context, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error)
	// ListTables returns a list of table identifiers in the catalog, with the returned
	// identifiers containing the information required to load the table via that catalog.
	ListTables(ctx context.Context, namespace table.Identifier) iter.Seq2[table.Identifier, error]
	// LoadTable loads a table from the catalog and returns a Table with the metadata.
	LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error)
	// DropTable tells the catalog to drop the table entirely.
	DropTable(ctx context.Context, identifier table.Identifier) error
	// RenameTable tells the catalog to rename a given table by the identifiers
	// provided, and then loads and returns the destination table
	RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)
	// CheckTableExists returns if the table exists
	CheckTableExists(ctx context.Context, identifier table.Identifier) (bool, error)
	// ListNamespaces returns the list of available namespaces, optionally filtering by a
	// parent namespace
	ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)
	// CreateNamespace tells the catalog to create a new namespace with the given properties
	CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error
	// DropNamespace tells the catalog to drop the namespace and all tables in that namespace
	DropNamespace(ctx context.Context, namespace table.Identifier) error
	// CheckNamespaceExists returns if the namespace exists
	CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error)
	// LoadNamespaceProperties returns the current properties in the catalog for
	// a given namespace
	LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)
	// UpdateNamespaceProperties allows removing, adding, and/or updating properties of a namespace
	UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
		removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)
}

Catalog for iceberg table operations like create, drop, load, list and others.

func Load added in v0.2.0

func Load(ctx context.Context, name string, props iceberg.Properties) (Catalog, error)

Load allows loading a specific catalog by URI and properties.

This is utilized alongside RegisterCatalog/UnregisterCatalog to not only allow easier catalog loading but also to allow for custom catalog implementations to be registered and loaded external to this module.

The name parameter is used to look up the catalog configuration, if one exists, that was loaded from the configuration file, ".iceberg-go.yaml". By default, the config file is loaded from the user's home directory, but the directory can be changed by setting the GOICEBERG_HOME environment variable to the path of the directory containing the ".iceberg-go.yaml" file. The name will also be passed to the registered GetCatalog function.

The catalog registry will be checked for the catalog's type. The "type" property is used first to search the catalog registry, with the passed properties taking priority over any loaded config.

If there is no "type" in the configuration and no "type" in the passed in properties, then the "uri" property is used to look up the catalog by checking the scheme. Again, if there is an "uri" key set in the passed in "props" it will take priority over the configuration.

Currently, the following catalog types are registered by default:

  • "glue" for AWS Glue Data Catalog, the rest of the URI is ignored, all configuration should be provided using the properties. "glue.region", "glue.endpoint", "glue.max-retries", etc. Default AWS credentials are used if found, or can be overridden by setting "glue.access-key-id", "glue.secret-access-key", and "glue.session-token".

  • "rest" for a REST API catalog, if the properties have an "uri" key, then that will be used as the REST endpoint, otherwise the URI is used as the endpoint. The REST catalog also registers "http" and "https" so that Load with a http/s URI will automatically load the REST Catalog.

type CreateTableCfg added in v0.3.0

type CreateTableCfg struct {
	PartitionSpec *iceberg.PartitionSpec
	SortOrder     table.SortOrder
	// StagedUpdates holds additional table.Update operations that cause
	// the REST catalog to use a two-phase staged creation. Phase 1
	// sends a minimal create with stage-create=true; phase 2 commits
	// with an assert-create requirement and all updates atomically.
	// Non-REST catalogs ignore this field.
	StagedUpdates []table.Update
	// contains filtered or unexported fields
}

CreateTableCfg represents the configuration used for CreateTable operations

func NewCreateTableCfg added in v0.4.0

func NewCreateTableCfg() CreateTableCfg

type CreateTableOpt added in v0.2.0

type CreateTableOpt func(*CreateTableCfg)

func WithLocation added in v0.2.0

func WithLocation(location string) CreateTableOpt

func WithPartitionSpec added in v0.2.0

func WithPartitionSpec(spec *iceberg.PartitionSpec) CreateTableOpt

func WithProperties added in v0.2.0

func WithProperties(props iceberg.Properties) CreateTableOpt

func WithSortOrder added in v0.2.0

func WithSortOrder(order table.SortOrder) CreateTableOpt

func WithStagedUpdates added in v0.5.0

func WithStagedUpdates(updates ...table.Update) CreateTableOpt

WithStagedUpdates provides additional table.Update operations that cause the REST catalog to use two-phase staged creation. This is useful for atomically creating a table with a custom UUID, initial snapshots, or snapshot references. Use constructors from the table package (e.g. table.NewAssignUUIDUpdate, table.NewAddSnapshotUpdate, table.NewSetSnapshotRefUpdate).

type CreateViewCfg added in v0.5.0

type CreateViewCfg struct {
	// contains filtered or unexported fields
}

CreateViewCfg represents the configuration used for CreateView operations

func NewCreateViewCfg added in v0.5.0

func NewCreateViewCfg() CreateViewCfg

type CreateViewOpt added in v0.5.0

type CreateViewOpt func(*CreateViewCfg)

func WithViewLocation added in v0.5.0

func WithViewLocation(location string) CreateViewOpt

func WithViewProperties added in v0.5.0

func WithViewProperties(config iceberg.Properties) CreateViewOpt

type MultiTableTransaction added in v0.6.0

type MultiTableTransaction struct {
	// contains filtered or unexported fields
}

MultiTableTransaction collects changes across multiple tables and commits them atomically via [TransactionalCatalog.CommitTransaction].

A MultiTableTransaction must not be used concurrently from multiple goroutines.

Usage:

mtx, err := catalog.NewMultiTableTransaction(cat)
// ... err check ...

tx1 := tbl1.NewTransaction()
tx1.SetProperties(map[string]string{"key": "val"})
mtx.AddTransaction(tx1)

tx2 := tbl2.NewTransaction()
// ... build changes on tx2 ...
mtx.AddTransaction(tx2)

err = mtx.Commit(ctx)

func NewMultiTableTransaction added in v0.6.0

func NewMultiTableTransaction(cat Catalog) (*MultiTableTransaction, error)

NewMultiTableTransaction creates a new multi-table transaction backed by the given catalog. Returns an error if the catalog does not implement TransactionalCatalog.

func (*MultiTableTransaction) AddTransaction added in v0.6.0

func (m *MultiTableTransaction) AddTransaction(tx *table.Transaction) error

AddTransaction adds a table transaction to be committed atomically with all other transactions in this multi-table transaction. Returns an error if the transaction is nil, already committed, or targets a table that was already added.

func (*MultiTableTransaction) Commit added in v0.6.0

func (m *MultiTableTransaction) Commit(ctx context.Context) error

Commit extracts pending changes from all added transactions and commits them atomically. On success, all transactions are marked as committed. On failure, no transactions are marked committed and the caller may retry.

PostCommit hooks are not executed. Because the multi-table commit endpoint returns 204 No Content, callers must LoadTable individually to obtain updated metadata.

func (*MultiTableTransaction) CommitAndReload added in v0.6.0

func (m *MultiTableTransaction) CommitAndReload(ctx context.Context) ([]*table.Table, error)

CommitAndReload commits the multi-table transaction atomically and then reloads all affected tables from the catalog. This is a convenience method that combines MultiTableTransaction.Commit with individual LoadTable calls, since the multi-table commit endpoint returns 204 No Content and does not include updated metadata.

On commit failure, no tables are reloaded and the error is returned. On partial reload failure (commit succeeded but a LoadTable fails), the successfully loaded tables are still returned alongside the error.

type PropertiesUpdateSummary

type PropertiesUpdateSummary struct {
	Removed []string `json:"removed"`
	Updated []string `json:"updated"`
	Missing []string `json:"missing"`
}

type Registrar added in v0.2.0

type Registrar interface {
	GetCatalog(ctx context.Context, catalogName string, props iceberg.Properties) (Catalog, error)
}

Registrar is a factory for creating Catalog instances, used for registering to use with LoadCatalog.

type RegistrarFunc added in v0.2.0

type RegistrarFunc func(context.Context, string, iceberg.Properties) (Catalog, error)

func (RegistrarFunc) GetCatalog added in v0.2.0

func (f RegistrarFunc) GetCatalog(ctx context.Context, catalogName string, props iceberg.Properties) (Catalog, error)

type TransactionalCatalog added in v0.6.0

type TransactionalCatalog interface {
	CommitTransaction(ctx context.Context, commits []table.TableCommit) error
}

TransactionalCatalog is an optional interface implemented by catalogs that support atomic multi-table commits. Callers should check for this capability via a type assertion:

if tc, ok := cat.(catalog.TransactionalCatalog); ok {
    err := tc.CommitTransaction(ctx, commits)
}

The endpoint is all-or-nothing: either all table changes are applied atomically, or none are. On success the method returns nil; the caller must LoadTable individually to obtain updated metadata because the server returns 204 No Content.

type Type added in v0.2.0

type Type string
const (
	REST     Type = "rest"
	Hive     Type = "hive"
	Glue     Type = "glue"
	DynamoDB Type = "dynamodb"
	SQL      Type = "sql"
	Hadoop   Type = "hadoop"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL