Documentation
¶
Overview ¶
Package catalog provides an interface for Catalog implementations along with a registry for registering catalog implementations.
Subpackages of this package provide some default implementations for select catalog types which will register themselves if imported. For instance, adding the following import:
import _ "github.com/apache/iceberg-go/catalog/rest"
Will register the REST catalog implementation.
Example (MultiTableTransaction) ¶
This example demonstrates atomic multi-table commits using the MultiTableTransaction API. Changes to multiple tables are collected and committed in a single all-or-nothing request.
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"context"
"errors"
"fmt"
"iter"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/table"
)
// exampleCatalog is a minimal Catalog + TransactionalCatalog stub
// for the runnable example.
type exampleCatalog struct {
tables map[string]*table.Table
}
func (c *exampleCatalog) CatalogType() catalog.Type { return catalog.REST }
func (c *exampleCatalog) CommitTransaction(_ context.Context, _ []table.TableCommit) error {
return nil
}
func (c *exampleCatalog) LoadTable(_ context.Context, id table.Identifier) (*table.Table, error) {
if t, ok := c.tables[id[len(id)-1]]; ok {
return t, nil
}
return nil, errors.New("table not found")
}
func (c *exampleCatalog) ListTables(context.Context, table.Identifier) iter.Seq2[table.Identifier, error] {
return nil
}
func (c *exampleCatalog) CreateTable(context.Context, table.Identifier, *iceberg.Schema, ...catalog.CreateTableOpt) (*table.Table, error) {
return nil, nil
}
func (c *exampleCatalog) CommitTable(context.Context, table.Identifier, []table.Requirement, []table.Update) (table.Metadata, string, error) {
return nil, "", nil
}
func (c *exampleCatalog) DropTable(context.Context, table.Identifier) error { return nil }
func (c *exampleCatalog) RenameTable(context.Context, table.Identifier, table.Identifier) (*table.Table, error) {
return nil, nil
}
func (c *exampleCatalog) CheckTableExists(context.Context, table.Identifier) (bool, error) {
return false, nil
}
func (c *exampleCatalog) ListNamespaces(context.Context, table.Identifier) ([]table.Identifier, error) {
return nil, nil
}
func (c *exampleCatalog) CreateNamespace(context.Context, table.Identifier, iceberg.Properties) error {
return nil
}
func (c *exampleCatalog) DropNamespace(context.Context, table.Identifier) error { return nil }
func (c *exampleCatalog) CheckNamespaceExists(context.Context, table.Identifier) (bool, error) {
return false, nil
}
func (c *exampleCatalog) LoadNamespaceProperties(context.Context, table.Identifier) (iceberg.Properties, error) {
return nil, nil
}
func (c *exampleCatalog) UpdateNamespaceProperties(context.Context, table.Identifier, []string, iceberg.Properties) (catalog.PropertiesUpdateSummary, error) {
return catalog.PropertiesUpdateSummary{}, nil
}
func makeExampleTable(ns, name string) *table.Table {
schema := iceberg.NewSchema(0,
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true},
)
meta, _ := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
table.UnsortedSortOrder, "s3://bucket/"+name,
iceberg.Properties{table.PropertyFormatVersion: "2"})
return table.New(table.Identifier{ns, name}, meta, "", nil, nil)
}
// This example demonstrates atomic multi-table commits using the
// MultiTableTransaction API. Changes to multiple tables are collected
// and committed in a single all-or-nothing request.
func main() {
ctx := context.Background()
tbl1 := makeExampleTable("db", "orders")
tbl2 := makeExampleTable("db", "inventory")
cat := &exampleCatalog{
tables: map[string]*table.Table{
"orders": tbl1,
"inventory": tbl2,
},
}
// Create a multi-table transaction from the catalog.
mtx, err := catalog.NewMultiTableTransaction(cat)
if err != nil {
panic(err)
}
// Build changes on each table via individual transactions.
tx1 := tbl1.NewTransaction()
_ = tx1.SetProperties(map[string]string{"pipeline": "v2"})
_ = mtx.AddTransaction(tx1)
tx2 := tbl2.NewTransaction()
_ = tx2.SetProperties(map[string]string{"pipeline": "v2"})
_ = mtx.AddTransaction(tx2)
// Option A: Commit only — caller reloads tables manually.
err = mtx.Commit(ctx)
if err != nil {
panic(err)
}
fmt.Println("committed 2 tables atomically")
// Option B: CommitAndReload commits and reloads all tables.
// tables, err := mtx.CommitAndReload(ctx)
}
Output: committed 2 tables atomically
Index ¶
- Variables
- func GetRegisteredCatalogs() []string
- func NamespaceFromIdent(ident table.Identifier) table.Identifier
- func Register(catalogType string, reg Registrar)
- func TableNameFromIdent(ident table.Identifier) string
- func ToIdentifier(ident ...string) table.Identifier
- func Unregister(catalogType string)
- type Catalog
- type CreateTableCfg
- type CreateTableOpt
- type CreateViewCfg
- type CreateViewOpt
- type MultiTableTransaction
- type PropertiesUpdateSummary
- type Registrar
- type RegistrarFunc
- type TransactionalCatalog
- type Type
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoSuchTable is returned when a table does not exist in the catalog. ErrNoSuchTable = errors.New("table does not exist") ErrNoSuchNamespace = errors.New("namespace does not exist") ErrNamespaceAlreadyExists = errors.New("namespace already exists") ErrTableAlreadyExists = errors.New("table already exists") ErrCatalogNotFound = errors.New("catalog type not registered") ErrNamespaceNotEmpty = errors.New("namespace is not empty") ErrNoSuchView = errors.New("view does not exist") ErrViewAlreadyExists = errors.New("view already exists") ErrEmptyCommitList = errors.New("commit list must not be empty") ErrMissingIdentifier = errors.New("every table commit must have a valid identifier") )
Functions ¶
func GetRegisteredCatalogs ¶ added in v0.2.0
func GetRegisteredCatalogs() []string
GetRegisteredCatalogs returns the list of registered catalog names that can be looked up via LoadCatalog.
func NamespaceFromIdent ¶
func NamespaceFromIdent(ident table.Identifier) table.Identifier
func Register ¶ added in v0.2.0
Register adds the new catalog type to the registry. If the catalog type is already registered, it will be replaced.
func TableNameFromIdent ¶
func TableNameFromIdent(ident table.Identifier) string
func ToIdentifier ¶ added in v0.2.0
func ToIdentifier(ident ...string) table.Identifier
func Unregister ¶ added in v0.2.0
func Unregister(catalogType string)
Unregister removes the requested catalog factory from the registry.
Types ¶
type Catalog ¶
type Catalog interface {
// CatalogType returns the type of the catalog.
CatalogType() Type
// CreateTable creates a new iceberg table in the catalog using the provided identifier
// and schema. Options can be used to optionally provide location, partition spec, sort order,
// and custom properties.
CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...CreateTableOpt) (*table.Table, error)
// CommitTable commits the table metadata and updates to the catalog, returning the new metadata
CommitTable(ctx context.Context, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error)
// ListTables returns a list of table identifiers in the catalog, with the returned
// identifiers containing the information required to load the table via that catalog.
ListTables(ctx context.Context, namespace table.Identifier) iter.Seq2[table.Identifier, error]
// LoadTable loads a table from the catalog and returns a Table with the metadata.
LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error)
// DropTable tells the catalog to drop the table entirely.
DropTable(ctx context.Context, identifier table.Identifier) error
// RenameTable tells the catalog to rename a given table by the identifiers
// provided, and then loads and returns the destination table
RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)
// CheckTableExists returns if the table exists
CheckTableExists(ctx context.Context, identifier table.Identifier) (bool, error)
// ListNamespaces returns the list of available namespaces, optionally filtering by a
// parent namespace
ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)
// CreateNamespace tells the catalog to create a new namespace with the given properties
CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error
// DropNamespace tells the catalog to drop the namespace and all tables in that namespace
DropNamespace(ctx context.Context, namespace table.Identifier) error
// CheckNamespaceExists returns if the namespace exists
CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error)
// LoadNamespaceProperties returns the current properties in the catalog for
// a given namespace
LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)
// UpdateNamespaceProperties allows removing, adding, and/or updating properties of a namespace
UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)
}
Catalog for iceberg table operations like create, drop, load, list and others.
func Load ¶ added in v0.2.0
Load allows loading a specific catalog by URI and properties.
This is utilized alongside RegisterCatalog/UnregisterCatalog to not only allow easier catalog loading but also to allow for custom catalog implementations to be registered and loaded external to this module.
The name parameter is used to look up the catalog configuration, if one exists, that was loaded from the configuration file, ".iceberg-go.yaml". By default, the config file is loaded from the user's home directory, but the directory can be changed by setting the GOICEBERG_HOME environment variable to the path of the directory containing the ".iceberg-go.yaml" file. The name will also be passed to the registered GetCatalog function.
The catalog registry will be checked for the catalog's type. The "type" property is used first to search the catalog registry, with the passed properties taking priority over any loaded config.
If there is no "type" in the configuration and no "type" in the passed in properties, then the "uri" property is used to look up the catalog by checking the scheme. Again, if there is an "uri" key set in the passed in "props" it will take priority over the configuration.
Currently, the following catalog types are registered by default:
"glue" for AWS Glue Data Catalog, the rest of the URI is ignored, all configuration should be provided using the properties. "glue.region", "glue.endpoint", "glue.max-retries", etc. Default AWS credentials are used if found, or can be overridden by setting "glue.access-key-id", "glue.secret-access-key", and "glue.session-token".
"rest" for a REST API catalog, if the properties have an "uri" key, then that will be used as the REST endpoint, otherwise the URI is used as the endpoint. The REST catalog also registers "http" and "https" so that Load with a http/s URI will automatically load the REST Catalog.
type CreateTableCfg ¶ added in v0.3.0
type CreateTableCfg struct {
PartitionSpec *iceberg.PartitionSpec
SortOrder table.SortOrder
// StagedUpdates holds additional table.Update operations that cause
// the REST catalog to use a two-phase staged creation. Phase 1
// sends a minimal create with stage-create=true; phase 2 commits
// with an assert-create requirement and all updates atomically.
// Non-REST catalogs ignore this field.
StagedUpdates []table.Update
// contains filtered or unexported fields
}
CreateTableCfg represents the configuration used for CreateTable operations
func NewCreateTableCfg ¶ added in v0.4.0
func NewCreateTableCfg() CreateTableCfg
type CreateTableOpt ¶ added in v0.2.0
type CreateTableOpt func(*CreateTableCfg)
func WithLocation ¶ added in v0.2.0
func WithLocation(location string) CreateTableOpt
func WithPartitionSpec ¶ added in v0.2.0
func WithPartitionSpec(spec *iceberg.PartitionSpec) CreateTableOpt
func WithProperties ¶ added in v0.2.0
func WithProperties(props iceberg.Properties) CreateTableOpt
func WithSortOrder ¶ added in v0.2.0
func WithSortOrder(order table.SortOrder) CreateTableOpt
func WithStagedUpdates ¶ added in v0.5.0
func WithStagedUpdates(updates ...table.Update) CreateTableOpt
WithStagedUpdates provides additional table.Update operations that cause the REST catalog to use two-phase staged creation. This is useful for atomically creating a table with a custom UUID, initial snapshots, or snapshot references. Use constructors from the table package (e.g. table.NewAssignUUIDUpdate, table.NewAddSnapshotUpdate, table.NewSetSnapshotRefUpdate).
type CreateViewCfg ¶ added in v0.5.0
type CreateViewCfg struct {
// contains filtered or unexported fields
}
CreateViewCfg represents the configuration used for CreateView operations
func NewCreateViewCfg ¶ added in v0.5.0
func NewCreateViewCfg() CreateViewCfg
type CreateViewOpt ¶ added in v0.5.0
type CreateViewOpt func(*CreateViewCfg)
func WithViewLocation ¶ added in v0.5.0
func WithViewLocation(location string) CreateViewOpt
func WithViewProperties ¶ added in v0.5.0
func WithViewProperties(config iceberg.Properties) CreateViewOpt
type MultiTableTransaction ¶ added in v0.6.0
type MultiTableTransaction struct {
// contains filtered or unexported fields
}
MultiTableTransaction collects changes across multiple tables and commits them atomically via [TransactionalCatalog.CommitTransaction].
A MultiTableTransaction must not be used concurrently from multiple goroutines.
Usage:
mtx, err := catalog.NewMultiTableTransaction(cat)
// ... err check ...
tx1 := tbl1.NewTransaction()
tx1.SetProperties(map[string]string{"key": "val"})
mtx.AddTransaction(tx1)
tx2 := tbl2.NewTransaction()
// ... build changes on tx2 ...
mtx.AddTransaction(tx2)
err = mtx.Commit(ctx)
func NewMultiTableTransaction ¶ added in v0.6.0
func NewMultiTableTransaction(cat Catalog) (*MultiTableTransaction, error)
NewMultiTableTransaction creates a new multi-table transaction backed by the given catalog. Returns an error if the catalog does not implement TransactionalCatalog.
func (*MultiTableTransaction) AddTransaction ¶ added in v0.6.0
func (m *MultiTableTransaction) AddTransaction(tx *table.Transaction) error
AddTransaction adds a table transaction to be committed atomically with all other transactions in this multi-table transaction. Returns an error if the transaction is nil, already committed, or targets a table that was already added.
func (*MultiTableTransaction) Commit ¶ added in v0.6.0
func (m *MultiTableTransaction) Commit(ctx context.Context) error
Commit extracts pending changes from all added transactions and commits them atomically. On success, all transactions are marked as committed. On failure, no transactions are marked committed and the caller may retry.
PostCommit hooks are not executed. Because the multi-table commit endpoint returns 204 No Content, callers must LoadTable individually to obtain updated metadata.
func (*MultiTableTransaction) CommitAndReload ¶ added in v0.6.0
CommitAndReload commits the multi-table transaction atomically and then reloads all affected tables from the catalog. This is a convenience method that combines MultiTableTransaction.Commit with individual LoadTable calls, since the multi-table commit endpoint returns 204 No Content and does not include updated metadata.
On commit failure, no tables are reloaded and the error is returned. On partial reload failure (commit succeeded but a LoadTable fails), the successfully loaded tables are still returned alongside the error.
type PropertiesUpdateSummary ¶
type Registrar ¶ added in v0.2.0
type Registrar interface {
GetCatalog(ctx context.Context, catalogName string, props iceberg.Properties) (Catalog, error)
}
Registrar is a factory for creating Catalog instances, used for registering to use with LoadCatalog.
type RegistrarFunc ¶ added in v0.2.0
func (RegistrarFunc) GetCatalog ¶ added in v0.2.0
func (f RegistrarFunc) GetCatalog(ctx context.Context, catalogName string, props iceberg.Properties) (Catalog, error)
type TransactionalCatalog ¶ added in v0.6.0
type TransactionalCatalog interface {
CommitTransaction(ctx context.Context, commits []table.TableCommit) error
}
TransactionalCatalog is an optional interface implemented by catalogs that support atomic multi-table commits. Callers should check for this capability via a type assertion:
if tc, ok := cat.(catalog.TransactionalCatalog); ok {
err := tc.CommitTransaction(ctx, commits)
}
The endpoint is all-or-nothing: either all table changes are applied atomically, or none are. On success the method returns nil; the caller must LoadTable individually to obtain updated metadata because the server returns 204 No Content.