Documentation
¶
Overview ¶
Package migrator provides functionality for loading, parsing, and managing ClickHouse database migration files and directories.
This package handles the core migration lifecycle including:
- Loading migration files from filesystem or embedded sources
- Parsing SQL migration content using the ClickHouse DDL parser
- Managing migration directories with integrity verification
- Tracking migration state and execution history
The migrator package integrates with the parser package to provide structured access to ClickHouse DDL statements within migration files, enabling tools to analyze, validate, and execute schema changes safely.
Core Components ¶
The package provides three main types:
- Migration: Represents a single migration with parsed DDL statements
- MigrationDir: Represents a collection of migrations from a directory
- SumFile: Provides cryptographic integrity verification using chained hashing
- Revision: Records migration execution history and audit information
Basic Usage ¶
Loading a single migration:
migration, err := migrator.LoadMigration("001_create_users", file)
if err != nil {
log.Fatal(err)
}
for _, stmt := range migration.Statements {
if stmt.CreateTable != nil {
fmt.Printf("CREATE TABLE: %s\n", stmt.CreateTable.Name)
}
}
Loading a migration directory:
migDir, err := migrator.LoadMigrationDir(os.DirFS("./migrations"))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Found %d migrations\n", len(migDir.Migrations))
Rehashing a migration directory to update integrity verification:
// After potential modifications to migration files...
err = migDir.Rehash()
if err != nil {
log.Fatal(err)
}
// Write updated sum file
file, err := os.Create("migrations.sum")
if err != nil {
log.Fatal(err)
}
defer file.Close()
_, err = migDir.SumFile.WriteTo(file)
if err != nil {
log.Fatal(err)
}
Creating a sum file for integrity verification:
sumFile := migrator.NewSumFile()
err := sumFile.Add("001_init.sql", strings.NewReader(sqlContent))
if err != nil {
log.Fatal(err)
}
file, err := os.Create("migrations.sum")
if err != nil {
log.Fatal(err)
}
defer file.Close()
_, err = sumFile.WriteTo(file)
if err != nil {
log.Fatal(err)
}
Integrity Verification ¶
The SumFile type implements a reverse one-branch Merkle tree using chained SHA256 hashing. This provides tamper evidence for migration files:
- Each migration hash includes the content of the previous migration
- Any modification to any file invalidates all subsequent hashes
- File ordering changes are detectable through hash chain breaks
This ensures migration file integrity and prevents unauthorized modifications in production environments.
Revision Tracking ¶
The Revision type captures detailed migration execution information:
revision := &migrator.Revision{
Version: "20240101120000_create_users",
ExecutedAt: time.Now(),
ExecutionTime: 2 * time.Second,
Kind: migrator.StandardRevision,
Applied: 5,
Total: 5,
Hash: "migration_content_hash",
PartialHashes: []string{"stmt1_hash", "stmt2_hash", ...},
HousekeeperVersion: "1.0.0",
}
This provides complete audit trail information for compliance, debugging, and rollback scenarios.
Package migrator provides functionality for loading, parsing, and managing ClickHouse database migration files and directories.
This package handles the core migration lifecycle including:
- Loading migration files from filesystem or embedded sources
- Parsing SQL migration content using the ClickHouse DDL parser
- Managing migration directories with integrity verification
- Tracking migration state and execution history
The migrator package integrates with the parser package to provide structured access to ClickHouse DDL statements within migration files, enabling tools to analyze, validate, and execute schema changes safely.
Index ¶
- func IsSnapshot(r io.Reader) (bool, error)
- type ClickHouse
- type Migration
- type MigrationDir
- func (m *MigrationDir) CreateSnapshot(version, description string) (*Snapshot, error)
- func (m *MigrationDir) GetMigrationsAfterSnapshot() []*Migration
- func (m *MigrationDir) GetSnapshot() *Snapshot
- func (m *MigrationDir) HasSnapshot() bool
- func (m *MigrationDir) Rehash() error
- func (m *MigrationDir) Validate() (bool, error)
- type Revision
- type RevisionKind
- type RevisionSet
- func (rs *RevisionSet) Count() int
- func (rs *RevisionSet) GetCompleted(migrationDir *MigrationDir) []*Migration
- func (rs *RevisionSet) GetExecutedVersions() []string
- func (rs *RevisionSet) GetFailed(migrationDir *MigrationDir) []*Migration
- func (rs *RevisionSet) GetLastSnapshot() *Revision
- func (rs *RevisionSet) GetMigrationsAfterSnapshot() []string
- func (rs *RevisionSet) GetPartiallyApplied(migrationDir *MigrationDir) []*Migration
- func (rs *RevisionSet) GetPending(migrationDir *MigrationDir) []*Migration
- func (rs *RevisionSet) GetRevision(migration *Migration) *Revision
- func (rs *RevisionSet) HasRevision(version string) bool
- func (rs *RevisionSet) HasSnapshot() bool
- func (rs *RevisionSet) IsCompleted(migration *Migration) bool
- func (rs *RevisionSet) IsFailed(migration *Migration) bool
- func (rs *RevisionSet) IsPartiallyApplied(migration *Migration) bool
- func (rs *RevisionSet) IsPending(migration *Migration) bool
- type Snapshot
- type SumFile
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsSnapshot ¶
IsSnapshot checks if a migration file is a snapshot by examining its content for the snapshot marker comment.
This function reads the beginning of the file to check for the special -- housekeeper:snapshot marker that identifies snapshot files.
Example usage:
file, err := os.Open("20240810120000_snapshot.sql")
if err != nil {
log.Fatal(err)
}
defer file.Close()
isSnapshot, err := migrator.IsSnapshot(file)
if err != nil {
log.Fatal(err)
}
if isSnapshot {
fmt.Println("This is a snapshot file")
}
Types ¶
type ClickHouse ¶
type Migration ¶
type Migration struct {
// Version is the migration identifier, typically derived from the filename
// or a timestamp. Used for ordering and tracking migration application.
Version string
// Statements contains the parsed ClickHouse DDL statements from the
// migration file. Each statement represents a single DDL operation
// such as CREATE TABLE, ALTER DATABASE, etc.
Statements []*parser.Statement
// IsSnapshot indicates whether this migration is a snapshot that consolidates
// previous migrations. Snapshot migrations are handled differently during
// execution - they are not executed as DDL but serve as consolidation points.
IsSnapshot bool
}
Migration represents a single ClickHouse database migration containing a version identifier and the parsed DDL statements to be executed.
Migrations are typically loaded from .sql files in a migration directory and contain CREATE, ALTER, DROP, and other ClickHouse DDL operations that modify database schema or structure.
Example migration content:
CREATE TABLE users (id UInt64, name String) ENGINE = MergeTree() ORDER BY id; ALTER TABLE users ADD COLUMN email String DEFAULT '';
func LoadMigration ¶
LoadMigration creates a Migration from the provided io.Reader containing ClickHouse DDL statements.
This function parses the SQL content using the ClickHouse DDL parser and creates a Migration structure with the specified version and parsed statements. The version is typically a timestamp or sequential identifier used for migration ordering.
The reader content should contain valid ClickHouse DDL statements such as:
- CREATE DATABASE, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY
- ALTER TABLE, ALTER DATABASE operations
- DROP statements for cleanup
- Any other supported ClickHouse DDL operations
Example usage:
// Load from string
sql := `
CREATE TABLE users (
id UInt64,
name String,
email String DEFAULT ''
) ENGINE = MergeTree() ORDER BY id;
ALTER TABLE users ADD COLUMN created_at DateTime DEFAULT now();
`
migration, err := migrator.LoadMigration("001_create_users", strings.NewReader(sql))
if err != nil {
log.Fatal(err)
}
// Load from file
file, err := os.Open("001_create_users.sql")
if err != nil {
log.Fatal(err)
}
defer file.Close()
migration, err = migrator.LoadMigration("001_create_users", file)
if err != nil {
log.Fatal(err)
}
// Access parsed statements
fmt.Printf("Migration %s contains %d statements\n", migration.Version, len(migration.Statements))
for i, stmt := range migration.Statements {
if stmt.CreateTable != nil {
table := stmt.CreateTable
name := table.Name
if table.Database != nil {
name = *table.Database + "." + name
}
fmt.Printf(" Statement %d: CREATE TABLE %s\n", i+1, name)
}
if stmt.AlterTable != nil {
alter := stmt.AlterTable
name := alter.Name
if alter.Database != nil {
name = *alter.Database + "." + name
}
fmt.Printf(" Statement %d: ALTER TABLE %s (%d operations)\n",
i+1, name, len(alter.Operations))
}
}
Returns an error if the reader content contains invalid ClickHouse DDL syntax or if the reader cannot be read.
type MigrationDir ¶
type MigrationDir struct {
// Migrations contains all migration files found in the directory,
// sorted in lexical order by filename to ensure deterministic
// execution order.
Migrations []*Migration
// SumFile contains integrity verification data for the migration
// directory, allowing detection of modified or corrupted migration
// files. This field is always present and provides cryptographic
// verification of migration file contents.
SumFile *SumFile
// contains filtered or unexported fields
}
MigrationDir represents a collection of migrations loaded from a directory along with integrity verification data.
This structure provides a complete view of a migration directory including all migration files and their associated sum file for integrity checking. The migrations are automatically sorted in lexical order to ensure consistent application ordering.
func LoadMigrationDir ¶
func LoadMigrationDir(dir fs.FS) (*MigrationDir, error)
LoadMigrationDir loads all migration files from the specified filesystem and returns a MigrationDir containing parsed migrations and integrity verification data.
This function walks the provided filesystem in lexical order, loading all .sql files as migrations and any .sum files for integrity verification. The filesystem can be a regular directory, embedded filesystem, or any implementation of fs.FS.
Snapshot files (marked with -- housekeeper:snapshot) are automatically detected and stored separately from regular migrations.
Supported file extensions:
- .sql: Migration files containing ClickHouse DDL statements or snapshot data
- .sum: Sum files containing integrity hashes (currently loaded but not processed)
Example usage:
// Load from regular filesystem directory
migDir, err := migrator.LoadMigrationDir(os.DirFS("./migrations"))
if err != nil {
log.Fatal(err)
}
// Load from embedded filesystem
//go:embed migrations/*.sql
var migrationsFS embed.FS
migDir, err := migrator.LoadMigrationDir(migrationsFS)
if err != nil {
log.Fatal(err)
}
// Process loaded migrations
for _, mig := range migDir.Migrations {
fmt.Printf("Migration %s has %d statements\n", mig.Version, len(mig.Statements))
for _, stmt := range mig.Statements {
if stmt.CreateTable != nil {
fmt.Printf(" CREATE TABLE: %s\n", stmt.CreateTable.Name)
}
}
}
// Check for snapshot
if migDir.HasSnapshot() {
snapshot, _ := migDir.GetSnapshot()
fmt.Printf("Found snapshot: %s\n", snapshot.Version)
}
Returns an error if the directory cannot be read or any migration file contains invalid ClickHouse DDL syntax.
func (*MigrationDir) CreateSnapshot ¶
func (m *MigrationDir) CreateSnapshot(version, description string) (*Snapshot, error)
CreateSnapshot generates a new snapshot from all current migrations.
This method creates a snapshot that consolidates all migrations currently in the directory. The snapshot can then be written to a file and the old migration files can be safely removed.
Example usage:
migDir, err := migrator.LoadMigrationDir(os.DirFS("./migrations"))
if err != nil {
log.Fatal(err)
}
snapshot, err := migDir.CreateSnapshot(
"20240810120000_snapshot",
"Q3 2024 Release",
)
if err != nil {
log.Fatal(err)
}
// Write snapshot to file
file, err := os.Create("migrations/20240810120000_snapshot.sql")
if err != nil {
log.Fatal(err)
}
defer file.Close()
_, err = snapshot.WriteTo(file)
if err != nil {
log.Fatal(err)
}
func (*MigrationDir) GetMigrationsAfterSnapshot ¶
func (m *MigrationDir) GetMigrationsAfterSnapshot() []*Migration
GetMigrationsAfterSnapshot returns all migrations that are not included in the snapshot.
If no snapshot exists, returns all migrations.
Example usage:
migDir, err := migrator.LoadMigrationDir(os.DirFS("./migrations"))
if err != nil {
log.Fatal(err)
}
newMigrations := migDir.GetMigrationsAfterSnapshot()
fmt.Printf("Found %d migrations after snapshot\n", len(newMigrations))
func (*MigrationDir) GetSnapshot ¶
func (m *MigrationDir) GetSnapshot() *Snapshot
GetSnapshot returns the loaded snapshot, if one exists.
Returns nil if no snapshot was found in the migration directory.
Example usage:
migDir, err := migrator.LoadMigrationDir(os.DirFS("./migrations"))
if err != nil {
log.Fatal(err)
}
if snapshot := migDir.GetSnapshot(); snapshot != nil {
fmt.Printf("Found snapshot: %s (%s)\n",
snapshot.Version, snapshot.Description)
fmt.Printf("Includes %d migrations\n", len(snapshot.IncludedMigrations))
}
func (*MigrationDir) HasSnapshot ¶
func (m *MigrationDir) HasSnapshot() bool
HasSnapshot returns true if a snapshot was loaded from the migration directory.
Example usage:
migDir, err := migrator.LoadMigrationDir(os.DirFS("./migrations"))
if err != nil {
log.Fatal(err)
}
if migDir.HasSnapshot() {
fmt.Println("Directory contains a snapshot")
}
func (*MigrationDir) Rehash ¶
func (m *MigrationDir) Rehash() error
Rehash reloads all migration files from the filesystem and recalculates the SumFile.
This method is useful for:
- Verifying migration file integrity after potential modifications
- Regenerating the sum file after adding or modifying migrations
- Detecting unauthorized changes to migration files
The method performs the following operations:
- Clears existing migrations and sum file
- Reloads all .sql files from the filesystem in lexical order
- Recalculates the chained SHA256 hashes for each migration
- Updates the SumFile with new integrity verification data
Example usage:
migDir, err := migrator.LoadMigrationDir(os.DirFS("./migrations"))
if err != nil {
log.Fatal(err)
}
// After some time or potential changes...
err = migDir.Rehash()
if err != nil {
log.Fatal(err)
}
// Write updated sum file
file, err := os.Create("migrations.sum")
if err != nil {
log.Fatal(err)
}
defer file.Close()
_, err = migDir.SumFile.WriteTo(file)
if err != nil {
log.Fatal(err)
}
Returns an error if the filesystem cannot be read, any migration file contains invalid SQL, or if the filesystem reference is nil.
func (*MigrationDir) Validate ¶
func (m *MigrationDir) Validate() (bool, error)
Validate verifies the integrity of the MigrationDir by ensuring that all migrations are present in the sum file and that the sum file validates correctly against the current migration content.
This method provides comprehensive validation of the migration directory by:
- Checking that all loaded migrations have corresponding entries in the sum file
- Validating the sum file's chained hashes against the actual migration content
- Detecting any missing or modified migration files
The validation process:
- Verifies that every migration in the directory has an entry in the sum file
- Reads each migration file content from the filesystem
- Uses the sum file's Validate method to verify cryptographic integrity
- Returns false if any migration is missing from sum file or content doesn't match
Example usage:
// Load migration directory
migDir, err := migrator.LoadMigrationDir(os.DirFS("./migrations"))
if err != nil {
log.Fatal(err)
}
// Validate integrity
isValid, err := migDir.Validate()
if err != nil {
log.Fatal(err)
}
if isValid {
fmt.Println("Migration directory is valid and unmodified")
} else {
fmt.Println("Migration directory has integrity issues!")
}
// Handle validation in migration pipeline
if !isValid {
log.Fatal("Migration integrity check failed - cannot proceed")
}
fmt.Println("All migrations validated successfully")
Returns false if:
- Any migration is missing from the sum file
- Any migration content doesn't match its stored hash
- The sum file's chained hash validation fails
Returns an error if:
- The filesystem cannot be accessed
- Any migration file cannot be read
- The filesystem reference is nil
Note: This method requires a filesystem reference (fs field) to read migration content. If the MigrationDir was loaded without a filesystem or the reference is nil, an error will be returned.
type Revision ¶
type Revision struct {
// Version is the unique identifier for the migration, typically
// a timestamp-based string like "20240101120000_create_users".
// Used for ordering and referencing specific migrations.
Version string
// ExecutedAt records the timestamp when the migration execution
// began. Used for audit trails and determining migration order
// in cases where version ordering is ambiguous.
ExecutedAt time.Time
// ExecutionTime records the total duration required to execute
// all statements in the migration. Useful for performance
// monitoring and identifying slow migrations.
ExecutionTime time.Duration
// Kind categorizes the type of revision (migration, snapshot, etc.).
// Determines how the revision should be processed during rollbacks
// and migration analysis.
Kind RevisionKind
// Error contains the error message if the migration failed during
// execution. Nil indicates successful execution. Used for debugging
// failed migrations and determining rollback strategies.
Error *string
// Applied records the number of statements that were successfully
// executed before completion or failure. Combined with Total,
// this allows partial migration recovery and precise error reporting.
Applied int
// Total records the total number of statements in the migration.
// Used to calculate completion percentage and validate that
// all expected statements were processed.
Total int
// Hash contains the cryptographic hash of the complete migration
// content, used for integrity verification and detecting
// unauthorized migration modifications.
Hash string
// PartialHashes contains individual hashes for each statement
// in the migration, enabling fine-grained integrity checking
// and identification of specific statement modifications.
PartialHashes []string
// HousekeeperVersion records the version of the Housekeeper tool
// that executed the migration. Used for compatibility tracking
// and debugging version-specific migration behaviors.
HousekeeperVersion string
}
Revision represents a record of migration execution history, capturing detailed information about when and how a migration was applied to a ClickHouse database.
Revisions provide complete audit trail information including execution timing, success/failure status, and integrity verification data. This information is essential for migration rollback, debugging, and compliance tracking in production environments.
Example usage:
revision := &migrator.Revision{
Version: "20240101120000_create_users",
ExecutedAt: time.Now(),
ExecutionTime: 2 * time.Second,
Kind: migrator.StandardRevision,
Applied: 5,
Total: 5,
Hash: "abc123...",
PartialHashes: []string{"hash1", "hash2", "hash3", "hash4", "hash5"},
HousekeeperVersion: "1.0.0",
}
type RevisionKind ¶
type RevisionKind string
RevisionKind represents the category of a migration revision, determining how it should be processed and what role it plays in the overall migration lifecycle.
Different revision kinds may have different rollback behaviors, validation requirements, and execution priorities.
const ( // StandardRevision represents a normal migration execution containing // DDL statements that modify database schema. This is the most common // revision type for typical migration operations. StandardRevision RevisionKind = "migration" // SnapshotRevision represents a snapshot marker in the migration // history, typically used for marking safe rollback points or // significant migration milestones. Snapshots may not contain // actual DDL statements but serve as metadata markers. SnapshotRevision RevisionKind = "snapshot" )
RevisionKind constants define the types of migration revisions that can be recorded. These constants categorize different kinds of migration executions for tracking and rollback purposes.
type RevisionSet ¶
type RevisionSet struct {
// contains filtered or unexported fields
}
RevisionSet represents a collection of migration revisions with convenient query methods for determining migration execution status.
This abstraction provides a clean interface for checking whether migrations have been executed, failed, or are pending, encapsulating the logic for handling different revision kinds and error states.
func LoadRevisions ¶
func LoadRevisions(ctx context.Context, ch ClickHouse) (*RevisionSet, error)
LoadRevisions loads revisions from ClickHouse and returns them as a RevisionSet.
This provides a clean object-oriented API for querying migration status with intuitive method calls like IsCompleted() and IsPending().
Example usage:
// Load revisions with the modern API
revisionSet, err := migrator.LoadRevisions(ctx, client)
if err != nil {
log.Fatal(err)
}
// Clean, readable status checks
for _, migration := range migrationDir.Migrations {
if revisionSet.IsCompleted(migration) {
fmt.Printf("✓ %s completed\n", migration.Version)
} else if revisionSet.IsPending(migration) {
fmt.Printf("⏳ %s pending\n", migration.Version)
}
}
// Bulk operations
pending := revisionSet.GetPending(migrationDir)
Returns an error if the database query fails.
func NewRevisionSet ¶
func NewRevisionSet(revisions []*Revision) *RevisionSet
NewRevisionSet creates a new RevisionSet from a slice of revisions.
The RevisionSet provides convenient methods for querying migration status without requiring callers to understand the internal revision structure or filtering logic.
Example usage:
revisionSet, err := migrator.LoadRevisions(ctx, client)
if err != nil {
log.Fatal(err)
}
// Check if a specific migration is completed
if revisionSet.IsCompleted(migration) {
fmt.Printf("Migration %s is completed\n", migration.Version)
}
// Get all pending migrations
pending := revisionSet.GetPending(migrationDir)
fmt.Printf("Found %d pending migrations\n", len(pending))
func (*RevisionSet) Count ¶
func (rs *RevisionSet) Count() int
Count returns the total number of revisions in the set.
func (*RevisionSet) GetCompleted ¶
func (rs *RevisionSet) GetCompleted(migrationDir *MigrationDir) []*Migration
GetCompleted returns all migrations that have been successfully executed.
This method filters the migrations in a MigrationDir to return only those that have been completed. The order of migrations is preserved from the original MigrationDir.
Example usage:
completed := revisionSet.GetCompleted(migrationDir)
fmt.Printf("Found %d completed migrations:\n", len(completed))
for _, migration := range completed {
fmt.Printf(" ✓ %s\n", migration.Version)
}
func (*RevisionSet) GetExecutedVersions ¶
func (rs *RevisionSet) GetExecutedVersions() []string
GetExecutedVersions returns a slice of migration versions that have been successfully executed.
Only successful StandardRevision entries are included. Failed migrations and snapshot revisions are excluded from the results.
The versions are returned in the order they appear in the original revisions slice, which typically corresponds to execution order.
Example usage:
executed := revisionSet.GetExecutedVersions()
fmt.Printf("Executed migrations:\n")
for _, version := range executed {
fmt.Printf(" ✓ %s\n", version)
}
func (*RevisionSet) GetFailed ¶
func (rs *RevisionSet) GetFailed(migrationDir *MigrationDir) []*Migration
GetFailed returns all migrations that have been attempted but failed.
This method filters the migrations in a MigrationDir to return only those that have failed during execution. The order of migrations is preserved from the original MigrationDir.
Example usage:
failed := revisionSet.GetFailed(migrationDir)
if len(failed) > 0 {
fmt.Printf("Found %d failed migrations:\n", len(failed))
for _, migration := range failed {
revision := revisionSet.GetRevision(migration)
fmt.Printf(" ✗ %s: %s\n", migration.Version, *revision.Error)
}
}
func (*RevisionSet) GetLastSnapshot ¶
func (rs *RevisionSet) GetLastSnapshot() *Revision
GetLastSnapshot returns the most recent snapshot revision.
Returns nil if no snapshot revision exists in the set.
Example usage:
revisionSet, err := migrator.LoadRevisions(ctx, client)
if err != nil {
log.Fatal(err)
}
if snapshot := revisionSet.GetLastSnapshot(); snapshot != nil {
fmt.Printf("Last snapshot: %s at %s\n",
snapshot.Version, snapshot.ExecutedAt.Format("2006-01-02"))
}
func (*RevisionSet) GetMigrationsAfterSnapshot ¶
func (rs *RevisionSet) GetMigrationsAfterSnapshot() []string
GetMigrationsAfterSnapshot returns all successfully executed migrations after the last snapshot.
If no snapshot exists, returns all successfully executed migrations.
Example usage:
revisionSet, err := migrator.LoadRevisions(ctx, client)
if err != nil {
log.Fatal(err)
}
migrationsAfterSnapshot := revisionSet.GetMigrationsAfterSnapshot()
fmt.Printf("Found %d migrations after snapshot\n", len(migrationsAfterSnapshot))
func (*RevisionSet) GetPartiallyApplied ¶
func (rs *RevisionSet) GetPartiallyApplied(migrationDir *MigrationDir) []*Migration
GetPartiallyApplied returns all migrations that have been partially executed.
This method filters the migrations in a MigrationDir to return only those that have partial execution (some statements applied but not all). These migrations are candidates for resume operations.
The order of migrations is preserved from the original MigrationDir.
Example usage:
partiallyApplied := revisionSet.GetPartiallyApplied(migrationDir)
if len(partiallyApplied) > 0 {
fmt.Printf("Found %d partially applied migrations:\n", len(partiallyApplied))
for _, migration := range partiallyApplied {
revision := revisionSet.GetRevision(migration)
fmt.Printf(" ⚠ %s: %d/%d statements applied\n",
migration.Version, revision.Applied, revision.Total)
}
}
func (*RevisionSet) GetPending ¶
func (rs *RevisionSet) GetPending(migrationDir *MigrationDir) []*Migration
GetPending returns all migrations that have not been successfully executed.
This method filters the migrations in a MigrationDir to return only those that are pending execution. The order of migrations is preserved from the original MigrationDir.
Example usage:
pending := revisionSet.GetPending(migrationDir)
fmt.Printf("Found %d pending migrations:\n", len(pending))
for _, migration := range pending {
fmt.Printf(" - %s\n", migration.Version)
}
func (*RevisionSet) GetRevision ¶
func (rs *RevisionSet) GetRevision(migration *Migration) *Revision
GetRevision returns the revision record for a migration, if it exists.
Returns nil if no revision exists for the migration version.
Example usage:
if revision := revisionSet.GetRevision(migration); revision != nil {
fmt.Printf("Executed at: %s\n", revision.ExecutedAt.Format("2006-01-02 15:04:05"))
fmt.Printf("Execution time: %v\n", revision.ExecutionTime)
}
func (*RevisionSet) HasRevision ¶
func (rs *RevisionSet) HasRevision(version string) bool
HasRevision returns true if a revision exists for the given version.
func (*RevisionSet) HasSnapshot ¶
func (rs *RevisionSet) HasSnapshot() bool
HasSnapshot returns true if there is at least one snapshot revision.
Example usage:
if revisionSet.HasSnapshot() {
fmt.Println("Database has snapshot revisions")
}
func (*RevisionSet) IsCompleted ¶
func (rs *RevisionSet) IsCompleted(migration *Migration) bool
IsCompleted returns true if the migration has been successfully executed.
A migration is considered completed if:
- There exists a revision with the same version
- The revision kind is StandardRevision
- The revision has no error (successful execution)
- All statements were applied (applied == total)
- The number of statements matches the revision total count
Failed migrations, partially applied migrations, and snapshot revisions are not considered completed.
Example usage:
if revisionSet.IsCompleted(migration) {
fmt.Printf("✓ %s is completed\n", migration.Version)
} else {
fmt.Printf("⏳ %s needs to be run\n", migration.Version)
}
func (*RevisionSet) IsFailed ¶
func (rs *RevisionSet) IsFailed(migration *Migration) bool
IsFailed returns true if the migration has been attempted but failed.
A migration is considered failed if:
- There exists a revision with the same version
- The revision kind is StandardRevision
- The revision has an error (failed execution)
Example usage:
if revisionSet.IsFailed(migration) {
revision := revisionSet.GetRevision(migration)
fmt.Printf("✗ %s failed: %s\n", migration.Version, *revision.Error)
}
func (*RevisionSet) IsPartiallyApplied ¶
func (rs *RevisionSet) IsPartiallyApplied(migration *Migration) bool
IsPartiallyApplied returns true if the migration was partially executed.
A migration is considered partially applied if:
- There exists a revision with the same version
- The revision kind is StandardRevision
- Some statements were applied but not all (0 < applied < total)
- The revision may or may not have an error (partial execution can succeed up to the failure point)
This method helps identify migrations that can potentially be resumed from their failure point.
Example usage:
if revisionSet.IsPartiallyApplied(migration) {
revision := revisionSet.GetRevision(migration)
fmt.Printf("⚠ %s partially applied: %d/%d statements executed\n",
migration.Version, revision.Applied, revision.Total)
}
func (*RevisionSet) IsPending ¶
func (rs *RevisionSet) IsPending(migration *Migration) bool
IsPending returns true if the migration has not been successfully executed.
A migration is considered pending if it is not completed, regardless of whether it has failed before or has no revision record.
This is equivalent to !IsCompleted(migration).
Example usage:
if revisionSet.IsPending(migration) {
fmt.Printf("⏳ %s is pending\n", migration.Version)
}
type Snapshot ¶
type Snapshot struct {
// Version is the unique identifier for the snapshot, typically
// a timestamp-based string like "20240810120000_snapshot".
Version string
// Description provides a human-readable description of the snapshot,
// such as "Q3 2024 Release" or "Post-migration cleanup".
Description string
// CreatedAt records when the snapshot was created.
CreatedAt time.Time
// IncludedMigrations lists the versions of all migrations that are
// consolidated in this snapshot, in the order they were applied.
IncludedMigrations []string
// CumulativeHash is the SHA256 hash of all included migration content,
// used for integrity verification.
CumulativeHash string
// Statements contains the parsed DDL statements from all included
// migrations, representing the cumulative schema state.
Statements []*parser.Statement
}
Snapshot represents a migration snapshot that consolidates all previous migrations into a single point-in-time snapshot.
Snapshots allow for safe deletion of migration files that preceded the snapshot, as all their changes are captured in the snapshot's cumulative SQL. This helps manage large migration histories while maintaining the ability to recreate the schema from scratch.
A snapshot file uses a special comment directive format:
-- housekeeper:snapshot -- version: 20240810120000_snapshot -- description: Q3 2024 Release -- created_at: 2024-08-10T12:00:00Z -- included_migrations: 001_init,002_users,003_products -- cumulative_hash: sha256:abc123...
Followed by the cumulative SQL from all included migrations.
func GenerateSnapshot ¶
GenerateSnapshot creates a new snapshot from the provided migrations.
This function consolidates all provided migrations into a single snapshot, calculating the cumulative hash and combining all SQL statements.
Example usage:
migrations := []*migrator.Migration{
migration1,
migration2,
migration3,
}
snapshot, err := migrator.GenerateSnapshot(
"20240810120000_snapshot",
"Q3 2024 Release",
migrations,
)
if err != nil {
log.Fatal(err)
}
// Write snapshot to file
file, err := os.Create("migrations/20240810120000_snapshot.sql")
if err != nil {
log.Fatal(err)
}
defer file.Close()
_, err = snapshot.WriteTo(file)
if err != nil {
log.Fatal(err)
}
func LoadSnapshot ¶
LoadSnapshot reads and parses a snapshot file from the provided reader.
The reader should contain a properly formatted snapshot file with the snapshot marker, metadata headers, and cumulative SQL statements.
Example usage:
file, err := os.Open("20240810120000_snapshot.sql")
if err != nil {
log.Fatal(err)
}
defer file.Close()
snapshot, err := migrator.LoadSnapshot(file)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Snapshot %s includes %d migrations\n",
snapshot.Version, len(snapshot.IncludedMigrations))
func (*Snapshot) ValidateAgainstRevisions ¶
func (c *Snapshot) ValidateAgainstRevisions(revisionSet *RevisionSet) error
ValidateAgainstRevisions verifies that the snapshot is consistent with the revision history in the database.
This checks that all included migrations have been successfully applied according to the revision records.
type SumFile ¶
type SumFile struct {
// contains filtered or unexported fields
}
SumFile provides cryptographic integrity verification for migration files using a reverse one-branch Merkle tree approach with chained SHA256 hashing.
The SumFile maintains a chronologically ordered list of migration file hashes, where each entry builds upon the previous hash to create a tamper-evident chain. This design allows detection of any modification to migration files and ensures the integrity of the complete migration history.
File format (h1 compatible with Go modules):
h1:TotalHashOfAllEntries= 001_init.sql h1:HashOfFirstFile= 002_users.sql h1:ChainedHashIncorporatingPreviousHash= 003_views.sql h1:ChainedHashIncorporatingPreviousHash=
The chained hashing means that: - Entry 1 hash = SHA256(file1_content) - Entry 2 hash = SHA256(entry1_hash + file2_content) - Entry 3 hash = SHA256(entry2_hash + file3_content)
This provides tamper evidence - changing any file or reordering files will invalidate all subsequent hashes in the chain.
func LoadSumFile ¶
LoadSumFile reads and parses a SumFile from the provided reader. The reader should contain a properly formatted sum file with h1-prefixed base64-encoded SHA256 hashes.
Expected format:
h1:TotalHashBase64= 001_init.sql h1:FileHashBase64= 002_users.sql h1:ChainedHashBase64=
The first line contains the total hash of all entries, followed by individual migration entries with their chained hashes.
Example usage:
// Load from file
file, err := os.Open("migrations.sum")
if err != nil {
log.Fatal(err)
}
defer file.Close()
sumFile, err := migrator.LoadSumFile(file)
if err != nil {
log.Fatal(err)
}
// Load from string
sumContent := `h1:abcd1234base64hash=
001_init.sql h1:file1hash=
002_users.sql h1:chainedHash=`
sumFile, err = migrator.LoadSumFile(strings.NewReader(sumContent))
if err != nil {
log.Fatal(err)
}
// The loaded SumFile can now be used for integrity verification
fmt.Printf("Loaded %d migration entries\n", len(sumFile.entries))
Returns an error if the reader contains invalid format or corrupted hash data.
func NewSumFile ¶
func NewSumFile() *SumFile
NewSumFile creates a new empty SumFile ready for adding migration entries. The SumFile is initialized with a SHA256 hasher and empty entry list.
Example usage:
sumFile := migrator.NewSumFile()
// Add migrations in order
err := sumFile.Add("001_init.sql", strings.NewReader("CREATE DATABASE test;"))
if err != nil {
log.Fatal(err)
}
err = sumFile.Add("002_users.sql", strings.NewReader("CREATE TABLE users (...);"))
if err != nil {
log.Fatal(err)
}
// Write to file
file, err := os.Create("migrations.sum")
if err != nil {
log.Fatal(err)
}
defer file.Close()
_, err = sumFile.WriteTo(file)
if err != nil {
log.Fatal(err)
}
func (*SumFile) Add ¶
Add appends a new migration entry to the SumFile with chained hash calculation.
This method reads the provided migration content, calculates its hash chained with the previous entry's hash (if any), and adds it to the sum file entries. The chaining means that each hash incorporates the previous hash, creating a tamper-evident chain that detects any modification or reordering.
Hash calculation:
- First entry: SHA256(file_content)
- Subsequent entries: SHA256(previous_hash + file_content)
This method is thread-safe and can be called concurrently, though entries will be processed sequentially to maintain proper hash chaining.
Example usage:
sumFile := migrator.NewSumFile()
// Add first migration
sql1 := "CREATE DATABASE test ENGINE = Atomic;"
err := sumFile.Add("001_init.sql", strings.NewReader(sql1))
if err != nil {
log.Fatal(err)
}
// Add second migration (hash will be chained with first)
sql2 := "CREATE TABLE test.users (id UInt64, name String) ENGINE = MergeTree() ORDER BY id;"
err = sumFile.Add("002_users.sql", strings.NewReader(sql2))
if err != nil {
log.Fatal(err)
}
// Add from file
file, err := os.Open("003_views.sql")
if err != nil {
log.Fatal(err)
}
defer file.Close()
err = sumFile.Add("003_views.sql", file)
if err != nil {
log.Fatal(err)
}
The version parameter should be a unique identifier for the migration, typically the filename without directory path.
Returns an error if the reader cannot be read or hash calculation fails.
func (*SumFile) Validate ¶
Validate verifies the integrity of the SumFile by recalculating chained hashes from the provided migration content and comparing them with stored values.
This method ensures that the migration files have not been modified since the SumFile was created by recomputing the chained hash sequence and comparing each entry against the stored hash values.
The files parameter should be a map where keys are migration versions (matching the entries in this SumFile) and values are io.Reader instances containing the current migration file content.
Validation process:
- Iterates through all entries in lexical order by version
- For each entry, calculates the chained hash using the previous hash and current content
- Compares the calculated hash with the stored hash
- Returns false immediately if any hash mismatch is found
- Returns true only if all hashes match exactly
Example usage:
// Load sum file
sumFile, err := migrator.LoadSumFile(sumFileReader)
if err != nil {
log.Fatal(err)
}
// Prepare migration files for validation
files := make(map[string]io.Reader)
files["20240101120000"] = strings.NewReader("CREATE DATABASE test ENGINE = Atomic;")
files["20240101120100"] = strings.NewReader("CREATE TABLE test.users (id UInt64) ENGINE = MergeTree() ORDER BY id;")
// Validate integrity
isValid, err := sumFile.Validate(files)
if err != nil {
log.Fatal(err)
}
if isValid {
fmt.Println("Migration files are valid and unmodified")
} else {
fmt.Println("Migration files have been modified!")
}
Returns false if:
- Any entry's calculated hash doesn't match the stored hash
- A required migration file is missing from the files map
Returns an error if:
- Any of the provided readers cannot be read
- Hash calculation fails for any entry
Note: This method is thread-safe and does not modify the SumFile.
func (*SumFile) WriteTo ¶
WriteTo writes the complete SumFile to the provided writer in the standard format.
The output format is compatible with Go module sum files (h1 format) and contains:
- First line: Total hash of all entries (currently writes the sum field)
- Subsequent lines: Each migration entry with format "version h1:hash"
Output format:
h1:TotalHashBase64= 001_init.sql h1:FileHashBase64= 002_users.sql h1:ChainedHashBase64= 003_views.sql h1:ChainedHashBase64=
This method implements the io.WriterTo interface for efficient streaming and returns the total number of bytes written.
Example usage:
sumFile := migrator.NewSumFile()
// ... add entries ...
// Write to file
file, err := os.Create("migrations.sum")
if err != nil {
log.Fatal(err)
}
defer file.Close()
bytesWritten, err := sumFile.WriteTo(file)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Wrote %d bytes to sum file\n", bytesWritten)
// Write to buffer for string representation
var buf bytes.Buffer
_, err = sumFile.WriteTo(&buf)
if err != nil {
log.Fatal(err)
}
sumContent := buf.String()
fmt.Println("Sum file content:", sumContent)
// Write to HTTP response or any io.Writer
_, err = sumFile.WriteTo(httpResponseWriter)
if err != nil {
log.Fatal(err)
}
Returns the number of bytes written and any error encountered during writing.