warehouse

package
v0.18.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package warehouse provides a set of interfaces and implementations for working with data warehouses.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateTableQuery

func CreateTableQuery(qm QueryMapper, table string, schema *arrow.Schema) (string, error)

CreateTableQuery creates a DDL query to create a table from an Arrow schema, using a query mapper.

func FindMissingColumns

func FindMissingColumns(
	tableName string,
	existingFields map[string]*arrow.Field,
	inputSchema *arrow.Schema,
	compatibilityChecker FieldCompatibilityChecker,
) ([]*arrow.Field, error)

FindMissingColumns compares existing table fields with input schema and returns missing fields or type incompatibility errors. This is the common logic extracted from both BigQuery and ClickHouse drivers.

Types

type AddColumnCall

type AddColumnCall struct {
	// contains filtered or unexported fields
}

AddColumnCall is a call to the AddColumn method of the MockWarehouseDriver

type ArrowType

type ArrowType struct {
	ArrowDataType arrow.DataType
	Nullable      bool
	Metadata      arrow.Metadata
}

ArrowType represents an Arrow data type with metadata and nullability information

func (ArrowType) Copy

func (a ArrowType) Copy() ArrowType

Copy creates a copy of the ArrowType

type CreateTableCall

type CreateTableCall struct {
	// contains filtered or unexported fields
}

CreateTableCall is a call to the CreateTable method of the MockWarehouseDriver

type Driver

type Driver interface {
	// CreateTable creates a new table with the specified Arrow schema.
	// Returns error if table exists or schema conversion fails.
	// Implementation must convert Arrow types to warehouse-native types.
	CreateTable(table string, schema *arrow.Schema) error

	// AddColumn adds a new column to an existing table.
	AddColumn(table string, field *arrow.Field) error

	// Write inserts batch data into the specified table.
	// Schema must match table structure. Rows contain column_name -> value mappings.
	// Implementation handles type conversion and batch optimization.
	// Returns error on type mismatch, constraint violation, or connection issues.
	Write(ctx context.Context, table string, schema *arrow.Schema, rows []map[string]any) error

	// MissingColumns compares provided schema against existing table structure.
	// Returns fields that exist in schema but not in table.
	// Used for schema drift detection before writes.
	// Returns TableNotFoundError if table doesn't exist.
	MissingColumns(table string, schema *arrow.Schema) ([]*arrow.Field, error)
}

Driver abstracts data warehouse operations for table management and data ingestion. Implementations handle warehouse-specific DDL/DML operations while maintaining compatibility with Apache Arrow schemas.

func NewBatchingDriver

func NewBatchingDriver(
	ctx context.Context,
	driver Driver,
	maxBatchSize int,
	interval time.Duration,
) Driver

NewBatchingDriver creates a batching driver that accumulates writes in memory and flushes them periodically or when the context is cancelled.

func NewConsoleDriver

func NewConsoleDriver() Driver

NewConsoleDriver creates a new console driver that prints data to stdout.

func NewDebuggingDriver added in v0.10.1

func NewDebuggingDriver(driver Driver) Driver

NewDebuggingDriver creates a new logging driver that logs all writes JSON formatted to stdout.

func NewLoggingDriver

func NewLoggingDriver(driver Driver) Driver

NewLoggingDriver creates a new driver that logs all writes.

func NewNoopDriver added in v0.10.1

func NewNoopDriver() Driver

NewNoopDriver creates a new noop driver that does nothing.

type ErrColumnAlreadyExists

type ErrColumnAlreadyExists struct {
	TableName  string
	ColumnName string
}

ErrColumnAlreadyExists represents an error when a column already exists

func NewColumnAlreadyExistsError

func NewColumnAlreadyExistsError(tableName, columnName string) *ErrColumnAlreadyExists

NewColumnAlreadyExistsError creates a new ErrColumnAlreadyExists

func (*ErrColumnAlreadyExists) Error

func (e *ErrColumnAlreadyExists) Error() string

Error implements the error interface

type ErrInvalidTableName

type ErrInvalidTableName struct {
	TableName string
	Reason    string
}

ErrInvalidTableName represents an error when a table name is invalid

func NewInvalidTableNameError

func NewInvalidTableNameError(tableName, reason string) *ErrInvalidTableName

NewInvalidTableNameError creates a new ErrInvalidTableName

func (*ErrInvalidTableName) Error

func (e *ErrInvalidTableName) Error() string

Error implements the error interface

type ErrMultipleTypeIncompatible

type ErrMultipleTypeIncompatible struct {
	TableName string
	Errors    []*ErrTypeIncompatible
}

ErrMultipleTypeIncompatible represents multiple type incompatibility errors

func NewMultipleTypeIncompatibleError

func NewMultipleTypeIncompatibleError(tableName string, errors []*ErrTypeIncompatible) *ErrMultipleTypeIncompatible

NewMultipleTypeIncompatibleError creates a new ErrMultipleTypeIncompatible

func (*ErrMultipleTypeIncompatible) Error

Error implements the error interface

type ErrTableAlreadyExists

type ErrTableAlreadyExists struct {
	TableName string
}

ErrTableAlreadyExists represents an error when a table already exists

func NewTableAlreadyExistsError

func NewTableAlreadyExistsError(tableName string) *ErrTableAlreadyExists

NewTableAlreadyExistsError creates a new ErrTableAlreadyExists error.

func (*ErrTableAlreadyExists) Error

func (e *ErrTableAlreadyExists) Error() string

Error implements the error interface

type ErrTableNotFound

type ErrTableNotFound struct {
	TableName string
}

ErrTableNotFound represents an error when a table is not found

func NewTableNotFoundError

func NewTableNotFoundError(tableName string) *ErrTableNotFound

NewTableNotFoundError creates a new ErrTableNotFound

func (*ErrTableNotFound) Error

func (e *ErrTableNotFound) Error() string

Error implements the error interface

type ErrTypeIncompatible

type ErrTypeIncompatible struct {
	TableName     string
	ColumnName    string
	ExistingType  arrow.DataType
	ExpectedType  arrow.DataType
	DetailedError string
}

ErrTypeIncompatible represents an error when a column type is incompatible with the expected type

func NewTypeIncompatibleError

func NewTypeIncompatibleError(
	tableName, columnName string,
	existingType, expectedType arrow.DataType,
) *ErrTypeIncompatible

NewTypeIncompatibleError creates a new ErrTypeIncompatible

func NewTypeIncompatibleErrorWithDetail

func NewTypeIncompatibleErrorWithDetail(
	tableName, columnName string,
	existingType, expectedType arrow.DataType,
	detailedError string,
) *ErrTypeIncompatible

NewTypeIncompatibleErrorWithDetail creates a new ErrTypeIncompatible with detailed error message

func (*ErrTypeIncompatible) Error

func (e *ErrTypeIncompatible) Error() string

Error implements the error interface

type ErrUnsupportedMapping

type ErrUnsupportedMapping struct {
	Type   any
	Mapper string
}

ErrUnsupportedMapping represents an error when an Arrow type is not supported by a mapper

func NewUnsupportedMappingErr

func NewUnsupportedMappingErr(theType any, mapper string) *ErrUnsupportedMapping

NewUnsupportedMappingErr creates a new ErrUnsupportedArrowType

func (*ErrUnsupportedMapping) Error

func (e *ErrUnsupportedMapping) Error() string

Error implements the error interface

type ErrUnsupportedWarehouseType

type ErrUnsupportedWarehouseType struct {
	WarehouseType string
	Operation     string
}

ErrUnsupportedWarehouseType represents an error when a warehouse type is not supported

func NewUnsupportedWarehouseTypeError

func NewUnsupportedWarehouseTypeError(warehouseType, operation string) *ErrUnsupportedWarehouseType

NewUnsupportedWarehouseTypeError creates a new ErrUnsupportedWarehouseType

func (*ErrUnsupportedWarehouseType) Error

Error implements the error interface

type FieldCompatibilityChecker

type FieldCompatibilityChecker interface {
	AreFieldsCompatible(existing, input *arrow.Field) (bool, error)
}

FieldCompatibilityChecker defines the interface for checking field compatibility

type FieldTypeMapper

type FieldTypeMapper[WHT SpecificWarehouseType] interface {
	ArrowToWarehouse(arrowType ArrowType) (WHT, error)
	WarehouseToArrow(warehouseType WHT) (ArrowType, error)
}

FieldTypeMapper provides bidirectional conversion between Arrow types and warehouse-specific types. Generic interface parameterized by warehouse type (WHT) which must implement SpecificWarehouseType. Defined in types.go.

func NewDeferredMapper

func NewDeferredMapper[T SpecificWarehouseType](getMapper func() FieldTypeMapper[T]) FieldTypeMapper[T]

NewDeferredMapper creates a deferred mapper for handling circular dependencies

func NewTypeMapper

func NewTypeMapper[T SpecificWarehouseType](
	types []FieldTypeMapper[T],
) FieldTypeMapper[T]

NewTypeMapper creates a new type mapper with the provided type mappers

type MissingColumnResp

type MissingColumnResp struct {
	// contains filtered or unexported fields
}

MissingColumnResp is a response to the MissingColumns method of the MockWarehouseDriver

type MockDriver

type MockDriver struct {
	Writes     []MockWrittenRows
	WriteError error
}

MockDriver is a mock driver that stores written rows in memory.

func NewMockDriver

func NewMockDriver() *MockDriver

NewMockDriver creates a new mock driver that stores written rows in memory.

func (*MockDriver) AddColumn

func (d *MockDriver) AddColumn(_ string, _ *arrow.Field) error

AddColumn implements Driver

func (*MockDriver) CreateTable

func (d *MockDriver) CreateTable(_ string, _ *arrow.Schema) error

CreateTable implements Driver

func (*MockDriver) MissingColumns

func (d *MockDriver) MissingColumns(_ string, _ *arrow.Schema) ([]*arrow.Field, error)

MissingColumns implements Driver

func (*MockDriver) Write

func (d *MockDriver) Write(_ context.Context, table string, _ *arrow.Schema, rows []map[string]any) error

Write implements Driver

type MockWarehouseDriver

type MockWarehouseDriver struct {
	WriteCallCount    int
	WriteErrors       []error
	WriteCalls        []WriteCall
	CreateTableCalls  []CreateTableCall
	AddColumnCalls    []AddColumnCall
	MissingColumnResp []MissingColumnResp
	// contains filtered or unexported fields
}

MockWarehouseDriver implements Driver interface for testing

func (*MockWarehouseDriver) AddColumn

func (m *MockWarehouseDriver) AddColumn(table string, field *arrow.Field) error

AddColumn implements warehouse.Driver

func (*MockWarehouseDriver) CreateTable

func (m *MockWarehouseDriver) CreateTable(table string, schema *arrow.Schema) error

CreateTable implements warehouse.Driver

func (*MockWarehouseDriver) GetWriteCallCount

func (m *MockWarehouseDriver) GetWriteCallCount() int

GetWriteCallCount returns the number of write calls

func (*MockWarehouseDriver) GetWriteCalls

func (m *MockWarehouseDriver) GetWriteCalls() []WriteCall

GetWriteCalls returns the write calls

func (*MockWarehouseDriver) MissingColumns

func (m *MockWarehouseDriver) MissingColumns(
	_ string,
	_ *arrow.Schema,
) ([]*arrow.Field, error)

MissingColumns implements warehouse.Driver

func (*MockWarehouseDriver) Write

func (m *MockWarehouseDriver) Write(
	_ context.Context,
	table string,
	schema *arrow.Schema,
	records []map[string]any,
) error

Write implements warehouse.Driver

type MockWrittenRows

type MockWrittenRows struct {
	Table string
	Rows  []map[string]any
}

MockWrittenRows is a collection of rows written to a single table in mock driver.

type QueryMapper

type QueryMapper interface {
	// TablePredicate returns warehouse-specific table creation prefix.
	// Examples: "TABLE", "TABLE IF NOT EXISTS", "TEMPORARY TABLE"
	TablePredicate(table string) string

	// Field converts Arrow field to warehouse column type definition.
	// Must handle type mapping, nullability, and metadata.
	// Returns string like "VARCHAR(255)", "Int64", "TIMESTAMP", etc.
	Field(*arrow.Field) (string, error)

	// TableSuffix returns warehouse-specific table creation suffix.
	// Examples: "ENGINE = MergeTree()", "OPTIONS (description='...')"
	// May include newlines for multi-line clauses.
	TableSuffix(table string) string
}

QueryMapper defines SQL DDL query construction from Arrow schemas. Used by Driver implementations that operate on sql.DB to generate warehouse-specific CREATE TABLE statements.

type Registry

type Registry interface {
	Get(propertyID string) (Driver, error)
}

Registry is a registry of drivers for different properties.

func NewStaticBatchedDriverRegistry added in v0.13.0

func NewStaticBatchedDriverRegistry(ctx context.Context, driver Driver) Registry

NewStaticBatchedDriverRegistry creates a new static driver registry that always returns the same driver.

func NewStaticDriverRegistry

func NewStaticDriverRegistry(driver Driver) Registry

NewStaticDriverRegistry creates a new static driver registry that always returns the same driver.

type SpecificWarehouseType

type SpecificWarehouseType interface {
	Format(i any, m arrow.Metadata) (any, error)
}

SpecificWarehouseType defines the interface for warehouse data types

type TypeComparer

type TypeComparer struct {
	// contains filtered or unexported fields
}

TypeComparer holds comparison configuration and custom rules

func NewTypeComparer

func NewTypeComparer(rules ...TypeCompatibilityRule) *TypeComparer

NewTypeComparer creates a new comparer with the specified compatibility rules

func (*TypeComparer) Compare

func (tc *TypeComparer) Compare(expected, actual arrow.DataType, typePath string) TypeComparisonResult

Compare performs the comparison with custom rules applied

type TypeComparisonResult

type TypeComparisonResult struct {
	Equal        bool
	ErrorMessage string
	TypePath     string
}

TypeComparisonResult represents the result of comparing two Arrow data types

func CompareArrowTypes

func CompareArrowTypes(expected, actual arrow.DataType, typePath string) TypeComparisonResult

CompareArrowTypes performs deep comparison of two Arrow data types and returns detailed comparison results. This maintains backward compatibility by using the default TypeComparer.

type TypeCompatibilityRule

type TypeCompatibilityRule func(expected, actual arrow.DataType) (compatible bool, handled bool)

TypeCompatibilityRule defines a function that can determine if two types should be considered compatible. Returns:

  • compatible: whether the types should be considered compatible (only meaningful if handled=true)
  • handled: whether this rule applies to and processed the given type pair. If false, other rules or default comparison logic will be used. If true, the compatibility decision is final.

type TypeMapperImpl

type TypeMapperImpl[T SpecificWarehouseType] struct {
	Types []FieldTypeMapper[T]
}

TypeMapperImpl provides type mapping functionality with multiple type mappers

func (*TypeMapperImpl[T]) ArrowToWarehouse

func (m *TypeMapperImpl[T]) ArrowToWarehouse(arrowType ArrowType) (T, error)

ArrowToWarehouse implements FieldTypeMapper

func (*TypeMapperImpl[T]) WarehouseToArrow

func (m *TypeMapperImpl[T]) WarehouseToArrow(warehouseType T) (ArrowType, error)

WarehouseToArrow implements FieldTypeMapper

type TypeNotFoundInRegistryErr

type TypeNotFoundInRegistryErr struct {
	TypeName string
	TypeKind string
}

TypeNotFoundInRegistryErr represents an error when a field type is not found in the registry

func (*TypeNotFoundInRegistryErr) Error

func (e *TypeNotFoundInRegistryErr) Error() string

type WriteCall

type WriteCall struct {
	Table   string
	Schema  *arrow.Schema
	Records []map[string]any
}

WriteCall is a call to the Write method of the MockWarehouseDriver

Directories

Path Synopsis
Package bigquery provides implementation of BigQuery data warehouse
Package bigquery provides implementation of BigQuery data warehouse
Package clickhouse provides implementation of Clickhouse data warehouse
Package clickhouse provides implementation of Clickhouse data warehouse
Package testutils provides test utilities for the warehouse package.
Package testutils provides test utilities for the warehouse package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL