Documentation
¶
Index ¶
- func BackupDucklakeMetadata() error
- func CompactDataFiles(ctx context.Context, db *DuckDb, updateFunc func(CompactionStatus), ...) error
- func DeletePartition(ctx context.Context, partition *config.Partition, from, to time.Time, ...) (rowCount int, err error)
- func DucklakeCleanup(ctx context.Context, db *DuckDb) error
- func EnsureDuckLakeTable(columns []*schema.ColumnSchema, db *DuckDb, tableName string) error
- func GetLegacyTableViewSchema(ctx context.Context, viewName string, db *DuckDb) (*schema.TableSchema, error)
- func GetLegacyTableViews(ctx context.Context, db *DuckDb) ([]string, error)
- func GetPartitionsForArg(partitionMap map[string]*config.Partition, arg string) ([]string, error)
- func GetTableSchema(ctx context.Context, tableName string, db *DuckDb) (map[string]string, error)
- func GetTables(ctx context.Context, db *DuckDb) ([]string, error)
- func PartitionMatchesPatterns(table, partition string, patterns []*PartitionPattern) bool
- func TableExists(ctx context.Context, tableName string, db *DuckDb) (bool, error)
- type ColumnSchemaChange
- type CompactionStatus
- func (s *CompactionStatus) DurationString() string
- func (s *CompactionStatus) FinalFilesString() any
- func (s *CompactionStatus) InitialFilesString() any
- func (s *CompactionStatus) ProgressPercentString() string
- func (s *CompactionStatus) RowsCompactedString() any
- func (s *CompactionStatus) String() string
- func (s *CompactionStatus) TotalRowsString() any
- func (s *CompactionStatus) UpdateProgress()
- func (s *CompactionStatus) VerboseString() string
- type ConversionError
- type Converter
- func (w *Converter) AddChunk(executionId string, chunk int32) error
- func (w *Converter) CheckTableSchema(db *sql.DB, tableName string, conversionSchema schema.ConversionSchema) (TableSchemaStatus, error)
- func (w *Converter) InferSchemaForJSONLFile(filePath string) (*schema.TableSchema, error)
- func (w *Converter) WaitForConversions(ctx context.Context) error
- type DuckDb
- func (d *DuckDb) Exec(query string, args ...any) (sql.Result, error)
- func (d *DuckDb) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
- func (d *DuckDb) GetTempDir() string
- func (d *DuckDb) Query(query string, args ...any) (*sql.Rows, error)
- func (d *DuckDb) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (d *DuckDb) QueryRow(query string, args ...any) *sql.Row
- func (d *DuckDb) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- type DuckDbOpt
- type FileMetadata
- type PartitionPattern
- type RowValidationError
- type SchemaChangeError
- type SqlCommand
- type TableSchemaStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BackupDucklakeMetadata ¶ added in v0.7.0
func BackupDucklakeMetadata() error
BackupDucklakeMetadata creates a timestamped backup of the DuckLake metadata database. It creates backup files with format: metadata.sqlite.backup.YYYYMMDDHHMMSS and also backs up the WAL file if it exists: - metadata.sqlite-wal.backup.YYYYMMDDHHMMSS It removes any existing backup files to maintain only the most recent backup.
The backup is created in the same directory as the original database file. If the database file doesn't exist, no backup is created and no error is returned.
Returns an error if the backup operation fails.
func CompactDataFiles ¶ added in v0.7.0
func CompactDataFiles(ctx context.Context, db *DuckDb, updateFunc func(CompactionStatus), reindex bool, patterns ...*PartitionPattern) error
func DeletePartition ¶ added in v0.7.0
func DeletePartition(ctx context.Context, partition *config.Partition, from, to time.Time, db *DuckDb) (rowCount int, err error)
DeletePartition deletes data for the specified partition and date range from the given Ducklake connected database.
func DucklakeCleanup ¶ added in v0.7.0
DucklakeCleanup performs removes old snapshots deletes expired and unused parquet files from the DuckDB database.
func EnsureDuckLakeTable ¶ added in v0.7.0
func EnsureDuckLakeTable(columns []*schema.ColumnSchema, db *DuckDb, tableName string) error
EnsureDuckLakeTable determines whether we have a ducklake table for this table, and if so, whether it needs schema updating
func GetLegacyTableViewSchema ¶ added in v0.7.0
func GetLegacyTableViewSchema(ctx context.Context, viewName string, db *DuckDb) (*schema.TableSchema, error)
GetLegacyTableViewSchema retrieves the schema of a table view in the legacy database(tailpipe.db) file
func GetLegacyTableViews ¶ added in v0.7.0
GetLegacyTableViews retrieves the names of all table views in the legacy database(tailpipe.db) file
func GetPartitionsForArg ¶ added in v0.7.0
GetPartitionsForArg returns the actual partition names that match the given argument. The partitionNames list is needed to determine whether a single-part argument refers to a table or partition.
func GetTableSchema ¶ added in v0.7.0
GetTableSchema returns the schema of the specified table as a map of column names to their types
func GetTables ¶ added in v0.7.0
GetTables returns the list of tables in the DuckLake metadata catalog
func PartitionMatchesPatterns ¶ added in v0.7.0
func PartitionMatchesPatterns(table, partition string, patterns []*PartitionPattern) bool
PartitionMatchesPatterns checks if the given table and partition match any of the provided patterns.
Types ¶
type ColumnSchemaChange ¶ added in v0.7.0
type CompactionStatus ¶ added in v0.7.0
type CompactionStatus struct {
Message string
InitialFiles int
FinalFiles int
RowsCompacted int64
RowsToCompact int64
TotalRows int64
ProgressPercent float64
MigrateSource int // number of source files migrated
MigrateDest int // number of destination files after migration
PartitionIndexExpressions map[string]string // the index expression used for migration for each partition
Duration time.Duration // duration of the compaction process
}
func NewCompactionStatus ¶ added in v0.7.0
func NewCompactionStatus() *CompactionStatus
func (*CompactionStatus) DurationString ¶ added in v0.7.0
func (s *CompactionStatus) DurationString() string
func (*CompactionStatus) FinalFilesString ¶ added in v0.7.0
func (s *CompactionStatus) FinalFilesString() any
func (*CompactionStatus) InitialFilesString ¶ added in v0.7.0
func (s *CompactionStatus) InitialFilesString() any
func (*CompactionStatus) ProgressPercentString ¶ added in v0.7.0
func (s *CompactionStatus) ProgressPercentString() string
func (*CompactionStatus) RowsCompactedString ¶ added in v0.7.0
func (s *CompactionStatus) RowsCompactedString() any
func (*CompactionStatus) String ¶ added in v0.7.0
func (s *CompactionStatus) String() string
func (*CompactionStatus) TotalRowsString ¶ added in v0.7.0
func (s *CompactionStatus) TotalRowsString() any
func (*CompactionStatus) UpdateProgress ¶ added in v0.7.0
func (s *CompactionStatus) UpdateProgress()
func (*CompactionStatus) VerboseString ¶ added in v0.7.0
func (s *CompactionStatus) VerboseString() string
type ConversionError ¶ added in v0.7.0
type ConversionError struct {
SourceFiles []string
BaseError error
RowsAffected int64
// contains filtered or unexported fields
}
func NewConversionError ¶ added in v0.7.0
func NewConversionError(err error, rowsAffected int64, paths ...string) *ConversionError
func (*ConversionError) Error ¶ added in v0.7.0
func (c *ConversionError) Error() string
func (*ConversionError) Merge ¶ added in v0.7.0
func (c *ConversionError) Merge(err error)
Merge adds a second error to the conversion error message.
func (*ConversionError) Unwrap ¶ added in v0.7.0
func (c *ConversionError) Unwrap() error
type Converter ¶ added in v0.7.0
type Converter struct {
// the partition being collected
Partition *config.Partition
// contains filtered or unexported fields
}
Converter struct executes all the conversions for a single collection it therefore has a unique execution executionId, and will potentially convert of multiple JSONL files each file is assumed to have the filename format <execution_id>_<chunkNumber>.jsonl so when new input files are available, we simply store the chunk number
func NewParquetConverter ¶ added in v0.7.0
func (*Converter) AddChunk ¶ added in v0.7.0
AddChunk adds a new chunk to the list of scheduledChunks to be processed if this is the first chunk, determine if we have a full conversionSchema yet and if not infer from the chunk signal the scheduler that `scheduledChunks are available
func (*Converter) CheckTableSchema ¶ added in v0.7.0
func (w *Converter) CheckTableSchema(db *sql.DB, tableName string, conversionSchema schema.ConversionSchema) (TableSchemaStatus, error)
CheckTableSchema checks if the specified table exists in the DuckDB database and compares its schema with the provided schema. it returns a TableSchemaStatus indicating whether the table exists, whether the schema matches, and any differences. THis is not used at present but will be used when we implement ducklake schema evolution handling
func (*Converter) InferSchemaForJSONLFile ¶ added in v0.7.0
func (w *Converter) InferSchemaForJSONLFile(filePath string) (*schema.TableSchema, error)
type DuckDb ¶ added in v0.2.0
DuckDb provides a wrapper around the sql.DB connection to DuckDB with enhanced error handling for invalid parquet files. It automatically retries operations when encountering invalid parquet files, which can occur when files are being written concurrently. The wrapper also handles installation and loading of required DuckDB extensions, and manages the connection lifecycle.
func (*DuckDb) ExecContext ¶ added in v0.2.0
func (*DuckDb) GetTempDir ¶ added in v0.4.0
GetTempDir returns the temporary directory configured for DuckDB operations
func (*DuckDb) QueryContext ¶ added in v0.2.0
type DuckDbOpt ¶ added in v0.2.0
type DuckDbOpt func(*DuckDb)
DuckDbOpt is a function type that modifies a DuckDb instance. It's used to configure DuckDb instances with different options like extensions, database file, and temp directory.
func WithDbFile ¶ added in v0.2.0
WithDbFile sets the database file path for the DuckDb instance. This can be used to specify a persistent database file or an in-memory database.
func WithDuckDbExtensions ¶ added in v0.2.0
WithDuckDbExtensions sets the list of DuckDB extensions to be installed and loaded. These extensions will be installed and loaded when the DuckDb instance is created.
func WithDuckLake ¶ added in v0.7.0
func WithDuckLake() DuckDbOpt
WithDuckLake enables the DuckLake extension for DuckDB.
func WithDuckLakeReadonly ¶ added in v0.7.0
WithDuckLakeReadonly enables the DuckLake extension in read-only mode. filters is an optional list of SQL filter expressions - if specified, a view will be created for each table in the database and the filters will be applied to the view. If no filters are specified, the ducklake attachment will be set as the default catalog so the tables can be accessed directly
func WithMaxMemoryMb ¶ added in v0.4.0
WithMaxMemoryMb sets the maximum memory limit for DuckDB. This can be used to control the memory usage of DuckDB operations.
func WithTempDir ¶ added in v0.2.0
WithTempDir sets the temporary directory for DuckDB operations. This directory is used for temporary files during database operations. If not specified, the collection temp directory will be used.
type FileMetadata ¶ added in v0.7.0
FileMetadata represents the result of a file metadata query
func GetPartitionFileMetadata ¶ added in v0.7.0
func GetPartitionFileMetadata(ctx context.Context, tableName, partitionName string, db *DuckDb) (*FileMetadata, error)
GetPartitionFileMetadata gets file metadata for a specific partition from DuckLake metadata tables
func GetTableFileMetadata ¶ added in v0.7.0
GetTableFileMetadata gets file metadata for a specific table from DuckLake metadata tables
type PartitionPattern ¶ added in v0.7.0
PartitionPattern represents a pattern used to match partitions. It consists of a table pattern and a partition pattern, both of which are used to match a given table and partition name.
func GetPartitionMatchPatternsForArg ¶ added in v0.7.0
func GetPartitionMatchPatternsForArg(partitions []string, arg string) (*PartitionPattern, error)
GetPartitionMatchPatternsForArg parses a single partition argument into a PartitionPattern. The partitions list is needed to determine whether single-part arguments refer to tables or partitions.
func GetPartitionPatternsForArgs ¶ added in v0.7.0
func GetPartitionPatternsForArgs(partitions []string, partitionArgs ...string) ([]*PartitionPattern, error)
GetPartitionPatternsForArgs returns the table and partition patterns for the given partition args. The partitions list is needed to determine whether single-part arguments refer to tables or partitions.
func NewPartitionPattern ¶ added in v0.7.0
func NewPartitionPattern(partition *config.Partition) PartitionPattern
type RowValidationError ¶ added in v0.7.0
type RowValidationError struct {
// contains filtered or unexported fields
}
func NewRowValidationError ¶ added in v0.7.0
func NewRowValidationError(failedRows int64, nullColumns []string) *RowValidationError
func (*RowValidationError) Error ¶ added in v0.7.0
func (e *RowValidationError) Error() string
func (*RowValidationError) Is ¶ added in v0.7.0
func (e *RowValidationError) Is(target error) bool
Is implements the errors.Is interface to support error comparison
type SchemaChangeError ¶ added in v0.7.0
type SchemaChangeError struct {
ChangedColumns []ColumnSchemaChange
}
func (*SchemaChangeError) Error ¶ added in v0.7.0
func (e *SchemaChangeError) Error() string
type SqlCommand ¶ added in v0.7.0
SqlCommand represents a SQL command with its description.
func GetCreateViewsSql ¶ added in v0.7.0
GetCreateViewsSql returns the SQL commands to create views for all tables in the DuckLake catalog,
applying the specified filters.
func GetDucklakeInitCommands ¶ added in v0.7.0
func GetDucklakeInitCommands(readonly bool) []SqlCommand
GetDucklakeInitCommands returns the set of SQL commands required to initialize and connect to DuckLake. this is used both for tailpipe to connect to ducklake and also for tailpipe connect to build the init script It returns an ordered slice of SQL commands.
type TableSchemaStatus ¶ added in v0.7.0
type TableSchemaStatus struct {
TableExists bool
SchemaMatches bool
CanMigrate bool
SchemaDiff string
}
TableSchemaStatus represents the status of a table schema comparison this is not used at present but will be used when we implement ducklake schema evolution handling It indicates whether the table exists, whether the schema matches, whether it can be migrated by ducklake
func NewTableSchemaStatusFromComparison ¶ added in v0.7.0
func NewTableSchemaStatusFromComparison(existingSchema map[string]schema.ColumnSchema, conversionSchema schema.ConversionSchema) TableSchemaStatus
NewTableSchemaStatusFromComparison compares an existing schema with a conversion schema and returns a TableSchemaStatus indicating whether they match, can be migrated, and the differences
Source Files
¶
- backup.go
- cleanup.go
- compact.go
- compaction_status.go
- compaction_types.go
- conversion_error.go
- convertor.go
- convertor_convert.go
- convertor_schema.go
- convertor_validate.go
- duck_db.go
- duck_db_error.go
- duck_db_options.go
- ducklake_table.go
- file_metadata.go
- partition_key.go
- partition_pattern.go
- read_json_query.go
- reorder_metadata.go
- row_validation_error.go
- schema_change_error.go
- schema_comparison.go
- sql_command.go
- tables.go
- views.go