database

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2025 License: AGPL-3.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BackupDucklakeMetadata added in v0.7.0

func BackupDucklakeMetadata() error

BackupDucklakeMetadata creates a timestamped backup of the DuckLake metadata database. It creates backup files with format: metadata.sqlite.backup.YYYYMMDDHHMMSS and also backs up the WAL file if it exists: - metadata.sqlite-wal.backup.YYYYMMDDHHMMSS It removes any existing backup files to maintain only the most recent backup.

The backup is created in the same directory as the original database file. If the database file doesn't exist, no backup is created and no error is returned.

Returns an error if the backup operation fails.

func CompactDataFiles added in v0.7.0

func CompactDataFiles(ctx context.Context, db *DuckDb, updateFunc func(CompactionStatus), reindex bool, patterns ...*PartitionPattern) error

func DeletePartition added in v0.7.0

func DeletePartition(ctx context.Context, partition *config.Partition, from, to time.Time, db *DuckDb) (rowCount int, err error)

DeletePartition deletes data for the specified partition and date range from the given Ducklake connected database.

func DucklakeCleanup added in v0.7.0

func DucklakeCleanup(ctx context.Context, db *DuckDb) error

DucklakeCleanup performs removes old snapshots deletes expired and unused parquet files from the DuckDB database.

func EnsureDuckLakeTable added in v0.7.0

func EnsureDuckLakeTable(columns []*schema.ColumnSchema, db *DuckDb, tableName string) error

EnsureDuckLakeTable determines whether we have a ducklake table for this table, and if so, whether it needs schema updating

func GetLegacyTableViewSchema added in v0.7.0

func GetLegacyTableViewSchema(ctx context.Context, viewName string, db *DuckDb) (*schema.TableSchema, error)

GetLegacyTableViewSchema retrieves the schema of a table view in the legacy database(tailpipe.db) file

func GetLegacyTableViews added in v0.7.0

func GetLegacyTableViews(ctx context.Context, db *DuckDb) ([]string, error)

GetLegacyTableViews retrieves the names of all table views in the legacy database(tailpipe.db) file

func GetPartitionsForArg added in v0.7.0

func GetPartitionsForArg(partitionMap map[string]*config.Partition, arg string) ([]string, error)

GetPartitionsForArg returns the actual partition names that match the given argument. The partitionNames list is needed to determine whether a single-part argument refers to a table or partition.

func GetTableSchema added in v0.7.0

func GetTableSchema(ctx context.Context, tableName string, db *DuckDb) (map[string]string, error)

GetTableSchema returns the schema of the specified table as a map of column names to their types

func GetTables added in v0.7.0

func GetTables(ctx context.Context, db *DuckDb) ([]string, error)

GetTables returns the list of tables in the DuckLake metadata catalog

func PartitionMatchesPatterns added in v0.7.0

func PartitionMatchesPatterns(table, partition string, patterns []*PartitionPattern) bool

PartitionMatchesPatterns checks if the given table and partition match any of the provided patterns.

func TableExists added in v0.7.0

func TableExists(ctx context.Context, tableName string, db *DuckDb) (bool, error)

TableExists checks if a table exists in the DuckLake metadata tables

Types

type ColumnSchemaChange added in v0.7.0

type ColumnSchemaChange struct {
	Name    string
	OldType string
	NewType string
}

type CompactionStatus added in v0.7.0

type CompactionStatus struct {
	Message         string
	InitialFiles    int
	FinalFiles      int
	RowsCompacted   int64
	RowsToCompact   int64
	TotalRows       int64
	ProgressPercent float64

	MigrateSource             int               // number of source files migrated
	MigrateDest               int               // number of destination files after migration
	PartitionIndexExpressions map[string]string // the index expression used for migration for each partition
	Duration                  time.Duration     // duration of the compaction process
}

func NewCompactionStatus added in v0.7.0

func NewCompactionStatus() *CompactionStatus

func (*CompactionStatus) DurationString added in v0.7.0

func (s *CompactionStatus) DurationString() string

func (*CompactionStatus) FinalFilesString added in v0.7.0

func (s *CompactionStatus) FinalFilesString() any

func (*CompactionStatus) InitialFilesString added in v0.7.0

func (s *CompactionStatus) InitialFilesString() any

func (*CompactionStatus) ProgressPercentString added in v0.7.0

func (s *CompactionStatus) ProgressPercentString() string

func (*CompactionStatus) RowsCompactedString added in v0.7.0

func (s *CompactionStatus) RowsCompactedString() any

func (*CompactionStatus) String added in v0.7.0

func (s *CompactionStatus) String() string

func (*CompactionStatus) TotalRowsString added in v0.7.0

func (s *CompactionStatus) TotalRowsString() any

func (*CompactionStatus) UpdateProgress added in v0.7.0

func (s *CompactionStatus) UpdateProgress()

func (*CompactionStatus) VerboseString added in v0.7.0

func (s *CompactionStatus) VerboseString() string

type ConversionError added in v0.7.0

type ConversionError struct {
	SourceFiles  []string
	BaseError    error
	RowsAffected int64
	// contains filtered or unexported fields
}

func NewConversionError added in v0.7.0

func NewConversionError(err error, rowsAffected int64, paths ...string) *ConversionError

func (*ConversionError) Error added in v0.7.0

func (c *ConversionError) Error() string

func (*ConversionError) Merge added in v0.7.0

func (c *ConversionError) Merge(err error)

Merge adds a second error to the conversion error message.

func (*ConversionError) Unwrap added in v0.7.0

func (c *ConversionError) Unwrap() error

type Converter added in v0.7.0

type Converter struct {

	// the partition being collected
	Partition *config.Partition
	// contains filtered or unexported fields
}

Converter struct executes all the conversions for a single collection it therefore has a unique execution executionId, and will potentially convert of multiple JSONL files each file is assumed to have the filename format <execution_id>_<chunkNumber>.jsonl so when new input files are available, we simply store the chunk number

func NewParquetConverter added in v0.7.0

func NewParquetConverter(ctx context.Context, cancel context.CancelFunc, executionId string, partition *config.Partition, sourceDir string, tableSchema *schema.TableSchema, statusFunc func(int64, int64, ...error), db *DuckDb) (*Converter, error)

func (*Converter) AddChunk added in v0.7.0

func (w *Converter) AddChunk(executionId string, chunk int32) error

AddChunk adds a new chunk to the list of scheduledChunks to be processed if this is the first chunk, determine if we have a full conversionSchema yet and if not infer from the chunk signal the scheduler that `scheduledChunks are available

func (*Converter) CheckTableSchema added in v0.7.0

func (w *Converter) CheckTableSchema(db *sql.DB, tableName string, conversionSchema schema.ConversionSchema) (TableSchemaStatus, error)

CheckTableSchema checks if the specified table exists in the DuckDB database and compares its schema with the provided schema. it returns a TableSchemaStatus indicating whether the table exists, whether the schema matches, and any differences. THis is not used at present but will be used when we implement ducklake schema evolution handling

func (*Converter) InferSchemaForJSONLFile added in v0.7.0

func (w *Converter) InferSchemaForJSONLFile(filePath string) (*schema.TableSchema, error)

func (*Converter) WaitForConversions added in v0.7.0

func (w *Converter) WaitForConversions(ctx context.Context) error

WaitForConversions waits for all jobs to be processed or for the context to be cancelled

type DuckDb added in v0.2.0

type DuckDb struct {
	// duckDb connection
	*sql.DB
	// contains filtered or unexported fields
}

DuckDb provides a wrapper around the sql.DB connection to DuckDB with enhanced error handling for invalid parquet files. It automatically retries operations when encountering invalid parquet files, which can occur when files are being written concurrently. The wrapper also handles installation and loading of required DuckDB extensions, and manages the connection lifecycle.

func NewDuckDb added in v0.2.0

func NewDuckDb(opts ...DuckDbOpt) (_ *DuckDb, err error)

func (*DuckDb) Exec added in v0.2.0

func (d *DuckDb) Exec(query string, args ...any) (sql.Result, error)

func (*DuckDb) ExecContext added in v0.2.0

func (d *DuckDb) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)

func (*DuckDb) GetTempDir added in v0.4.0

func (d *DuckDb) GetTempDir() string

GetTempDir returns the temporary directory configured for DuckDB operations

func (*DuckDb) Query added in v0.2.0

func (d *DuckDb) Query(query string, args ...any) (*sql.Rows, error)

func (*DuckDb) QueryContext added in v0.2.0

func (d *DuckDb) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

func (*DuckDb) QueryRow added in v0.2.0

func (d *DuckDb) QueryRow(query string, args ...any) *sql.Row

func (*DuckDb) QueryRowContext added in v0.2.0

func (d *DuckDb) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

type DuckDbOpt added in v0.2.0

type DuckDbOpt func(*DuckDb)

DuckDbOpt is a function type that modifies a DuckDb instance. It's used to configure DuckDb instances with different options like extensions, database file, and temp directory.

func WithDbFile added in v0.2.0

func WithDbFile(filename string) DuckDbOpt

WithDbFile sets the database file path for the DuckDb instance. This can be used to specify a persistent database file or an in-memory database.

func WithDuckDbExtensions added in v0.2.0

func WithDuckDbExtensions(extensions []string) DuckDbOpt

WithDuckDbExtensions sets the list of DuckDB extensions to be installed and loaded. These extensions will be installed and loaded when the DuckDb instance is created.

func WithDuckLake added in v0.7.0

func WithDuckLake() DuckDbOpt

WithDuckLake enables the DuckLake extension for DuckDB.

func WithDuckLakeReadonly added in v0.7.0

func WithDuckLakeReadonly(filters ...string) DuckDbOpt

WithDuckLakeReadonly enables the DuckLake extension in read-only mode. filters is an optional list of SQL filter expressions - if specified, a view will be created for each table in the database and the filters will be applied to the view. If no filters are specified, the ducklake attachment will be set as the default catalog so the tables can be accessed directly

func WithMaxMemoryMb added in v0.4.0

func WithMaxMemoryMb(maxMemoryMb int) DuckDbOpt

WithMaxMemoryMb sets the maximum memory limit for DuckDB. This can be used to control the memory usage of DuckDB operations.

func WithTempDir added in v0.2.0

func WithTempDir(dir string) DuckDbOpt

WithTempDir sets the temporary directory for DuckDB operations. This directory is used for temporary files during database operations. If not specified, the collection temp directory will be used.

type FileMetadata added in v0.7.0

type FileMetadata struct {
	FileSize  int64
	FileCount int64
	RowCount  int64
}

FileMetadata represents the result of a file metadata query

func GetPartitionFileMetadata added in v0.7.0

func GetPartitionFileMetadata(ctx context.Context, tableName, partitionName string, db *DuckDb) (*FileMetadata, error)

GetPartitionFileMetadata gets file metadata for a specific partition from DuckLake metadata tables

func GetTableFileMetadata added in v0.7.0

func GetTableFileMetadata(ctx context.Context, tableName string, db *DuckDb) (*FileMetadata, error)

GetTableFileMetadata gets file metadata for a specific table from DuckLake metadata tables

type PartitionPattern added in v0.7.0

type PartitionPattern struct {
	Table     string
	Partition string
}

PartitionPattern represents a pattern used to match partitions. It consists of a table pattern and a partition pattern, both of which are used to match a given table and partition name.

func GetPartitionMatchPatternsForArg added in v0.7.0

func GetPartitionMatchPatternsForArg(partitions []string, arg string) (*PartitionPattern, error)

GetPartitionMatchPatternsForArg parses a single partition argument into a PartitionPattern. The partitions list is needed to determine whether single-part arguments refer to tables or partitions.

func GetPartitionPatternsForArgs added in v0.7.0

func GetPartitionPatternsForArgs(partitions []string, partitionArgs ...string) ([]*PartitionPattern, error)

GetPartitionPatternsForArgs returns the table and partition patterns for the given partition args. The partitions list is needed to determine whether single-part arguments refer to tables or partitions.

func NewPartitionPattern added in v0.7.0

func NewPartitionPattern(partition *config.Partition) PartitionPattern

type RowValidationError added in v0.7.0

type RowValidationError struct {
	// contains filtered or unexported fields
}

func NewRowValidationError added in v0.7.0

func NewRowValidationError(failedRows int64, nullColumns []string) *RowValidationError

func (*RowValidationError) Error added in v0.7.0

func (e *RowValidationError) Error() string

func (*RowValidationError) Is added in v0.7.0

func (e *RowValidationError) Is(target error) bool

Is implements the errors.Is interface to support error comparison

type SchemaChangeError added in v0.7.0

type SchemaChangeError struct {
	ChangedColumns []ColumnSchemaChange
}

func (*SchemaChangeError) Error added in v0.7.0

func (e *SchemaChangeError) Error() string

type SqlCommand added in v0.7.0

type SqlCommand struct {
	Description string
	Command     string
}

SqlCommand represents a SQL command with its description.

func GetCreateViewsSql added in v0.7.0

func GetCreateViewsSql(ctx context.Context, db *DuckDb, filters ...string) ([]SqlCommand, error)

GetCreateViewsSql returns the SQL commands to create views for all tables in the DuckLake catalog,

applying the specified filters.

func GetDucklakeInitCommands added in v0.7.0

func GetDucklakeInitCommands(readonly bool) []SqlCommand

GetDucklakeInitCommands returns the set of SQL commands required to initialize and connect to DuckLake. this is used both for tailpipe to connect to ducklake and also for tailpipe connect to build the init script It returns an ordered slice of SQL commands.

type TableSchemaStatus added in v0.7.0

type TableSchemaStatus struct {
	TableExists   bool
	SchemaMatches bool
	CanMigrate    bool
	SchemaDiff    string
}

TableSchemaStatus represents the status of a table schema comparison this is not used at present but will be used when we implement ducklake schema evolution handling It indicates whether the table exists, whether the schema matches, whether it can be migrated by ducklake

func NewTableSchemaStatusFromComparison added in v0.7.0

func NewTableSchemaStatusFromComparison(existingSchema map[string]schema.ColumnSchema, conversionSchema schema.ConversionSchema) TableSchemaStatus

NewTableSchemaStatusFromComparison compares an existing schema with a conversion schema and returns a TableSchemaStatus indicating whether they match, can be migrated, and the differences

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL