lakehouse

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package lakehouse implements Iceberg lakehouse modules and steps.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LakehouseModuleSchemas

func LakehouseModuleSchemas() []sdk.ModuleSchemaData

LakehouseModuleSchemas returns ModuleSchemaData for all lakehouse module types.

func NewCatalogModule

func NewCatalogModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewCatalogModule creates a new catalog.iceberg module instance.

func NewCompactStep

func NewCompactStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewCompactStep creates a step.lakehouse_compact instance.

func NewCreateTableStep

func NewCreateTableStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewCreateTableStep creates a step.lakehouse_create_table instance.

func NewEvolveSchemaStep

func NewEvolveSchemaStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewEvolveSchemaStep creates a step.lakehouse_evolve_schema instance.

func NewExpireSnapshotsStep

func NewExpireSnapshotsStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewExpireSnapshotsStep creates a step.lakehouse_expire_snapshots instance.

func NewQueryStep

func NewQueryStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewQueryStep creates a step.lakehouse_query instance.

func NewSnapshotStep

func NewSnapshotStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewSnapshotStep creates a step.lakehouse_snapshot instance.

func NewTableModule

func NewTableModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewTableModule creates a new lakehouse.table module instance.

func NewWriteStep

func NewWriteStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewWriteStep creates a step.lakehouse_write instance.

func RegisterCatalog

func RegisterCatalog(name string, client IcebergCatalogClient) error

RegisterCatalog adds a named catalog client to the global registry. Returns an error if a catalog with that name is already registered.

func UnregisterCatalog

func UnregisterCatalog(name string)

UnregisterCatalog removes a catalog client from the registry.

Types

type CatalogConfig

type CatalogConfig struct {
	Defaults  map[string]string `json:"defaults"`
	Overrides map[string]string `json:"overrides"`
}

CatalogConfig holds catalog-level configuration defaults and overrides.

type CatalogModule

type CatalogModule struct {
	// contains filtered or unexported fields
}

CatalogModule implements sdk.ModuleInstance for the catalog.iceberg module type. It wraps an IcebergCatalogClient and registers it in the global catalog registry.

func (*CatalogModule) Client

func (m *CatalogModule) Client() IcebergCatalogClient

Client returns the underlying IcebergCatalogClient.

func (*CatalogModule) Init

func (m *CatalogModule) Init() error

Init validates the catalog configuration.

func (*CatalogModule) Start

func (m *CatalogModule) Start(ctx context.Context) error

Start creates the HTTP client, pings the catalog to verify connectivity, and registers the catalog in the global registry.

func (*CatalogModule) Stop

func (m *CatalogModule) Stop(_ context.Context) error

Stop deregisters the catalog from the global registry.

type CreateTableRequest

type CreateTableRequest struct {
	Name          string            `json:"name"`
	Schema        Schema            `json:"schema"`
	PartitionSpec *PartitionSpec    `json:"partition-spec,omitempty"`
	WriteOrder    *SortOrder        `json:"write-order,omitempty"`
	Properties    map[string]string `json:"properties,omitempty"`
	Location      string            `json:"location,omitempty"`
}

CreateTableRequest is the body for creating a new Iceberg table.

type IcebergCatalogClient

type IcebergCatalogClient interface {
	// GetConfig retrieves catalog-level configuration.
	GetConfig(ctx context.Context) (*CatalogConfig, error)

	// ListNamespaces lists namespaces, optionally filtered by parent.
	ListNamespaces(ctx context.Context, parent string) ([]Namespace, error)

	// CreateNamespace creates a new namespace with optional properties.
	CreateNamespace(ctx context.Context, ns Namespace, properties map[string]string) error

	// LoadNamespace retrieves namespace info and properties.
	LoadNamespace(ctx context.Context, ns Namespace) (*NamespaceInfo, error)

	// DropNamespace removes a namespace.
	DropNamespace(ctx context.Context, ns Namespace) error

	// UpdateNamespaceProperties sets or removes namespace properties.
	UpdateNamespaceProperties(ctx context.Context, ns Namespace, updates, removals map[string]string) error

	// ListTables lists all tables within a namespace.
	ListTables(ctx context.Context, ns Namespace) ([]TableIdentifier, error)

	// CreateTable creates a new table in the given namespace.
	CreateTable(ctx context.Context, ns Namespace, req CreateTableRequest) (*TableMetadata, error)

	// LoadTable retrieves table metadata by identifier.
	LoadTable(ctx context.Context, id TableIdentifier) (*TableMetadata, error)

	// UpdateTable applies schema or metadata updates to a table.
	UpdateTable(ctx context.Context, id TableIdentifier, updates []TableUpdate, requirements []TableRequirement) (*TableMetadata, error)

	// DropTable removes a table; if purge is true, data files are also deleted.
	DropTable(ctx context.Context, id TableIdentifier, purge bool) error

	// TableExists returns true if the table exists.
	TableExists(ctx context.Context, id TableIdentifier) (bool, error)

	// ListSnapshots returns all snapshots for a table.
	ListSnapshots(ctx context.Context, id TableIdentifier) ([]Snapshot, error)
}

IcebergCatalogClient is the interface for interacting with an Iceberg REST Catalog.

func LookupCatalog

func LookupCatalog(name string) (IcebergCatalogClient, error)

LookupCatalog retrieves a catalog client by name.

func NewIcebergCatalogClient

func NewIcebergCatalogClient(cfg IcebergClientConfig) (IcebergCatalogClient, error)

NewIcebergCatalogClient creates a new IcebergCatalogClient from config.

type IcebergClientConfig

type IcebergClientConfig struct {
	// Endpoint is the base URL of the REST catalog (e.g. "https://catalog.example.com/v1").
	Endpoint string `json:"endpoint" yaml:"endpoint"`
	// Token is a static Bearer token for authentication.
	Token string `json:"credential" yaml:"credential"`
	// HTTPTimeout is the HTTP client timeout (default 30s).
	HTTPTimeout time.Duration `json:"httpTimeout" yaml:"httpTimeout"`
}

IcebergClientConfig holds configuration for the HTTP catalog client.

type Namespace

type Namespace []string

Namespace is a multi-level Iceberg namespace (e.g. ["analytics", "raw"]).

type NamespaceInfo

type NamespaceInfo struct {
	Namespace  Namespace         `json:"namespace"`
	Properties map[string]string `json:"properties"`
}

NamespaceInfo holds namespace metadata returned from the catalog.

type PartitionField

type PartitionField struct {
	SourceID  int    `json:"source-id"`
	FieldID   int    `json:"field-id"`
	Name      string `json:"name"`
	Transform string `json:"transform"`
}

PartitionField is a single partition field.

type PartitionSpec

type PartitionSpec struct {
	SpecID int              `json:"spec-id"`
	Fields []PartitionField `json:"fields"`
}

PartitionSpec describes how data is partitioned.

type Schema

type Schema struct {
	SchemaID int           `json:"schema-id"`
	Type     string        `json:"type"`
	Fields   []SchemaField `json:"fields"`
}

Schema is an Iceberg table schema.

type SchemaField

type SchemaField struct {
	ID       int    `json:"id"`
	Name     string `json:"name"`
	Type     string `json:"type"`
	Required bool   `json:"required"`
	Doc      string `json:"doc,omitempty"`
}

SchemaField is a single field in an Iceberg table schema.

type Snapshot

type Snapshot struct {
	SnapshotID   int64             `json:"snapshot-id"`
	Timestamp    int64             `json:"timestamp-ms"`
	Summary      map[string]string `json:"summary"`
	ManifestList string            `json:"manifest-list"`
}

Snapshot represents an Iceberg table snapshot.

type SortField

type SortField struct {
	SourceID  int    `json:"source-id"`
	Transform string `json:"transform"`
	Direction string `json:"direction"`
	NullOrder string `json:"null-order"`
}

SortField is a single sort field.

type SortOrder

type SortOrder struct {
	OrderID int         `json:"order-id"`
	Fields  []SortField `json:"fields"`
}

SortOrder describes the sort order for data files.

type TableConfig

type TableConfig struct {
	Catalog   string            `json:"catalog"    yaml:"catalog"`
	Namespace Namespace         `json:"namespace"  yaml:"namespace"`
	Table     string            `json:"table"      yaml:"table"`
	Schema    TableSchemaConfig `json:"schema"     yaml:"schema"`
}

TableConfig holds configuration for the lakehouse.table module.

type TableFieldConfig

type TableFieldConfig struct {
	Name     string `json:"name"     yaml:"name"`
	Type     string `json:"type"     yaml:"type"`
	Required bool   `json:"required" yaml:"required"`
	Doc      string `json:"doc"      yaml:"doc"`
}

TableFieldConfig holds a single field declaration.

type TableIdentifier

type TableIdentifier struct {
	Namespace Namespace
	Name      string
}

TableIdentifier uniquely identifies an Iceberg table.

type TableMetadata

type TableMetadata struct {
	FormatVersion     int               `json:"format-version"`
	TableUUID         string            `json:"table-uuid"`
	Location          string            `json:"location"`
	CurrentSchemaID   int               `json:"current-schema-id"`
	Schemas           []Schema          `json:"schemas"`
	CurrentSnapshotID *int64            `json:"current-snapshot-id,omitempty"`
	Snapshots         []Snapshot        `json:"snapshots"`
	Properties        map[string]string `json:"properties"`
}

TableMetadata holds the full metadata for an Iceberg table.

type TableModule

type TableModule struct {
	// contains filtered or unexported fields
}

TableModule implements sdk.ModuleInstance for the lakehouse.table module type. It ensures the managed table exists in the catalog on Start.

func (*TableModule) Init

func (m *TableModule) Init() error

Init validates the table configuration. Catalog lookup is deferred to Start.

func (*TableModule) Start

func (m *TableModule) Start(ctx context.Context) error

Start ensures the managed table exists, creating it if necessary.

func (*TableModule) Stop

func (m *TableModule) Stop(_ context.Context) error

Stop is a no-op; the catalog handles its own lifecycle.

type TableRequirement

type TableRequirement struct {
	Type   string         `json:"type"`
	Fields map[string]any `json:"fields,omitempty"`
}

TableRequirement asserts preconditions before applying updates.

type TableSchemaConfig

type TableSchemaConfig struct {
	Fields []TableFieldConfig `json:"fields" yaml:"fields"`
}

TableSchemaConfig holds the declared schema for a managed table.

type TableUpdate

type TableUpdate struct {
	Action string         `json:"action"`
	Fields map[string]any `json:"fields,omitempty"`
}

TableUpdate represents a single schema or metadata update.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL