Documentation
¶
Overview ¶
Package warehouse provides a set of interfaces and implementations for working with data warehouses.
Index ¶
- func CreateTableQuery(qm QueryMapper, table string, schema *arrow.Schema) (string, error)
- func FindMissingColumns(tableName string, existingFields map[string]*arrow.Field, ...) ([]*arrow.Field, error)
- type AddColumnCall
- type ArrowType
- type CreateTableCall
- type Driver
- type ErrColumnAlreadyExists
- type ErrInvalidTableName
- type ErrMultipleTypeIncompatible
- type ErrTableAlreadyExists
- type ErrTableNotFound
- type ErrTypeIncompatible
- type ErrUnsupportedMapping
- type ErrUnsupportedWarehouseType
- type FieldCompatibilityChecker
- type FieldTypeMapper
- type MissingColumnResp
- type MockDriver
- func (d *MockDriver) AddColumn(_ string, _ *arrow.Field) error
- func (d *MockDriver) CreateTable(_ string, _ *arrow.Schema) error
- func (d *MockDriver) MissingColumns(_ string, _ *arrow.Schema) ([]*arrow.Field, error)
- func (d *MockDriver) Write(_ context.Context, table string, _ *arrow.Schema, rows []map[string]any) error
- type MockWarehouseDriver
- func (m *MockWarehouseDriver) AddColumn(table string, field *arrow.Field) error
- func (m *MockWarehouseDriver) CreateTable(table string, schema *arrow.Schema) error
- func (m *MockWarehouseDriver) GetWriteCallCount() int
- func (m *MockWarehouseDriver) GetWriteCalls() []WriteCall
- func (m *MockWarehouseDriver) MissingColumns(_ string, _ *arrow.Schema) ([]*arrow.Field, error)
- func (m *MockWarehouseDriver) Write(_ context.Context, table string, schema *arrow.Schema, ...) error
- type MockWrittenRows
- type QueryMapper
- type Registry
- type SpecificWarehouseType
- type TypeComparer
- type TypeComparisonResult
- type TypeCompatibilityRule
- type TypeMapperImpl
- type TypeNotFoundInRegistryErr
- type WriteCall
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateTableQuery ¶
CreateTableQuery creates a DDL query to create a table from an Arrow schema, using a query mapper.
func FindMissingColumns ¶
func FindMissingColumns( tableName string, existingFields map[string]*arrow.Field, inputSchema *arrow.Schema, compatibilityChecker FieldCompatibilityChecker, ) ([]*arrow.Field, error)
FindMissingColumns compares existing table fields with input schema and returns missing fields or type incompatibility errors. This is the common logic extracted from both BigQuery and ClickHouse drivers.
Types ¶
type AddColumnCall ¶
type AddColumnCall struct {
// contains filtered or unexported fields
}
AddColumnCall is a call to the AddColumn method of the MockWarehouseDriver
type CreateTableCall ¶
type CreateTableCall struct {
// contains filtered or unexported fields
}
CreateTableCall is a call to the CreateTable method of the MockWarehouseDriver
type Driver ¶
type Driver interface {
// CreateTable creates a new table with the specified Arrow schema.
// Returns error if table exists or schema conversion fails.
// Implementation must convert Arrow types to warehouse-native types.
CreateTable(table string, schema *arrow.Schema) error
// AddColumn adds a new column to an existing table.
AddColumn(table string, field *arrow.Field) error
// Write inserts batch data into the specified table.
// Schema must match table structure. Rows contain column_name -> value mappings.
// Implementation handles type conversion and batch optimization.
// Returns error on type mismatch, constraint violation, or connection issues.
Write(ctx context.Context, table string, schema *arrow.Schema, rows []map[string]any) error
// MissingColumns compares provided schema against existing table structure.
// Returns fields that exist in schema but not in table.
// Used for schema drift detection before writes.
// Returns TableNotFoundError if table doesn't exist.
MissingColumns(table string, schema *arrow.Schema) ([]*arrow.Field, error)
}
Driver abstracts data warehouse operations for table management and data ingestion. Implementations handle warehouse-specific DDL/DML operations while maintaining compatibility with Apache Arrow schemas.
func NewBatchingDriver ¶
func NewBatchingDriver( ctx context.Context, driver Driver, maxBatchSize int, interval time.Duration, ) Driver
NewBatchingDriver creates a batching driver that accumulates writes in memory and flushes them periodically or when the context is cancelled.
func NewConsoleDriver ¶
func NewConsoleDriver() Driver
NewConsoleDriver creates a new console driver that prints data to stdout.
func NewDebuggingDriver ¶ added in v0.10.1
NewDebuggingDriver creates a new logging driver that logs all writes JSON formatted to stdout.
func NewLoggingDriver ¶
NewLoggingDriver creates a new driver that logs all writes.
func NewNoopDriver ¶ added in v0.10.1
func NewNoopDriver() Driver
NewNoopDriver creates a new noop driver that does nothing.
type ErrColumnAlreadyExists ¶
ErrColumnAlreadyExists represents an error when a column already exists
func NewColumnAlreadyExistsError ¶
func NewColumnAlreadyExistsError(tableName, columnName string) *ErrColumnAlreadyExists
NewColumnAlreadyExistsError creates a new ErrColumnAlreadyExists
func (*ErrColumnAlreadyExists) Error ¶
func (e *ErrColumnAlreadyExists) Error() string
Error implements the error interface
type ErrInvalidTableName ¶
ErrInvalidTableName represents an error when a table name is invalid
func NewInvalidTableNameError ¶
func NewInvalidTableNameError(tableName, reason string) *ErrInvalidTableName
NewInvalidTableNameError creates a new ErrInvalidTableName
func (*ErrInvalidTableName) Error ¶
func (e *ErrInvalidTableName) Error() string
Error implements the error interface
type ErrMultipleTypeIncompatible ¶
type ErrMultipleTypeIncompatible struct {
TableName string
Errors []*ErrTypeIncompatible
}
ErrMultipleTypeIncompatible represents multiple type incompatibility errors
func NewMultipleTypeIncompatibleError ¶
func NewMultipleTypeIncompatibleError(tableName string, errors []*ErrTypeIncompatible) *ErrMultipleTypeIncompatible
NewMultipleTypeIncompatibleError creates a new ErrMultipleTypeIncompatible
func (*ErrMultipleTypeIncompatible) Error ¶
func (e *ErrMultipleTypeIncompatible) Error() string
Error implements the error interface
type ErrTableAlreadyExists ¶
type ErrTableAlreadyExists struct {
TableName string
}
ErrTableAlreadyExists represents an error when a table already exists
func NewTableAlreadyExistsError ¶
func NewTableAlreadyExistsError(tableName string) *ErrTableAlreadyExists
NewTableAlreadyExistsError creates a new ErrTableAlreadyExists error.
func (*ErrTableAlreadyExists) Error ¶
func (e *ErrTableAlreadyExists) Error() string
Error implements the error interface
type ErrTableNotFound ¶
type ErrTableNotFound struct {
TableName string
}
ErrTableNotFound represents an error when a table is not found
func NewTableNotFoundError ¶
func NewTableNotFoundError(tableName string) *ErrTableNotFound
NewTableNotFoundError creates a new ErrTableNotFound
func (*ErrTableNotFound) Error ¶
func (e *ErrTableNotFound) Error() string
Error implements the error interface
type ErrTypeIncompatible ¶
type ErrTypeIncompatible struct {
TableName string
ColumnName string
ExistingType arrow.DataType
ExpectedType arrow.DataType
DetailedError string
}
ErrTypeIncompatible represents an error when a column type is incompatible with the expected type
func NewTypeIncompatibleError ¶
func NewTypeIncompatibleError( tableName, columnName string, existingType, expectedType arrow.DataType, ) *ErrTypeIncompatible
NewTypeIncompatibleError creates a new ErrTypeIncompatible
func NewTypeIncompatibleErrorWithDetail ¶
func NewTypeIncompatibleErrorWithDetail( tableName, columnName string, existingType, expectedType arrow.DataType, detailedError string, ) *ErrTypeIncompatible
NewTypeIncompatibleErrorWithDetail creates a new ErrTypeIncompatible with detailed error message
func (*ErrTypeIncompatible) Error ¶
func (e *ErrTypeIncompatible) Error() string
Error implements the error interface
type ErrUnsupportedMapping ¶
ErrUnsupportedMapping represents an error when an Arrow type is not supported by a mapper
func NewUnsupportedMappingErr ¶
func NewUnsupportedMappingErr(theType any, mapper string) *ErrUnsupportedMapping
NewUnsupportedMappingErr creates a new ErrUnsupportedArrowType
func (*ErrUnsupportedMapping) Error ¶
func (e *ErrUnsupportedMapping) Error() string
Error implements the error interface
type ErrUnsupportedWarehouseType ¶
ErrUnsupportedWarehouseType represents an error when a warehouse type is not supported
func NewUnsupportedWarehouseTypeError ¶
func NewUnsupportedWarehouseTypeError(warehouseType, operation string) *ErrUnsupportedWarehouseType
NewUnsupportedWarehouseTypeError creates a new ErrUnsupportedWarehouseType
func (*ErrUnsupportedWarehouseType) Error ¶
func (e *ErrUnsupportedWarehouseType) Error() string
Error implements the error interface
type FieldCompatibilityChecker ¶
type FieldCompatibilityChecker interface {
AreFieldsCompatible(existing, input *arrow.Field) (bool, error)
}
FieldCompatibilityChecker defines the interface for checking field compatibility
type FieldTypeMapper ¶
type FieldTypeMapper[WHT SpecificWarehouseType] interface { ArrowToWarehouse(arrowType ArrowType) (WHT, error) WarehouseToArrow(warehouseType WHT) (ArrowType, error) }
FieldTypeMapper provides bidirectional conversion between Arrow types and warehouse-specific types. Generic interface parameterized by warehouse type (WHT) which must implement SpecificWarehouseType. Defined in types.go.
func NewDeferredMapper ¶
func NewDeferredMapper[T SpecificWarehouseType](getMapper func() FieldTypeMapper[T]) FieldTypeMapper[T]
NewDeferredMapper creates a deferred mapper for handling circular dependencies
func NewTypeMapper ¶
func NewTypeMapper[T SpecificWarehouseType]( types []FieldTypeMapper[T], ) FieldTypeMapper[T]
NewTypeMapper creates a new type mapper with the provided type mappers
type MissingColumnResp ¶
type MissingColumnResp struct {
// contains filtered or unexported fields
}
MissingColumnResp is a response to the MissingColumns method of the MockWarehouseDriver
type MockDriver ¶
type MockDriver struct {
Writes []MockWrittenRows
WriteError error
}
MockDriver is a mock driver that stores written rows in memory.
func NewMockDriver ¶
func NewMockDriver() *MockDriver
NewMockDriver creates a new mock driver that stores written rows in memory.
func (*MockDriver) AddColumn ¶
func (d *MockDriver) AddColumn(_ string, _ *arrow.Field) error
AddColumn implements Driver
func (*MockDriver) CreateTable ¶
func (d *MockDriver) CreateTable(_ string, _ *arrow.Schema) error
CreateTable implements Driver
func (*MockDriver) MissingColumns ¶
MissingColumns implements Driver
type MockWarehouseDriver ¶
type MockWarehouseDriver struct {
WriteCallCount int
WriteErrors []error
WriteCalls []WriteCall
CreateTableCalls []CreateTableCall
AddColumnCalls []AddColumnCall
MissingColumnResp []MissingColumnResp
// contains filtered or unexported fields
}
MockWarehouseDriver implements Driver interface for testing
func (*MockWarehouseDriver) AddColumn ¶
func (m *MockWarehouseDriver) AddColumn(table string, field *arrow.Field) error
AddColumn implements warehouse.Driver
func (*MockWarehouseDriver) CreateTable ¶
func (m *MockWarehouseDriver) CreateTable(table string, schema *arrow.Schema) error
CreateTable implements warehouse.Driver
func (*MockWarehouseDriver) GetWriteCallCount ¶
func (m *MockWarehouseDriver) GetWriteCallCount() int
GetWriteCallCount returns the number of write calls
func (*MockWarehouseDriver) GetWriteCalls ¶
func (m *MockWarehouseDriver) GetWriteCalls() []WriteCall
GetWriteCalls returns the write calls
func (*MockWarehouseDriver) MissingColumns ¶
MissingColumns implements warehouse.Driver
type MockWrittenRows ¶
MockWrittenRows is a collection of rows written to a single table in mock driver.
type QueryMapper ¶
type QueryMapper interface {
// TablePredicate returns warehouse-specific table creation prefix.
// Examples: "TABLE", "TABLE IF NOT EXISTS", "TEMPORARY TABLE"
TablePredicate(table string) string
// Field converts Arrow field to warehouse column type definition.
// Must handle type mapping, nullability, and metadata.
// Returns string like "VARCHAR(255)", "Int64", "TIMESTAMP", etc.
Field(*arrow.Field) (string, error)
// TableSuffix returns warehouse-specific table creation suffix.
// Examples: "ENGINE = MergeTree()", "OPTIONS (description='...')"
// May include newlines for multi-line clauses.
TableSuffix(table string) string
}
QueryMapper defines SQL DDL query construction from Arrow schemas. Used by Driver implementations that operate on sql.DB to generate warehouse-specific CREATE TABLE statements.
type Registry ¶
Registry is a registry of drivers for different properties.
func NewStaticBatchedDriverRegistry ¶ added in v0.13.0
NewStaticBatchedDriverRegistry creates a new static driver registry that always returns the same driver.
func NewStaticDriverRegistry ¶
NewStaticDriverRegistry creates a new static driver registry that always returns the same driver.
type SpecificWarehouseType ¶
SpecificWarehouseType defines the interface for warehouse data types
type TypeComparer ¶
type TypeComparer struct {
// contains filtered or unexported fields
}
TypeComparer holds comparison configuration and custom rules
func NewTypeComparer ¶
func NewTypeComparer(rules ...TypeCompatibilityRule) *TypeComparer
NewTypeComparer creates a new comparer with the specified compatibility rules
func (*TypeComparer) Compare ¶
func (tc *TypeComparer) Compare(expected, actual arrow.DataType, typePath string) TypeComparisonResult
Compare performs the comparison with custom rules applied
type TypeComparisonResult ¶
TypeComparisonResult represents the result of comparing two Arrow data types
func CompareArrowTypes ¶
func CompareArrowTypes(expected, actual arrow.DataType, typePath string) TypeComparisonResult
CompareArrowTypes performs deep comparison of two Arrow data types and returns detailed comparison results. This maintains backward compatibility by using the default TypeComparer.
type TypeCompatibilityRule ¶
TypeCompatibilityRule defines a function that can determine if two types should be considered compatible. Returns:
- compatible: whether the types should be considered compatible (only meaningful if handled=true)
- handled: whether this rule applies to and processed the given type pair. If false, other rules or default comparison logic will be used. If true, the compatibility decision is final.
type TypeMapperImpl ¶
type TypeMapperImpl[T SpecificWarehouseType] struct { Types []FieldTypeMapper[T] }
TypeMapperImpl provides type mapping functionality with multiple type mappers
func (*TypeMapperImpl[T]) ArrowToWarehouse ¶
func (m *TypeMapperImpl[T]) ArrowToWarehouse(arrowType ArrowType) (T, error)
ArrowToWarehouse implements FieldTypeMapper
func (*TypeMapperImpl[T]) WarehouseToArrow ¶
func (m *TypeMapperImpl[T]) WarehouseToArrow(warehouseType T) (ArrowType, error)
WarehouseToArrow implements FieldTypeMapper
type TypeNotFoundInRegistryErr ¶
TypeNotFoundInRegistryErr represents an error when a field type is not found in the registry
func (*TypeNotFoundInRegistryErr) Error ¶
func (e *TypeNotFoundInRegistryErr) Error() string
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package bigquery provides implementation of BigQuery data warehouse
|
Package bigquery provides implementation of BigQuery data warehouse |
|
Package clickhouse provides implementation of Clickhouse data warehouse
|
Package clickhouse provides implementation of Clickhouse data warehouse |
|
Package testutils provides test utilities for the warehouse package.
|
Package testutils provides test utilities for the warehouse package. |