migrate

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package migrate provides declarative schema diffing and scripted migration running.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MigrateSchemas

func MigrateSchemas() map[string]any

moduleSchemas returns JSON Schema definitions for UI forms.

func NewMigrateApplyStep

func NewMigrateApplyStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewMigrateApplyStep creates a step.migrate_apply instance.

func NewMigrateContractStep

func NewMigrateContractStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewMigrateContractStep creates a step.migrate_contract instance.

func NewMigrateExpandStatusStep

func NewMigrateExpandStatusStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewMigrateExpandStatusStep creates a step.migrate_expand_status instance.

func NewMigrateExpandStep

func NewMigrateExpandStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewMigrateExpandStep creates a step.migrate_expand instance.

func NewMigratePlanStep

func NewMigratePlanStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewMigratePlanStep creates a step.migrate_plan instance.

func NewMigrateRollbackStep

func NewMigrateRollbackStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewMigrateRollbackStep creates a step.migrate_rollback instance.

func NewMigrateRunStep

func NewMigrateRunStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewMigrateRunStep creates a step.migrate_run instance.

func NewMigrateStatusStep

func NewMigrateStatusStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewMigrateStatusStep creates a step.migrate_status instance.

func NewSchemaEvolvePipelineStep

func NewSchemaEvolvePipelineStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewSchemaEvolvePipelineStep creates a step.schema_evolve_pipeline instance.

func NewSchemaEvolveVerifyStep

func NewSchemaEvolveVerifyStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewSchemaEvolveVerifyStep creates a step.schema_evolve_verify instance.

func NewSchemaModule

func NewSchemaModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewSchemaModule creates a new migrate.schema module instance.

func RegisterModule

func RegisterModule(name string, m *SchemaModule) error

RegisterModule adds a named SchemaModule to the global registry.

func SetCDCLookup

func SetCDCLookup(fn func(string) (CDCRestarter, error))

SetCDCLookup registers the function used to resolve a CDC connector by module name.

func SetDBLookup

func SetDBLookup(fn func(string) (SQLExecutor, error))

SetDBLookup registers the function used to resolve a source DB executor by module name.

func SetLakehouseLookup

func SetLakehouseLookup(fn func(string) (LakehouseEvolver, error))

SetLakehouseLookup registers the function used to resolve a lakehouse catalog module.

func SetSchemaRegistryLookup

func SetSchemaRegistryLookup(fn func(string) (SchemaRegistryProvider, error))

SetSchemaRegistryLookup registers the function used to resolve a schema registry module.

func UnregisterModule

func UnregisterModule(name string)

UnregisterModule removes a SchemaModule from the registry.

Types

type AppliedMigration

type AppliedMigration struct {
	Version   int
	AppliedAt time.Time
	Checksum  string
}

AppliedMigration is a migration that has already been run.

type CDCRestarter

type CDCRestarter interface {
	Stop(ctx context.Context) error
	Start(ctx context.Context) error
}

CDCRestarter can stop and restart a CDC connector to pick up schema changes.

type ColumnDef

type ColumnDef struct {
	Name       string `json:"name"               yaml:"name"`
	Type       string `json:"type"               yaml:"type"`
	Nullable   bool   `json:"nullable,omitempty" yaml:"nullable,omitempty"`
	PrimaryKey bool   `json:"primaryKey,omitempty" yaml:"primaryKey,omitempty"`
	Unique     bool   `json:"unique,omitempty"   yaml:"unique,omitempty"`
	Default    string `json:"default,omitempty"  yaml:"default,omitempty"`
}

ColumnDef describes a single column within a table.

type ConnectorAction

type ConnectorAction struct {
	Action string `json:"action"` // "restart", "reconfigure"
	Name   string `json:"name"`
}

ConnectorAction describes an action to take on a CDC connector.

type EvolutionStep

type EvolutionStep struct {
	Target      string         `json:"target"` // source_db, cdc_connector, schema_registry, lakehouse
	Action      string         `json:"action"` // alter_table, restart_connector, register_schema, evolve_table
	SQL         string         `json:"sql,omitempty"`
	Config      map[string]any `json:"config,omitempty"`
	Rollback    string         `json:"rollback"`
	Description string         `json:"description"`
}

EvolutionStep is a single ordered step in a schema evolution plan.

type ExpandContractChange

type ExpandContractChange struct {
	Type      string `json:"type"` // add_column, rename_column, change_type, drop_column
	OldColumn string `json:"oldColumn"`
	NewColumn string `json:"newColumn"`
	NewType   string `json:"newType"`
	Transform string `json:"transform"` // SQL expr (references NEW.<col>); defaults set per type
}

ExpandContractChange describes one change in an expand-contract migration.

type ExpandContractPlan

type ExpandContractPlan struct {
	Table        string
	Changes      []ExpandContractChange
	ExpandSQL    []string // DDL + triggers to apply during expand phase
	ContractSQL  []string // DDL to apply during contract phase
	VerifySQL    string   // SQL to verify all rows have been migrated
	TriggerNames []string // generated trigger names (for status checking)
	NewColumns   []string // new column names added during expand
}

ExpandContractPlan holds all SQL generated for the expand and contract phases.

func BuildExpandContractPlan

func BuildExpandContractPlan(table string, changes []ExpandContractChange) (*ExpandContractPlan, error)

BuildExpandContractPlan generates SQL statements for the expand and contract phases.

Expand phase creates new columns and BEFORE INSERT OR UPDATE triggers that keep old and new columns in sync, enabling dual-version read/write.

Contract phase removes old structures once all consumers have migrated.

type IndexDef

type IndexDef struct {
	Name    string   `json:"name,omitempty"   yaml:"name,omitempty"`
	Columns []string `json:"columns"          yaml:"columns"`
	Unique  bool     `json:"unique,omitempty" yaml:"unique,omitempty"`
}

IndexDef describes an index on a table.

type LakehouseEvolver

type LakehouseEvolver interface {
	EvolveTable(ctx context.Context, namespace []string, table string, change map[string]any) error
}

LakehouseEvolver evolves an Iceberg table schema.

type MigrateApplyStep

type MigrateApplyStep struct {
	// contains filtered or unexported fields
}

MigrateApplyStep executes a previously computed migration plan.

func (*MigrateApplyStep) Execute

func (s *MigrateApplyStep) Execute(ctx context.Context, _ map[string]any, _ map[string]map[string]any, _, _, config map[string]any) (*sdk.StepResult, error)

type MigrateContractStep

type MigrateContractStep struct {
	// contains filtered or unexported fields
}

MigrateContractStep executes the contract phase: removes old columns and triggers once all consumers have migrated to the new schema.

func (*MigrateContractStep) Execute

func (s *MigrateContractStep) Execute(ctx context.Context, _ map[string]any, _ map[string]map[string]any, _, _, config map[string]any) (*sdk.StepResult, error)

type MigrateExpandStatusStep

type MigrateExpandStatusStep struct {
	// contains filtered or unexported fields
}

MigrateExpandStatusStep checks whether an expand phase is active and safe to contract.

func (*MigrateExpandStatusStep) Execute

func (s *MigrateExpandStatusStep) Execute(ctx context.Context, _ map[string]any, _ map[string]map[string]any, _, _, config map[string]any) (*sdk.StepResult, error)

type MigrateExpandStep

type MigrateExpandStep struct {
	// contains filtered or unexported fields
}

MigrateExpandStep executes the expand phase of an expand-contract migration. It adds new columns and installs sync triggers for dual-write operation.

func (*MigrateExpandStep) Execute

func (s *MigrateExpandStep) Execute(ctx context.Context, _ map[string]any, _ map[string]map[string]any, _, _, config map[string]any) (*sdk.StepResult, error)

type MigratePlanStep

type MigratePlanStep struct {
	// contains filtered or unexported fields
}

MigratePlanStep diffs the desired schema against the live database.

func (*MigratePlanStep) Execute

func (s *MigratePlanStep) Execute(ctx context.Context, _ map[string]any, _ map[string]map[string]any, _, _, config map[string]any) (*sdk.StepResult, error)

type MigrateRollbackStep

type MigrateRollbackStep struct {
	// contains filtered or unexported fields
}

MigrateRollbackStep rolls back N migration scripts.

func (*MigrateRollbackStep) Execute

func (s *MigrateRollbackStep) Execute(ctx context.Context, _ map[string]any, _ map[string]map[string]any, _, _, config map[string]any) (*sdk.StepResult, error)

type MigrateRunStep

type MigrateRunStep struct {
	// contains filtered or unexported fields
}

MigrateRunStep runs a specific numbered migration script.

func (*MigrateRunStep) Execute

func (s *MigrateRunStep) Execute(ctx context.Context, _ map[string]any, _ map[string]map[string]any, _, _, config map[string]any) (*sdk.StepResult, error)

type MigrateStatusStep

type MigrateStatusStep struct {
	// contains filtered or unexported fields
}

MigrateStatusStep reports the current migration state.

func (*MigrateStatusStep) Execute

func (s *MigrateStatusStep) Execute(ctx context.Context, _ map[string]any, _ map[string]map[string]any, _, _, config map[string]any) (*sdk.StepResult, error)

type MigrationPlan

type MigrationPlan struct {
	Changes []SchemaChange
	Safe    bool // true if all changes are non-breaking
}

MigrationPlan is the result of diffing two schema definitions.

func DiffSchema

func DiffSchema(desired, live SchemaDefinition) (MigrationPlan, error)

DiffSchema computes the migration plan to move from live to desired schema. live may be an empty SchemaDefinition (zero value) if the table does not yet exist.

type MigrationRunner

type MigrationRunner struct {
	// contains filtered or unexported fields
}

MigrationRunner executes ordered migration scripts with advisory locking.

func NewMigrationRunner

func NewMigrationRunner(executor SQLExecutor, lockTable string) (*MigrationRunner, error)

NewMigrationRunner creates a runner backed by executor storing state in lockTable. Returns an error if lockTable is not a valid SQL identifier.

func (*MigrationRunner) Apply

func (r *MigrationRunner) Apply(ctx context.Context, scripts []MigrationScript) error

Apply runs all pending migrations from scripts in version order. It acquires a Postgres advisory lock for the duration.

func (*MigrationRunner) EnsureLockTable

func (r *MigrationRunner) EnsureLockTable(ctx context.Context) error

EnsureLockTable creates the migration state table if it does not exist.

func (*MigrationRunner) Rollback

func (r *MigrationRunner) Rollback(ctx context.Context, scripts []MigrationScript, steps int) error

Rollback rolls back the N most recent applied migrations using their down scripts.

func (*MigrationRunner) Status

func (r *MigrationRunner) Status(ctx context.Context, scripts []MigrationScript) (*MigrationState, error)

Status returns the current version and which scripts are still pending.

type MigrationScript

type MigrationScript struct {
	Version     int
	Description string
	UpSQL       string
	DownSQL     string
}

MigrationScript is a versioned up/down SQL pair parsed from disk.

func LoadScripts

func LoadScripts(dir string) ([]MigrationScript, error)

LoadScripts parses NNN_description.up.sql + NNN_description.down.sql pairs from dir. Pairs are sorted by version ascending. A missing .down.sql file is not an error.

type MigrationState

type MigrationState struct {
	Version int
	Applied []AppliedMigration
	Pending []MigrationScript
}

MigrationState describes the current migration state of the database.

type SQLExecutor

type SQLExecutor interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
}

SQLExecutor executes SQL statements against a database connection.

type SchemaAction

type SchemaAction struct {
	Action        string `json:"action"` // "register", "check_compat"
	Subject       string `json:"subject"`
	Schema        string `json:"schema"`
	Compatibility string `json:"compatibility"`
}

SchemaAction describes an action to take in the schema registry.

type SchemaChange

type SchemaChange struct {
	Type        string // add_column, add_table, add_index, widen_type, drop_column, narrow_type, drop_index
	Description string
	SQL         string
	Breaking    bool
}

SchemaChange represents a single DDL change.

type SchemaDefinition

type SchemaDefinition struct {
	Table   string      `json:"table"             yaml:"table"`
	Columns []ColumnDef `json:"columns"           yaml:"columns"`
	Indexes []IndexDef  `json:"indexes,omitempty" yaml:"indexes,omitempty"`
}

SchemaDefinition describes the desired state of a database table.

func IntrospectSchema

func IntrospectSchema(ctx context.Context, exec SQLExecutor, tableName string) (*SchemaDefinition, error)

IntrospectSchema queries the live database for the current schema of tableName. Uses information_schema.columns and pg_indexes to build a SchemaDefinition. Returns nil (not an error) if the table does not exist.

func ParseSchemaDir

func ParseSchemaDir(dir string) ([]SchemaDefinition, error)

ParseSchemaDir parses all .yaml files in a directory as schema definitions.

func ParseSchemaFile

func ParseSchemaFile(path string) (SchemaDefinition, error)

ParseSchemaFile parses a YAML schema definition file.

type SchemaEvolutionPlan

type SchemaEvolutionPlan struct {
	SourceDB       SchemaChange // DDL to apply in the source database
	CDCConnector   ConnectorAction
	SchemaRegistry SchemaAction
	Lakehouse      TableUpdate
	Safe           bool
	Steps          []EvolutionStep
}

SchemaEvolutionPlan coordinates a schema change across the full CDC pipeline.

func BuildEvolutionPlan

func BuildEvolutionPlan(table, namespace string, change SchemaChange, subject, schema string, icebergFields []string) *SchemaEvolutionPlan

BuildEvolutionPlan constructs a SchemaEvolutionPlan from a change description. It does not execute anything — callers execute via executeEvolutionPlan.

type SchemaEvolvePipelineStep

type SchemaEvolvePipelineStep struct {
	// contains filtered or unexported fields
}

SchemaEvolvePipelineStep orchestrates a schema change across the full CDC pipeline: schema registry → lakehouse → source DB → CDC connector restart.

func (*SchemaEvolvePipelineStep) Execute

func (s *SchemaEvolvePipelineStep) Execute(ctx context.Context, _ map[string]any, _ map[string]map[string]any, _, _, config map[string]any) (*sdk.StepResult, error)

type SchemaEvolveVerifyStep

type SchemaEvolveVerifyStep struct {
	// contains filtered or unexported fields
}

SchemaEvolveVerifyStep verifies that all pipeline layers have consistent schemas after an evolution. Compares source DB introspection against schema registry.

func (*SchemaEvolveVerifyStep) Execute

func (s *SchemaEvolveVerifyStep) Execute(ctx context.Context, _ map[string]any, _ map[string]map[string]any, _, _, config map[string]any) (*sdk.StepResult, error)

type SchemaModule

type SchemaModule struct {
	// contains filtered or unexported fields
}

SchemaModule implements sdk.ModuleInstance for the migrate.schema module type.

func LookupModule

func LookupModule(name string) (*SchemaModule, error)

LookupModule retrieves a SchemaModule by name.

func (*SchemaModule) Init

func (m *SchemaModule) Init() error

Init validates the module configuration.

func (*SchemaModule) LockTable

func (m *SchemaModule) LockTable() string

LockTable returns the name of the migration state table.

func (*SchemaModule) OnBreakingChange

func (m *SchemaModule) OnBreakingChange() string

OnBreakingChange returns the configured breaking-change policy.

func (*SchemaModule) Schemas

func (m *SchemaModule) Schemas() []SchemaDefinition

Schemas returns the parsed declarative schema definitions.

func (*SchemaModule) Scripts

func (m *SchemaModule) Scripts() []MigrationScript

Scripts returns the loaded migration scripts.

func (*SchemaModule) Start

func (m *SchemaModule) Start(_ context.Context) error

Start parses schema files and/or migration scripts based on the configured strategy, then registers this module in the global registry.

func (*SchemaModule) Stop

func (m *SchemaModule) Stop(_ context.Context) error

Stop deregisters the module.

type SchemaModuleConfig

type SchemaModuleConfig struct {
	Strategy         string       `json:"strategy"         yaml:"strategy"`         // declarative, scripted, or both
	Target           string       `json:"target"           yaml:"target"`           // database module reference
	Schemas          []SchemaPath `json:"schemas"          yaml:"schemas"`          // YAML schema file paths
	MigrationsDir    string       `json:"migrationsDir"    yaml:"migrationsDir"`    // directory of NNN_*.sql files
	LockTable        string       `json:"lockTable"        yaml:"lockTable"`        // defaults to schema_migrations
	OnBreakingChange string       `json:"onBreakingChange" yaml:"onBreakingChange"` // block, warn, blue_green
}

SchemaModuleConfig holds configuration for the migrate.schema module.

type SchemaPath

type SchemaPath struct {
	Path string `json:"path" yaml:"path"`
}

SchemaPath is a reference to a YAML schema file.

type SchemaRegistryProvider

type SchemaRegistryProvider interface {
	RegisterSubject(ctx context.Context, subject, schema string) (int, error)
	CheckCompatibility(ctx context.Context, subject, schema string) (bool, error)
	GetSchemaFields(ctx context.Context, subject string) ([]string, int, error)
}

SchemaRegistryProvider registers schemas and checks compatibility. Also used by the verify step to read schema fields.

type TableUpdate

type TableUpdate struct {
	Action    string           `json:"action"` // "add_column", "widen_type"
	Table     string           `json:"table"`
	Namespace []string         `json:"namespace"`
	Changes   []map[string]any `json:"changes"`
}

TableUpdate describes a change to apply to an Iceberg table.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL