Documentation
¶
Overview ¶
Package migrate provides declarative schema diffing and scripted migration running.
Index ¶
- func MigrateSchemas() map[string]any
- func NewMigrateApplyStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewMigrateContractStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewMigrateExpandStatusStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewMigrateExpandStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewMigratePlanStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewMigrateRollbackStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewMigrateRunStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewMigrateStatusStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewSchemaEvolvePipelineStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewSchemaEvolveVerifyStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewSchemaModule(name string, config map[string]any) (sdk.ModuleInstance, error)
- func RegisterModule(name string, m *SchemaModule) error
- func SetCDCLookup(fn func(string) (CDCRestarter, error))
- func SetDBLookup(fn func(string) (SQLExecutor, error))
- func SetLakehouseLookup(fn func(string) (LakehouseEvolver, error))
- func SetSchemaRegistryLookup(fn func(string) (SchemaRegistryProvider, error))
- func UnregisterModule(name string)
- type AppliedMigration
- type CDCRestarter
- type ColumnDef
- type ConnectorAction
- type EvolutionStep
- type ExpandContractChange
- type ExpandContractPlan
- type IndexDef
- type LakehouseEvolver
- type MigrateApplyStep
- type MigrateContractStep
- type MigrateExpandStatusStep
- type MigrateExpandStep
- type MigratePlanStep
- type MigrateRollbackStep
- type MigrateRunStep
- type MigrateStatusStep
- type MigrationPlan
- type MigrationRunner
- func (r *MigrationRunner) Apply(ctx context.Context, scripts []MigrationScript) error
- func (r *MigrationRunner) EnsureLockTable(ctx context.Context) error
- func (r *MigrationRunner) Rollback(ctx context.Context, scripts []MigrationScript, steps int) error
- func (r *MigrationRunner) Status(ctx context.Context, scripts []MigrationScript) (*MigrationState, error)
- type MigrationScript
- type MigrationState
- type SQLExecutor
- type SchemaAction
- type SchemaChange
- type SchemaDefinition
- type SchemaEvolutionPlan
- type SchemaEvolvePipelineStep
- type SchemaEvolveVerifyStep
- type SchemaModule
- func (m *SchemaModule) Init() error
- func (m *SchemaModule) LockTable() string
- func (m *SchemaModule) OnBreakingChange() string
- func (m *SchemaModule) Schemas() []SchemaDefinition
- func (m *SchemaModule) Scripts() []MigrationScript
- func (m *SchemaModule) Start(_ context.Context) error
- func (m *SchemaModule) Stop(_ context.Context) error
- type SchemaModuleConfig
- type SchemaPath
- type SchemaRegistryProvider
- type TableUpdate
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MigrateSchemas ¶
moduleSchemas returns JSON Schema definitions for UI forms.
func NewMigrateApplyStep ¶
NewMigrateApplyStep creates a step.migrate_apply instance.
func NewMigrateContractStep ¶
NewMigrateContractStep creates a step.migrate_contract instance.
func NewMigrateExpandStatusStep ¶
NewMigrateExpandStatusStep creates a step.migrate_expand_status instance.
func NewMigrateExpandStep ¶
NewMigrateExpandStep creates a step.migrate_expand instance.
func NewMigratePlanStep ¶
NewMigratePlanStep creates a step.migrate_plan instance.
func NewMigrateRollbackStep ¶
NewMigrateRollbackStep creates a step.migrate_rollback instance.
func NewMigrateRunStep ¶
NewMigrateRunStep creates a step.migrate_run instance.
func NewMigrateStatusStep ¶
NewMigrateStatusStep creates a step.migrate_status instance.
func NewSchemaEvolvePipelineStep ¶
NewSchemaEvolvePipelineStep creates a step.schema_evolve_pipeline instance.
func NewSchemaEvolveVerifyStep ¶
NewSchemaEvolveVerifyStep creates a step.schema_evolve_verify instance.
func NewSchemaModule ¶
NewSchemaModule creates a new migrate.schema module instance.
func RegisterModule ¶
func RegisterModule(name string, m *SchemaModule) error
RegisterModule adds a named SchemaModule to the global registry.
func SetCDCLookup ¶
func SetCDCLookup(fn func(string) (CDCRestarter, error))
SetCDCLookup registers the function used to resolve a CDC connector by module name.
func SetDBLookup ¶
func SetDBLookup(fn func(string) (SQLExecutor, error))
SetDBLookup registers the function used to resolve a source DB executor by module name.
func SetLakehouseLookup ¶
func SetLakehouseLookup(fn func(string) (LakehouseEvolver, error))
SetLakehouseLookup registers the function used to resolve a lakehouse catalog module.
func SetSchemaRegistryLookup ¶
func SetSchemaRegistryLookup(fn func(string) (SchemaRegistryProvider, error))
SetSchemaRegistryLookup registers the function used to resolve a schema registry module.
func UnregisterModule ¶
func UnregisterModule(name string)
UnregisterModule removes a SchemaModule from the registry.
Types ¶
type AppliedMigration ¶
AppliedMigration is a migration that has already been run.
type CDCRestarter ¶
CDCRestarter can stop and restart a CDC connector to pick up schema changes.
type ColumnDef ¶
type ColumnDef struct {
Name string `json:"name" yaml:"name"`
Type string `json:"type" yaml:"type"`
Nullable bool `json:"nullable,omitempty" yaml:"nullable,omitempty"`
PrimaryKey bool `json:"primaryKey,omitempty" yaml:"primaryKey,omitempty"`
Unique bool `json:"unique,omitempty" yaml:"unique,omitempty"`
Default string `json:"default,omitempty" yaml:"default,omitempty"`
}
ColumnDef describes a single column within a table.
type ConnectorAction ¶
type ConnectorAction struct {
Action string `json:"action"` // "restart", "reconfigure"
Name string `json:"name"`
}
ConnectorAction describes an action to take on a CDC connector.
type EvolutionStep ¶
type EvolutionStep struct {
Target string `json:"target"` // source_db, cdc_connector, schema_registry, lakehouse
Action string `json:"action"` // alter_table, restart_connector, register_schema, evolve_table
SQL string `json:"sql,omitempty"`
Config map[string]any `json:"config,omitempty"`
Rollback string `json:"rollback"`
Description string `json:"description"`
}
EvolutionStep is a single ordered step in a schema evolution plan.
type ExpandContractChange ¶
type ExpandContractChange struct {
Type string `json:"type"` // add_column, rename_column, change_type, drop_column
OldColumn string `json:"oldColumn"`
NewColumn string `json:"newColumn"`
NewType string `json:"newType"`
Transform string `json:"transform"` // SQL expr (references NEW.<col>); defaults set per type
}
ExpandContractChange describes one change in an expand-contract migration.
type ExpandContractPlan ¶
type ExpandContractPlan struct {
Table string
Changes []ExpandContractChange
ExpandSQL []string // DDL + triggers to apply during expand phase
ContractSQL []string // DDL to apply during contract phase
VerifySQL string // SQL to verify all rows have been migrated
TriggerNames []string // generated trigger names (for status checking)
NewColumns []string // new column names added during expand
}
ExpandContractPlan holds all SQL generated for the expand and contract phases.
func BuildExpandContractPlan ¶
func BuildExpandContractPlan(table string, changes []ExpandContractChange) (*ExpandContractPlan, error)
BuildExpandContractPlan generates SQL statements for the expand and contract phases.
Expand phase creates new columns and BEFORE INSERT OR UPDATE triggers that keep old and new columns in sync, enabling dual-version read/write.
Contract phase removes old structures once all consumers have migrated.
type IndexDef ¶
type IndexDef struct {
Name string `json:"name,omitempty" yaml:"name,omitempty"`
Columns []string `json:"columns" yaml:"columns"`
Unique bool `json:"unique,omitempty" yaml:"unique,omitempty"`
}
IndexDef describes an index on a table.
type LakehouseEvolver ¶
type LakehouseEvolver interface {
EvolveTable(ctx context.Context, namespace []string, table string, change map[string]any) error
}
LakehouseEvolver evolves an Iceberg table schema.
type MigrateApplyStep ¶
type MigrateApplyStep struct {
// contains filtered or unexported fields
}
MigrateApplyStep executes a previously computed migration plan.
type MigrateContractStep ¶
type MigrateContractStep struct {
// contains filtered or unexported fields
}
MigrateContractStep executes the contract phase: removes old columns and triggers once all consumers have migrated to the new schema.
type MigrateExpandStatusStep ¶
type MigrateExpandStatusStep struct {
// contains filtered or unexported fields
}
MigrateExpandStatusStep checks whether an expand phase is active and safe to contract.
type MigrateExpandStep ¶
type MigrateExpandStep struct {
// contains filtered or unexported fields
}
MigrateExpandStep executes the expand phase of an expand-contract migration. It adds new columns and installs sync triggers for dual-write operation.
type MigratePlanStep ¶
type MigratePlanStep struct {
// contains filtered or unexported fields
}
MigratePlanStep diffs the desired schema against the live database.
type MigrateRollbackStep ¶
type MigrateRollbackStep struct {
// contains filtered or unexported fields
}
MigrateRollbackStep rolls back N migration scripts.
type MigrateRunStep ¶
type MigrateRunStep struct {
// contains filtered or unexported fields
}
MigrateRunStep runs a specific numbered migration script.
type MigrateStatusStep ¶
type MigrateStatusStep struct {
// contains filtered or unexported fields
}
MigrateStatusStep reports the current migration state.
type MigrationPlan ¶
type MigrationPlan struct {
Changes []SchemaChange
Safe bool // true if all changes are non-breaking
}
MigrationPlan is the result of diffing two schema definitions.
func DiffSchema ¶
func DiffSchema(desired, live SchemaDefinition) (MigrationPlan, error)
DiffSchema computes the migration plan to move from live to desired schema. live may be an empty SchemaDefinition (zero value) if the table does not yet exist.
type MigrationRunner ¶
type MigrationRunner struct {
// contains filtered or unexported fields
}
MigrationRunner executes ordered migration scripts with advisory locking.
func NewMigrationRunner ¶
func NewMigrationRunner(executor SQLExecutor, lockTable string) (*MigrationRunner, error)
NewMigrationRunner creates a runner backed by executor storing state in lockTable. Returns an error if lockTable is not a valid SQL identifier.
func (*MigrationRunner) Apply ¶
func (r *MigrationRunner) Apply(ctx context.Context, scripts []MigrationScript) error
Apply runs all pending migrations from scripts in version order. It acquires a Postgres advisory lock for the duration.
func (*MigrationRunner) EnsureLockTable ¶
func (r *MigrationRunner) EnsureLockTable(ctx context.Context) error
EnsureLockTable creates the migration state table if it does not exist.
func (*MigrationRunner) Rollback ¶
func (r *MigrationRunner) Rollback(ctx context.Context, scripts []MigrationScript, steps int) error
Rollback rolls back the N most recent applied migrations using their down scripts.
func (*MigrationRunner) Status ¶
func (r *MigrationRunner) Status(ctx context.Context, scripts []MigrationScript) (*MigrationState, error)
Status returns the current version and which scripts are still pending.
type MigrationScript ¶
MigrationScript is a versioned up/down SQL pair parsed from disk.
func LoadScripts ¶
func LoadScripts(dir string) ([]MigrationScript, error)
LoadScripts parses NNN_description.up.sql + NNN_description.down.sql pairs from dir. Pairs are sorted by version ascending. A missing .down.sql file is not an error.
type MigrationState ¶
type MigrationState struct {
Version int
Applied []AppliedMigration
Pending []MigrationScript
}
MigrationState describes the current migration state of the database.
type SQLExecutor ¶
type SQLExecutor interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
}
SQLExecutor executes SQL statements against a database connection.
type SchemaAction ¶
type SchemaAction struct {
Action string `json:"action"` // "register", "check_compat"
Subject string `json:"subject"`
Schema string `json:"schema"`
Compatibility string `json:"compatibility"`
}
SchemaAction describes an action to take in the schema registry.
type SchemaChange ¶
type SchemaChange struct {
Type string // add_column, add_table, add_index, widen_type, drop_column, narrow_type, drop_index
Description string
SQL string
Breaking bool
}
SchemaChange represents a single DDL change.
type SchemaDefinition ¶
type SchemaDefinition struct {
Table string `json:"table" yaml:"table"`
Columns []ColumnDef `json:"columns" yaml:"columns"`
Indexes []IndexDef `json:"indexes,omitempty" yaml:"indexes,omitempty"`
}
SchemaDefinition describes the desired state of a database table.
func IntrospectSchema ¶
func IntrospectSchema(ctx context.Context, exec SQLExecutor, tableName string) (*SchemaDefinition, error)
IntrospectSchema queries the live database for the current schema of tableName. Uses information_schema.columns and pg_indexes to build a SchemaDefinition. Returns nil (not an error) if the table does not exist.
func ParseSchemaDir ¶
func ParseSchemaDir(dir string) ([]SchemaDefinition, error)
ParseSchemaDir parses all .yaml files in a directory as schema definitions.
func ParseSchemaFile ¶
func ParseSchemaFile(path string) (SchemaDefinition, error)
ParseSchemaFile parses a YAML schema definition file.
type SchemaEvolutionPlan ¶
type SchemaEvolutionPlan struct {
SourceDB SchemaChange // DDL to apply in the source database
CDCConnector ConnectorAction
SchemaRegistry SchemaAction
Lakehouse TableUpdate
Safe bool
Steps []EvolutionStep
}
SchemaEvolutionPlan coordinates a schema change across the full CDC pipeline.
func BuildEvolutionPlan ¶
func BuildEvolutionPlan(table, namespace string, change SchemaChange, subject, schema string, icebergFields []string) *SchemaEvolutionPlan
BuildEvolutionPlan constructs a SchemaEvolutionPlan from a change description. It does not execute anything — callers execute via executeEvolutionPlan.
type SchemaEvolvePipelineStep ¶
type SchemaEvolvePipelineStep struct {
// contains filtered or unexported fields
}
SchemaEvolvePipelineStep orchestrates a schema change across the full CDC pipeline: schema registry → lakehouse → source DB → CDC connector restart.
type SchemaEvolveVerifyStep ¶
type SchemaEvolveVerifyStep struct {
// contains filtered or unexported fields
}
SchemaEvolveVerifyStep verifies that all pipeline layers have consistent schemas after an evolution. Compares source DB introspection against schema registry.
type SchemaModule ¶
type SchemaModule struct {
// contains filtered or unexported fields
}
SchemaModule implements sdk.ModuleInstance for the migrate.schema module type.
func LookupModule ¶
func LookupModule(name string) (*SchemaModule, error)
LookupModule retrieves a SchemaModule by name.
func (*SchemaModule) Init ¶
func (m *SchemaModule) Init() error
Init validates the module configuration.
func (*SchemaModule) LockTable ¶
func (m *SchemaModule) LockTable() string
LockTable returns the name of the migration state table.
func (*SchemaModule) OnBreakingChange ¶
func (m *SchemaModule) OnBreakingChange() string
OnBreakingChange returns the configured breaking-change policy.
func (*SchemaModule) Schemas ¶
func (m *SchemaModule) Schemas() []SchemaDefinition
Schemas returns the parsed declarative schema definitions.
func (*SchemaModule) Scripts ¶
func (m *SchemaModule) Scripts() []MigrationScript
Scripts returns the loaded migration scripts.
type SchemaModuleConfig ¶
type SchemaModuleConfig struct {
Strategy string `json:"strategy" yaml:"strategy"` // declarative, scripted, or both
Target string `json:"target" yaml:"target"` // database module reference
Schemas []SchemaPath `json:"schemas" yaml:"schemas"` // YAML schema file paths
MigrationsDir string `json:"migrationsDir" yaml:"migrationsDir"` // directory of NNN_*.sql files
LockTable string `json:"lockTable" yaml:"lockTable"` // defaults to schema_migrations
OnBreakingChange string `json:"onBreakingChange" yaml:"onBreakingChange"` // block, warn, blue_green
}
SchemaModuleConfig holds configuration for the migrate.schema module.
type SchemaPath ¶
type SchemaPath struct {
Path string `json:"path" yaml:"path"`
}
SchemaPath is a reference to a YAML schema file.
type SchemaRegistryProvider ¶
type SchemaRegistryProvider interface {
RegisterSubject(ctx context.Context, subject, schema string) (int, error)
CheckCompatibility(ctx context.Context, subject, schema string) (bool, error)
GetSchemaFields(ctx context.Context, subject string) ([]string, int, error)
}
SchemaRegistryProvider registers schemas and checks compatibility. Also used by the verify step to read schema fields.